mem_memberbase_append_memberId Normal 0 / ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID TRANSNAME Y TRANSNAME STATUS Y STATUS LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS STARTDATE Y STARTDATE ENDDATE Y ENDDATE LOGDATE Y LOGDATE DEPDATE Y DEPDATE REPLAYDATE Y REPLAYDATE LOG_FIELD Y LOG_FIELD EXECUTING_SERVER N EXECUTING_SERVER EXECUTING_USER N EXECUTING_USER CLIENT N CLIENT
ID_BATCH Y ID_BATCH SEQ_NR Y SEQ_NR LOGDATE Y LOGDATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS INPUT_BUFFER_ROWS Y INPUT_BUFFER_ROWS OUTPUT_BUFFER_ROWS Y OUTPUT_BUFFER_ROWS
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE LOGGING_OBJECT_TYPE Y LOGGING_OBJECT_TYPE OBJECT_NAME Y OBJECT_NAME OBJECT_COPY Y OBJECT_COPY REPOSITORY_DIRECTORY Y REPOSITORY_DIRECTORY FILENAME Y FILENAME OBJECT_ID Y OBJECT_ID OBJECT_REVISION Y OBJECT_REVISION PARENT_CHANNEL_ID Y PARENT_CHANNEL_ID ROOT_CHANNEL_ID Y ROOT_CHANNEL_ID
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS LOG_FIELD N LOG_FIELD
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE METRICS_DATE Y METRICS_DATE METRICS_CODE Y METRICS_CODE METRICS_DESCRIPTION Y METRICS_DESCRIPTION METRICS_SUBJECT Y METRICS_SUBJECT METRICS_TYPE Y METRICS_TYPE METRICS_VALUE Y METRICS_VALUE
0.0 0.0 3 50 50 N Y 50000 Y N 1000 100 - 2019/05/21 19:06:33.293 - 2019/05/21 19:06:33.293 H4sIAAAAAAAAAAMAAAAAAAAAAAA= N MemberSqlserver 192.168.50.32 MSSQLNATIVE Native MemberData 1433 sa Encrypted 2be98afc819c69e8ea300ff228dd38f99 EXTRA_OPTION_MSSQLNATIVE.instance MemberData FORCE_IDENTIFIERS_TO_LOWERCASE N FORCE_IDENTIFIERS_TO_UPPERCASE N INITIAL_POOL_SIZE 100 IS_CLUSTERED N MAXIMUM_POOL_SIZE 300 MSSQLUseIntegratedSecurity false MSSQL_DOUBLE_DECIMAL_SEPARATOR N PORT_NUMBER 1433 PRESERVE_RESERVED_WORD_CASE Y QUOTE_ALL_FIELDS N SUPPORTS_BOOLEAN_DATA_TYPE Y SUPPORTS_TIMESTAMP_DATA_TYPE Y USE_POOLING Y 表输入 2 Java 代码 Y Java 代码 UserDefinedJavaClass Y 10 none TRANSFORM_CLASS Processor import java.sql.*; import org.pentaho.di.core.database.*; import org.apache.http.HttpHost; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.script.Script; Database database = null; PreparedStatement stat = null; PreparedStatement stat1 = null; RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")})); public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { //logBasic("start---"); Object[] r = getRow(); if (r == null) { try { if (stat!=null) { stat.close(); } if (stat1!=null) { stat1.close(); } if (database!=null) { database.disconnect(); } if(client!=null){ client.close(); } } catch(Exception e) { throw new KettleException(e); } setOutputDone(); return false; } synchronized(this) { r = createOutputRow(r, data.outputRowMeta.size()); //获取数据库名和表名 String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null ); String tablename = "DataImport";//getInputRowMeta().getString(r, "tablename", null ); String idname = "Id";//getInputRowMeta().getString(r, "idname", null ); String idname1 = "MemPhone1";//getInputRowMeta().getString(r, "idname", null ); String idname2 = "MemPhone2";//getInputRowMeta().getString(r, "idname", null ); String idname3 = "MemPhone3";//getInputRowMeta().getString(r, "idname", null ); String idname4 = "MemberId";//getInputRowMeta().getString(r, "idname", null ); String idname5 = "OrdersCode";//getInputRowMeta().getString(r, "idname", null ); String sourceidname = "DataId";//getInputRowMeta().getString(r, "sourceidname", null ); String sourcetablename = "Order";//getInputRowMeta().getString(r, "sourcetablename", null ); if (dbName==null||tablename==null) { throw new KettleException("Unable to find field with name "+tablename+" in the input row."); } //logBasic("table---"+tablename); if(database == null){ //数据库连接 DatabaseMeta databaseMeta=null; try { databaseMeta = getTransMeta().findDatabase(dbName); if (databaseMeta==null) { logError("A connection with name "+dbName+" could not be found!"); setErrors(1); return false; } database = new Database(getTrans(), databaseMeta); database.connect(); //logBasic("success!"); } catch(Exception e) { logError("Connecting to database "+dbName+" failed.", e); setErrors(1); return false; } } //查询表数据 try { RowMetaInterface idxRowMeta =data.outputRowMeta; int i=0; r = createOutputRow(r, data.outputRowMeta.size()); //int index = getInputRowMeta().size(); // Add the index name // String Id = idxRowMeta.getString(r, idname, null); String Id1 = idxRowMeta.getString(r, idname1, null); String Id2 = idxRowMeta.getString(r, idname2, null); String Id3 = idxRowMeta.getString(r, idname3, null); String Id4 = idxRowMeta.getString(r, idname4, null); String Id5 = idxRowMeta.getString(r, idname5, null); // Add the column name String DataId = idxRowMeta.getString(r, sourceidname, null); /*//String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'"; String sqlSelect = "select Id from "+tablename + "_mem where Id = '"+ Id +"'"; ResultSet resultSet = null; resultSet = database.openQuery(sqlSelect); Object[] idxRow = database.getRow(resultSet); if(idxRow!=null){ if (database!=null) { database.closeQuery(resultSet); } return true; }*/ //logBasic("idxRow--Id"+Id); //logBasic("idxRow--sourcetablename"+sourcetablename); //logBasic("idxRow--DataId"+DataId); GetRequest getRequest = new GetRequest( "crm_memberbase", // Index "_doc", // Type Id); // Document id getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段 getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段 boolean exists = client.exists(getRequest,RequestOptions.DEFAULT); //client.close(); if(exists) { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("crm_memberbase"); updateRequest.type("_doc"); updateRequest.id(Id); updateRequest.doc(XContentFactory.jsonBuilder() .startObject() .field(idname1, Id1) .field(idname2, Id2) .field(idname3, Id3) .field(idname4, Id4) .field(idname5, Id5) .endObject()); client.update(updateRequest,RequestOptions.DEFAULT); } //logBasic("idxRow--length"+i); } catch(Exception e) { throw new KettleException(e); } //释放连接 //if (database!=null) { // database.disconnect(); //} // Send the row on to the next step. } putRow(data.outputRowMeta, r); return true; } N 288 96 Y 表输入 2 TableInput Y 1 none MemberSqlserver select * --,concat('{"name": "Order","parent": "',MemKey,'"}')as order_join_field from ( SELECT row_number() over (order by AConsigneePhone2 asc ) as MemKey, AconsigneePhone2, Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AconsigneePhone1 ) as MemPhone1, Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AconsigneePhone2 ) as MemPhone2, Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AconsigneePhone3 ) as MemPhone3, BuyUserId as MemberId , OrdersCode from Mall..Orders where 1=1 and (AconsigneePhone1!='' or AconsigneePhone2!='') --order by AConsigneePhone2 asc --offset 0 rows fetch NEXT 2 rows only )a left join MemberData..DataImport as b on a.MemKey = b.Id --dbo.DesEncryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',b.dataid ) = (case when AConsigneePhone2= '' then --(case -- when AconsigneePhone1!='' then AconsigneePhone1 -- else ''end) -- ELSE AconsigneePhone2 end) where 1=1 and b.Id is not null and MemKey >(108-1)*5000 --and MemKey <= 10 --MemKey >((${page}-1)*${pagesize}) and MemKey <= ((${page}-1)*${pagesize} + (${pagesize})) --order by a.AConsigneePhone2 asc --offset ((14-1)*5000) rows --fetch next (10) rows only 0 N N N 192 32 Y N