0.00.0100005050NY50000YN1000100-2019/05/26 19:55:18.893-2019/05/26 19:55:18.893H4sIAAAAAAAAAAMAAAAAAAAAAAA=NMallSqlserver192.168.50.32MSSQLNATIVENativeMall1433saEncrypted 2be98afc819c69e8ea300ff228dd38f99EXTRA_OPTION_MSSQLNATIVE.defaultRowPrefetch200EXTRA_OPTION_MSSQLNATIVE.readTimeout60FORCE_IDENTIFIERS_TO_LOWERCASENFORCE_IDENTIFIERS_TO_UPPERCASENINITIAL_POOL_SIZE500IS_CLUSTEREDNMAXIMUM_POOL_SIZE1000MSSQLUseIntegratedSecurityfalseMSSQL_DOUBLE_DECIMAL_SEPARATORNPORT_NUMBER1433PRESERVE_RESERVED_WORD_CASEYQUOTE_ALL_FIELDSNSUPPORTS_BOOLEAN_DATA_TYPEYSUPPORTS_TIMESTAMP_DATA_TYPEYUSE_POOLINGY表输入 2Java 代码YJava 代码Elasticsearch bulk insert 2YElasticsearch bulk insert 2ElasticSearchBulkY1nonecrm_memberbase_doc5000100SECONDSNMemberKeyYNYGenderGenderMemReceiverMobileMemReceiverMobileMemReceiverPhoneMemReceiverPhoneMemUsualAddressMemUsualAddressMemUsualAreaCodeMemUsualAreaCodeMemUsualCityCodeMemUsualCityCodeMemUsualPhoneMemUsualPhoneMemUsualProvinceCodeMemUsualProvinceCodeMemberKeyMemberKeyMemberNickNameMemberNickNameOriginTypeOriginTypeRegisterTimeRegisterTimeorder_join_fieldorder_join_field
192.168.50.32
9300cluster.nameescustom.fields.order_join_field{ "type": "join","relations": {"Member": "Order,OriginType" }}54496YJava 代码UserDefinedJavaClassN120noneTRANSFORM_CLASSProcessorimport java.sql.*;
import org.pentaho.di.core.database.*;
Database database = null;
PreparedStatement stat = null;
ResultSet resultSet = null;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
int page = Integer.parseInt(getVariable("page"));
int pagesize = Integer.parseInt(getVariable("pagesize"));
Object[] r = getRow();
if (r == null) {
try {
if (stat!=null) {
stat.close();
}
if (database!=null) {
database.disconnect();
}
}
catch(Exception e) {
throw new KettleException(e);
}
setOutputDone();
return false;
}
r = createOutputRow(r, data.outputRowMeta.size());
//获取数据库名和表名
String dbName = "testSqlServer";//getInputRowMeta().getString(r, "conname", null );
String tablename = "DataImport";//getInputRowMeta().getString(r, "tablename", null );
String idname = "MemberKey";//getInputRowMeta().getString(r, "idname", null );
String sourceidname = "MemUsualPhone";//getInputRowMeta().getString(r, "sourceidname", null );
String sourcetablename = "Order";//getInputRowMeta().getString(r, "sourcetablename", null );
if (dbName==null||tablename==null) {
throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
}
//logBasic("table---"+tablename);
if(database == null){
//数据库连接
DatabaseMeta databaseMeta=null;
try {
databaseMeta = getTransMeta().findDatabase(dbName);
if (databaseMeta==null) {
logError("A connection with name "+dbName+" could not be found!");
setErrors(1);
return false;
}
database = new Database(getTrans(), databaseMeta);
database.connect();
//logBasic("success!");
} catch(Exception e) {
logError("Connecting to database "+dbName+" failed.", e);
setErrors(1);
return false;
}
}
//查询表数据
try {
RowMetaInterface idxRowMeta =data.outputRowMeta;
int i=0;
r = createOutputRow(r, data.outputRowMeta.size());
//int index = getInputRowMeta().size();
// Add the index name
//
String Id = idxRowMeta.getString(r, idname, null);
// Add the column name
String DataId = idxRowMeta.getString(r, sourceidname, null);
String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
resultSet = database.openQuery(sqlSelect);
Object[] idxRow = database.getRow(resultSet);
if(idxRow!=null){
if (database!=null) {
database.closeQuery(resultSet);
}
return true;
}
//logBasic("idxRow--Id"+Id);
//logBasic("idxRow--sourcetablename"+sourcetablename);
//logBasic("idxRow--DataId"+DataId);
//3.获得预处理对象
String sql=" insert into "+tablename+" values (?,?,?)";
//logBasic("idxRow--database"+ database);
stat = database.prepareSQL(sql);
//logBasic("idxRow--database"+ stat);
//stat.addBatch(sql);
//4.SQL语句占位符设置实际参数
stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
//5.执行SQL语句
boolean line = stat.execute();
//int[] line = stat.executeBatch();
System.out.println("更新记录数"+ line);
//6.释放资源
//stat.close();
Integer pn = Integer.parseInt(Id);
Integer curpageNum = Integer.parseInt(Id) % pagesize;
if(pn > 0 && curpageNum == 0){
setVariable("page",String.valueOf(page));
logBasic("idxRow--getVariable"+getVariable("page"));
}
//logBasic("idxRow--length"+i);
}
catch(Exception e) {
throw new KettleException(e);
}
//释放连接
//if (database!=null) {
// database.disconnect();
//}
// Send the row on to the next step.
putRow(data.outputRowMeta, r);
return true;
}N32064Y表输入 2TableInputN1noneMallSqlserverselect ROW_NUMBER() over (order by AConsigneePhone2 asc ) as MemberKey,
sex as MemberGender,
Consignee as MemberNickName,
case when AConsigneePhone2= '' then
(case
when AconsigneePhone1!='' then dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1)
else ''end)
ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) end as MemUsualPhone,--电话号码
case when AConsigneePhone1= '' then ''
ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) end as MemReceiverMobile,--手机号码
case when AConsigneePhone3= '' then ''
ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) end as MemReceiverPhone,--电话号码
case when ADeliveryAddress= '' then ''
ELSE dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',ADeliveryAddress) end as MemUsualAddress,
OriginType,
( select CONVERT(varchar(100), min(b.OrderTime), 20) from orders b where b.AConsigneePhone2=a.AConsigneePhone2
and b.OrderTime!='1800/01/01 00:00:00'
group by b.AConsigneePhone2
)
as RegisterTime,
case when b0.Level= 3 then a.RegionCode else null end as MemUsualAreaCode,
case when b1.Level= 2 then b.ParentCode else b.AreaCode end as MemUsualCityCode,
case when c1.Level= 1 then c.ParentCode else c.AreaCode end as MemUsualProvinceCode
,1 as HasOrder
,convert(varchar,datepart(year,getdate())) as RegisterYear
,convert(varchar,datepart(month,getdate())) as RegisterMonth
,convert(varchar,datepart(day,getdate())) as RegisterDay
,'Member' as order_join_field
from orders a
left join Areamap b0 on b0.MappingCode=a.RegionCode
left join AreaRegion b on b.AreaCode=a.RegionCode left join Areamap b1 on b1.MappingCode=b.ParentCode
left join AreaRegion c on c.AreaCode=b.ParentCode left join Areamap c1 on c1.MappingCode=c.ParentCode
where 1=1 and (a.AConsigneePhone2!= '' or a.AConsigneePhone1!= '')
order by AConsigneePhone2 asc offset ((${page}-1)*${pagesize}) rows fetch NEXT (${pagesize}) rows only
0数据检验NYN192112YN