etl_all_OrdersPrescription Normal / ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID TRANSNAME Y TRANSNAME STATUS Y STATUS LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS STARTDATE Y STARTDATE ENDDATE Y ENDDATE LOGDATE Y LOGDATE DEPDATE Y DEPDATE REPLAYDATE Y REPLAYDATE LOG_FIELD Y LOG_FIELD EXECUTING_SERVER N EXECUTING_SERVER EXECUTING_USER N EXECUTING_USER CLIENT N CLIENT
ID_BATCH Y ID_BATCH SEQ_NR Y SEQ_NR LOGDATE Y LOGDATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS INPUT_BUFFER_ROWS Y INPUT_BUFFER_ROWS OUTPUT_BUFFER_ROWS Y OUTPUT_BUFFER_ROWS
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE LOGGING_OBJECT_TYPE Y LOGGING_OBJECT_TYPE OBJECT_NAME Y OBJECT_NAME OBJECT_COPY Y OBJECT_COPY REPOSITORY_DIRECTORY Y REPOSITORY_DIRECTORY FILENAME Y FILENAME OBJECT_ID Y OBJECT_ID OBJECT_REVISION Y OBJECT_REVISION PARENT_CHANNEL_ID Y PARENT_CHANNEL_ID ROOT_CHANNEL_ID Y ROOT_CHANNEL_ID
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS LOG_FIELD N LOG_FIELD
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE METRICS_DATE Y METRICS_DATE METRICS_CODE Y METRICS_CODE METRICS_DESCRIPTION Y METRICS_DESCRIPTION METRICS_SUBJECT Y METRICS_SUBJECT METRICS_TYPE Y METRICS_TYPE METRICS_VALUE Y METRICS_VALUE
0.0 0.0 10000 50 50 N Y 50000 Y N 1000 100 - 2019/07/12 17:22:11.899 - 2019/07/12 17:22:11.899 H4sIAAAAAAAAAAMAAAAAAAAAAAA= N 219.128.77.96Mall 219.128.77.96 MSSQLNATIVE Native Mall 1433 caixukun Encrypted 2be98afc86aa7f2e4a801a5508cc2fe83 FORCE_IDENTIFIERS_TO_LOWERCASE N FORCE_IDENTIFIERS_TO_UPPERCASE N IS_CLUSTERED N MSSQLUseIntegratedSecurity false MSSQL_DOUBLE_DECIMAL_SEPARATOR N PORT_NUMBER 1433 PRESERVE_RESERVED_WORD_CASE Y QUOTE_ALL_FIELDS N SUPPORTS_BOOLEAN_DATA_TYPE Y SUPPORTS_TIMESTAMP_DATA_TYPE Y USE_POOLING N mall 192.168.50.32 MSSQLNATIVE Native Mall 1433 sa Encrypted 2be98afc86aa7f297aa15a478c7d38f99 FORCE_IDENTIFIERS_TO_LOWERCASE N FORCE_IDENTIFIERS_TO_UPPERCASE N IS_CLUSTERED N MSSQLUseIntegratedSecurity false MSSQL_DOUBLE_DECIMAL_SEPARATOR N PORT_NUMBER 1433 PRESERVE_RESERVED_WORD_CASE Y QUOTE_ALL_FIELDS N SUPPORTS_BOOLEAN_DATA_TYPE Y SUPPORTS_TIMESTAMP_DATA_TYPE Y USE_POOLING N Java 代码 表输出 Y 表输入 Java 代码 Y Java 代码 UserDefinedJavaClass Y 10 none TRANSFORM_CLASS Processor import java.sql.*; import org.pentaho.di.core.database.*; import org.apache.http.HttpHost; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.script.Script; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import com.microsoft.sqlserver.jdbc.SQLServerException; Database database = null; PreparedStatement stat = null; PreparedStatement stat1 = null; RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")})); Integer index = 0; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { //logBasic("start---"); Object[] r = getRow(); if (r == null) { try { if (stat!=null) { stat.close(); } if (stat1!=null) { stat1.close(); } if (database!=null) { database.disconnect(); } if(client!=null){ client.close(); } } catch(Exception e) { throw new KettleException(e); } setOutputDone(); return false; } synchronized(this) { r = createOutputRow(r, data.outputRowMeta.size()); //获取数据库名和表名 String dbName = "mall"; String tablename = "PrescriptionRecord";//getInputRowMeta().getString(r, "tablename", null ); String sourceidname = "Id";//getInputRowMeta().getString(r, "sourceidname", null ); if (dbName==null||tablename==null) { throw new KettleException("Unable to find field with name "+tablename+" in the input row."); } //logBasic("table---"+tablename); if(database == null){ //数据库连接 DatabaseMeta databaseMeta=null; try { databaseMeta = getTransMeta().findDatabase(dbName); if (databaseMeta==null) { logError("A connection with name "+dbName+" could not be found!"); setErrors(1); return false; } database = new Database(getTrans(), databaseMeta); database.connect(); //logBasic("success!"); } catch(Exception e) { logError("Connecting to database "+dbName+" failed.", e); setErrors(1); return false; } } //查询表数据 try { RowMetaInterface idxRowMeta =data.outputRowMeta; int i=0; r = createOutputRow(r, data.outputRowMeta.size()); //int index = getInputRowMeta().size(); // Add the column name String DataId = idxRowMeta.getString(r, sourceidname, null); String sqlSelect = "select Id from "+tablename + " where Id = '"+ DataId +"'"; ResultSet resultSet = null; resultSet = database.openQuery(sqlSelect); Object[] idxRow = database.getRow(resultSet); if (database!=null) { database.closeQuery(resultSet); resultSet = null; idxRowMeta = null; } if(idxRow != null){ return true; } //logBasic("idxRow--Id"+Id); //logBasic("idxRow--sourcetablename"+sourcetablename); //logBasic("idxRow--DataId"+DataId); /* GetRequest getRequest = new GetRequest( "crm_order_routingphone", // Index "_doc", // /Type DataId); // Document id getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段 getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段 boolean exists = client.exists(getRequest,RequestOptions.DEFAULT); //client.close(); if(exists ){ return true; } //if(!exists && idxRow == null){ // return true; //} */ /* //3.获得预处理对象 String sql="insert into "+tablename+" values (?,?,?,?);";//begin tran t2; commit tran t2 //logBasic("idxRow--database"+ database); if(stat == null) stat = database.prepareSQL(sql); //logBasic("idxRow--database"+ stat); //stat.addBatch(sql); //4.SQL语句占位符设置实际参数 stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试” stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3 stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3 stat.setString(4, "phone1"); //stat.setInt(5, index); //5.执行SQL语句 boolean line = stat.execute(); //int[] line = stat.executeBatch(); //System.out.println("更新记录数"+ line); //6.释放资源 //stat.close(); */ } catch(SQLServerException e) { return true; }catch(Exception e) { throw new KettleException(e); } //释放连接 //if (database!=null) { // database.disconnect(); //} // Send the row on to the next step. } putRow(data.outputRowMeta, r); return true; } N 352 80 Y 表输入 TableInput Y 1 none 219.128.77.96Mall SELECT * FROM PrescriptionRecord a where a.CreationDate > '2019-05-01 17:56:29' and datediff(day,DATEADD(d,0,DATEDIFF(d,0,getdate())-10),CreationDate )>0 order by CreationDate asc 0 N N N 192 96 Y 表输出 TableOutput Y 1 none mall dbo
PrescriptionRecord
1000 N N Y N N N Y N Y N 400 192 Y N