0.00.0100005050NY50000YN1000100-2019/06/10 13:44:56.679-2019/06/10 13:44:56.679H4sIAAAAAAAAAAMAAAAAAAAAAAA=Nmall192.168.50.32MSSQLNATIVENativeMall1433saEncrypted 2be98afc819c69e8ea300ff228dd38f99FORCE_IDENTIFIERS_TO_LOWERCASENFORCE_IDENTIFIERS_TO_UPPERCASENIS_CLUSTEREDNMSSQLUseIntegratedSecurityfalseMSSQL_DOUBLE_DECIMAL_SEPARATORNPORT_NUMBER1433PRESERVE_RESERVED_WORD_CASEYQUOTE_ALL_FIELDSNSUPPORTS_BOOLEAN_DATA_TYPEYSUPPORTS_TIMESTAMP_DATA_TYPEYUSE_POOLINGNmemberheathytemp表输入Java 代码YJava 代码Elasticsearch bulk insert 2Ymemberheathytemp表输入 2Java 代码Ymemberheathytemp表输入 3Java 代码YElasticsearch bulk insert 2ElasticSearchBulkY1nonecrm_memberhealth_temp_doc1000SECONDSNIdYNYDiseaseClassCodeDiseaseClassCodeMemTerminalIdMemTerminalIdMemberAgeMemberAgeMemberGenderMemberGenderMemberHeightMemberHeightMemberIdMemberIdMemberNameMemberNameMemberPhone1MemberPhone1MemberPhone2MemberPhone2MemberPhone3MemberPhone3MemberUpdateTimeMemberUpdateTimeMemberWeightMemberWeight
192.168.50.32
9300cluster.nameescustom.fields.aliasemem_memberhealth_temp784128YJava 代码UserDefinedJavaClassN20noneTRANSFORM_CLASSProcessorimport java.sql.*;
import org.pentaho.di.core.database.*;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.script.Script;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import com.microsoft.sqlserver.jdbc.SQLServerException;
Database database = null;
PreparedStatement stat = null;
PreparedStatement stat1 = null;
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
Integer index = 0;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
//logBasic("start---");
String type = getVariable("type");
String indexs = getVariable("index");
//if (indexs != null) index = Integer.parseInt(indexs);
Object[] r = getRow();
if (r == null) {
try {
if (stat!=null) {
stat.close();
}
if (stat1!=null) {
stat1.close();
}
if (database!=null) {
database.disconnect();
}
if(client!=null){
client.close();
}
}
catch(Exception e) {
throw new KettleException(e);
}
setOutputDone();
return false;
}
synchronized(this) {
r = createOutputRow(r, data.outputRowMeta.size());
//获取数据库名和表名
String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null );
String tablename = "DataImport_heathytemp";//getInputRowMeta().getString(r, "tablename", null );
String idname = "MemberId";//getInputRowMeta().getString(r, "idname", null );
String sourceidname = "Id";//getInputRowMeta().getString(r, "sourceidname", null );
String sourcetablename = "Orders";//getInputRowMeta().getString(r, "sourcetablename", null );
if (dbName==null||tablename==null) {
throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
}
//logBasic("table---"+tablename);
if(database == null){
//数据库连接
DatabaseMeta databaseMeta=null;
try {
databaseMeta = getTransMeta().findDatabase(dbName);
if (databaseMeta==null) {
logError("A connection with name "+dbName+" could not be found!");
setErrors(1);
return false;
}
database = new Database(getTrans(), databaseMeta);
database.connect();
//logBasic("success!");
} catch(Exception e) {
logError("Connecting to database "+dbName+" failed.", e);
setErrors(1);
return false;
}
}
//查询表数据
try {
RowMetaInterface idxRowMeta =data.outputRowMeta;
int i=0;
r = createOutputRow(r, data.outputRowMeta.size());
//int index = getInputRowMeta().size();
// Add the index name
//
String Id = idxRowMeta.getString(r, idname, null);
// Add the column name
String DataId = idxRowMeta.getString(r, sourceidname, null);
/*String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
ResultSet resultSet = null;
resultSet = database.openQuery(sqlSelect);
Object[] idxRow = database.getRow(resultSet);
if (database!=null) {
database.closeQuery(resultSet);
resultSet = null;
}
//if(idxRow != null){
// return true;
//}
*/
//logBasic("idxRow--Id"+Id);
//logBasic("idxRow--sourcetablename"+sourcetablename);
//logBasic("idxRow--DataId"+DataId);
GetRequest getRequest = new GetRequest(
"crm_memberheathytemp", // Index
"_doc", // /Type
DataId); // Document id
getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
//client.close();
if(exists ){
return true;
}
//if(!exists && idxRow == null){
// return true;
//}
//3.获得预处理对象
String sql="insert into "+tablename+"(Id,DataName,DataId,Type) values (?,?,?,?);";//begin tran t2; commit tran t2
//logBasic("idxRow--database"+ database);
index = index + 1;
if(stat == null)
stat = database.prepareSQL(sql);
//logBasic("idxRow--database"+ stat);
//stat.addBatch(sql);
//4.SQL语句占位符设置实际参数
stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
stat.setString(4, "phone1,phone2");
//stat.setString(5,MemberFriendsPhone)
//stat.setString(5, index);
//5.执行SQL语句
boolean line = stat.execute();
//int[] line = stat.executeBatch();
//System.out.println("更新记录数"+ line);
//6.释放资源
//stat.close();
//setVariable("index",String.valueOf(index));
//Integer pn = Integer.parseInt(Id);
//Integer curpageNum = Integer.parseInt(Id) % pagesize;
//if(pn > 0 && curpageNum == 0){
// setVariable("page",String.valueOf(page));
//}
//logBasic("idxRow--getVariable"+getVariable("page"));
//logBasic("idxRow--curpageNum"+curpageNum);
//logBasic("idxRow--length"+i);
}
catch(SQLServerException e) {
return true;
}catch(Exception e) {
throw new KettleException(e);
}
//释放连接
//if (database!=null) {
// database.disconnect();
//}
// Send the row on to the next step.
}
putRow(data.outputRowMeta, r);
return true;
}N576192Ymemberheathytemp表输入TableInputY1nonemallSELECT
a.BuyUserId AS Id ,
a.BuyUserId AS MemberId
,a.Consignee AS MemberName,
a.Height AS MemberHeight,
a.Weight AS MemberWeight,
CASE WHEN b.Gender IS NULL THEN a.Sex END AS MemberGender,
a.OrderTime AS MemberUpdateTime,
a.DiseaseClassCode,
a.OriginType AS MemTerminalId,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) AS MemberPhone1,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) AS MemberPhone2,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) AS MemberPhone3,
b.age as MemberAge
FROM
[Mall].[dbo].[Orders] a
LEFT JOIN
[Mall]..PrescriptionRecord AS b ON a.OrdersCode = b.OrdersCode
WHERE AConsigneePhone1 !=''
order by a.BuyUserId0NNN43280Ymemberheathytemp表输入 2TableInputY1nonemallSELECT
a.BuyUserId AS Id ,
a.BuyUserId AS MemberId
,a.Consignee AS MemberName,
a.Height AS MemberHeight,
a.Weight AS MemberWeight,
CASE WHEN b.Gender IS NULL THEN a.Sex END AS MemberGender,
a.OrderTime AS MemberUpdateTime,
a.DiseaseClassCode,
a.OriginType AS MemTerminalId,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) AS MemberPhone1,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) AS MemberPhone2,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) AS MemberPhone3,
b.age as MemberAge
FROM
[Mall].[dbo].[Orders] a
LEFT JOIN
[Mall]..PrescriptionRecord AS b ON a.OrdersCode = b.OrdersCode
where AConsigneePhone2 !=''
order by a.BuyUserId0NNN400160Ymemberheathytemp表输入 3TableInputY1nonemallSELECT
a.BuyUserId AS Id ,
a.BuyUserId AS MemberId
,a.Consignee AS MemberName,
a.Height AS MemberHeight,
a.Weight AS MemberWeight,
CASE WHEN b.Gender IS NULL THEN a.Sex END AS MemberGender,
a.OrderTime AS MemberUpdateTime,
a.DiseaseClassCode,
a.OriginType AS MemTerminalId,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) AS MemberPhone1,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) AS MemberPhone2,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) AS MemberPhone3,
b.age as MemberAge
FROM
[Mall].[dbo].[Orders] a
LEFT JOIN
[Mall]..PrescriptionRecord AS b ON a.OrdersCode = b.OrdersCode
where AConsigneePhone3 !=''
order by a.BuyUserId0NNN451256YN