mem_memberbase_query_page_java Normal 0 / ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID TRANSNAME Y TRANSNAME STATUS Y STATUS LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS STARTDATE Y STARTDATE ENDDATE Y ENDDATE LOGDATE Y LOGDATE DEPDATE Y DEPDATE REPLAYDATE Y REPLAYDATE LOG_FIELD Y LOG_FIELD EXECUTING_SERVER N EXECUTING_SERVER EXECUTING_USER N EXECUTING_USER CLIENT N CLIENT
ID_BATCH Y ID_BATCH SEQ_NR Y SEQ_NR LOGDATE Y LOGDATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS INPUT_BUFFER_ROWS Y INPUT_BUFFER_ROWS OUTPUT_BUFFER_ROWS Y OUTPUT_BUFFER_ROWS
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE LOGGING_OBJECT_TYPE Y LOGGING_OBJECT_TYPE OBJECT_NAME Y OBJECT_NAME OBJECT_COPY Y OBJECT_COPY REPOSITORY_DIRECTORY Y REPOSITORY_DIRECTORY FILENAME Y FILENAME OBJECT_ID Y OBJECT_ID OBJECT_REVISION Y OBJECT_REVISION PARENT_CHANNEL_ID Y PARENT_CHANNEL_ID ROOT_CHANNEL_ID Y ROOT_CHANNEL_ID
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS LOG_FIELD N LOG_FIELD
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE METRICS_DATE Y METRICS_DATE METRICS_CODE Y METRICS_CODE METRICS_DESCRIPTION Y METRICS_DESCRIPTION METRICS_SUBJECT Y METRICS_SUBJECT METRICS_TYPE Y METRICS_TYPE METRICS_VALUE Y METRICS_VALUE
0.0 0.0 10000 50 50 N Y 50000 Y N 1000 100 - 2019/05/26 19:55:18.893 - 2019/05/26 19:55:18.893 H4sIAAAAAAAAAAMAAAAAAAAAAAA= N MallSqlserver 192.168.50.32 MSSQLNATIVE Native Mall 1433 sa Encrypted 2be98afc819c69e8ea300ff228dd38f99 EXTRA_OPTION_MSSQLNATIVE.defaultRowPrefetch 200 EXTRA_OPTION_MSSQLNATIVE.readTimeout 60 FORCE_IDENTIFIERS_TO_LOWERCASE N FORCE_IDENTIFIERS_TO_UPPERCASE N INITIAL_POOL_SIZE 500 IS_CLUSTERED N MAXIMUM_POOL_SIZE 1000 MSSQLUseIntegratedSecurity false MSSQL_DOUBLE_DECIMAL_SEPARATOR N PORT_NUMBER 1433 PRESERVE_RESERVED_WORD_CASE Y QUOTE_ALL_FIELDS N SUPPORTS_BOOLEAN_DATA_TYPE Y SUPPORTS_TIMESTAMP_DATA_TYPE Y USE_POOLING Y 表输入 2 Java 代码 Y Java 代码 Elasticsearch bulk insert 2 Y Elasticsearch bulk insert 2 ElasticSearchBulk Y 1 none crm_memberbase _doc 5000 100 SECONDS N MemberKey Y N Y Gender Gender MemReceiverMobile MemReceiverMobile MemReceiverPhone MemReceiverPhone MemUsualAddress MemUsualAddress MemUsualAreaCode MemUsualAreaCode MemUsualCityCode MemUsualCityCode MemUsualPhone MemUsualPhone MemUsualProvinceCode MemUsualProvinceCode MemberKey MemberKey MemberNickName MemberNickName OriginType OriginType RegisterTime RegisterTime order_join_field order_join_field
192.168.50.32
9300
cluster.name es custom.fields.order_join_field { "type": "join","relations": {"Member": "Order,OriginType" }} 544 96 Y
Java 代码 UserDefinedJavaClass N 120 none TRANSFORM_CLASS Processor import java.sql.*; import org.pentaho.di.core.database.*; Database database = null; PreparedStatement stat = null; ResultSet resultSet = null; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { int page = Integer.parseInt(getVariable("page")); int pagesize = Integer.parseInt(getVariable("pagesize")); Object[] r = getRow(); if (r == null) { try { if (stat!=null) { stat.close(); } if (database!=null) { database.disconnect(); } } catch(Exception e) { throw new KettleException(e); } setOutputDone(); return false; } r = createOutputRow(r, data.outputRowMeta.size()); //获取数据库名和表名 String dbName = "testSqlServer";//getInputRowMeta().getString(r, "conname", null ); String tablename = "DataImport";//getInputRowMeta().getString(r, "tablename", null ); String idname = "MemberKey";//getInputRowMeta().getString(r, "idname", null ); String sourceidname = "MemUsualPhone";//getInputRowMeta().getString(r, "sourceidname", null ); String sourcetablename = "Order";//getInputRowMeta().getString(r, "sourcetablename", null ); if (dbName==null||tablename==null) { throw new KettleException("Unable to find field with name "+tablename+" in the input row."); } //logBasic("table---"+tablename); if(database == null){ //数据库连接 DatabaseMeta databaseMeta=null; try { databaseMeta = getTransMeta().findDatabase(dbName); if (databaseMeta==null) { logError("A connection with name "+dbName+" could not be found!"); setErrors(1); return false; } database = new Database(getTrans(), databaseMeta); database.connect(); //logBasic("success!"); } catch(Exception e) { logError("Connecting to database "+dbName+" failed.", e); setErrors(1); return false; } } //查询表数据 try { RowMetaInterface idxRowMeta =data.outputRowMeta; int i=0; r = createOutputRow(r, data.outputRowMeta.size()); //int index = getInputRowMeta().size(); // Add the index name // String Id = idxRowMeta.getString(r, idname, null); // Add the column name String DataId = idxRowMeta.getString(r, sourceidname, null); String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'"; resultSet = database.openQuery(sqlSelect); Object[] idxRow = database.getRow(resultSet); if(idxRow!=null){ if (database!=null) { database.closeQuery(resultSet); } return true; } //logBasic("idxRow--Id"+Id); //logBasic("idxRow--sourcetablename"+sourcetablename); //logBasic("idxRow--DataId"+DataId); //3.获得预处理对象 String sql=" insert into "+tablename+" values (?,?,?)"; //logBasic("idxRow--database"+ database); stat = database.prepareSQL(sql); //logBasic("idxRow--database"+ stat); //stat.addBatch(sql); //4.SQL语句占位符设置实际参数 stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试” stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3 stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3 //5.执行SQL语句 boolean line = stat.execute(); //int[] line = stat.executeBatch(); System.out.println("更新记录数"+ line); //6.释放资源 //stat.close(); Integer pn = Integer.parseInt(Id); Integer curpageNum = Integer.parseInt(Id) % pagesize; if(pn > 0 && curpageNum == 0){ setVariable("page",String.valueOf(page)); logBasic("idxRow--getVariable"+getVariable("page")); } //logBasic("idxRow--length"+i); } catch(Exception e) { throw new KettleException(e); } //释放连接 //if (database!=null) { // database.disconnect(); //} // Send the row on to the next step. putRow(data.outputRowMeta, r); return true; } N 320 64 Y 表输入 2 TableInput N 1 none MallSqlserver select ROW_NUMBER() over (order by AConsigneePhone2 asc ) as MemberKey, sex as MemberGender, Consignee as MemberNickName, case when AConsigneePhone2= '' then (case when AconsigneePhone1!='' then dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) else ''end) ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) end as MemUsualPhone,--电话号码 case when AConsigneePhone1= '' then '' ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) end as MemReceiverMobile,--手机号码 case when AConsigneePhone3= '' then '' ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) end as MemReceiverPhone,--电话号码 case when ADeliveryAddress= '' then '' ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',ADeliveryAddress) end as MemUsualAddress, OriginType, ( select CONVERT(varchar(100), min(b.OrderTime), 20) from orders b where b.AConsigneePhone2=a.AConsigneePhone2 and b.OrderTime!='1800/01/01 00:00:00' group by b.AConsigneePhone2 ) as RegisterTime, case when b0.Level= 3 then a.RegionCode else null end as MemUsualAreaCode, case when b1.Level= 2 then b.ParentCode else b.AreaCode end as MemUsualCityCode, case when c1.Level= 1 then c.ParentCode else c.AreaCode end as MemUsualProvinceCode ,1 as HasOrder ,convert(varchar,datepart(year,getdate())) as RegisterYear ,convert(varchar,datepart(month,getdate())) as RegisterMonth ,convert(varchar,datepart(day,getdate())) as RegisterDay ,'Member' as order_join_field from orders a left join Areamap b0 on b0.MappingCode=a.RegionCode left join AreaRegion b on b.AreaCode=a.RegionCode left join Areamap b1 on b1.MappingCode=b.ParentCode left join AreaRegion c on c.AreaCode=b.ParentCode left join Areamap c1 on c1.MappingCode=c.ParentCode where 1=1 and (a.AConsigneePhone2!= '' or a.AConsigneePhone1!= '') order by AConsigneePhone2 asc offset ((${page}-1)*${pagesize}) rows fetch NEXT (${pagesize}) rows only 0 数据检验 N Y N 192 112 Y N