mem_memberheathytemp_doc Normal / ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID TRANSNAME Y TRANSNAME STATUS Y STATUS LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS STARTDATE Y STARTDATE ENDDATE Y ENDDATE LOGDATE Y LOGDATE DEPDATE Y DEPDATE REPLAYDATE Y REPLAYDATE LOG_FIELD Y LOG_FIELD EXECUTING_SERVER N EXECUTING_SERVER EXECUTING_USER N EXECUTING_USER CLIENT N CLIENT
ID_BATCH Y ID_BATCH SEQ_NR Y SEQ_NR LOGDATE Y LOGDATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS INPUT_BUFFER_ROWS Y INPUT_BUFFER_ROWS OUTPUT_BUFFER_ROWS Y OUTPUT_BUFFER_ROWS
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE LOGGING_OBJECT_TYPE Y LOGGING_OBJECT_TYPE OBJECT_NAME Y OBJECT_NAME OBJECT_COPY Y OBJECT_COPY REPOSITORY_DIRECTORY Y REPOSITORY_DIRECTORY FILENAME Y FILENAME OBJECT_ID Y OBJECT_ID OBJECT_REVISION Y OBJECT_REVISION PARENT_CHANNEL_ID Y PARENT_CHANNEL_ID ROOT_CHANNEL_ID Y ROOT_CHANNEL_ID
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS LOG_FIELD N LOG_FIELD
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE METRICS_DATE Y METRICS_DATE METRICS_CODE Y METRICS_CODE METRICS_DESCRIPTION Y METRICS_DESCRIPTION METRICS_SUBJECT Y METRICS_SUBJECT METRICS_TYPE Y METRICS_TYPE METRICS_VALUE Y METRICS_VALUE
0.0 0.0 10000 50 50 N Y 50000 Y N 1000 100 - 2019/06/10 13:44:56.679 - 2019/06/10 13:44:56.679 H4sIAAAAAAAAAAMAAAAAAAAAAAA= N mall 192.168.50.32 MSSQLNATIVE Native Mall 1433 sa Encrypted 2be98afc819c69e8ea300ff228dd38f99 FORCE_IDENTIFIERS_TO_LOWERCASE N FORCE_IDENTIFIERS_TO_UPPERCASE N IS_CLUSTERED N MSSQLUseIntegratedSecurity false MSSQL_DOUBLE_DECIMAL_SEPARATOR N PORT_NUMBER 1433 PRESERVE_RESERVED_WORD_CASE Y QUOTE_ALL_FIELDS N SUPPORTS_BOOLEAN_DATA_TYPE Y SUPPORTS_TIMESTAMP_DATA_TYPE Y USE_POOLING N memberheathytemp表输入 Java 代码 Y Java 代码 Elasticsearch bulk insert 2 Y memberheathytemp表输入 2 Java 代码 Y memberheathytemp表输入 3 Java 代码 Y Elasticsearch bulk insert 2 ElasticSearchBulk Y 1 none crm_memberhealth_temp _doc 1000 SECONDS N Id Y N Y DiseaseClassCode DiseaseClassCode MemTerminalId MemTerminalId MemberAge MemberAge MemberGender MemberGender MemberHeight MemberHeight MemberId MemberId MemberName MemberName MemberPhone1 MemberPhone1 MemberPhone2 MemberPhone2 MemberPhone3 MemberPhone3 MemberUpdateTime MemberUpdateTime MemberWeight MemberWeight
192.168.50.32
9300
cluster.name es custom.fields.aliase mem_memberhealth_temp 784 128 Y
Java 代码 UserDefinedJavaClass N 20 none TRANSFORM_CLASS Processor import java.sql.*; import org.pentaho.di.core.database.*; import org.apache.http.HttpHost; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.script.Script; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import com.microsoft.sqlserver.jdbc.SQLServerException; Database database = null; PreparedStatement stat = null; PreparedStatement stat1 = null; RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")})); Integer index = 0; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { //logBasic("start---"); String type = getVariable("type"); String indexs = getVariable("index"); //if (indexs != null) index = Integer.parseInt(indexs); Object[] r = getRow(); if (r == null) { try { if (stat!=null) { stat.close(); } if (stat1!=null) { stat1.close(); } if (database!=null) { database.disconnect(); } if(client!=null){ client.close(); } } catch(Exception e) { throw new KettleException(e); } setOutputDone(); return false; } synchronized(this) { r = createOutputRow(r, data.outputRowMeta.size()); //获取数据库名和表名 String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null ); String tablename = "DataImport_heathytemp";//getInputRowMeta().getString(r, "tablename", null ); String idname = "MemberId";//getInputRowMeta().getString(r, "idname", null ); String sourceidname = "Id";//getInputRowMeta().getString(r, "sourceidname", null ); String sourcetablename = "Orders";//getInputRowMeta().getString(r, "sourcetablename", null ); if (dbName==null||tablename==null) { throw new KettleException("Unable to find field with name "+tablename+" in the input row."); } //logBasic("table---"+tablename); if(database == null){ //数据库连接 DatabaseMeta databaseMeta=null; try { databaseMeta = getTransMeta().findDatabase(dbName); if (databaseMeta==null) { logError("A connection with name "+dbName+" could not be found!"); setErrors(1); return false; } database = new Database(getTrans(), databaseMeta); database.connect(); //logBasic("success!"); } catch(Exception e) { logError("Connecting to database "+dbName+" failed.", e); setErrors(1); return false; } } //查询表数据 try { RowMetaInterface idxRowMeta =data.outputRowMeta; int i=0; r = createOutputRow(r, data.outputRowMeta.size()); //int index = getInputRowMeta().size(); // Add the index name // String Id = idxRowMeta.getString(r, idname, null); // Add the column name String DataId = idxRowMeta.getString(r, sourceidname, null); /*String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'"; ResultSet resultSet = null; resultSet = database.openQuery(sqlSelect); Object[] idxRow = database.getRow(resultSet); if (database!=null) { database.closeQuery(resultSet); resultSet = null; } //if(idxRow != null){ // return true; //} */ //logBasic("idxRow--Id"+Id); //logBasic("idxRow--sourcetablename"+sourcetablename); //logBasic("idxRow--DataId"+DataId); GetRequest getRequest = new GetRequest( "crm_memberheathytemp", // Index "_doc", // /Type DataId); // Document id getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段 getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段 boolean exists = client.exists(getRequest,RequestOptions.DEFAULT); //client.close(); if(exists ){ return true; } //if(!exists && idxRow == null){ // return true; //} //3.获得预处理对象 String sql="insert into "+tablename+"(Id,DataName,DataId,Type) values (?,?,?,?);";//begin tran t2; commit tran t2 //logBasic("idxRow--database"+ database); index = index + 1; if(stat == null) stat = database.prepareSQL(sql); //logBasic("idxRow--database"+ stat); //stat.addBatch(sql); //4.SQL语句占位符设置实际参数 stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试” stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3 stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3 stat.setString(4, "phone1,phone2"); //stat.setString(5,MemberFriendsPhone) //stat.setString(5, index); //5.执行SQL语句 boolean line = stat.execute(); //int[] line = stat.executeBatch(); //System.out.println("更新记录数"+ line); //6.释放资源 //stat.close(); //setVariable("index",String.valueOf(index)); //Integer pn = Integer.parseInt(Id); //Integer curpageNum = Integer.parseInt(Id) % pagesize; //if(pn > 0 && curpageNum == 0){ // setVariable("page",String.valueOf(page)); //} //logBasic("idxRow--getVariable"+getVariable("page")); //logBasic("idxRow--curpageNum"+curpageNum); //logBasic("idxRow--length"+i); } catch(SQLServerException e) { return true; }catch(Exception e) { throw new KettleException(e); } //释放连接 //if (database!=null) { // database.disconnect(); //} // Send the row on to the next step. } putRow(data.outputRowMeta, r); return true; } N 576 192 Y memberheathytemp表输入 TableInput Y 1 none mall SELECT a.BuyUserId AS Id , a.BuyUserId AS MemberId ,a.Consignee AS MemberName, a.Height AS MemberHeight, a.Weight AS MemberWeight, CASE WHEN b.Gender IS NULL THEN a.Sex END AS MemberGender, a.OrderTime AS MemberUpdateTime, a.DiseaseClassCode, a.OriginType AS MemTerminalId, Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) AS MemberPhone1, Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) AS MemberPhone2, Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) AS MemberPhone3, b.age as MemberAge FROM [Mall].[dbo].[Orders] a LEFT JOIN [Mall]..PrescriptionRecord AS b ON a.OrdersCode = b.OrdersCode WHERE AConsigneePhone1 !='' order by a.BuyUserId 0 N N N 432 80 Y memberheathytemp表输入 2 TableInput Y 1 none mall SELECT a.BuyUserId AS Id , a.BuyUserId AS MemberId ,a.Consignee AS MemberName, a.Height AS MemberHeight, a.Weight AS MemberWeight, CASE WHEN b.Gender IS NULL THEN a.Sex END AS MemberGender, a.OrderTime AS MemberUpdateTime, a.DiseaseClassCode, a.OriginType AS MemTerminalId, Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) AS MemberPhone1, Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) AS MemberPhone2, Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) AS MemberPhone3, b.age as MemberAge FROM [Mall].[dbo].[Orders] a LEFT JOIN [Mall]..PrescriptionRecord AS b ON a.OrdersCode = b.OrdersCode where AConsigneePhone2 !='' order by a.BuyUserId 0 N N N 400 160 Y memberheathytemp表输入 3 TableInput Y 1 none mall SELECT a.BuyUserId AS Id , a.BuyUserId AS MemberId ,a.Consignee AS MemberName, a.Height AS MemberHeight, a.Weight AS MemberWeight, CASE WHEN b.Gender IS NULL THEN a.Sex END AS MemberGender, a.OrderTime AS MemberUpdateTime, a.DiseaseClassCode, a.OriginType AS MemTerminalId, Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) AS MemberPhone1, Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) AS MemberPhone2, Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) AS MemberPhone3, b.age as MemberAge FROM [Mall].[dbo].[Orders] a LEFT JOIN [Mall]..PrescriptionRecord AS b ON a.OrdersCode = b.OrdersCode where AConsigneePhone3 !='' order by a.BuyUserId 0 N N N 451 256 Y N