0.00.055050NY50000YN1000100-2019/06/10 16:15:14.565-2019/06/10 16:15:14.565Nmall192.168.50.32MSSQLNATIVENativeMall1433saEncrypted 2be98afc819c69e8ea300ff228dd38f99FORCE_IDENTIFIERS_TO_LOWERCASENFORCE_IDENTIFIERS_TO_UPPERCASENIS_CLUSTEREDNMSSQLUseIntegratedSecurityfalseMSSQL_DOUBLE_DECIMAL_SEPARATORNPORT_NUMBER1433PRESERVE_RESERVED_WORD_CASEYQUOTE_ALL_FIELDSNSUPPORTS_BOOLEAN_DATA_TYPEYSUPPORTS_TIMESTAMP_DATA_TYPEYUSE_POOLINGNmemberdata192.168.50.32MSSQLNATIVENativeMemberData1433saEncrypted 2be98afc819c69e8ea300ff228dd38f99FORCE_IDENTIFIERS_TO_LOWERCASENFORCE_IDENTIFIERS_TO_UPPERCASENIS_CLUSTEREDNMSSQLUseIntegratedSecurityfalseMSSQL_DOUBLE_DECIMAL_SEPARATORNPORT_NUMBER1433PRESERVE_RESERVED_WORD_CASEYQUOTE_ALL_FIELDSNSUPPORTS_BOOLEAN_DATA_TYPEYSUPPORTS_TIMESTAMP_DATA_TYPEYUSE_POOLINGNmemberfriends表输入Java 代码YJava 代码表输出YJava 代码Elasticsearch bulk insertYmemberfriends表输入 3Java 代码Ymemberfriends表输入 2Java 代码Ymemberfriends表输入TableInputY1nonemallselect
concat(dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1),'_',dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2))
as PhoneId
,CreationDate AS MemberCrtTime
,BuyUserId as MemberId
,dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) as MemberPhone
,dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) as MemberFriendsPhone
from Orders
where
AConsigneePhone1 !=''
AND AConsigneePhone1 !=AConsigneePhone2
AND AConsigneePhone2 !=''0NNN64112YJava 代码UserDefinedJavaClassN20noneTRANSFORM_CLASSProcessorimport java.sql.*;
import org.pentaho.di.core.database.*;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.script.Script;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import com.microsoft.sqlserver.jdbc.SQLServerException;
Database database = null;
PreparedStatement stat = null;
PreparedStatement stat1 = null;
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
Integer index = 0;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
//logBasic("start---");
String type = getVariable("type");
String indexs = getVariable("index");
//if (indexs != null) index = Integer.parseInt(indexs);
Object[] r = getRow();
if (r == null) {
try {
if (stat!=null) {
stat.close();
}
if (stat1!=null) {
stat1.close();
}
if (database!=null) {
database.disconnect();
}
if(client!=null){
client.close();
}
}
catch(Exception e) {
throw new KettleException(e);
}
setOutputDone();
return false;
}
synchronized(this) {
r = createOutputRow(r, data.outputRowMeta.size());
//获取数据库名和表名
String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null );
String tablename = "DataImport_Friend";//getInputRowMeta().getString(r, "tablename", null );
String idname = "MemberPhone";//getInputRowMeta().getString(r, "idname", null );
String sourceidname = "PhoneId";//getInputRowMeta().getString(r, "sourceidname", null );
String sourcetablename = "Orders";//getInputRowMeta().getString(r, "sourcetablename", null );
if (dbName==null||tablename==null) {
throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
}
//logBasic("table---"+tablename);
if(database == null){
//数据库连接
DatabaseMeta databaseMeta=null;
try {
databaseMeta = getTransMeta().findDatabase(dbName);
if (databaseMeta==null) {
logError("A connection with name "+dbName+" could not be found!");
setErrors(1);
return false;
}
database = new Database(getTrans(), databaseMeta);
database.connect();
//logBasic("success!");
} catch(Exception e) {
logError("Connecting to database "+dbName+" failed.", e);
setErrors(1);
return false;
}
}
//查询表数据
try {
RowMetaInterface idxRowMeta =data.outputRowMeta;
int i=0;
r = createOutputRow(r, data.outputRowMeta.size());
//int index = getInputRowMeta().size();
// Add the index name
//
String Id = idxRowMeta.getString(r, idname, null);
// Add the column name
String DataId = idxRowMeta.getString(r, sourceidname, null);
/*String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
ResultSet resultSet = null;
resultSet = database.openQuery(sqlSelect);
Object[] idxRow = database.getRow(resultSet);
if (database!=null) {
database.closeQuery(resultSet);
resultSet = null;
}
//if(idxRow != null){
// return true;
//}
*/
//logBasic("idxRow--Id"+Id);
//logBasic("idxRow--sourcetablename"+sourcetablename);
//logBasic("idxRow--DataId"+DataId);
GetRequest getRequest = new GetRequest(
"crm_memberfriends", // Index
"_doc", // /Type
DataId); // Document id
getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
//client.close();
if(exists ){
return true;
}
//if(!exists && idxRow == null){
// return true;
//}
//3.获得预处理对象
String sql="insert into "+tablename+"(Id,DataName,DataId,Type) values (?,?,?,?);";//begin tran t2; commit tran t2
//logBasic("idxRow--database"+ database);
index = index + 1;
if(stat == null)
stat = database.prepareSQL(sql);
//logBasic("idxRow--database"+ stat);
//stat.addBatch(sql);
//4.SQL语句占位符设置实际参数
stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
stat.setString(4, "phone1,phone2");
//stat.setString(5,MemberFriendsPhone)
//stat.setString(5, index);
//5.执行SQL语句
boolean line = stat.execute();
//int[] line = stat.executeBatch();
//System.out.println("更新记录数"+ line);
//6.释放资源
//stat.close();
//setVariable("index",String.valueOf(index));
//Integer pn = Integer.parseInt(Id);
//Integer curpageNum = Integer.parseInt(Id) % pagesize;
//if(pn > 0 && curpageNum == 0){
// setVariable("page",String.valueOf(page));
//}
//logBasic("idxRow--getVariable"+getVariable("page"));
//logBasic("idxRow--curpageNum"+curpageNum);
//logBasic("idxRow--length"+i);
}
catch(SQLServerException e) {
return true;
}catch(Exception e) {
throw new KettleException(e);
}
//释放连接
//if (database!=null) {
// database.disconnect();
//}
// Send the row on to the next step.
}
putRow(data.outputRowMeta, r);
return true;
}N224112Y表输出TableOutputY1nonememberdata
mem_MemberFriends
1000NNYYNNYNYNphoneidphoneidMemberCrtTimeMemberCrtTimeMemberIdMemberIdMemberPhoneMemberPhoneMemberFriendsPhoneMemberFriendsPhone224256YElasticsearch bulk insertElasticSearchBulkY1nonecrm_memberfriends_doc1000SECONDSNPhoneIdYNYMemberCrtTimeMemberCrtTimeMemberFriendsPhoneMemberFriendsPhoneMemberIdMemberIdMemberPhoneMemberPhonePhoneIdPhoneId
192.168.50.32
9300cluster.nameescustom.fields.aliasemem_memberfriends41680Ymemberfriends表输入 2TableInputY1nonemallselect
concat(dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2),'_',dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3))
as PhoneId
,CreationDate AS MemberCrtTime
,BuyUserId as MemberId
,dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) as MemberPhone
,dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) as MemberFriendsPhone
from Orders
where
AConsigneePhone2 !=''
AND AConsigneePhone2 !=AConsigneePhone3
AND AConsigneePhone3 !=''0NNN80288Ymemberfriends表输入 3TableInputY1nonemallselect
concat(dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1),'_',dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3))
as PhoneId
,CreationDate AS MemberCrtTime
,BuyUserId as MemberId
,dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) as MemberPhone
,dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) as MemberFriendsPhone
from Orders
where
AConsigneePhone1 !=''
AND AConsigneePhone1 !=AConsigneePhone3
AND AConsigneePhone3 !=''0NNN80208YN