1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471 |
- <?xml version="1.0" encoding="UTF-8"?>
- <transformation>
- <info>
- <name>mem_memberterminal</name>
- <description/>
- <extended_description/>
- <trans_version/>
- <trans_type>Normal</trans_type>
- <trans_status>0</trans_status>
- <directory>/</directory>
- <parameters>
- </parameters>
- <log>
- <trans-log-table>
- <connection/>
- <schema/>
- <table/>
- <size_limit_lines/>
- <interval/>
- <timeout_days/>
- <field>
- <id>ID_BATCH</id>
- <enabled>Y</enabled>
- <name>ID_BATCH</name>
- </field>
- <field>
- <id>CHANNEL_ID</id>
- <enabled>Y</enabled>
- <name>CHANNEL_ID</name>
- </field>
- <field>
- <id>TRANSNAME</id>
- <enabled>Y</enabled>
- <name>TRANSNAME</name>
- </field>
- <field>
- <id>STATUS</id>
- <enabled>Y</enabled>
- <name>STATUS</name>
- </field>
- <field>
- <id>LINES_READ</id>
- <enabled>Y</enabled>
- <name>LINES_READ</name>
- <subject/>
- </field>
- <field>
- <id>LINES_WRITTEN</id>
- <enabled>Y</enabled>
- <name>LINES_WRITTEN</name>
- <subject/>
- </field>
- <field>
- <id>LINES_UPDATED</id>
- <enabled>Y</enabled>
- <name>LINES_UPDATED</name>
- <subject/>
- </field>
- <field>
- <id>LINES_INPUT</id>
- <enabled>Y</enabled>
- <name>LINES_INPUT</name>
- <subject/>
- </field>
- <field>
- <id>LINES_OUTPUT</id>
- <enabled>Y</enabled>
- <name>LINES_OUTPUT</name>
- <subject/>
- </field>
- <field>
- <id>LINES_REJECTED</id>
- <enabled>Y</enabled>
- <name>LINES_REJECTED</name>
- <subject/>
- </field>
- <field>
- <id>ERRORS</id>
- <enabled>Y</enabled>
- <name>ERRORS</name>
- </field>
- <field>
- <id>STARTDATE</id>
- <enabled>Y</enabled>
- <name>STARTDATE</name>
- </field>
- <field>
- <id>ENDDATE</id>
- <enabled>Y</enabled>
- <name>ENDDATE</name>
- </field>
- <field>
- <id>LOGDATE</id>
- <enabled>Y</enabled>
- <name>LOGDATE</name>
- </field>
- <field>
- <id>DEPDATE</id>
- <enabled>Y</enabled>
- <name>DEPDATE</name>
- </field>
- <field>
- <id>REPLAYDATE</id>
- <enabled>Y</enabled>
- <name>REPLAYDATE</name>
- </field>
- <field>
- <id>LOG_FIELD</id>
- <enabled>Y</enabled>
- <name>LOG_FIELD</name>
- </field>
- <field>
- <id>EXECUTING_SERVER</id>
- <enabled>N</enabled>
- <name>EXECUTING_SERVER</name>
- </field>
- <field>
- <id>EXECUTING_USER</id>
- <enabled>N</enabled>
- <name>EXECUTING_USER</name>
- </field>
- <field>
- <id>CLIENT</id>
- <enabled>N</enabled>
- <name>CLIENT</name>
- </field>
- </trans-log-table>
- <perf-log-table>
- <connection/>
- <schema/>
- <table/>
- <interval/>
- <timeout_days/>
- <field>
- <id>ID_BATCH</id>
- <enabled>Y</enabled>
- <name>ID_BATCH</name>
- </field>
- <field>
- <id>SEQ_NR</id>
- <enabled>Y</enabled>
- <name>SEQ_NR</name>
- </field>
- <field>
- <id>LOGDATE</id>
- <enabled>Y</enabled>
- <name>LOGDATE</name>
- </field>
- <field>
- <id>TRANSNAME</id>
- <enabled>Y</enabled>
- <name>TRANSNAME</name>
- </field>
- <field>
- <id>STEPNAME</id>
- <enabled>Y</enabled>
- <name>STEPNAME</name>
- </field>
- <field>
- <id>STEP_COPY</id>
- <enabled>Y</enabled>
- <name>STEP_COPY</name>
- </field>
- <field>
- <id>LINES_READ</id>
- <enabled>Y</enabled>
- <name>LINES_READ</name>
- </field>
- <field>
- <id>LINES_WRITTEN</id>
- <enabled>Y</enabled>
- <name>LINES_WRITTEN</name>
- </field>
- <field>
- <id>LINES_UPDATED</id>
- <enabled>Y</enabled>
- <name>LINES_UPDATED</name>
- </field>
- <field>
- <id>LINES_INPUT</id>
- <enabled>Y</enabled>
- <name>LINES_INPUT</name>
- </field>
- <field>
- <id>LINES_OUTPUT</id>
- <enabled>Y</enabled>
- <name>LINES_OUTPUT</name>
- </field>
- <field>
- <id>LINES_REJECTED</id>
- <enabled>Y</enabled>
- <name>LINES_REJECTED</name>
- </field>
- <field>
- <id>ERRORS</id>
- <enabled>Y</enabled>
- <name>ERRORS</name>
- </field>
- <field>
- <id>INPUT_BUFFER_ROWS</id>
- <enabled>Y</enabled>
- <name>INPUT_BUFFER_ROWS</name>
- </field>
- <field>
- <id>OUTPUT_BUFFER_ROWS</id>
- <enabled>Y</enabled>
- <name>OUTPUT_BUFFER_ROWS</name>
- </field>
- </perf-log-table>
- <channel-log-table>
- <connection/>
- <schema/>
- <table/>
- <timeout_days/>
- <field>
- <id>ID_BATCH</id>
- <enabled>Y</enabled>
- <name>ID_BATCH</name>
- </field>
- <field>
- <id>CHANNEL_ID</id>
- <enabled>Y</enabled>
- <name>CHANNEL_ID</name>
- </field>
- <field>
- <id>LOG_DATE</id>
- <enabled>Y</enabled>
- <name>LOG_DATE</name>
- </field>
- <field>
- <id>LOGGING_OBJECT_TYPE</id>
- <enabled>Y</enabled>
- <name>LOGGING_OBJECT_TYPE</name>
- </field>
- <field>
- <id>OBJECT_NAME</id>
- <enabled>Y</enabled>
- <name>OBJECT_NAME</name>
- </field>
- <field>
- <id>OBJECT_COPY</id>
- <enabled>Y</enabled>
- <name>OBJECT_COPY</name>
- </field>
- <field>
- <id>REPOSITORY_DIRECTORY</id>
- <enabled>Y</enabled>
- <name>REPOSITORY_DIRECTORY</name>
- </field>
- <field>
- <id>FILENAME</id>
- <enabled>Y</enabled>
- <name>FILENAME</name>
- </field>
- <field>
- <id>OBJECT_ID</id>
- <enabled>Y</enabled>
- <name>OBJECT_ID</name>
- </field>
- <field>
- <id>OBJECT_REVISION</id>
- <enabled>Y</enabled>
- <name>OBJECT_REVISION</name>
- </field>
- <field>
- <id>PARENT_CHANNEL_ID</id>
- <enabled>Y</enabled>
- <name>PARENT_CHANNEL_ID</name>
- </field>
- <field>
- <id>ROOT_CHANNEL_ID</id>
- <enabled>Y</enabled>
- <name>ROOT_CHANNEL_ID</name>
- </field>
- </channel-log-table>
- <step-log-table>
- <connection/>
- <schema/>
- <table/>
- <timeout_days/>
- <field>
- <id>ID_BATCH</id>
- <enabled>Y</enabled>
- <name>ID_BATCH</name>
- </field>
- <field>
- <id>CHANNEL_ID</id>
- <enabled>Y</enabled>
- <name>CHANNEL_ID</name>
- </field>
- <field>
- <id>LOG_DATE</id>
- <enabled>Y</enabled>
- <name>LOG_DATE</name>
- </field>
- <field>
- <id>TRANSNAME</id>
- <enabled>Y</enabled>
- <name>TRANSNAME</name>
- </field>
- <field>
- <id>STEPNAME</id>
- <enabled>Y</enabled>
- <name>STEPNAME</name>
- </field>
- <field>
- <id>STEP_COPY</id>
- <enabled>Y</enabled>
- <name>STEP_COPY</name>
- </field>
- <field>
- <id>LINES_READ</id>
- <enabled>Y</enabled>
- <name>LINES_READ</name>
- </field>
- <field>
- <id>LINES_WRITTEN</id>
- <enabled>Y</enabled>
- <name>LINES_WRITTEN</name>
- </field>
- <field>
- <id>LINES_UPDATED</id>
- <enabled>Y</enabled>
- <name>LINES_UPDATED</name>
- </field>
- <field>
- <id>LINES_INPUT</id>
- <enabled>Y</enabled>
- <name>LINES_INPUT</name>
- </field>
- <field>
- <id>LINES_OUTPUT</id>
- <enabled>Y</enabled>
- <name>LINES_OUTPUT</name>
- </field>
- <field>
- <id>LINES_REJECTED</id>
- <enabled>Y</enabled>
- <name>LINES_REJECTED</name>
- </field>
- <field>
- <id>ERRORS</id>
- <enabled>Y</enabled>
- <name>ERRORS</name>
- </field>
- <field>
- <id>LOG_FIELD</id>
- <enabled>N</enabled>
- <name>LOG_FIELD</name>
- </field>
- </step-log-table>
- <metrics-log-table>
- <connection/>
- <schema/>
- <table/>
- <timeout_days/>
- <field>
- <id>ID_BATCH</id>
- <enabled>Y</enabled>
- <name>ID_BATCH</name>
- </field>
- <field>
- <id>CHANNEL_ID</id>
- <enabled>Y</enabled>
- <name>CHANNEL_ID</name>
- </field>
- <field>
- <id>LOG_DATE</id>
- <enabled>Y</enabled>
- <name>LOG_DATE</name>
- </field>
- <field>
- <id>METRICS_DATE</id>
- <enabled>Y</enabled>
- <name>METRICS_DATE</name>
- </field>
- <field>
- <id>METRICS_CODE</id>
- <enabled>Y</enabled>
- <name>METRICS_CODE</name>
- </field>
- <field>
- <id>METRICS_DESCRIPTION</id>
- <enabled>Y</enabled>
- <name>METRICS_DESCRIPTION</name>
- </field>
- <field>
- <id>METRICS_SUBJECT</id>
- <enabled>Y</enabled>
- <name>METRICS_SUBJECT</name>
- </field>
- <field>
- <id>METRICS_TYPE</id>
- <enabled>Y</enabled>
- <name>METRICS_TYPE</name>
- </field>
- <field>
- <id>METRICS_VALUE</id>
- <enabled>Y</enabled>
- <name>METRICS_VALUE</name>
- </field>
- </metrics-log-table>
- </log>
- <maxdate>
- <connection/>
- <table/>
- <field/>
- <offset>0.0</offset>
- <maxdiff>0.0</maxdiff>
- </maxdate>
- <size_rowset>2</size_rowset>
- <sleep_time_empty>50</sleep_time_empty>
- <sleep_time_full>50</sleep_time_full>
- <unique_connections>N</unique_connections>
- <feedback_shown>Y</feedback_shown>
- <feedback_size>50000</feedback_size>
- <using_thread_priorities>Y</using_thread_priorities>
- <shared_objects_file/>
- <capture_step_performance>N</capture_step_performance>
- <step_performance_capturing_delay>1000</step_performance_capturing_delay>
- <step_performance_capturing_size_limit>100</step_performance_capturing_size_limit>
- <dependencies>
- </dependencies>
- <partitionschemas>
- </partitionschemas>
- <slaveservers>
- </slaveservers>
- <clusterschemas>
- </clusterschemas>
- <created_user>-</created_user>
- <created_date>2019/05/14 14:56:48.089</created_date>
- <modified_user>-</modified_user>
- <modified_date>2019/05/14 14:56:48.089</modified_date>
- <key_for_session_key>H4sIAAAAAAAAAAMAAAAAAAAAAAA=</key_for_session_key>
- <is_key_private>N</is_key_private>
- </info>
- <notepads>
- </notepads>
- <connection>
- <name>MallSqlserver</name>
- <server>192.168.50.32</server>
- <type>MSSQLNATIVE</type>
- <access>Native</access>
- <database>Mall</database>
- <port>1433</port>
- <username>sa</username>
- <password>Encrypted 2be98afc819c69e8ea300ff228dd38f99</password>
- <servername/>
- <data_tablespace/>
- <index_tablespace/>
- <attributes>
- <attribute>
- <code>EXTRA_OPTION_MSSQLNATIVE.defaultRowPrefetch</code>
- <attribute>200</attribute>
- </attribute>
- <attribute>
- <code>EXTRA_OPTION_MSSQLNATIVE.readTimeout</code>
- <attribute>60</attribute>
- </attribute>
- <attribute>
- <code>FORCE_IDENTIFIERS_TO_LOWERCASE</code>
- <attribute>N</attribute>
- </attribute>
- <attribute>
- <code>FORCE_IDENTIFIERS_TO_UPPERCASE</code>
- <attribute>N</attribute>
- </attribute>
- <attribute>
- <code>INITIAL_POOL_SIZE</code>
- <attribute>100</attribute>
- </attribute>
- <attribute>
- <code>IS_CLUSTERED</code>
- <attribute>N</attribute>
- </attribute>
- <attribute>
- <code>MAXIMUM_POOL_SIZE</code>
- <attribute>300</attribute>
- </attribute>
- <attribute>
- <code>MSSQLUseIntegratedSecurity</code>
- <attribute>false</attribute>
- </attribute>
- <attribute>
- <code>MSSQL_DOUBLE_DECIMAL_SEPARATOR</code>
- <attribute>N</attribute>
- </attribute>
- <attribute>
- <code>PORT_NUMBER</code>
- <attribute>1433</attribute>
- </attribute>
- <attribute>
- <code>PRESERVE_RESERVED_WORD_CASE</code>
- <attribute>Y</attribute>
- </attribute>
- <attribute>
- <code>QUOTE_ALL_FIELDS</code>
- <attribute>N</attribute>
- </attribute>
- <attribute>
- <code>SUPPORTS_BOOLEAN_DATA_TYPE</code>
- <attribute>Y</attribute>
- </attribute>
- <attribute>
- <code>SUPPORTS_TIMESTAMP_DATA_TYPE</code>
- <attribute>Y</attribute>
- </attribute>
- <attribute>
- <code>USE_POOLING</code>
- <attribute>Y</attribute>
- </attribute>
- </attributes>
- </connection>
- <order>
- <hop>
- <from>terminaldoc表输入</from>
- <to>Java 代码</to>
- <enabled>Y</enabled>
- </hop>
- <hop>
- <from>Java 代码</from>
- <to>Elasticsearch bulk insert</to>
- <enabled>Y</enabled>
- </hop>
- <hop>
- <from>terminaldoc表输入 2</from>
- <to>Java 代码 2</to>
- <enabled>N</enabled>
- </hop>
- <hop>
- <from>terminaldoc表输入 3</from>
- <to>Java 代码 3</to>
- <enabled>Y</enabled>
- </hop>
- <hop>
- <from>Java 代码 2</from>
- <to>Elasticsearch bulk insert</to>
- <enabled>Y</enabled>
- </hop>
- <hop>
- <from>Java 代码 3</from>
- <to>Elasticsearch bulk insert</to>
- <enabled>Y</enabled>
- </hop>
- </order>
- <step>
- <name>Elasticsearch bulk insert</name>
- <type>ElasticSearchBulk</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>1</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <general>
- <index>crm_memberterminal</index>
- <type>_doc</type>
- <batchSize>100</batchSize>
- <timeout>100</timeout>
- <timeoutUnit>SECONDS</timeoutUnit>
- <isJson>N</isJson>
- <idField>termphoneid</idField>
- <overwriteIfExists>Y</overwriteIfExists>
- <useOutput>N</useOutput>
- <stopOnError>Y</stopOnError>
- </general>
- <fields>
- </fields>
- <servers>
- <server>
- <address>192.168.50.32</address>
- <port>9300</port>
- </server>
- </servers>
- <settings>
- <setting>
- <name>cluster.name</name>
- <value>es</value>
- </setting>
- <setting>
- <name>custom.fields.aliase</name>
- <value>mem_memberterminal</value>
- </setting>
- </settings>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>368</xloc>
- <yloc>128</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step>
- <name>Java 代码</name>
- <type>UserDefinedJavaClass</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>10</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <definitions>
- <definition>
- <class_type>TRANSFORM_CLASS</class_type>
- <class_name>Processor</class_name>
- <class_source>import java.sql.*;
- import org.pentaho.di.core.database.*;
- import org.apache.http.HttpHost;
- import org.elasticsearch.ElasticsearchException;
- import org.elasticsearch.action.get.GetRequest;
- import org.elasticsearch.action.get.GetResponse;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.common.Strings;
- import org.elasticsearch.rest.RestStatus;
- import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
- import org.elasticsearch.action.update.UpdateRequest;
- import org.elasticsearch.action.update.UpdateResponse;
- import org.elasticsearch.common.xcontent.XContentBuilder;
- import org.elasticsearch.common.xcontent.XContentFactory;
- import org.elasticsearch.script.Script;
- import java.lang.reflect.InvocationTargetException;
- import java.lang.reflect.Method;
- import com.microsoft.sqlserver.jdbc.SQLServerException;
- Database database = null;
- PreparedStatement stat = null;
- PreparedStatement stat1 = null;
- RestHighLevelClient client = new RestHighLevelClient(
- RestClient.builder(
- new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
- Integer index = 0;
- public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
- {
- //logBasic("start---");
- String type = getVariable("type");
- String indexs = getVariable("index");
- //if (indexs != null) index = Integer.parseInt(indexs);
- Object[] r = getRow();
- if (r == null) {
-
- try {
- if (stat!=null) {
- stat.close();
- }
-
- if (stat1!=null) {
- stat1.close();
- }
- if (database!=null) {
- database.disconnect();
- }
- if(client!=null){
- client.close();
- }
- }
- catch(Exception e) {
- throw new KettleException(e);
- }
- setOutputDone();
- return false;
- }
- synchronized(this) {
- r = createOutputRow(r, data.outputRowMeta.size());
- //获取数据库名和表名
- String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null );
- String tablename = "DataImport_memberterminal";//getInputRowMeta().getString(r, "tablename", null );
- String idname = "MemberPhone";//getInputRowMeta().getString(r, "idname", null );
- String sourceidname = "termphoneid";//getInputRowMeta().getString(r, "sourceidname", null );
- String sourcetablename = "Orders";//getInputRowMeta().getString(r, "sourcetablename", null );
- if (dbName==null||tablename==null) {
- throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
- }
- //logBasic("table---"+tablename);
- if(database == null){
- //数据库连接
- DatabaseMeta databaseMeta=null;
- try {
- databaseMeta = getTransMeta().findDatabase(dbName);
- if (databaseMeta==null) {
- logError("A connection with name "+dbName+" could not be found!");
- setErrors(1);
- return false;
- }
- database = new Database(getTrans(), databaseMeta);
- database.connect();
- //logBasic("success!");
- } catch(Exception e) {
- logError("Connecting to database "+dbName+" failed.", e);
- setErrors(1);
- return false;
- }
- }
- //查询表数据
- try {
- RowMetaInterface idxRowMeta =data.outputRowMeta;
-
- int i=0;
- r = createOutputRow(r, data.outputRowMeta.size());
-
- //int index = getInputRowMeta().size();
-
- // Add the index name
- //
- String Id = idxRowMeta.getString(r, idname, null);
- // Add the column name
-
- String DataId = idxRowMeta.getString(r, sourceidname, null);
-
-
- /*String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
- ResultSet resultSet = null;
- resultSet = database.openQuery(sqlSelect);
- Object[] idxRow = database.getRow(resultSet);
- if (database!=null) {
- database.closeQuery(resultSet);
- resultSet = null;
- }
- //if(idxRow != null){
- // return true;
- //}
- */
-
- //logBasic("idxRow--Id"+Id);
- //logBasic("idxRow--sourcetablename"+sourcetablename);
- //logBasic("idxRow--DataId"+DataId);
-
-
- GetRequest getRequest = new GetRequest(
- "crm_memberberterminal", // Index
- "_doc", // /Type
- DataId); // Document id
- getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
- getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
- boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
- //client.close();
- if(exists ){
- return true;
- }
- //if(!exists && idxRow == null){
- // return true;
- //}
-
-
- //3.获得预处理对象
- String sql="insert into "+tablename+" values (?,?,?,?);";//begin tran t2; commit tran t2
- //logBasic("idxRow--database"+ database);
- if(stat == null)
- stat = database.prepareSQL(sql);
- //logBasic("idxRow--database"+ stat);
- //stat.addBatch(sql);
- //4.SQL语句占位符设置实际参数
- stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
- stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
- stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
- stat.setString(4, "phone1");
- //stat.setInt(5, index);
- //5.执行SQL语句
- boolean line = stat.execute();
- //int[] line = stat.executeBatch();
- //System.out.println("更新记录数"+ line);
- //6.释放资源
- //stat.close();
- setVariable("index",String.valueOf(index));
- //Integer pn = Integer.parseInt(Id);
- //Integer curpageNum = Integer.parseInt(Id) % pagesize;
- //if(pn > 0 && curpageNum == 0){
- // setVariable("page",String.valueOf(page));
- //}
- //logBasic("idxRow--getVariable"+getVariable("page"));
- //logBasic("idxRow--curpageNum"+curpageNum);
- //logBasic("idxRow--length"+i);
-
-
-
- }
- catch(SQLServerException e) {
- return true;
- }catch(Exception e) {
-
- throw new KettleException(e);
- }
- //释放连接
- //if (database!=null) {
- // database.disconnect();
- //}
- // Send the row on to the next step.
- }
- putRow(data.outputRowMeta, r);
- return true;
- }</class_source>
- </definition>
- </definitions>
- <fields>
- </fields>
- <clear_result_fields>N</clear_result_fields>
- <info_steps/>
- <target_steps/>
- <usage_parameters/>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>224</xloc>
- <yloc>80</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step>
- <name>Java 代码 2</name>
- <type>UserDefinedJavaClass</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>10</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <definitions>
- <definition>
- <class_type>TRANSFORM_CLASS</class_type>
- <class_name>Processor</class_name>
- <class_source>import java.sql.*;
- import org.pentaho.di.core.database.*;
- import org.apache.http.HttpHost;
- import org.elasticsearch.ElasticsearchException;
- import org.elasticsearch.action.get.GetRequest;
- import org.elasticsearch.action.get.GetResponse;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.common.Strings;
- import org.elasticsearch.rest.RestStatus;
- import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
- import org.elasticsearch.action.update.UpdateRequest;
- import org.elasticsearch.action.update.UpdateResponse;
- import org.elasticsearch.common.xcontent.XContentBuilder;
- import org.elasticsearch.common.xcontent.XContentFactory;
- import org.elasticsearch.script.Script;
- import java.lang.reflect.InvocationTargetException;
- import java.lang.reflect.Method;
- import com.microsoft.sqlserver.jdbc.SQLServerException;
- Database database = null;
- PreparedStatement stat = null;
- PreparedStatement stat1 = null;
- RestHighLevelClient client = new RestHighLevelClient(
- RestClient.builder(
- new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
- Integer index = 0;
- public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
- {
- //logBasic("start---");
- String type = getVariable("type");
- String indexs = getVariable("index");
- //if (indexs != null) index = Integer.parseInt(indexs);
- Object[] r = getRow();
- if (r == null) {
-
- try {
- if (stat!=null) {
- stat.close();
- }
-
- if (stat1!=null) {
- stat1.close();
- }
- if (database!=null) {
- database.disconnect();
- }
- if(client!=null){
- client.close();
- }
- }
- catch(Exception e) {
- throw new KettleException(e);
- }
- setOutputDone();
- return false;
- }
- synchronized(this) {
- r = createOutputRow(r, data.outputRowMeta.size());
- //获取数据库名和表名
- String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null );
- String tablename = "DataImport_memberterminal";//getInputRowMeta().getString(r, "tablename", null );
- String idname = "MemberPhone";//getInputRowMeta().getString(r, "idname", null );
- String sourceidname = "termphoneid";//getInputRowMeta().getString(r, "sourceidname", null );
- String sourcetablename = "Orders";//getInputRowMeta().getString(r, "sourcetablename", null );
- if (dbName==null||tablename==null) {
- throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
- }
- //logBasic("table---"+tablename);
- if(database == null){
- //数据库连接
- DatabaseMeta databaseMeta=null;
- try {
- databaseMeta = getTransMeta().findDatabase(dbName);
- if (databaseMeta==null) {
- logError("A connection with name "+dbName+" could not be found!");
- setErrors(1);
- return false;
- }
- database = new Database(getTrans(), databaseMeta);
- database.connect();
- //logBasic("success!");
- } catch(Exception e) {
- logError("Connecting to database "+dbName+" failed.", e);
- setErrors(1);
- return false;
- }
- }
- //查询表数据
- try {
- RowMetaInterface idxRowMeta =data.outputRowMeta;
-
- int i=0;
- r = createOutputRow(r, data.outputRowMeta.size());
-
- //int index = getInputRowMeta().size();
-
- // Add the index name
- //
- String Id = idxRowMeta.getString(r, idname, null);
- // Add the column name
-
- String DataId = idxRowMeta.getString(r, sourceidname, null);
-
-
- /*String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
- ResultSet resultSet = null;
- resultSet = database.openQuery(sqlSelect);
- Object[] idxRow = database.getRow(resultSet);
- if (database!=null) {
- database.closeQuery(resultSet);
- resultSet = null;
- }
- //if(idxRow != null){
- // return true;
- //}
- */
-
- //logBasic("idxRow--Id"+Id);
- //logBasic("idxRow--sourcetablename"+sourcetablename);
- //logBasic("idxRow--DataId"+DataId);
-
-
- GetRequest getRequest = new GetRequest(
- "crm_memberberterminal", // Index
- "_doc", // /Type
- DataId); // Document id
- getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
- getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
- boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
- //client.close();
- if(exists ){
- return true;
- }
- //if(!exists && idxRow == null){
- // return true;
- //}
-
-
- //3.获得预处理对象
- String sql="insert into "+tablename+" values (?,?,?,?);";//begin tran t2; commit tran t2
- //logBasic("idxRow--database"+ database);
- if(stat == null)
- stat = database.prepareSQL(sql);
- //logBasic("idxRow--database"+ stat);
- //stat.addBatch(sql);
- //4.SQL语句占位符设置实际参数
- stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
- stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
- stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
- stat.setString(4, "phone2");
- //stat.setInt(5, index);
- //5.执行SQL语句
- boolean line = stat.execute();
- //int[] line = stat.executeBatch();
- //System.out.println("更新记录数"+ line);
- //6.释放资源
- //stat.close();
- setVariable("index",String.valueOf(index));
- //Integer pn = Integer.parseInt(Id);
- //Integer curpageNum = Integer.parseInt(Id) % pagesize;
- //if(pn > 0 && curpageNum == 0){
- // setVariable("page",String.valueOf(page));
- //}
- //logBasic("idxRow--getVariable"+getVariable("page"));
- //logBasic("idxRow--curpageNum"+curpageNum);
- //logBasic("idxRow--length"+i);
-
-
-
- }
- catch(SQLServerException e) {
- return true;
- }catch(Exception e) {
-
- throw new KettleException(e);
- }
- //释放连接
- //if (database!=null) {
- // database.disconnect();
- //}
- // Send the row on to the next step.
- }
- putRow(data.outputRowMeta, r);
- return true;
- }</class_source>
- </definition>
- </definitions>
- <fields>
- </fields>
- <clear_result_fields>N</clear_result_fields>
- <info_steps/>
- <target_steps/>
- <usage_parameters/>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>224</xloc>
- <yloc>160</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step>
- <name>Java 代码 3</name>
- <type>UserDefinedJavaClass</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>10</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <definitions>
- <definition>
- <class_type>TRANSFORM_CLASS</class_type>
- <class_name>Processor</class_name>
- <class_source>import java.sql.*;
- import org.pentaho.di.core.database.*;
- import org.apache.http.HttpHost;
- import org.elasticsearch.ElasticsearchException;
- import org.elasticsearch.action.get.GetRequest;
- import org.elasticsearch.action.get.GetResponse;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.common.Strings;
- import org.elasticsearch.rest.RestStatus;
- import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
- import org.elasticsearch.action.update.UpdateRequest;
- import org.elasticsearch.action.update.UpdateResponse;
- import org.elasticsearch.common.xcontent.XContentBuilder;
- import org.elasticsearch.common.xcontent.XContentFactory;
- import org.elasticsearch.script.Script;
- import java.lang.reflect.InvocationTargetException;
- import java.lang.reflect.Method;
- import com.microsoft.sqlserver.jdbc.SQLServerException;
- Database database = null;
- PreparedStatement stat = null;
- PreparedStatement stat1 = null;
- RestHighLevelClient client = new RestHighLevelClient(
- RestClient.builder(
- new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
- Integer index = 0;
- public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
- {
- //logBasic("start---");
- String type = getVariable("type");
- String indexs = getVariable("index");
- //if (indexs != null) index = Integer.parseInt(indexs);
- Object[] r = getRow();
- if (r == null) {
-
- try {
- if (stat!=null) {
- stat.close();
- }
-
- if (stat1!=null) {
- stat1.close();
- }
- if (database!=null) {
- database.disconnect();
- }
- if(client!=null){
- client.close();
- }
- }
- catch(Exception e) {
- throw new KettleException(e);
- }
- setOutputDone();
- return false;
- }
- synchronized(this) {
- r = createOutputRow(r, data.outputRowMeta.size());
- //获取数据库名和表名
- String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null );
- String tablename = "DataImport_memberterminal";//getInputRowMeta().getString(r, "tablename", null );
- String idname = "MemberPhone";//getInputRowMeta().getString(r, "idname", null );
- String sourceidname = "termphoneid";//getInputRowMeta().getString(r, "sourceidname", null );
- String sourcetablename = "Orders";//getInputRowMeta().getString(r, "sourcetablename", null );
- if (dbName==null||tablename==null) {
- throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
- }
- //logBasic("table---"+tablename);
- if(database == null){
- //数据库连接
- DatabaseMeta databaseMeta=null;
- try {
- databaseMeta = getTransMeta().findDatabase(dbName);
- if (databaseMeta==null) {
- logError("A connection with name "+dbName+" could not be found!");
- setErrors(1);
- return false;
- }
- database = new Database(getTrans(), databaseMeta);
- database.connect();
- //logBasic("success!");
- } catch(Exception e) {
- logError("Connecting to database "+dbName+" failed.", e);
- setErrors(1);
- return false;
- }
- }
- //查询表数据
- try {
- RowMetaInterface idxRowMeta =data.outputRowMeta;
-
- int i=0;
- r = createOutputRow(r, data.outputRowMeta.size());
-
- //int index = getInputRowMeta().size();
-
- // Add the index name
- //
- String Id = idxRowMeta.getString(r, idname, null);
- // Add the column name
-
- String DataId = idxRowMeta.getString(r, sourceidname, null);
-
-
- /*String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
- ResultSet resultSet = null;
- resultSet = database.openQuery(sqlSelect);
- Object[] idxRow = database.getRow(resultSet);
- if (database!=null) {
- database.closeQuery(resultSet);
- resultSet = null;
- }
- //if(idxRow != null){
- // return true;
- //}
- */
-
- //logBasic("idxRow--Id"+Id);
- //logBasic("idxRow--sourcetablename"+sourcetablename);
- //logBasic("idxRow--DataId"+DataId);
-
-
- GetRequest getRequest = new GetRequest(
- "crm_memberberterminal", // Index
- "_doc", // /Type
- DataId); // Document id
- getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
- getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
- boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
- //client.close();
- if(exists ){
- return true;
- }
- //if(!exists && idxRow == null){
- // return true;
- //}
-
-
- //3.获得预处理对象
- String sql="insert into "+tablename+" values (?,?,?,?);";//begin tran t2; commit tran t2
- //logBasic("idxRow--database"+ database);
- if(stat == null)
- stat = database.prepareSQL(sql);
- //logBasic("idxRow--database"+ stat);
- //stat.addBatch(sql);
- //4.SQL语句占位符设置实际参数
- stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
- stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
- stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
- stat.setString(4, "phone3");
- //5.执行SQL语句
- boolean line = stat.execute();
- //int[] line = stat.executeBatch();
- //System.out.println("更新记录数"+ line);
- //6.释放资源
- //stat.close();
- setVariable("index",String.valueOf(index));
- //Integer pn = Integer.parseInt(Id);
- //Integer curpageNum = Integer.parseInt(Id) % pagesize;
- //if(pn > 0 && curpageNum == 0){
- // setVariable("page",String.valueOf(page));
- //}
- //logBasic("idxRow--getVariable"+getVariable("page"));
- //logBasic("idxRow--curpageNum"+curpageNum);
- //logBasic("idxRow--length"+i);
-
-
-
- }
- catch(SQLServerException e) {
- return true;
- }catch(Exception e) {
-
- throw new KettleException(e);
- }
- //释放连接
- //if (database!=null) {
- // database.disconnect();
- //}
- // Send the row on to the next step.
- }
- putRow(data.outputRowMeta, r);
- return true;
- }</class_source>
- </definition>
- </definitions>
- <fields>
- </fields>
- <clear_result_fields>N</clear_result_fields>
- <info_steps/>
- <target_steps/>
- <usage_parameters/>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>240</xloc>
- <yloc>240</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step>
- <name>terminaldoc表输入</name>
- <type>TableInput</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>1</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <connection>MallSqlserver</connection>
- <sql>select
- concat(OriginType,'_',dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1)) as termphoneid
- ,OriginType as TerminalCode
- ,BuyUserId as MemberId
- ,dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) as MemberPhone
- ,SourcePlatforms as MemSourcePlatforms
- ,DeviceType as MemDeviceType
- ,OrdersCode
- from Orders
- where AConsigneePhone1 !='' and AConsigneePhone1!= AConsigneePhone2 and AConsigneePhone1!=AConsigneePhone3</sql>
- <limit>0</limit>
- <lookup/>
- <execute_each_row>N</execute_each_row>
- <variables_active>N</variables_active>
- <lazy_conversion_active>N</lazy_conversion_active>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>128</xloc>
- <yloc>112</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step>
- <name>terminaldoc表输入 2</name>
- <type>TableInput</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>1</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <connection>MallSqlserver</connection>
- <sql>select
- concat(OriginType,'_',dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2)) as termphoneid
- ,OriginType as TerminalCode
- ,BuyUserId as MemberId
- ,dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) as MemberPhone
- ,SourcePlatforms as MemSourcePlatforms
- ,DeviceType as MemDeviceType
- ,OrdersCode
- from Orders
- where AConsigneePhone2 !='' and AConsigneePhone2!=AConsigneePhone1 and AConsigneePhone2!=AConsigneePhone3</sql>
- <limit>0</limit>
- <lookup/>
- <execute_each_row>N</execute_each_row>
- <variables_active>N</variables_active>
- <lazy_conversion_active>N</lazy_conversion_active>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>112</xloc>
- <yloc>176</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step>
- <name>terminaldoc表输入 3</name>
- <type>TableInput</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>1</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <connection>MallSqlserver</connection>
- <sql>select
- concat(OriginType,'_',dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3)) as termphoneid
- ,OriginType as TerminalCode
- ,BuyUserId as MemberId
- ,dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) as MemberPhone
- ,SourcePlatforms as MemSourcePlatforms
- ,DeviceType as MemDeviceType
- ,OrdersCode
- from Orders
- where AConsigneePhone3 !='' and AConsigneePhone3!=AConsigneePhone2 and AConsigneePhone3 != AConsigneePhone1</sql>
- <limit>0</limit>
- <lookup/>
- <execute_each_row>N</execute_each_row>
- <variables_active>N</variables_active>
- <lazy_conversion_active>N</lazy_conversion_active>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>128</xloc>
- <yloc>240</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step_error_handling>
- </step_error_handling>
- <slave-step-copy-partition-distribution>
- </slave-step-copy-partition-distribution>
- <slave_transformation>N</slave_transformation>
- <attributes/>
- </transformation>
|