|
- <?xml version="1.0" encoding="UTF-8"?>
- <transformation>
- <info>
- <name>mem_memberterminal</name>
- <description/>
- <extended_description/>
- <trans_version/>
- <trans_type>Normal</trans_type>
- <trans_status>0</trans_status>
- <directory>/</directory>
- <parameters>
- </parameters>
- <log>
- <trans-log-table>
- <connection/>
- <schema/>
- <table/>
- <size_limit_lines/>
- <interval/>
- <timeout_days/>
- <field>
- <id>ID_BATCH</id>
- <enabled>Y</enabled>
- <name>ID_BATCH</name>
- </field>
- <field>
- <id>CHANNEL_ID</id>
- <enabled>Y</enabled>
- <name>CHANNEL_ID</name>
- </field>
- <field>
- <id>TRANSNAME</id>
- <enabled>Y</enabled>
- <name>TRANSNAME</name>
- </field>
- <field>
- <id>STATUS</id>
- <enabled>Y</enabled>
- <name>STATUS</name>
- </field>
- <field>
- <id>LINES_READ</id>
- <enabled>Y</enabled>
- <name>LINES_READ</name>
- <subject/>
- </field>
- <field>
- <id>LINES_WRITTEN</id>
- <enabled>Y</enabled>
- <name>LINES_WRITTEN</name>
- <subject/>
- </field>
- <field>
- <id>LINES_UPDATED</id>
- <enabled>Y</enabled>
- <name>LINES_UPDATED</name>
- <subject/>
- </field>
- <field>
- <id>LINES_INPUT</id>
- <enabled>Y</enabled>
- <name>LINES_INPUT</name>
- <subject/>
- </field>
- <field>
- <id>LINES_OUTPUT</id>
- <enabled>Y</enabled>
- <name>LINES_OUTPUT</name>
- <subject/>
- </field>
- <field>
- <id>LINES_REJECTED</id>
- <enabled>Y</enabled>
- <name>LINES_REJECTED</name>
- <subject/>
- </field>
- <field>
- <id>ERRORS</id>
- <enabled>Y</enabled>
- <name>ERRORS</name>
- </field>
- <field>
- <id>STARTDATE</id>
- <enabled>Y</enabled>
- <name>STARTDATE</name>
- </field>
- <field>
- <id>ENDDATE</id>
- <enabled>Y</enabled>
- <name>ENDDATE</name>
- </field>
- <field>
- <id>LOGDATE</id>
- <enabled>Y</enabled>
- <name>LOGDATE</name>
- </field>
- <field>
- <id>DEPDATE</id>
- <enabled>Y</enabled>
- <name>DEPDATE</name>
- </field>
- <field>
- <id>REPLAYDATE</id>
- <enabled>Y</enabled>
- <name>REPLAYDATE</name>
- </field>
- <field>
- <id>LOG_FIELD</id>
- <enabled>Y</enabled>
- <name>LOG_FIELD</name>
- </field>
- <field>
- <id>EXECUTING_SERVER</id>
- <enabled>N</enabled>
- <name>EXECUTING_SERVER</name>
- </field>
- <field>
- <id>EXECUTING_USER</id>
- <enabled>N</enabled>
- <name>EXECUTING_USER</name>
- </field>
- <field>
- <id>CLIENT</id>
- <enabled>N</enabled>
- <name>CLIENT</name>
- </field>
- </trans-log-table>
- <perf-log-table>
- <connection/>
- <schema/>
- <table/>
- <interval/>
- <timeout_days/>
- <field>
- <id>ID_BATCH</id>
- <enabled>Y</enabled>
- <name>ID_BATCH</name>
- </field>
- <field>
- <id>SEQ_NR</id>
- <enabled>Y</enabled>
- <name>SEQ_NR</name>
- </field>
- <field>
- <id>LOGDATE</id>
- <enabled>Y</enabled>
- <name>LOGDATE</name>
- </field>
- <field>
- <id>TRANSNAME</id>
- <enabled>Y</enabled>
- <name>TRANSNAME</name>
- </field>
- <field>
- <id>STEPNAME</id>
- <enabled>Y</enabled>
- <name>STEPNAME</name>
- </field>
- <field>
- <id>STEP_COPY</id>
- <enabled>Y</enabled>
- <name>STEP_COPY</name>
- </field>
- <field>
- <id>LINES_READ</id>
- <enabled>Y</enabled>
- <name>LINES_READ</name>
- </field>
- <field>
- <id>LINES_WRITTEN</id>
- <enabled>Y</enabled>
- <name>LINES_WRITTEN</name>
- </field>
- <field>
- <id>LINES_UPDATED</id>
- <enabled>Y</enabled>
- <name>LINES_UPDATED</name>
- </field>
- <field>
- <id>LINES_INPUT</id>
- <enabled>Y</enabled>
- <name>LINES_INPUT</name>
- </field>
- <field>
- <id>LINES_OUTPUT</id>
- <enabled>Y</enabled>
- <name>LINES_OUTPUT</name>
- </field>
- <field>
- <id>LINES_REJECTED</id>
- <enabled>Y</enabled>
- <name>LINES_REJECTED</name>
- </field>
- <field>
- <id>ERRORS</id>
- <enabled>Y</enabled>
- <name>ERRORS</name>
- </field>
- <field>
- <id>INPUT_BUFFER_ROWS</id>
- <enabled>Y</enabled>
- <name>INPUT_BUFFER_ROWS</name>
- </field>
- <field>
- <id>OUTPUT_BUFFER_ROWS</id>
- <enabled>Y</enabled>
- <name>OUTPUT_BUFFER_ROWS</name>
- </field>
- </perf-log-table>
- <channel-log-table>
- <connection/>
- <schema/>
- <table/>
- <timeout_days/>
- <field>
- <id>ID_BATCH</id>
- <enabled>Y</enabled>
- <name>ID_BATCH</name>
- </field>
- <field>
- <id>CHANNEL_ID</id>
- <enabled>Y</enabled>
- <name>CHANNEL_ID</name>
- </field>
- <field>
- <id>LOG_DATE</id>
- <enabled>Y</enabled>
- <name>LOG_DATE</name>
- </field>
- <field>
- <id>LOGGING_OBJECT_TYPE</id>
- <enabled>Y</enabled>
- <name>LOGGING_OBJECT_TYPE</name>
- </field>
- <field>
- <id>OBJECT_NAME</id>
- <enabled>Y</enabled>
- <name>OBJECT_NAME</name>
- </field>
- <field>
- <id>OBJECT_COPY</id>
- <enabled>Y</enabled>
- <name>OBJECT_COPY</name>
- </field>
- <field>
- <id>REPOSITORY_DIRECTORY</id>
- <enabled>Y</enabled>
- <name>REPOSITORY_DIRECTORY</name>
- </field>
- <field>
- <id>FILENAME</id>
- <enabled>Y</enabled>
- <name>FILENAME</name>
- </field>
- <field>
- <id>OBJECT_ID</id>
- <enabled>Y</enabled>
- <name>OBJECT_ID</name>
- </field>
- <field>
- <id>OBJECT_REVISION</id>
- <enabled>Y</enabled>
- <name>OBJECT_REVISION</name>
- </field>
- <field>
- <id>PARENT_CHANNEL_ID</id>
- <enabled>Y</enabled>
- <name>PARENT_CHANNEL_ID</name>
- </field>
- <field>
- <id>ROOT_CHANNEL_ID</id>
- <enabled>Y</enabled>
- <name>ROOT_CHANNEL_ID</name>
- </field>
- </channel-log-table>
- <step-log-table>
- <connection/>
- <schema/>
- <table/>
- <timeout_days/>
- <field>
- <id>ID_BATCH</id>
- <enabled>Y</enabled>
- <name>ID_BATCH</name>
- </field>
- <field>
- <id>CHANNEL_ID</id>
- <enabled>Y</enabled>
- <name>CHANNEL_ID</name>
- </field>
- <field>
- <id>LOG_DATE</id>
- <enabled>Y</enabled>
- <name>LOG_DATE</name>
- </field>
- <field>
- <id>TRANSNAME</id>
- <enabled>Y</enabled>
- <name>TRANSNAME</name>
- </field>
- <field>
- <id>STEPNAME</id>
- <enabled>Y</enabled>
- <name>STEPNAME</name>
- </field>
- <field>
- <id>STEP_COPY</id>
- <enabled>Y</enabled>
- <name>STEP_COPY</name>
- </field>
- <field>
- <id>LINES_READ</id>
- <enabled>Y</enabled>
- <name>LINES_READ</name>
- </field>
- <field>
- <id>LINES_WRITTEN</id>
- <enabled>Y</enabled>
- <name>LINES_WRITTEN</name>
- </field>
- <field>
- <id>LINES_UPDATED</id>
- <enabled>Y</enabled>
- <name>LINES_UPDATED</name>
- </field>
- <field>
- <id>LINES_INPUT</id>
- <enabled>Y</enabled>
- <name>LINES_INPUT</name>
- </field>
- <field>
- <id>LINES_OUTPUT</id>
- <enabled>Y</enabled>
- <name>LINES_OUTPUT</name>
- </field>
- <field>
- <id>LINES_REJECTED</id>
- <enabled>Y</enabled>
- <name>LINES_REJECTED</name>
- </field>
- <field>
- <id>ERRORS</id>
- <enabled>Y</enabled>
- <name>ERRORS</name>
- </field>
- <field>
- <id>LOG_FIELD</id>
- <enabled>N</enabled>
- <name>LOG_FIELD</name>
- </field>
- </step-log-table>
- <metrics-log-table>
- <connection/>
- <schema/>
- <table/>
- <timeout_days/>
- <field>
- <id>ID_BATCH</id>
- <enabled>Y</enabled>
- <name>ID_BATCH</name>
- </field>
- <field>
- <id>CHANNEL_ID</id>
- <enabled>Y</enabled>
- <name>CHANNEL_ID</name>
- </field>
- <field>
- <id>LOG_DATE</id>
- <enabled>Y</enabled>
- <name>LOG_DATE</name>
- </field>
- <field>
- <id>METRICS_DATE</id>
- <enabled>Y</enabled>
- <name>METRICS_DATE</name>
- </field>
- <field>
- <id>METRICS_CODE</id>
- <enabled>Y</enabled>
- <name>METRICS_CODE</name>
- </field>
- <field>
- <id>METRICS_DESCRIPTION</id>
- <enabled>Y</enabled>
- <name>METRICS_DESCRIPTION</name>
- </field>
- <field>
- <id>METRICS_SUBJECT</id>
- <enabled>Y</enabled>
- <name>METRICS_SUBJECT</name>
- </field>
- <field>
- <id>METRICS_TYPE</id>
- <enabled>Y</enabled>
- <name>METRICS_TYPE</name>
- </field>
- <field>
- <id>METRICS_VALUE</id>
- <enabled>Y</enabled>
- <name>METRICS_VALUE</name>
- </field>
- </metrics-log-table>
- </log>
- <maxdate>
- <connection/>
- <table/>
- <field/>
- <offset>0.0</offset>
- <maxdiff>0.0</maxdiff>
- </maxdate>
- <size_rowset>2</size_rowset>
- <sleep_time_empty>50</sleep_time_empty>
- <sleep_time_full>50</sleep_time_full>
- <unique_connections>N</unique_connections>
- <feedback_shown>Y</feedback_shown>
- <feedback_size>50000</feedback_size>
- <using_thread_priorities>Y</using_thread_priorities>
- <shared_objects_file/>
- <capture_step_performance>N</capture_step_performance>
- <step_performance_capturing_delay>1000</step_performance_capturing_delay>
- <step_performance_capturing_size_limit>100</step_performance_capturing_size_limit>
- <dependencies>
- </dependencies>
- <partitionschemas>
- </partitionschemas>
- <slaveservers>
- </slaveservers>
- <clusterschemas>
- </clusterschemas>
- <created_user>-</created_user>
- <created_date>2019/05/14 14:56:48.089</created_date>
- <modified_user>-</modified_user>
- <modified_date>2019/05/14 14:56:48.089</modified_date>
- <key_for_session_key>H4sIAAAAAAAAAAMAAAAAAAAAAAA=</key_for_session_key>
- <is_key_private>N</is_key_private>
- </info>
- <notepads>
- </notepads>
- <connection>
- <name>MallSqlserver</name>
- <server>192.168.50.32</server>
- <type>MSSQLNATIVE</type>
- <access>Native</access>
- <database>Mall</database>
- <port>1433</port>
- <username>sa</username>
- <password>Encrypted 2be98afc819c69e8ea300ff228dd38f99</password>
- <servername/>
- <data_tablespace/>
- <index_tablespace/>
- <attributes>
- <attribute>
- <code>EXTRA_OPTION_MSSQLNATIVE.defaultRowPrefetch</code>
- <attribute>200</attribute>
- </attribute>
- <attribute>
- <code>EXTRA_OPTION_MSSQLNATIVE.readTimeout</code>
- <attribute>60</attribute>
- </attribute>
- <attribute>
- <code>FORCE_IDENTIFIERS_TO_LOWERCASE</code>
- <attribute>N</attribute>
- </attribute>
- <attribute>
- <code>FORCE_IDENTIFIERS_TO_UPPERCASE</code>
- <attribute>N</attribute>
- </attribute>
- <attribute>
- <code>INITIAL_POOL_SIZE</code>
- <attribute>100</attribute>
- </attribute>
- <attribute>
- <code>IS_CLUSTERED</code>
- <attribute>N</attribute>
- </attribute>
- <attribute>
- <code>MAXIMUM_POOL_SIZE</code>
- <attribute>300</attribute>
- </attribute>
- <attribute>
- <code>MSSQLUseIntegratedSecurity</code>
- <attribute>false</attribute>
- </attribute>
- <attribute>
- <code>MSSQL_DOUBLE_DECIMAL_SEPARATOR</code>
- <attribute>N</attribute>
- </attribute>
- <attribute>
- <code>PORT_NUMBER</code>
- <attribute>1433</attribute>
- </attribute>
- <attribute>
- <code>PRESERVE_RESERVED_WORD_CASE</code>
- <attribute>Y</attribute>
- </attribute>
- <attribute>
- <code>QUOTE_ALL_FIELDS</code>
- <attribute>N</attribute>
- </attribute>
- <attribute>
- <code>SUPPORTS_BOOLEAN_DATA_TYPE</code>
- <attribute>Y</attribute>
- </attribute>
- <attribute>
- <code>SUPPORTS_TIMESTAMP_DATA_TYPE</code>
- <attribute>Y</attribute>
- </attribute>
- <attribute>
- <code>USE_POOLING</code>
- <attribute>Y</attribute>
- </attribute>
- </attributes>
- </connection>
- <order>
- <hop>
- <from>terminaldoc表输入</from>
- <to>Java 代码</to>
- <enabled>Y</enabled>
- </hop>
- <hop>
- <from>Java 代码</from>
- <to>Elasticsearch bulk insert</to>
- <enabled>Y</enabled>
- </hop>
- <hop>
- <from>terminaldoc表输入 2</from>
- <to>Java 代码 2</to>
- <enabled>N</enabled>
- </hop>
- <hop>
- <from>terminaldoc表输入 3</from>
- <to>Java 代码 3</to>
- <enabled>Y</enabled>
- </hop>
- <hop>
- <from>Java 代码 2</from>
- <to>Elasticsearch bulk insert</to>
- <enabled>Y</enabled>
- </hop>
- <hop>
- <from>Java 代码 3</from>
- <to>Elasticsearch bulk insert</to>
- <enabled>Y</enabled>
- </hop>
- </order>
- <step>
- <name>Elasticsearch bulk insert</name>
- <type>ElasticSearchBulk</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>1</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <general>
- <index>crm_memberterminal</index>
- <type>_doc</type>
- <batchSize>100</batchSize>
- <timeout>100</timeout>
- <timeoutUnit>SECONDS</timeoutUnit>
- <isJson>N</isJson>
- <idField>termphoneid</idField>
- <overwriteIfExists>Y</overwriteIfExists>
- <useOutput>N</useOutput>
- <stopOnError>Y</stopOnError>
- </general>
- <fields>
- </fields>
- <servers>
- <server>
- <address>192.168.50.32</address>
- <port>9300</port>
- </server>
- </servers>
- <settings>
- <setting>
- <name>cluster.name</name>
- <value>es</value>
- </setting>
- <setting>
- <name>custom.fields.aliase</name>
- <value>mem_memberterminal</value>
- </setting>
- </settings>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>368</xloc>
- <yloc>128</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step>
- <name>Java 代码</name>
- <type>UserDefinedJavaClass</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>10</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <definitions>
- <definition>
- <class_type>TRANSFORM_CLASS</class_type>
- <class_name>Processor</class_name>
- <class_source>import java.sql.*;
- import org.pentaho.di.core.database.*;
- import org.apache.http.HttpHost;
- import org.elasticsearch.ElasticsearchException;
- import org.elasticsearch.action.get.GetRequest;
- import org.elasticsearch.action.get.GetResponse;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.common.Strings;
- import org.elasticsearch.rest.RestStatus;
- import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
- import org.elasticsearch.action.update.UpdateRequest;
- import org.elasticsearch.action.update.UpdateResponse;
- import org.elasticsearch.common.xcontent.XContentBuilder;
- import org.elasticsearch.common.xcontent.XContentFactory;
- import org.elasticsearch.script.Script;
- import java.lang.reflect.InvocationTargetException;
- import java.lang.reflect.Method;
- import com.microsoft.sqlserver.jdbc.SQLServerException;
- Database database = null;
- PreparedStatement stat = null;
- PreparedStatement stat1 = null;
- RestHighLevelClient client = new RestHighLevelClient(
- RestClient.builder(
- new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
- Integer index = 0;
- public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
- {
- //logBasic("start---");
- String type = getVariable("type");
- String indexs = getVariable("index");
- //if (indexs != null) index = Integer.parseInt(indexs);
- Object[] r = getRow();
- if (r == null) {
-
- try {
- if (stat!=null) {
- stat.close();
- }
-
- if (stat1!=null) {
- stat1.close();
- }
- if (database!=null) {
- database.disconnect();
- }
- if(client!=null){
- client.close();
- }
- }
- catch(Exception e) {
- throw new KettleException(e);
- }
- setOutputDone();
- return false;
- }
- synchronized(this) {
- r = createOutputRow(r, data.outputRowMeta.size());
- //获取数据库名和表名
- String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null );
- String tablename = "DataImport_memberterminal";//getInputRowMeta().getString(r, "tablename", null );
- String idname = "MemberPhone";//getInputRowMeta().getString(r, "idname", null );
- String sourceidname = "termphoneid";//getInputRowMeta().getString(r, "sourceidname", null );
- String sourcetablename = "Orders";//getInputRowMeta().getString(r, "sourcetablename", null );
- if (dbName==null||tablename==null) {
- throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
- }
- //logBasic("table---"+tablename);
- if(database == null){
- //数据库连接
- DatabaseMeta databaseMeta=null;
- try {
- databaseMeta = getTransMeta().findDatabase(dbName);
- if (databaseMeta==null) {
- logError("A connection with name "+dbName+" could not be found!");
- setErrors(1);
- return false;
- }
- database = new Database(getTrans(), databaseMeta);
- database.connect();
- //logBasic("success!");
- } catch(Exception e) {
- logError("Connecting to database "+dbName+" failed.", e);
- setErrors(1);
- return false;
- }
- }
- //查询表数据
- try {
- RowMetaInterface idxRowMeta =data.outputRowMeta;
-
- int i=0;
- r = createOutputRow(r, data.outputRowMeta.size());
-
- //int index = getInputRowMeta().size();
-
- // Add the index name
- //
- String Id = idxRowMeta.getString(r, idname, null);
- // Add the column name
-
- String DataId = idxRowMeta.getString(r, sourceidname, null);
-
-
- /*String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
- ResultSet resultSet = null;
- resultSet = database.openQuery(sqlSelect);
- Object[] idxRow = database.getRow(resultSet);
- if (database!=null) {
- database.closeQuery(resultSet);
- resultSet = null;
- }
- //if(idxRow != null){
- // return true;
- //}
- */
-
- //logBasic("idxRow--Id"+Id);
- //logBasic("idxRow--sourcetablename"+sourcetablename);
- //logBasic("idxRow--DataId"+DataId);
-
-
- GetRequest getRequest = new GetRequest(
- "crm_memberberterminal", // Index
- "_doc", // /Type
- DataId); // Document id
- getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
- getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
- boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
- //client.close();
- if(exists ){
- return true;
- }
- //if(!exists && idxRow == null){
- // return true;
- //}
-
-
- //3.获得预处理对象
- String sql="insert into "+tablename+" values (?,?,?,?);";//begin tran t2; commit tran t2
- //logBasic("idxRow--database"+ database);
- if(stat == null)
- stat = database.prepareSQL(sql);
- //logBasic("idxRow--database"+ stat);
- //stat.addBatch(sql);
- //4.SQL语句占位符设置实际参数
- stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
- stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
- stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
- stat.setString(4, "phone1");
- //stat.setInt(5, index);
- //5.执行SQL语句
- boolean line = stat.execute();
- //int[] line = stat.executeBatch();
- //System.out.println("更新记录数"+ line);
- //6.释放资源
- //stat.close();
- setVariable("index",String.valueOf(index));
- //Integer pn = Integer.parseInt(Id);
- //Integer curpageNum = Integer.parseInt(Id) % pagesize;
- //if(pn > 0 && curpageNum == 0){
- // setVariable("page",String.valueOf(page));
- //}
- //logBasic("idxRow--getVariable"+getVariable("page"));
- //logBasic("idxRow--curpageNum"+curpageNum);
- //logBasic("idxRow--length"+i);
-
-
-
- }
- catch(SQLServerException e) {
- return true;
- }catch(Exception e) {
-
- throw new KettleException(e);
- }
- //释放连接
- //if (database!=null) {
- // database.disconnect();
- //}
- // Send the row on to the next step.
- }
- putRow(data.outputRowMeta, r);
- return true;
- }</class_source>
- </definition>
- </definitions>
- <fields>
- </fields>
- <clear_result_fields>N</clear_result_fields>
- <info_steps/>
- <target_steps/>
- <usage_parameters/>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>224</xloc>
- <yloc>80</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step>
- <name>Java 代码 2</name>
- <type>UserDefinedJavaClass</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>10</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <definitions>
- <definition>
- <class_type>TRANSFORM_CLASS</class_type>
- <class_name>Processor</class_name>
- <class_source>import java.sql.*;
- import org.pentaho.di.core.database.*;
- import org.apache.http.HttpHost;
- import org.elasticsearch.ElasticsearchException;
- import org.elasticsearch.action.get.GetRequest;
- import org.elasticsearch.action.get.GetResponse;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.common.Strings;
- import org.elasticsearch.rest.RestStatus;
- import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
- import org.elasticsearch.action.update.UpdateRequest;
- import org.elasticsearch.action.update.UpdateResponse;
- import org.elasticsearch.common.xcontent.XContentBuilder;
- import org.elasticsearch.common.xcontent.XContentFactory;
- import org.elasticsearch.script.Script;
- import java.lang.reflect.InvocationTargetException;
- import java.lang.reflect.Method;
- import com.microsoft.sqlserver.jdbc.SQLServerException;
- Database database = null;
- PreparedStatement stat = null;
- PreparedStatement stat1 = null;
- RestHighLevelClient client = new RestHighLevelClient(
- RestClient.builder(
- new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
- Integer index = 0;
- public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
- {
- //logBasic("start---");
- String type = getVariable("type");
- String indexs = getVariable("index");
- //if (indexs != null) index = Integer.parseInt(indexs);
- Object[] r = getRow();
- if (r == null) {
-
- try {
- if (stat!=null) {
- stat.close();
- }
-
- if (stat1!=null) {
- stat1.close();
- }
- if (database!=null) {
- database.disconnect();
- }
- if(client!=null){
- client.close();
- }
- }
- catch(Exception e) {
- throw new KettleException(e);
- }
- setOutputDone();
- return false;
- }
- synchronized(this) {
- r = createOutputRow(r, data.outputRowMeta.size());
- //获取数据库名和表名
- String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null );
- String tablename = "DataImport_memberterminal";//getInputRowMeta().getString(r, "tablename", null );
- String idname = "MemberPhone";//getInputRowMeta().getString(r, "idname", null );
- String sourceidname = "termphoneid";//getInputRowMeta().getString(r, "sourceidname", null );
- String sourcetablename = "Orders";//getInputRowMeta().getString(r, "sourcetablename", null );
- if (dbName==null||tablename==null) {
- throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
- }
- //logBasic("table---"+tablename);
- if(database == null){
- //数据库连接
- DatabaseMeta databaseMeta=null;
- try {
- databaseMeta = getTransMeta().findDatabase(dbName);
- if (databaseMeta==null) {
- logError("A connection with name "+dbName+" could not be found!");
- setErrors(1);
- return false;
- }
- database = new Database(getTrans(), databaseMeta);
- database.connect();
- //logBasic("success!");
- } catch(Exception e) {
- logError("Connecting to database "+dbName+" failed.", e);
- setErrors(1);
- return false;
- }
- }
- //查询表数据
- try {
- RowMetaInterface idxRowMeta =data.outputRowMeta;
-
- int i=0;
- r = createOutputRow(r, data.outputRowMeta.size());
-
- //int index = getInputRowMeta().size();
-
- // Add the index name
- //
- String Id = idxRowMeta.getString(r, idname, null);
- // Add the column name
-
- String DataId = idxRowMeta.getString(r, sourceidname, null);
-
-
- /*String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
- ResultSet resultSet = null;
- resultSet = database.openQuery(sqlSelect);
- Object[] idxRow = database.getRow(resultSet);
- if (database!=null) {
- database.closeQuery(resultSet);
- resultSet = null;
- }
- //if(idxRow != null){
- // return true;
- //}
- */
-
- //logBasic("idxRow--Id"+Id);
- //logBasic("idxRow--sourcetablename"+sourcetablename);
- //logBasic("idxRow--DataId"+DataId);
-
-
- GetRequest getRequest = new GetRequest(
- "crm_memberberterminal", // Index
- "_doc", // /Type
- DataId); // Document id
- getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
- getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
- boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
- //client.close();
- if(exists ){
- return true;
- }
- //if(!exists && idxRow == null){
- // return true;
- //}
-
-
- //3.获得预处理对象
- String sql="insert into "+tablename+" values (?,?,?,?);";//begin tran t2; commit tran t2
- //logBasic("idxRow--database"+ database);
- if(stat == null)
- stat = database.prepareSQL(sql);
- //logBasic("idxRow--database"+ stat);
- //stat.addBatch(sql);
- //4.SQL语句占位符设置实际参数
- stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
- stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
- stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
- stat.setString(4, "phone2");
- //stat.setInt(5, index);
- //5.执行SQL语句
- boolean line = stat.execute();
- //int[] line = stat.executeBatch();
- //System.out.println("更新记录数"+ line);
- //6.释放资源
- //stat.close();
- setVariable("index",String.valueOf(index));
- //Integer pn = Integer.parseInt(Id);
- //Integer curpageNum = Integer.parseInt(Id) % pagesize;
- //if(pn > 0 && curpageNum == 0){
- // setVariable("page",String.valueOf(page));
- //}
- //logBasic("idxRow--getVariable"+getVariable("page"));
- //logBasic("idxRow--curpageNum"+curpageNum);
- //logBasic("idxRow--length"+i);
-
-
-
- }
- catch(SQLServerException e) {
- return true;
- }catch(Exception e) {
-
- throw new KettleException(e);
- }
- //释放连接
- //if (database!=null) {
- // database.disconnect();
- //}
- // Send the row on to the next step.
- }
- putRow(data.outputRowMeta, r);
- return true;
- }</class_source>
- </definition>
- </definitions>
- <fields>
- </fields>
- <clear_result_fields>N</clear_result_fields>
- <info_steps/>
- <target_steps/>
- <usage_parameters/>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>224</xloc>
- <yloc>160</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step>
- <name>Java 代码 3</name>
- <type>UserDefinedJavaClass</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>10</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <definitions>
- <definition>
- <class_type>TRANSFORM_CLASS</class_type>
- <class_name>Processor</class_name>
- <class_source>import java.sql.*;
- import org.pentaho.di.core.database.*;
- import org.apache.http.HttpHost;
- import org.elasticsearch.ElasticsearchException;
- import org.elasticsearch.action.get.GetRequest;
- import org.elasticsearch.action.get.GetResponse;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.common.Strings;
- import org.elasticsearch.rest.RestStatus;
- import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
- import org.elasticsearch.action.update.UpdateRequest;
- import org.elasticsearch.action.update.UpdateResponse;
- import org.elasticsearch.common.xcontent.XContentBuilder;
- import org.elasticsearch.common.xcontent.XContentFactory;
- import org.elasticsearch.script.Script;
- import java.lang.reflect.InvocationTargetException;
- import java.lang.reflect.Method;
- import com.microsoft.sqlserver.jdbc.SQLServerException;
- Database database = null;
- PreparedStatement stat = null;
- PreparedStatement stat1 = null;
- RestHighLevelClient client = new RestHighLevelClient(
- RestClient.builder(
- new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
- Integer index = 0;
- public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
- {
- //logBasic("start---");
- String type = getVariable("type");
- String indexs = getVariable("index");
- //if (indexs != null) index = Integer.parseInt(indexs);
- Object[] r = getRow();
- if (r == null) {
-
- try {
- if (stat!=null) {
- stat.close();
- }
-
- if (stat1!=null) {
- stat1.close();
- }
- if (database!=null) {
- database.disconnect();
- }
- if(client!=null){
- client.close();
- }
- }
- catch(Exception e) {
- throw new KettleException(e);
- }
- setOutputDone();
- return false;
- }
- synchronized(this) {
- r = createOutputRow(r, data.outputRowMeta.size());
- //获取数据库名和表名
- String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null );
- String tablename = "DataImport_memberterminal";//getInputRowMeta().getString(r, "tablename", null );
- String idname = "MemberPhone";//getInputRowMeta().getString(r, "idname", null );
- String sourceidname = "termphoneid";//getInputRowMeta().getString(r, "sourceidname", null );
- String sourcetablename = "Orders";//getInputRowMeta().getString(r, "sourcetablename", null );
- if (dbName==null||tablename==null) {
- throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
- }
- //logBasic("table---"+tablename);
- if(database == null){
- //数据库连接
- DatabaseMeta databaseMeta=null;
- try {
- databaseMeta = getTransMeta().findDatabase(dbName);
- if (databaseMeta==null) {
- logError("A connection with name "+dbName+" could not be found!");
- setErrors(1);
- return false;
- }
- database = new Database(getTrans(), databaseMeta);
- database.connect();
- //logBasic("success!");
- } catch(Exception e) {
- logError("Connecting to database "+dbName+" failed.", e);
- setErrors(1);
- return false;
- }
- }
- //查询表数据
- try {
- RowMetaInterface idxRowMeta =data.outputRowMeta;
-
- int i=0;
- r = createOutputRow(r, data.outputRowMeta.size());
-
- //int index = getInputRowMeta().size();
-
- // Add the index name
- //
- String Id = idxRowMeta.getString(r, idname, null);
- // Add the column name
-
- String DataId = idxRowMeta.getString(r, sourceidname, null);
-
-
- /*String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
- ResultSet resultSet = null;
- resultSet = database.openQuery(sqlSelect);
- Object[] idxRow = database.getRow(resultSet);
- if (database!=null) {
- database.closeQuery(resultSet);
- resultSet = null;
- }
- //if(idxRow != null){
- // return true;
- //}
- */
-
- //logBasic("idxRow--Id"+Id);
- //logBasic("idxRow--sourcetablename"+sourcetablename);
- //logBasic("idxRow--DataId"+DataId);
-
-
- GetRequest getRequest = new GetRequest(
- "crm_memberberterminal", // Index
- "_doc", // /Type
- DataId); // Document id
- getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
- getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
- boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
- //client.close();
- if(exists ){
- return true;
- }
- //if(!exists && idxRow == null){
- // return true;
- //}
-
-
- //3.获得预处理对象
- String sql="insert into "+tablename+" values (?,?,?,?);";//begin tran t2; commit tran t2
- //logBasic("idxRow--database"+ database);
- if(stat == null)
- stat = database.prepareSQL(sql);
- //logBasic("idxRow--database"+ stat);
- //stat.addBatch(sql);
- //4.SQL语句占位符设置实际参数
- stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
- stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
- stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
- stat.setString(4, "phone3");
- //5.执行SQL语句
- boolean line = stat.execute();
- //int[] line = stat.executeBatch();
- //System.out.println("更新记录数"+ line);
- //6.释放资源
- //stat.close();
- setVariable("index",String.valueOf(index));
- //Integer pn = Integer.parseInt(Id);
- //Integer curpageNum = Integer.parseInt(Id) % pagesize;
- //if(pn > 0 && curpageNum == 0){
- // setVariable("page",String.valueOf(page));
- //}
- //logBasic("idxRow--getVariable"+getVariable("page"));
- //logBasic("idxRow--curpageNum"+curpageNum);
- //logBasic("idxRow--length"+i);
-
-
-
- }
- catch(SQLServerException e) {
- return true;
- }catch(Exception e) {
-
- throw new KettleException(e);
- }
- //释放连接
- //if (database!=null) {
- // database.disconnect();
- //}
- // Send the row on to the next step.
- }
- putRow(data.outputRowMeta, r);
- return true;
- }</class_source>
- </definition>
- </definitions>
- <fields>
- </fields>
- <clear_result_fields>N</clear_result_fields>
- <info_steps/>
- <target_steps/>
- <usage_parameters/>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>240</xloc>
- <yloc>240</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step>
- <name>terminaldoc表输入</name>
- <type>TableInput</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>1</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <connection>MallSqlserver</connection>
- <sql>select
- concat(OriginType,'_',dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1)) as termphoneid
- ,OriginType as TerminalCode
- ,BuyUserId as MemberId
- ,dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) as MemberPhone
- ,SourcePlatforms as MemSourcePlatforms
- ,DeviceType as MemDeviceType
- ,OrdersCode
- from Orders
- where AConsigneePhone1 !='' and AConsigneePhone1!= AConsigneePhone2 and AConsigneePhone1!=AConsigneePhone3</sql>
- <limit>0</limit>
- <lookup/>
- <execute_each_row>N</execute_each_row>
- <variables_active>N</variables_active>
- <lazy_conversion_active>N</lazy_conversion_active>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>128</xloc>
- <yloc>112</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step>
- <name>terminaldoc表输入 2</name>
- <type>TableInput</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>1</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <connection>MallSqlserver</connection>
- <sql>select
- concat(OriginType,'_',dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2)) as termphoneid
- ,OriginType as TerminalCode
- ,BuyUserId as MemberId
- ,dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) as MemberPhone
- ,SourcePlatforms as MemSourcePlatforms
- ,DeviceType as MemDeviceType
- ,OrdersCode
- from Orders
- where AConsigneePhone2 !='' and AConsigneePhone2!=AConsigneePhone1 and AConsigneePhone2!=AConsigneePhone3</sql>
- <limit>0</limit>
- <lookup/>
- <execute_each_row>N</execute_each_row>
- <variables_active>N</variables_active>
- <lazy_conversion_active>N</lazy_conversion_active>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>112</xloc>
- <yloc>176</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step>
- <name>terminaldoc表输入 3</name>
- <type>TableInput</type>
- <description/>
- <distribute>Y</distribute>
- <custom_distribution/>
- <copies>1</copies>
- <partitioning>
- <method>none</method>
- <schema_name/>
- </partitioning>
- <connection>MallSqlserver</connection>
- <sql>select
- concat(OriginType,'_',dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3)) as termphoneid
- ,OriginType as TerminalCode
- ,BuyUserId as MemberId
- ,dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) as MemberPhone
- ,SourcePlatforms as MemSourcePlatforms
- ,DeviceType as MemDeviceType
- ,OrdersCode
- from Orders
- where AConsigneePhone3 !='' and AConsigneePhone3!=AConsigneePhone2 and AConsigneePhone3 != AConsigneePhone1</sql>
- <limit>0</limit>
- <lookup/>
- <execute_each_row>N</execute_each_row>
- <variables_active>N</variables_active>
- <lazy_conversion_active>N</lazy_conversion_active>
- <attributes/>
- <cluster_schema/>
- <remotesteps>
- <input>
- </input>
- <output>
- </output>
- </remotesteps>
- <GUI>
- <xloc>128</xloc>
- <yloc>240</yloc>
- <draw>Y</draw>
- </GUI>
- </step>
- <step_error_handling>
- </step_error_handling>
- <slave-step-copy-partition-distribution>
- </slave-step-copy-partition-distribution>
- <slave_transformation>N</slave_transformation>
- <attributes/>
- </transformation>
|