0.00.0100005050NY50000YN1000100-2019/05/21 19:06:33.293-2019/05/21 19:06:33.293H4sIAAAAAAAAAAMAAAAAAAAAAAA=NMallSqlserver192.168.50.32MSSQLNATIVENativeMall1433saEncrypted 2be98afc819c69e8ea300ff228dd38f99EXTRA_OPTION_MSSQLNATIVE.defaultRowPrefetch200EXTRA_OPTION_MSSQLNATIVE.readTimeout60FORCE_IDENTIFIERS_TO_LOWERCASENFORCE_IDENTIFIERS_TO_UPPERCASENINITIAL_POOL_SIZE100IS_CLUSTEREDNMAXIMUM_POOL_SIZE300MSSQLUseIntegratedSecurityfalseMSSQL_DOUBLE_DECIMAL_SEPARATORNPORT_NUMBER1433PRESERVE_RESERVED_WORD_CASEYQUOTE_ALL_FIELDSNSUPPORTS_BOOLEAN_DATA_TYPEYSUPPORTS_TIMESTAMP_DATA_TYPEYUSE_POOLINGYJava 代码Elasticsearch bulk insert 2Y表输入 2Java 代码YElasticsearch bulk insert 2ElasticSearchBulkY1nonecrm_memberbase_doc5000100SECONDSNMemberKeyYNYGenderGenderMemReceiverMobileMemReceiverMobileMemReceiverPhoneMemReceiverPhoneMemUsualAddressMemUsualAddressMemUsualAreaCodeMemUsualAreaCodeMemUsualCityCodeMemUsualCityCodeMemUsualPhoneMemUsualPhoneMemUsualProvinceCodeMemUsualProvinceCodeMemberKeyMemberKeyMemberNickNameMemberNickNameOriginTypeOriginTypeRegisterTimeRegisterTimeorder_join_fieldorder_join_field
192.168.50.32
9300cluster.nameescustom.fields.order_join_field{ "type": "join","relations": {"Member": "Order,OriginType" }}256272YJava 代码UserDefinedJavaClassY1noneTRANSFORM_CLASSProcessorimport java.sql.*;
import org.pentaho.di.core.database.*;
Database database = null;
PreparedStatement stat = null;
ResultSet resultSet = null;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
Object[] r = getRow();
if (r == null) {
try {
if (stat!=null) {
stat.close();
}
if (database!=null) {
database.disconnect();
}
}
catch(Exception e) {
throw new KettleException(e);
}
setOutputDone();
return false;
}
r = createOutputRow(r, data.outputRowMeta.size());
//获取数据库名和表名
String dbName = "testSqlServer";//getInputRowMeta().getString(r, "conname", null );
String tablename = "DataImport";//getInputRowMeta().getString(r, "tablename", null );
String idname = "MemberKey";//getInputRowMeta().getString(r, "idname", null );
String sourceidname = "MemUsualPhone";//getInputRowMeta().getString(r, "sourceidname", null );
String sourcetablename = "Order";//getInputRowMeta().getString(r, "sourcetablename", null );
if (dbName==null||tablename==null) {
throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
}
//logBasic("table---"+tablename);
if(database == null){
//数据库连接
DatabaseMeta databaseMeta=null;
try {
databaseMeta = getTransMeta().findDatabase(dbName);
if (databaseMeta==null) {
logError("A connection with name "+dbName+" could not be found!");
setErrors(1);
return false;
}
database = new Database(getTrans(), databaseMeta);
database.connect();
//logBasic("success!");
} catch(Exception e) {
logError("Connecting to database "+dbName+" failed.", e);
setErrors(1);
return false;
}
}
//查询表数据
try {
RowMetaInterface idxRowMeta =data.outputRowMeta;
int i=0;
r = createOutputRow(r, data.outputRowMeta.size());
//int index = getInputRowMeta().size();
// Add the index name
//
String Id = idxRowMeta.getString(r, idname, null);
// Add the column name
String DataId = idxRowMeta.getString(r, sourceidname, null);
String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
resultSet = database.openQuery(sqlSelect);
Object[] idxRow = database.getRow(resultSet);
if(idxRow!=null){
if (database!=null) {
database.closeQuery(resultSet);
}
return true;
}
//logBasic("idxRow--Id"+Id);
//logBasic("idxRow--sourcetablename"+sourcetablename);
//logBasic("idxRow--DataId"+DataId);
//3.获得预处理对象
String sql=" insert into "+tablename+" values (?,?,?)";
//logBasic("idxRow--database"+ database);
stat = database.prepareSQL(sql);
//logBasic("idxRow--database"+ stat);
//stat.addBatch(sql);
//4.SQL语句占位符设置实际参数
stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
//5.执行SQL语句
boolean line = stat.execute();
//int[] line = stat.executeBatch();
System.out.println("更新记录数"+ line);
//6.释放资源
//stat.close();
//logBasic("idxRow--length"+i);
}
catch(Exception e) {
throw new KettleException(e);
}
//释放连接
//if (database!=null) {
// database.disconnect();
//}
// Send the row on to the next step.
putRow(data.outputRowMeta, r);
return true;
}N368176Y表输入 2TableInputY1noneMallSqlserverselect ROW_NUMBER() over (order by AConsigneePhone2 asc ) as MemberKey,
sex as MemberGender,
Consignee as MemberNickName,
case when AConsigneePhone2= '' then
(case
when AconsigneePhone1!='' then dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1)
else ''end)
ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) end as MemUsualPhone,--电话号码
case when AConsigneePhone1= '' then ''
ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) end as MemReceiverMobile,--手机号码
case when AConsigneePhone3= '' then ''
ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) end as MemReceiverPhone,--电话号码
case when ADeliveryAddress= '' then ''
ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',ADeliveryAddress) end as MemUsualAddress,
OriginType,
( select CONVERT(varchar(100), min(b.OrderTime), 20) from orders b where b.AConsigneePhone2=a.AConsigneePhone2
and b.OrderTime!='1800/01/01 00:00:00'
group by b.AConsigneePhone2
)
as RegisterTime,
case when b0.Level= 3 then a.RegionCode else null end as MemUsualAreaCode,
case when b1.Level= 2 then b.ParentCode else b.AreaCode end as MemUsualCityCode,
case when c1.Level= 1 then c.ParentCode else c.AreaCode end as MemUsualProvinceCode
,1 as HasOrder
,convert(varchar,datepart(year,getdate())) as RegisterYear
,convert(varchar,datepart(month,getdate())) as RegisterMonth
,convert(varchar,datepart(day,getdate())) as RegisterDay
,'Member' as order_join_field
from orders a
left join Areamap b0 on b0.MappingCode=a.RegionCode
left join AreaRegion b on b.AreaCode=a.RegionCode left join Areamap b1 on b1.MappingCode=b.ParentCode
left join AreaRegion c on c.AreaCode=b.ParentCode left join Areamap c1 on c1.MappingCode=c.ParentCode
where 1=1 and (a.AConsigneePhone2!= '' or a.AConsigneePhone1!= '')
0NNN256112YN