etl_all_Orders Normal / ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID TRANSNAME Y TRANSNAME STATUS Y STATUS LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS STARTDATE Y STARTDATE ENDDATE Y ENDDATE LOGDATE Y LOGDATE DEPDATE Y DEPDATE REPLAYDATE Y REPLAYDATE LOG_FIELD Y LOG_FIELD EXECUTING_SERVER N EXECUTING_SERVER EXECUTING_USER N EXECUTING_USER CLIENT N CLIENT
ID_BATCH Y ID_BATCH SEQ_NR Y SEQ_NR LOGDATE Y LOGDATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS INPUT_BUFFER_ROWS Y INPUT_BUFFER_ROWS OUTPUT_BUFFER_ROWS Y OUTPUT_BUFFER_ROWS
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE LOGGING_OBJECT_TYPE Y LOGGING_OBJECT_TYPE OBJECT_NAME Y OBJECT_NAME OBJECT_COPY Y OBJECT_COPY REPOSITORY_DIRECTORY Y REPOSITORY_DIRECTORY FILENAME Y FILENAME OBJECT_ID Y OBJECT_ID OBJECT_REVISION Y OBJECT_REVISION PARENT_CHANNEL_ID Y PARENT_CHANNEL_ID ROOT_CHANNEL_ID Y ROOT_CHANNEL_ID
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS LOG_FIELD N LOG_FIELD
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE METRICS_DATE Y METRICS_DATE METRICS_CODE Y METRICS_CODE METRICS_DESCRIPTION Y METRICS_DESCRIPTION METRICS_SUBJECT Y METRICS_SUBJECT METRICS_TYPE Y METRICS_TYPE METRICS_VALUE Y METRICS_VALUE
0.0 0.0 10000 50 50 N Y 50000 Y N 1000 100 - 2019/07/11 15:20:46.215 - 2019/07/11 15:20:46.215 H4sIAAAAAAAAAAMAAAAAAAAAAAA= N 219.128.77.96Mall 219.128.77.96 MSSQLNATIVE Native Mall 1433 caixukun Encrypted 2be98afc86aa7f2e4a801a5508cc2fe83 FORCE_IDENTIFIERS_TO_LOWERCASE N FORCE_IDENTIFIERS_TO_UPPERCASE N IS_CLUSTERED N MSSQLUseIntegratedSecurity false MSSQL_DOUBLE_DECIMAL_SEPARATOR N PORT_NUMBER 1433 PRESERVE_RESERVED_WORD_CASE Y QUOTE_ALL_FIELDS N SUPPORTS_BOOLEAN_DATA_TYPE Y SUPPORTS_TIMESTAMP_DATA_TYPE Y USE_POOLING N mall 192.168.50.32 MSSQLNATIVE Native Mall 1433 sa Encrypted 2be98afc86aa7f297aa15a478c7d38f99 FORCE_IDENTIFIERS_TO_LOWERCASE N FORCE_IDENTIFIERS_TO_UPPERCASE N INITIAL_POOL_SIZE 30 IS_CLUSTERED N MAXIMUM_POOL_SIZE 1000 MSSQLUseIntegratedSecurity false MSSQL_DOUBLE_DECIMAL_SEPARATOR N PORT_NUMBER 1433 PRESERVE_RESERVED_WORD_CASE Y QUOTE_ALL_FIELDS N SUPPORTS_BOOLEAN_DATA_TYPE Y SUPPORTS_TIMESTAMP_DATA_TYPE Y USE_POOLING Y 表输入 Java 代码 Y Java 代码 表输出 Y Java 代码 UserDefinedJavaClass Y 10 none TRANSFORM_CLASS Processor import java.sql.*; import org.pentaho.di.core.database.*; import org.apache.http.HttpHost; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.script.Script; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import com.microsoft.sqlserver.jdbc.SQLServerException; Database database = null; PreparedStatement stat = null; PreparedStatement stat1 = null; RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")})); Integer index = 0; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { //logBasic("start---"); Object[] r = getRow(); if (r == null) { try { if (stat!=null) { stat.close(); } if (stat1!=null) { stat1.close(); } if (database!=null) { database.disconnect(); } if(client!=null){ client.close(); } } catch(Exception e) { throw new KettleException(e); } setOutputDone(); return false; } synchronized(this) { r = createOutputRow(r, data.outputRowMeta.size()); //获取数据库名和表名 String dbName = "mall"; String tablename = "Orders";//getInputRowMeta().getString(r, "tablename", null ); String sourceidname = "OrdersCode";//getInputRowMeta().getString(r, "sourceidname", null ); if (dbName==null||tablename==null) { throw new KettleException("Unable to find field with name "+tablename+" in the input row."); } //logBasic("table---"+tablename); if(database == null){ //数据库连接 DatabaseMeta databaseMeta=null; try { databaseMeta = getTransMeta().findDatabase(dbName); if (databaseMeta==null) { logError("A connection with name "+dbName+" could not be found!"); setErrors(1); return false; } database = new Database(getTrans(), databaseMeta); database.connect(); //logBasic("success!"); } catch(Exception e) { logError("Connecting to database "+dbName+" failed.", e); setErrors(1); return false; } } //查询表数据 try { RowMetaInterface idxRowMeta =data.outputRowMeta; int i=0; r = createOutputRow(r, data.outputRowMeta.size()); //int index = getInputRowMeta().size(); // Add the column name String DataId = idxRowMeta.getString(r, sourceidname, null); String sqlSelect = "select OrdersCode from "+tablename + " where OrdersCode = '"+ DataId +"'"; ResultSet resultSet = null; resultSet = database.openQuery(sqlSelect); Object[] idxRow = database.getRow(resultSet); if (database!=null) { database.closeQuery(resultSet); resultSet = null; idxRowMeta = null; } if(idxRow != null){ return true; } //logBasic("idxRow--Id"+Id); //logBasic("idxRow--sourcetablename"+sourcetablename); //logBasic("idxRow--DataId"+DataId); /* GetRequest getRequest = new GetRequest( "crm_order_routingphone", // Index "_doc", // /Type DataId); // Document id getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段 getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段 boolean exists = client.exists(getRequest,RequestOptions.DEFAULT); //client.close(); if(exists ){ return true; } //if(!exists && idxRow == null){ // return true; //} */ /* //3.获得预处理对象 String sql="insert into "+tablename+" values (?,?,?,?);";//begin tran t2; commit tran t2 //logBasic("idxRow--database"+ database); if(stat == null) stat = database.prepareSQL(sql); //logBasic("idxRow--database"+ stat); //stat.addBatch(sql); //4.SQL语句占位符设置实际参数 stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试” stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3 stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3 stat.setString(4, "phone1"); //stat.setInt(5, index); //5.执行SQL语句 boolean line = stat.execute(); //int[] line = stat.executeBatch(); //System.out.println("更新记录数"+ line); //6.释放资源 //stat.close(); */ } catch(SQLServerException e) { return true; }catch(Exception e) { throw new KettleException(e); } //释放连接 //if (database!=null) { // database.disconnect(); //} // Send the row on to the next step. } putRow(data.outputRowMeta, r); return true; } N 352 96 Y 表输入 TableInput Y 1 none 219.128.77.96Mall SELECT OrdersCode , CreationDate , LastModified , AccountId , AccountName , Consignee , isnull(DeliveryZipCode,'') as DeliveryZipCode , Invoice , InvoiceNo , OrderNotes , Sum , InitSum , ProductCost , Reducing , Discountprice , PaymentType , DeliveryType , OrderTime , ShippingTime , ReceivingTime , ShippingNo , OrderStatus , IsReturn , TransportCosts , RegionCode , IsRxDrug , OriginType , OperateID , DeptId , DeptCode , BuyUserId , ProductLine , SubProductLine , DiseaseClassCode , Age , Sex , SyncStatus , IsVisible , ErrorCount , IsNeedReceipt , OrdersIntegral , CouponCode , CouponValue , Ispayment , AuditorId , AllocationTime , SourcePlatforms , DeviceType , IsReward , IsEmailmark , CustomerDataId , IsDrug , HuaWuOriginType , InvoiceSum , AdvanceSum , AgainOrderCount , ConfimLevel , BusinessId , IsPayByCard , PreferentialRate , VerificationCode , AConsigneePhone1 , AConsigneePhone2 , AConsigneePhone3 , ADeliveryAddress , IsAutomaticSigned , WarehouseCode , CashCouponValue , CashCouponCode , IsDealWith , IsNewOrder , ConsigneeType , CustomerCode , CustomerName , InvoiceContent , InvoiceContentType , OrdersType , Weight , Height , HealthPrice , LeverNumber , IsO2O , AdvancePayType , InventoryAuditDate , OrderProcessDate , AllocationFlag , BonusDate , IntegralValue , IsToERP , MerchantNote , OtherNote , IsPrescription , ConfirmId , OrderType , OrderFlag FROM Orders where 1=1 and OrderTime>'2019-06-09 22:22:47'--'2019-05-03 03:51:59' and datediff(day,DATEADD(d,0,DATEDIFF(d,0,getdate())-5),OrderTime )>0 order by OrderTime asc 0 N N N 200 100 Y 表输出 TableOutput Y 1 none mall dbo
Orders
1000 N N Y N N N Y N Y N OrdersCode OrdersCode CreationDate CreationDate LastModified LastModified AccountId AccountId AccountName AccountName Consignee Consignee DeliveryZipCode DeliveryZipCode Invoice Invoice InvoiceNo InvoiceNo OrderNotes OrderNotes Sum Sum InitSum InitSum ProductCost ProductCost Reducing Reducing Discountprice Discountprice PaymentType PaymentType DeliveryType DeliveryType OrderTime OrderTime ShippingTime ShippingTime ReceivingTime ReceivingTime ShippingNo ShippingNo OrderStatus OrderStatus IsReturn IsReturn TransportCosts TransportCosts RegionCode RegionCode IsRxDrug IsRxDrug OriginType OriginType OperateID OperateID DeptId DeptId DeptCode DeptCode BuyUserId BuyUserId ProductLine ProductLine SubProductLine SubProductLine DiseaseClassCode DiseaseClassCode Age Age Sex Sex SyncStatus SyncStatus IsVisible IsVisible ErrorCount ErrorCount IsNeedReceipt IsNeedReceipt OrdersIntegral OrdersIntegral CouponCode CouponCode CouponValue CouponValue Ispayment Ispayment AuditorId AuditorId AllocationTime AllocationTime SourcePlatforms SourcePlatforms DeviceType DeviceType IsReward IsReward IsEmailmark IsEmailmark CustomerDataId CustomerDataId IsDrug IsDrug HuaWuOriginType HuaWuOriginType InvoiceSum InvoiceSum AdvanceSum AdvanceSum AgainOrderCount AgainOrderCount ConfimLevel ConfimLevel BusinessId BusinessId IsPayByCard IsPayByCard PreferentialRate PreferentialRate VerificationCode VerificationCode AConsigneePhone1 AConsigneePhone1 AConsigneePhone2 AConsigneePhone2 AConsigneePhone3 AConsigneePhone3 ADeliveryAddress ADeliveryAddress IsAutomaticSigned IsAutomaticSigned WarehouseCode WarehouseCode CashCouponValue CashCouponValue CashCouponCode CashCouponCode IsDealWith IsDealWith IsNewOrder IsNewOrder ConsigneeType ConsigneeType CustomerCode CustomerCode CustomerName CustomerName InvoiceContent InvoiceContent InvoiceContentType InvoiceContentType OrdersType OrdersType Weight Weight Height Height HealthPrice HealthPrice LeverNumber LeverNumber IsO2O IsO2O AdvancePayType AdvancePayType InventoryAuditDate InventoryAuditDate OrderProcessDate OrderProcessDate AllocationFlag AllocationFlag BonusDate BonusDate IntegralValue IntegralValue IsToERP IsToERP MerchantNote MerchantNote OtherNote OtherNote IsPrescription IsPrescription ConfirmId ConfirmId OrderType OrderType OrderFlag OrderFlag 400 200 Y N