mem_memberbase_append_memberId
Normal
0
/
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
TRANSNAME
Y
TRANSNAME
STATUS
Y
STATUS
LINES_READ
Y
LINES_READ
LINES_WRITTEN
Y
LINES_WRITTEN
LINES_UPDATED
Y
LINES_UPDATED
LINES_INPUT
Y
LINES_INPUT
LINES_OUTPUT
Y
LINES_OUTPUT
LINES_REJECTED
Y
LINES_REJECTED
ERRORS
Y
ERRORS
STARTDATE
Y
STARTDATE
ENDDATE
Y
ENDDATE
LOGDATE
Y
LOGDATE
DEPDATE
Y
DEPDATE
REPLAYDATE
Y
REPLAYDATE
LOG_FIELD
Y
LOG_FIELD
EXECUTING_SERVER
N
EXECUTING_SERVER
EXECUTING_USER
N
EXECUTING_USER
CLIENT
N
CLIENT
ID_BATCH
Y
ID_BATCH
SEQ_NR
Y
SEQ_NR
LOGDATE
Y
LOGDATE
TRANSNAME
Y
TRANSNAME
STEPNAME
Y
STEPNAME
STEP_COPY
Y
STEP_COPY
LINES_READ
Y
LINES_READ
LINES_WRITTEN
Y
LINES_WRITTEN
LINES_UPDATED
Y
LINES_UPDATED
LINES_INPUT
Y
LINES_INPUT
LINES_OUTPUT
Y
LINES_OUTPUT
LINES_REJECTED
Y
LINES_REJECTED
ERRORS
Y
ERRORS
INPUT_BUFFER_ROWS
Y
INPUT_BUFFER_ROWS
OUTPUT_BUFFER_ROWS
Y
OUTPUT_BUFFER_ROWS
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
LOG_DATE
Y
LOG_DATE
LOGGING_OBJECT_TYPE
Y
LOGGING_OBJECT_TYPE
OBJECT_NAME
Y
OBJECT_NAME
OBJECT_COPY
Y
OBJECT_COPY
REPOSITORY_DIRECTORY
Y
REPOSITORY_DIRECTORY
FILENAME
Y
FILENAME
OBJECT_ID
Y
OBJECT_ID
OBJECT_REVISION
Y
OBJECT_REVISION
PARENT_CHANNEL_ID
Y
PARENT_CHANNEL_ID
ROOT_CHANNEL_ID
Y
ROOT_CHANNEL_ID
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
LOG_DATE
Y
LOG_DATE
TRANSNAME
Y
TRANSNAME
STEPNAME
Y
STEPNAME
STEP_COPY
Y
STEP_COPY
LINES_READ
Y
LINES_READ
LINES_WRITTEN
Y
LINES_WRITTEN
LINES_UPDATED
Y
LINES_UPDATED
LINES_INPUT
Y
LINES_INPUT
LINES_OUTPUT
Y
LINES_OUTPUT
LINES_REJECTED
Y
LINES_REJECTED
ERRORS
Y
ERRORS
LOG_FIELD
N
LOG_FIELD
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
LOG_DATE
Y
LOG_DATE
METRICS_DATE
Y
METRICS_DATE
METRICS_CODE
Y
METRICS_CODE
METRICS_DESCRIPTION
Y
METRICS_DESCRIPTION
METRICS_SUBJECT
Y
METRICS_SUBJECT
METRICS_TYPE
Y
METRICS_TYPE
METRICS_VALUE
Y
METRICS_VALUE
0.0
0.0
3
50
50
N
Y
50000
Y
N
1000
100
-
2019/05/21 19:06:33.293
-
2019/05/21 19:06:33.293
H4sIAAAAAAAAAAMAAAAAAAAAAAA=
N
MemberSqlserver
192.168.50.32
MSSQLNATIVE
Native
MemberData
1433
sa
Encrypted 2be98afc819c69e8ea300ff228dd38f99
EXTRA_OPTION_MSSQLNATIVE.instance
MemberData
FORCE_IDENTIFIERS_TO_LOWERCASE
N
FORCE_IDENTIFIERS_TO_UPPERCASE
N
INITIAL_POOL_SIZE
100
IS_CLUSTERED
N
MAXIMUM_POOL_SIZE
300
MSSQLUseIntegratedSecurity
false
MSSQL_DOUBLE_DECIMAL_SEPARATOR
N
PORT_NUMBER
1433
PRESERVE_RESERVED_WORD_CASE
Y
QUOTE_ALL_FIELDS
N
SUPPORTS_BOOLEAN_DATA_TYPE
Y
SUPPORTS_TIMESTAMP_DATA_TYPE
Y
USE_POOLING
Y
表输入 2
Java 代码
Y
Java 代码
UserDefinedJavaClass
Y
10
none
TRANSFORM_CLASS
Processor
import java.sql.*;
import org.pentaho.di.core.database.*;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.script.Script;
Database database = null;
PreparedStatement stat = null;
PreparedStatement stat1 = null;
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
//logBasic("start---");
Object[] r = getRow();
if (r == null) {
try {
if (stat!=null) {
stat.close();
}
if (stat1!=null) {
stat1.close();
}
if (database!=null) {
database.disconnect();
}
if(client!=null){
client.close();
}
}
catch(Exception e) {
throw new KettleException(e);
}
setOutputDone();
return false;
}
synchronized(this) {
r = createOutputRow(r, data.outputRowMeta.size());
//获取数据库名和表名
String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null );
String tablename = "DataImport";//getInputRowMeta().getString(r, "tablename", null );
String idname = "Id";//getInputRowMeta().getString(r, "idname", null );
String idname1 = "MemPhone1";//getInputRowMeta().getString(r, "idname", null );
String idname2 = "MemPhone2";//getInputRowMeta().getString(r, "idname", null );
String idname3 = "MemPhone3";//getInputRowMeta().getString(r, "idname", null );
String idname4 = "MemberId";//getInputRowMeta().getString(r, "idname", null );
String idname5 = "OrdersCode";//getInputRowMeta().getString(r, "idname", null );
String sourceidname = "DataId";//getInputRowMeta().getString(r, "sourceidname", null );
String sourcetablename = "Order";//getInputRowMeta().getString(r, "sourcetablename", null );
if (dbName==null||tablename==null) {
throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
}
//logBasic("table---"+tablename);
if(database == null){
//数据库连接
DatabaseMeta databaseMeta=null;
try {
databaseMeta = getTransMeta().findDatabase(dbName);
if (databaseMeta==null) {
logError("A connection with name "+dbName+" could not be found!");
setErrors(1);
return false;
}
database = new Database(getTrans(), databaseMeta);
database.connect();
//logBasic("success!");
} catch(Exception e) {
logError("Connecting to database "+dbName+" failed.", e);
setErrors(1);
return false;
}
}
//查询表数据
try {
RowMetaInterface idxRowMeta =data.outputRowMeta;
int i=0;
r = createOutputRow(r, data.outputRowMeta.size());
//int index = getInputRowMeta().size();
// Add the index name
//
String Id = idxRowMeta.getString(r, idname, null);
String Id1 = idxRowMeta.getString(r, idname1, null);
String Id2 = idxRowMeta.getString(r, idname2, null);
String Id3 = idxRowMeta.getString(r, idname3, null);
String Id4 = idxRowMeta.getString(r, idname4, null);
String Id5 = idxRowMeta.getString(r, idname5, null);
// Add the column name
String DataId = idxRowMeta.getString(r, sourceidname, null);
/*//String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
String sqlSelect = "select Id from "+tablename + "_mem where Id = '"+ Id +"'";
ResultSet resultSet = null;
resultSet = database.openQuery(sqlSelect);
Object[] idxRow = database.getRow(resultSet);
if(idxRow!=null){
if (database!=null) {
database.closeQuery(resultSet);
}
return true;
}*/
//logBasic("idxRow--Id"+Id);
//logBasic("idxRow--sourcetablename"+sourcetablename);
//logBasic("idxRow--DataId"+DataId);
GetRequest getRequest = new GetRequest(
"crm_memberbase", // Index
"_doc", // Type
Id); // Document id
getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
//client.close();
if(exists) {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("crm_memberbase");
updateRequest.type("_doc");
updateRequest.id(Id);
updateRequest.doc(XContentFactory.jsonBuilder()
.startObject()
.field(idname1, Id1)
.field(idname2, Id2)
.field(idname3, Id3)
.field(idname4, Id4)
.field(idname5, Id5)
.endObject());
client.update(updateRequest,RequestOptions.DEFAULT);
}
//logBasic("idxRow--length"+i);
}
catch(Exception e) {
throw new KettleException(e);
}
//释放连接
//if (database!=null) {
// database.disconnect();
//}
// Send the row on to the next step.
}
putRow(data.outputRowMeta, r);
return true;
}
N
288
96
Y
表输入 2
TableInput
Y
1
none
MemberSqlserver
select *
--,concat('{"name": "Order","parent": "',MemKey,'"}')as order_join_field
from (
SELECT
row_number() over (order by AConsigneePhone2 asc ) as MemKey,
AconsigneePhone2,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AconsigneePhone1 ) as MemPhone1,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AconsigneePhone2 ) as MemPhone2,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AconsigneePhone3 ) as MemPhone3,
BuyUserId as MemberId
, OrdersCode
from Mall..Orders
where 1=1 and (AconsigneePhone1!='' or AconsigneePhone2!='')
--order by AConsigneePhone2 asc
--offset 0 rows fetch NEXT 2 rows only
)a
left join MemberData..DataImport as b on a.MemKey = b.Id
--dbo.DesEncryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',b.dataid ) = (case when AConsigneePhone2= '' then
--(case
-- when AconsigneePhone1!='' then AconsigneePhone1
-- else ''end)
-- ELSE AconsigneePhone2 end)
where
1=1
and b.Id is not null
and MemKey >(108-1)*5000 --and MemKey <= 10
--MemKey >((${page}-1)*${pagesize}) and MemKey <= ((${page}-1)*${pagesize} + (${pagesize}))
--order by a.AConsigneePhone2 asc
--offset ((14-1)*5000) rows --fetch next (10) rows only
0
N
N
N
192
32
Y
N