mem_Product_parent Normal / ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID TRANSNAME Y TRANSNAME STATUS Y STATUS LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS STARTDATE Y STARTDATE ENDDATE Y ENDDATE LOGDATE Y LOGDATE DEPDATE Y DEPDATE REPLAYDATE Y REPLAYDATE LOG_FIELD Y LOG_FIELD EXECUTING_SERVER N EXECUTING_SERVER EXECUTING_USER N EXECUTING_USER CLIENT N CLIENT
ID_BATCH Y ID_BATCH SEQ_NR Y SEQ_NR LOGDATE Y LOGDATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS INPUT_BUFFER_ROWS Y INPUT_BUFFER_ROWS OUTPUT_BUFFER_ROWS Y OUTPUT_BUFFER_ROWS
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE LOGGING_OBJECT_TYPE Y LOGGING_OBJECT_TYPE OBJECT_NAME Y OBJECT_NAME OBJECT_COPY Y OBJECT_COPY REPOSITORY_DIRECTORY Y REPOSITORY_DIRECTORY FILENAME Y FILENAME OBJECT_ID Y OBJECT_ID OBJECT_REVISION Y OBJECT_REVISION PARENT_CHANNEL_ID Y PARENT_CHANNEL_ID ROOT_CHANNEL_ID Y ROOT_CHANNEL_ID
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE TRANSNAME Y TRANSNAME STEPNAME Y STEPNAME STEP_COPY Y STEP_COPY LINES_READ Y LINES_READ LINES_WRITTEN Y LINES_WRITTEN LINES_UPDATED Y LINES_UPDATED LINES_INPUT Y LINES_INPUT LINES_OUTPUT Y LINES_OUTPUT LINES_REJECTED Y LINES_REJECTED ERRORS Y ERRORS LOG_FIELD N LOG_FIELD
ID_BATCH Y ID_BATCH CHANNEL_ID Y CHANNEL_ID LOG_DATE Y LOG_DATE METRICS_DATE Y METRICS_DATE METRICS_CODE Y METRICS_CODE METRICS_DESCRIPTION Y METRICS_DESCRIPTION METRICS_SUBJECT Y METRICS_SUBJECT METRICS_TYPE Y METRICS_TYPE METRICS_VALUE Y METRICS_VALUE
0.0 0.0 10000 50 50 N Y 50000 Y N 1000 100 - 2019/05/16 17:49:47.596 - 2019/05/16 17:49:47.596 H4sIAAAAAAAAAAMAAAAAAAAAAAA= N testSqlserver 192.168.20.122 MSSQLNATIVE Native MemberData 1433 ljhyPortal2016 Encrypted 746573746ed7f9c1af00ce938a8b5aef398cc2fe8c FORCE_IDENTIFIERS_TO_LOWERCASE N FORCE_IDENTIFIERS_TO_UPPERCASE N INITIAL_POOL_SIZE 100 IS_CLUSTERED N MAXIMUM_POOL_SIZE 300 MSSQLUseIntegratedSecurity false MSSQL_DOUBLE_DECIMAL_SEPARATOR N PORT_NUMBER 1433 PRESERVE_RESERVED_WORD_CASE Y QUOTE_ALL_FIELDS N SUPPORTS_BOOLEAN_DATA_TYPE Y SUPPORTS_TIMESTAMP_DATA_TYPE Y USE_POOLING Y 表输入 2 Java 代码 Y Java 代码 Elasticsearch bulk insert 2 Y Elasticsearch bulk insert 2 ElasticSearchBulk Y 1 none product_parent _doc 500 100 SECONDS N ProductCode Y N Y Adverse Adverse AppreciationPrice AppreciationPrice ApprovalNumber ApprovalNumber BarCode BarCode BranchCode BranchCode BrandCode BrandCode BtManual BtManual BusinessIds BusinessIds CQuantity CQuantity CategoryType CategoryType Characters Characters CheckPendingPrice CheckPendingPrice ChemicalName ChemicalName Child Child ClassCode ClassCode Comment Comment CommonTitle CommonTitle CommonTitlePinyin CommonTitlePinyin Composition Composition ConsumeDays ConsumeDays ControlAreaRegion ControlAreaRegion ControlNumber ControlNumber CreationDate CreationDate Displaytab Displaytab DivisionCode DivisionCode Dosage Dosage DrugInteractions DrugInteractions EnglishName EnglishName ExternalID ExternalID FinancialType FinancialType Formula Formula Formulation Formulation FormulationCode FormulationCode GiftCategory GiftCategory GmpCertificateUrl GmpCertificateUrl GmpCode GmpCode HealthInsuranceType HealthInsuranceType Img100 Img100 Img180 Img180 Img320 Img320 Inspection Inspection Introduction Introduction IsControl IsControl IsDrugRecord IsDrugRecord IsGifts IsGifts IsLimitPrice IsLimitPrice IsRecommend IsRecommend IsSuit IsSuit IsVisible IsVisible LastModified LastModified Manual Manual Manufacturer Manufacturer ManufacturerCode ManufacturerCode MarketPrice MarketPrice MassDate MassDate MerchantManageCode MerchantManageCode MinMarketPrice MinMarketPrice MobilephoneUrl MobilephoneUrl Molecular Molecular OlderPatients OlderPatients OurPrice OurPrice OutOfStockRecommendCodes OutOfStockRecommendCodes Overdose Overdose Packing Packing PackingNumber PackingNumber PackingProduct PackingProduct Pharmaco Pharmaco Pharmacokinetics Pharmacokinetics PinyinCode PinyinCode PinyinFullCode PinyinFullCode PlaceCode PlaceCode PlureProductStatusType PlureProductStatusType PostageLogo PostageLogo Precautions Precautions PregnantWoman PregnantWoman PrescriptionType PrescriptionType PriceSectorPricing PriceSectorPricing ProductAttribute ProductAttribute ProductCode ProductCode ProductCodeForOutOfStock ProductCodeForOutOfStock ProductHotType ProductHotType ProductImageUrl ProductImageUrl ProductInventory ProductInventory ProductKey ProductKey ProductLine ProductLine ProductName ProductName ProductRecommend ProductRecommend ProductStatusType ProductStatusType ProductTag ProductTag ProductType ProductType Productmainmaterial Productmainmaterial Productusecrowd Productusecrowd ProductusecrowdCode ProductusecrowdCode ProfitLevel ProfitLevel ProudctDescription ProudctDescription PurchasePrice PurchasePrice PurchaseWhile PurchaseWhile QcClass QcClass Quantity Quantity Remark Remark SFDAApprovalValidDate SFDAApprovalValidDate SFDACode SFDACode SFDAGmpCertificateDate SFDAGmpCertificateDate SFDAStatus SFDAStatus ShowMall ShowMall StandardCode StandardCode Standards Standards Storage Storage Structure Structure SubProductLine SubProductLine SuitDescription SuitDescription SyncStatus SyncStatus Taboo Taboo ThumbnailUrl ThumbnailUrl ToErp ToErp Unit Unit UsageCode UsageCode UserName UserName Views Views WapManual WapManual Weight Weight XQuantity XQuantity product_join_field product_join_field recordState recordState
192.168.50.32
9300
cluster.name es custom.aliase.source mem_product custom.fields.Adverse {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.Characters {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.ChemicalName {"type":"text"} custom.fields.Child {"type":"text"} custom.fields.CreationDate {"type":"date","format":"yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis"} custom.fields.Dosage {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.DrugInteractions {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.EnglishName {"type":"text"} custom.fields.Introduction {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":5000} custom.fields.LastModified {"type":"date","format":"yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis"} custom.fields.Manual {"type":"text"} custom.fields.Manufacturer {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.OlderPatients {"type":"text"} custom.fields.Overdose {"type":"text"} custom.fields.PackingProduct {"type":"text"} custom.fields.Pharmaco {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.Pharmacokinetics {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.Precautions {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.PregnantWoman {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.ProductName {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.ProductTag {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","index":true} custom.fields.ProudctDescription {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.ProudctMainMaterial {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.ProudctName {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.Proudctusercrowd {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.SFDAApprovalValidDate {"type":"date","format":"yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis"} custom.fields.SFDAGmpCertificateDate {"type":"date","format":"yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis"} custom.fields.Standards {"type":"text"} custom.fields.Storage {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.Taboo {"type":"text","analyzer": "ik_max_word", "search_analyzer":"ik_smart","ignore_above":300} custom.fields.product_join_field { "type": "join", "relations": { "Product": "Order" }} custom.index.number_of_replicas 1 custom.index.number_of_shards 1 368 176 Y
Java 代码 UserDefinedJavaClass Y 10 none TRANSFORM_CLASS Processor import java.sql.*; import org.pentaho.di.core.database.*; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { Object[] r = getRow(); if (r == null) { setOutputDone(); return false; } r = createOutputRow(r, data.outputRowMeta.size()); //获取数据库名和表名 String dbName = "testSqlServer";//getInputRowMeta().getString(r, "conname", null ); String tablename = "DataImport_product";//getInputRowMeta().getString(r, "tablename", null ); String idname = "ProductKey";//getInputRowMeta().getString(r, "idname", null ); String sourceidname = "ProductCode";//getInputRowMeta().getString(r, "sourceidname", null ); String sourcetablename = "product";//getInputRowMeta().getString(r, "sourcetablename", null ); if (dbName==null||tablename==null) { throw new KettleException("Unable to find field with name "+tablename+" in the input row."); } //logBasic("table---"+tablename); //数据库连接 Database database=null; DatabaseMeta databaseMeta=null; try { databaseMeta = getTransMeta().findDatabase(dbName); if (databaseMeta==null) { logError("A connection with name "+dbName+" could not be found!"); setErrors(1); return false; } database = new Database(getTrans(), databaseMeta); database.connect(); //logBasic("success!"); } catch(Exception e) { logError("Connecting to database "+dbName+" failed.", e); setErrors(1); return false; } //查询表数据 ResultSet resultSet; try { RowMetaInterface idxRowMeta =data.outputRowMeta; int i=0; r = createOutputRow(r, data.outputRowMeta.size()); //int index = getInputRowMeta().size(); // Add the index name // String Id = idxRowMeta.getString(r, idname, null); // Add the column name String DataId = idxRowMeta.getString(r, sourceidname, null); String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'"; resultSet = database.openQuery(sqlSelect); Object[] idxRow = database.getRow(resultSet); if(idxRow!=null){ return true; } //logBasic("idxRow--Id"+Id); //logBasic("idxRow--sourcetablename"+sourcetablename); //logBasic("idxRow--DataId"+DataId); //3.获得预处理对象 String sql=" insert into "+tablename+" values (?,?,?)"; //logBasic("idxRow--database"+ database); PreparedStatement stat = database.prepareSQL(sql); //logBasic("idxRow--database"+ stat); //stat.addBatch(sql); //4.SQL语句占位符设置实际参数 stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试” stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3 stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3 //5.执行SQL语句 boolean line = stat.execute(); //int[] line = stat.executeBatch(); System.out.println("更新记录数"+ line); //6.释放资源 stat.close(); //logBasic("idxRow--length"+i); } catch(Exception e) { throw new KettleException(e); } //释放连接 if (database!=null) { database.disconnect(); } // Send the row on to the next step. putRow(data.outputRowMeta, r); return true; } N 497 58 Y 表输入 2 TableInput Y 1 none testSqlserver SELECT row_number() over (order by productCode asc ) as ProductKey ,[ProductCode] ,convert(varchar(19),[CreationDate],120) as [CreationDate] ,convert(varchar(19),[LastModified],120) as [LastModified] ,[ProductName] ,ltrim(rtrim([ClassCode])) as ClassCode ,[CommonTitle] ,[Introduction] ,[Unit] ,[Manufacturer] ,[Packing] ,[ProductStatusType] ,[IsVisible] ,[PurchasePrice] ,[MarketPrice] ,[OurPrice] ,[AppreciationPrice] ,[ProfitLevel] ,[Manual] ,[WapManual] ,[Views] ,[PriceSectorPricing] ,[BrandCode] ,[ProductType] ,[Inspection] ,[Composition] ,[Weight] ,[GmpCode] ,[GmpCertificateUrl] ,[ProductImageUrl] ,[ThumbnailUrl] ,[Img100] ,[Img180] ,[Img320] ,[StandardCode] ,[BarCode] ,[ApprovalNumber] ,[ProductAttribute] ,[Formulation] ,[PrescriptionType] ,[PurchaseWhile] ,[HealthInsuranceType] ,[ConsumeDays] ,[ProductTag] ,[IsGifts] ,[Standards] ,[Dosage] ,[MassDate] ,[Storage] ,[IsControl] ,[ControlNumber] ,[PackingNumber] ,[PinyinCode] ,[PinyinFullCode] ,[ProductLine] ,[SubProductLine] ,[IsRecommend] ,[SFDACode] ,convert(varchar(19),[SFDAApprovalValidDate],120) as [SFDAApprovalValidDate] ,convert(varchar(19),[SFDAGmpCertificateDate],120) as [SFDAGmpCertificateDate] ,[SFDAStatus] ,[QcClass] ,[SyncStatus] ,[recordState] ,[IsSuit] ,[SuitDescription] ,[FinancialType] ,[ProudctDescription] ,[PlureProductStatusType] ,[Productmainmaterial] ,[Productusecrowd] ,[Displaytab] ,[EnglishName] ,[ChemicalName] ,[Structure] ,[Formula] ,[Molecular] ,[Characters] ,[Adverse] ,[Taboo] ,[Precautions] ,[Child] ,[OlderPatients] ,[PregnantWoman] ,[DrugInteractions] ,[Overdose] ,[Pharmaco] ,[Pharmacokinetics] ,[PackingProduct] ,[ExternalID] ,[Remark] ,[ProductHotType] ,[MobilephoneUrl] ,[BtManual] ,[ProductRecommend] ,[ToErp] ,[ProductusecrowdCode] ,[PlaceCode] ,[FormulationCode] ,[UsageCode] ,[ManufacturerCode] ,[ShowMall] ,[DivisionCode] ,[BranchCode] ,[MerchantManageCode] ,[MinMarketPrice] ,[CheckPendingPrice] ,[ControlAreaRegion] ,[ProductCodeForOutOfStock] ,[IsLimitPrice] ,[CommonTitlePinyin] ,[ProductInventory] ,[PostageLogo] ,[Quantity] ,[IsDrugRecord] ,[OutOfStockRecommendCodes] ,[GiftCategory] ,[BusinessIds] ,[CategoryType] ,[XQuantity] ,[UserName] ,[Comment] ,[CQuantity] ,'Product' as product_join_field FROM [MemberData].[dbo].[mem_Product] 0 N N N 288 32 Y N