mem_memberhealthdoc
Normal
/
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
TRANSNAME
Y
TRANSNAME
STATUS
Y
STATUS
LINES_READ
Y
LINES_READ
LINES_WRITTEN
Y
LINES_WRITTEN
LINES_UPDATED
Y
LINES_UPDATED
LINES_INPUT
Y
LINES_INPUT
LINES_OUTPUT
Y
LINES_OUTPUT
LINES_REJECTED
Y
LINES_REJECTED
ERRORS
Y
ERRORS
STARTDATE
Y
STARTDATE
ENDDATE
Y
ENDDATE
LOGDATE
Y
LOGDATE
DEPDATE
Y
DEPDATE
REPLAYDATE
Y
REPLAYDATE
LOG_FIELD
Y
LOG_FIELD
EXECUTING_SERVER
N
EXECUTING_SERVER
EXECUTING_USER
N
EXECUTING_USER
CLIENT
N
CLIENT
ID_BATCH
Y
ID_BATCH
SEQ_NR
Y
SEQ_NR
LOGDATE
Y
LOGDATE
TRANSNAME
Y
TRANSNAME
STEPNAME
Y
STEPNAME
STEP_COPY
Y
STEP_COPY
LINES_READ
Y
LINES_READ
LINES_WRITTEN
Y
LINES_WRITTEN
LINES_UPDATED
Y
LINES_UPDATED
LINES_INPUT
Y
LINES_INPUT
LINES_OUTPUT
Y
LINES_OUTPUT
LINES_REJECTED
Y
LINES_REJECTED
ERRORS
Y
ERRORS
INPUT_BUFFER_ROWS
Y
INPUT_BUFFER_ROWS
OUTPUT_BUFFER_ROWS
Y
OUTPUT_BUFFER_ROWS
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
LOG_DATE
Y
LOG_DATE
LOGGING_OBJECT_TYPE
Y
LOGGING_OBJECT_TYPE
OBJECT_NAME
Y
OBJECT_NAME
OBJECT_COPY
Y
OBJECT_COPY
REPOSITORY_DIRECTORY
Y
REPOSITORY_DIRECTORY
FILENAME
Y
FILENAME
OBJECT_ID
Y
OBJECT_ID
OBJECT_REVISION
Y
OBJECT_REVISION
PARENT_CHANNEL_ID
Y
PARENT_CHANNEL_ID
ROOT_CHANNEL_ID
Y
ROOT_CHANNEL_ID
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
LOG_DATE
Y
LOG_DATE
TRANSNAME
Y
TRANSNAME
STEPNAME
Y
STEPNAME
STEP_COPY
Y
STEP_COPY
LINES_READ
Y
LINES_READ
LINES_WRITTEN
Y
LINES_WRITTEN
LINES_UPDATED
Y
LINES_UPDATED
LINES_INPUT
Y
LINES_INPUT
LINES_OUTPUT
Y
LINES_OUTPUT
LINES_REJECTED
Y
LINES_REJECTED
ERRORS
Y
ERRORS
LOG_FIELD
N
LOG_FIELD
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
LOG_DATE
Y
LOG_DATE
METRICS_DATE
Y
METRICS_DATE
METRICS_CODE
Y
METRICS_CODE
METRICS_DESCRIPTION
Y
METRICS_DESCRIPTION
METRICS_SUBJECT
Y
METRICS_SUBJECT
METRICS_TYPE
Y
METRICS_TYPE
METRICS_VALUE
Y
METRICS_VALUE
0.0
0.0
10000
50
50
N
Y
50000
Y
N
1000
100
-
2019/06/11 11:23:16.606
-
2019/06/11 11:23:16.606
N
mall
192.168.50.32
MSSQLNATIVE
Native
Mall
1433
sa
Encrypted 2be98afc819c69e8ea300ff228dd38f99
FORCE_IDENTIFIERS_TO_LOWERCASE
N
FORCE_IDENTIFIERS_TO_UPPERCASE
N
IS_CLUSTERED
N
MSSQLUseIntegratedSecurity
false
MSSQL_DOUBLE_DECIMAL_SEPARATOR
N
PORT_NUMBER
1433
PRESERVE_RESERVED_WORD_CASE
Y
QUOTE_ALL_FIELDS
N
SUPPORTS_BOOLEAN_DATA_TYPE
Y
SUPPORTS_TIMESTAMP_DATA_TYPE
Y
USE_POOLING
N
Java 代码
Elasticsearch bulk insert 2
Y
memberhealth表输入
Java 代码
Y
memberhealth表输入 2
Java 代码
Y
memberhealth表输入 3
Java 代码
Y
Elasticsearch bulk insert 2
ElasticSearchBulk
Y
1
none
crm_memberhealth
_doc
1000
SECONDS
N
MemberPhone
Y
N
Y
MemberAge
MemberAge
MemberGender
MemberGender
MemberHeight
MemberHeight
MemberId
MemberId
MemberName
MemberName
MemberPhone
MemberPhone
MemberWeight
MemberWeight
192.168.50.32
9300
cluster.name
es
custom.fields.aliase
mem_memberhealth
745
204
Y
Java 代码
UserDefinedJavaClass
N
20
none
TRANSFORM_CLASS
Processor
import java.sql.*;
import org.pentaho.di.core.database.*;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.script.Script;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import com.microsoft.sqlserver.jdbc.SQLServerException;
Database database = null;
PreparedStatement stat = null;
PreparedStatement stat1 = null;
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
Integer index = 0;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
//logBasic("start---");
String type = getVariable("type");
String indexs = getVariable("index");
//if (indexs != null) index = Integer.parseInt(indexs);
Object[] r = getRow();
if (r == null) {
try {
if (stat!=null) {
stat.close();
}
if (stat1!=null) {
stat1.close();
}
if (database!=null) {
database.disconnect();
}
if(client!=null){
client.close();
}
}
catch(Exception e) {
throw new KettleException(e);
}
setOutputDone();
return false;
}
synchronized(this) {
r = createOutputRow(r, data.outputRowMeta.size());
//获取数据库名和表名
String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null );
String tablename = "DataImport_heathytemp";//getInputRowMeta().getString(r, "tablename", null );
String idname = "MemberId";//getInputRowMeta().getString(r, "idname", null );
String sourceidname = "MemberPhone";//getInputRowMeta().getString(r, "sourceidname", null );
String sourcetablename = "Orders";//getInputRowMeta().getString(r, "sourcetablename", null );
if (dbName==null||tablename==null) {
throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
}
//logBasic("table---"+tablename);
if(database == null){
//数据库连接
DatabaseMeta databaseMeta=null;
try {
databaseMeta = getTransMeta().findDatabase(dbName);
if (databaseMeta==null) {
logError("A connection with name "+dbName+" could not be found!");
setErrors(1);
return false;
}
database = new Database(getTrans(), databaseMeta);
database.connect();
//logBasic("success!");
} catch(Exception e) {
logError("Connecting to database "+dbName+" failed.", e);
setErrors(1);
return false;
}
}
//查询表数据
try {
RowMetaInterface idxRowMeta =data.outputRowMeta;
int i=0;
r = createOutputRow(r, data.outputRowMeta.size());
//int index = getInputRowMeta().size();
// Add the index name
//
String Id = idxRowMeta.getString(r, idname, null);
// Add the column name
String DataId = idxRowMeta.getString(r, sourceidname, null);
/*String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
ResultSet resultSet = null;
resultSet = database.openQuery(sqlSelect);
Object[] idxRow = database.getRow(resultSet);
if (database!=null) {
database.closeQuery(resultSet);
resultSet = null;
}
//if(idxRow != null){
// return true;
//}
*/
//logBasic("idxRow--Id"+Id);
//logBasic("idxRow--sourcetablename"+sourcetablename);
//logBasic("idxRow--DataId"+DataId);
GetRequest getRequest = new GetRequest(
"crm_memberheathytemp", // Index
"_doc", // /Type
DataId); // Document id
getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
//client.close();
if(exists ){
return true;
}
//if(!exists && idxRow == null){
// return true;
//}
//3.获得预处理对象
String sql="insert into "+tablename+"(Id,DataName,DataId,Type) values (?,?,?,?);";//begin tran t2; commit tran t2
//logBasic("idxRow--database"+ database);
index = index + 1;
if(stat == null)
stat = database.prepareSQL(sql);
//logBasic("idxRow--database"+ stat);
//stat.addBatch(sql);
//4.SQL语句占位符设置实际参数
stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
stat.setString(4, "phone1,phone2");
//stat.setString(5,MemberFriendsPhone)
//stat.setString(5, index);
//5.执行SQL语句
boolean line = stat.execute();
//int[] line = stat.executeBatch();
//System.out.println("更新记录数"+ line);
//6.释放资源
//stat.close();
//setVariable("index",String.valueOf(index));
//Integer pn = Integer.parseInt(Id);
//Integer curpageNum = Integer.parseInt(Id) % pagesize;
//if(pn > 0 && curpageNum == 0){
// setVariable("page",String.valueOf(page));
//}
//logBasic("idxRow--getVariable"+getVariable("page"));
//logBasic("idxRow--curpageNum"+curpageNum);
//logBasic("idxRow--length"+i);
}
catch(SQLServerException e) {
return true;
}catch(Exception e) {
throw new KettleException(e);
}
//释放连接
//if (database!=null) {
// database.disconnect();
//}
// Send the row on to the next step.
}
putRow(data.outputRowMeta, r);
return true;
}
N
537
268
Y
memberhealth表输入
TableInput
Y
1
none
mall
SELECT
a.BuyUserId AS MemberId,
a.Consignee AS MemberName,
a.Height AS MemberHeight,
a.Weight AS MemberWeight,
CASE WHEN b.Gender IS NULL THEN a.Sex END AS MemberGender,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) AS MemberPhone,
b.age as MemberAge
FROM
[Mall].[dbo].[Orders] a
LEFT JOIN
[Mall]..PrescriptionRecord AS b ON a.OrdersCode = b.OrdersCode
WHERE AConsigneePhone1 !=''
order by a.BuyUserId
0
N
N
N
393
156
Y
memberhealth表输入 2
TableInput
Y
1
none
mall
SELECT
a.BuyUserId AS MemberId,
a.Consignee AS MemberName,
a.Height AS MemberHeight,
a.Weight AS MemberWeight,
CASE WHEN b.Gender IS NULL THEN a.Sex END AS MemberGender,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) AS MemberPhone,
b.age as MemberAge
FROM
[Mall].[dbo].[Orders] a
LEFT JOIN
[Mall]..PrescriptionRecord AS b ON a.OrdersCode = b.OrdersCode
WHERE AConsigneePhone2 !=''
order by a.BuyUserId
0
N
N
N
361
236
Y
memberhealth表输入 3
TableInput
Y
1
none
mall
SELECT
a.BuyUserId AS MemberId,
a.Consignee AS MemberName,
a.Height AS MemberHeight,
a.Weight AS MemberWeight,
CASE WHEN b.Gender IS NULL THEN a.Sex END AS MemberGender,
Mall.dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) AS MemberPhone,
b.age as MemberAge
FROM
[Mall].[dbo].[Orders] a
LEFT JOIN
[Mall]..PrescriptionRecord AS b ON a.OrdersCode = b.OrdersCode
WHERE AConsigneePhone3 !=''
order by a.BuyUserId
0
N
N
N
412
332
Y
N