mem_memberbase_query_page Normal 0 / ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID TRANSNAME Y TRANSNAME STATUS Y STATUS LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS STARTDATE Y STARTDATE ENDDATE Y ENDDATE LOGDATE Y LOGDATE DEPDATE Y DEPDATE REPLAYDATE Y REPLAYDATE LOG_FIELD Y LOG_FIELD EXECUTING_SERVER N EXECUTING_SERVER EXECUTING_USER N EXECUTING_USER CLIENT N CLIENT
ID_BATCH Y ID_BATCH SEQ_NR Y SEQ_NR LOGDATE Y LOGDATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS INPUT_BUFFER_ROWS Y INPUT_BUFFER_ROWS OUTPUT_BUFFER_ROWS Y OUTPUT_BUFFER_ROWS
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE LOGGING_OBJECT_TYPE Y LOGGING_OBJECT_TYPE OBJECT_NAME Y OBJECT_NAME OBJECT_COPY Y OBJECT_COPY REPOSITORY_DIRECTORY Y REPOSITORY_DIRECTORY FILENAME Y FILENAME OBJECT_ID Y OBJECT_ID OBJECT_REVISION Y OBJECT_REVISION PARENT_CHANNEL_ID Y PARENT_CHANNEL_ID ROOT_CHANNEL_ID Y ROOT_CHANNEL_ID
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS LOG_FIELD N LOG_FIELD
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE METRICS_DATE Y METRICS_DATE METRICS_CODE Y METRICS_CODE METRICS_DESCRIPTION Y METRICS_DESCRIPTION METRICS_SUBJECT Y METRICS_SUBJECT METRICS_TYPE Y METRICS_TYPE METRICS_VALUE Y METRICS_VALUE
0.0 0.0 10000 50 50 N Y 50000 Y N 1000 100 - 2019/05/26 19:55:18.893 - 2019/05/26 19:55:18.893 H4sIAAAAAAAAAAMAAAAAAAAAAAA= N MallSqlserver 192.168.50.32 MSSQLNATIVE Native Mall 1433 sa Encrypted 2be98afc819c69e8ea300ff228dd38f99 EXTRA_OPTION_MSSQLNATIVE.defaultRowPrefetch 200 EXTRA_OPTION_MSSQLNATIVE.readTimeout 60 FORCE_IDENTIFIERS_TO_LOWERCASE N FORCE_IDENTIFIERS_TO_UPPERCASE N INITIAL_POOL_SIZE 100 IS_CLUSTERED N MAXIMUM_POOL_SIZE 300 MSSQLUseIntegratedSecurity false MSSQL_DOUBLE_DECIMAL_SEPARATOR N PORT_NUMBER 1433 PRESERVE_RESERVED_WORD_CASE Y QUOTE_ALL_FIELDS N SUPPORTS_BOOLEAN_DATA_TYPE Y SUPPORTS_TIMESTAMP_DATA_TYPE Y USE_POOLING Y Java 代码 Elasticsearch bulk insert 2 Y 表输入 2 Java 代码 Y Elasticsearch bulk insert 2 ElasticSearchBulk Y 1 none crm_memberbase _doc 5000 100 SECONDS N MemberKey Y N Y HasOrder HasOrder MemReceiverMobile MemReceiverMobile MemReceiverPhone MemReceiverPhone MemUsualAddress MemUsualAddress MemUsualAreaCode MemUsualAreaCode MemUsualCityCode MemUsualCityCode MemUsualPhone MemUsualPhone MemUsualProvinceCode MemUsualProvinceCode MemberGender MemberGender MemberKey MemberKey MemberNickName MemberNickName OriginType OriginType RegisterDay RegisterDay RegisterMonth RegisterMonth RegisterTime RegisterTime RegisterYear RegisterYear order_join_field order_join_field
192.168.50.32
9300
cluster.name es 464 128 Y
Java 代码 UserDefinedJavaClass Y 100 none TRANSFORM_CLASS Processor import java.sql.*; import org.pentaho.di.core.database.*; Database database = null; PreparedStatement stat = null; ResultSet resultSet = null; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { int page = Integer.parseInt(getVariable("page")); int pagesize = Integer.parseInt(getVariable("pagesize")); Object[] r = getRow(); if (r == null) { try { if (stat!=null) { stat.close(); } if (database!=null) { database.disconnect(); } //if (database!=null) { //database.closeQuery(resultSet); //} } catch(Exception e) { throw new KettleException(e); } setOutputDone(); return false; } r = createOutputRow(r, data.outputRowMeta.size()); //获取数据库名和表名 String dbName = "MemberSqlserver";//getInputRowMeta().getString(r, "conname", null ); String tablename = "DataImport";//getInputRowMeta().getString(r, "tablename", null ); String idname = "MemberKey";//getInputRowMeta().getString(r, "idname", null ); String sourceidname = "MemUsualPhone";//getInputRowMeta().getString(r, "sourceidname", null ); String sourcetablename = "Order";//getInputRowMeta().getString(r, "sourcetablename", null ); if (dbName==null||tablename==null) { throw new KettleException("Unable to find field with name "+tablename+" in the input row."); } //logBasic("table---"+tablename); if(database == null){ //数据库连接 DatabaseMeta databaseMeta=null; try { databaseMeta = getTransMeta().findDatabase(dbName); if (databaseMeta==null) { logError("A connection with name "+dbName+" could not be found!"); setErrors(1); return false; } database = new Database(getTrans(), databaseMeta); database.connect(); //logBasic("success!"); } catch(Exception e) { logError("Connecting to database "+dbName+" failed.", e); setErrors(1); return false; } }//else{ // database.connect(); //} //查询表数据 try { RowMetaInterface idxRowMeta =data.outputRowMeta; int i=0; r = createOutputRow(r, data.outputRowMeta.size()); //int index = getInputRowMeta().size(); // Add the index name // String Id = idxRowMeta.getString(r, idname, null); // Add the column name String DataId = idxRowMeta.getString(r, sourceidname, null); String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'"; resultSet = database.openQuery(sqlSelect); Object[] idxRow = database.getRow(resultSet); if(idxRow!=null){ if (database!=null) { database.closeQuery(resultSet); resultSet = null; } return true; } //logBasic("idxRow--Id"+Id); //logBasic("idxRow--sourcetablename"+sourcetablename); //logBasic("idxRow--DataId"+DataId); //3.获得预处理对象 String sql=" insert into "+tablename+" values (?,?,?)"; //logBasic("idxRow--database"+ database); stat = database.prepareSQL(sql); //logBasic("idxRow--database"+ stat); //stat.addBatch(sql); //4.SQL语句占位符设置实际参数 stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试” stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3 stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3 //5.执行SQL语句 boolean line = stat.execute(); //int[] line = stat.executeBatch(); //System.out.println("更新记录数"+ line); //6.释放资源 //stat.close(); Integer pn = Integer.parseInt(Id); Integer curpageNum = Integer.parseInt(Id) % pagesize; //logBasic("idxRow--getVariable"+getVariable("page")); //logBasic("idxRow--curpageNum"+curpageNum); if(pn > 0 && curpageNum == 0){ setVariable("page",String.valueOf(page)); } //logBasic("idxRow--length"+i); } catch(Exception e) { throw new KettleException(e); } //释放连接 //if (database!=null) { // database.disconnect(); //database = null; //} //if (stat!=null) { // stat.close(); // stat = null; //} // Send the row on to the next step. putRow(data.outputRowMeta, r); return true; } N 352 64 Y 表输入 2 TableInput Y 1 none MallSqlserver select ROW_NUMBER() over (order by AConsigneePhone2 asc ) as MemberKey, case when sex= ''then 1 else sex end as MemberGender, Consignee as MemberNickName, case when AConsigneePhone2= '' then (case when AconsigneePhone1!='' then dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) else ''end) ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) end as MemUsualPhone,--电话号码 case when AConsigneePhone3= '' then '' ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) end as MemPhone1,-- case when AConsigneePhone3= '' then '' ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) end as MemPhone2,-- case when AConsigneePhone1= '' then '' ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) end as MemPhone3,-- case when ADeliveryAddress= '' then '' ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',ADeliveryAddress) end as MemUsualAddress, OriginType, ( select CONVERT(varchar(100), min(b.OrderTime), 20) from orders b where b.AConsigneePhone2=a.AConsigneePhone2 and b.OrderTime!='1800/01/01 00:00:00' group by b.AConsigneePhone2 ) as RegisterTime, case when b0.Level= 3 then a.RegionCode else null end as MemUsualAreaCode, case when b1.Level= 2 then b.ParentCode else b.AreaCode end as MemUsualCityCode, case when c1.Level= 1 then c.ParentCode else c.AreaCode end as MemUsualProvinceCode ,'true' as HasOrder ,convert(varchar,datepart(year,getdate())) as RegisterYear ,convert(varchar,datepart(month,getdate())) as RegisterMonth ,convert(varchar,datepart(day,getdate())) as RegisterDay ,'Member' as order_join_field from orders a left join Areamap b0 on b0.MappingCode=a.RegionCode left join AreaRegion b on b.AreaCode=a.RegionCode left join Areamap b1 on b1.MappingCode=b.ParentCode left join AreaRegion c on c.AreaCode=b.ParentCode left join Areamap c1 on c1.MappingCode=c.ParentCode where 1=1 and (a.AConsigneePhone2!= '' or a.AConsigneePhone1!= '') order by AConsigneePhone2 asc offset ((${page}-1)*${pagesize}) rows fetch NEXT (${pagesize}) rows only --offset 0 rows fetch NEXT 2 rows only 0 N Y N 224 64 Y N