etl_all_OrdersPrescription
Normal
/
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
TRANSNAME
Y
TRANSNAME
STATUS
Y
STATUS
LINES_READ
Y
LINES_READ
LINES_WRITTEN
Y
LINES_WRITTEN
LINES_UPDATED
Y
LINES_UPDATED
LINES_INPUT
Y
LINES_INPUT
LINES_OUTPUT
Y
LINES_OUTPUT
LINES_REJECTED
Y
LINES_REJECTED
ERRORS
Y
ERRORS
STARTDATE
Y
STARTDATE
ENDDATE
Y
ENDDATE
LOGDATE
Y
LOGDATE
DEPDATE
Y
DEPDATE
REPLAYDATE
Y
REPLAYDATE
LOG_FIELD
Y
LOG_FIELD
EXECUTING_SERVER
N
EXECUTING_SERVER
EXECUTING_USER
N
EXECUTING_USER
CLIENT
N
CLIENT
ID_BATCH
Y
ID_BATCH
SEQ_NR
Y
SEQ_NR
LOGDATE
Y
LOGDATE
TRANSNAME
Y
TRANSNAME
STEPNAME
Y
STEPNAME
STEP_COPY
Y
STEP_COPY
LINES_READ
Y
LINES_READ
LINES_WRITTEN
Y
LINES_WRITTEN
LINES_UPDATED
Y
LINES_UPDATED
LINES_INPUT
Y
LINES_INPUT
LINES_OUTPUT
Y
LINES_OUTPUT
LINES_REJECTED
Y
LINES_REJECTED
ERRORS
Y
ERRORS
INPUT_BUFFER_ROWS
Y
INPUT_BUFFER_ROWS
OUTPUT_BUFFER_ROWS
Y
OUTPUT_BUFFER_ROWS
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
LOG_DATE
Y
LOG_DATE
LOGGING_OBJECT_TYPE
Y
LOGGING_OBJECT_TYPE
OBJECT_NAME
Y
OBJECT_NAME
OBJECT_COPY
Y
OBJECT_COPY
REPOSITORY_DIRECTORY
Y
REPOSITORY_DIRECTORY
FILENAME
Y
FILENAME
OBJECT_ID
Y
OBJECT_ID
OBJECT_REVISION
Y
OBJECT_REVISION
PARENT_CHANNEL_ID
Y
PARENT_CHANNEL_ID
ROOT_CHANNEL_ID
Y
ROOT_CHANNEL_ID
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
LOG_DATE
Y
LOG_DATE
TRANSNAME
Y
TRANSNAME
STEPNAME
Y
STEPNAME
STEP_COPY
Y
STEP_COPY
LINES_READ
Y
LINES_READ
LINES_WRITTEN
Y
LINES_WRITTEN
LINES_UPDATED
Y
LINES_UPDATED
LINES_INPUT
Y
LINES_INPUT
LINES_OUTPUT
Y
LINES_OUTPUT
LINES_REJECTED
Y
LINES_REJECTED
ERRORS
Y
ERRORS
LOG_FIELD
N
LOG_FIELD
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
LOG_DATE
Y
LOG_DATE
METRICS_DATE
Y
METRICS_DATE
METRICS_CODE
Y
METRICS_CODE
METRICS_DESCRIPTION
Y
METRICS_DESCRIPTION
METRICS_SUBJECT
Y
METRICS_SUBJECT
METRICS_TYPE
Y
METRICS_TYPE
METRICS_VALUE
Y
METRICS_VALUE
0.0
0.0
10000
50
50
N
Y
50000
Y
N
1000
100
-
2019/07/12 17:22:11.899
-
2019/07/12 17:22:11.899
H4sIAAAAAAAAAAMAAAAAAAAAAAA=
N
219.128.77.96Mall
219.128.77.96
MSSQLNATIVE
Native
Mall
1433
caixukun
Encrypted 2be98afc86aa7f2e4a801a5508cc2fe83
FORCE_IDENTIFIERS_TO_LOWERCASE
N
FORCE_IDENTIFIERS_TO_UPPERCASE
N
IS_CLUSTERED
N
MSSQLUseIntegratedSecurity
false
MSSQL_DOUBLE_DECIMAL_SEPARATOR
N
PORT_NUMBER
1433
PRESERVE_RESERVED_WORD_CASE
Y
QUOTE_ALL_FIELDS
N
SUPPORTS_BOOLEAN_DATA_TYPE
Y
SUPPORTS_TIMESTAMP_DATA_TYPE
Y
USE_POOLING
N
mall
192.168.50.32
MSSQLNATIVE
Native
Mall
1433
sa
Encrypted 2be98afc86aa7f297aa15a478c7d38f99
FORCE_IDENTIFIERS_TO_LOWERCASE
N
FORCE_IDENTIFIERS_TO_UPPERCASE
N
IS_CLUSTERED
N
MSSQLUseIntegratedSecurity
false
MSSQL_DOUBLE_DECIMAL_SEPARATOR
N
PORT_NUMBER
1433
PRESERVE_RESERVED_WORD_CASE
Y
QUOTE_ALL_FIELDS
N
SUPPORTS_BOOLEAN_DATA_TYPE
Y
SUPPORTS_TIMESTAMP_DATA_TYPE
Y
USE_POOLING
N
Java 代码
表输出
Y
表输入
Java 代码
Y
Java 代码
UserDefinedJavaClass
Y
10
none
TRANSFORM_CLASS
Processor
import java.sql.*;
import org.pentaho.di.core.database.*;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.script.Script;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import com.microsoft.sqlserver.jdbc.SQLServerException;
Database database = null;
PreparedStatement stat = null;
PreparedStatement stat1 = null;
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
Integer index = 0;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
//logBasic("start---");
Object[] r = getRow();
if (r == null) {
try {
if (stat!=null) {
stat.close();
}
if (stat1!=null) {
stat1.close();
}
if (database!=null) {
database.disconnect();
}
if(client!=null){
client.close();
}
}
catch(Exception e) {
throw new KettleException(e);
}
setOutputDone();
return false;
}
synchronized(this) {
r = createOutputRow(r, data.outputRowMeta.size());
//获取数据库名和表名
String dbName = "mall";
String tablename = "PrescriptionRecord";//getInputRowMeta().getString(r, "tablename", null );
String sourceidname = "Id";//getInputRowMeta().getString(r, "sourceidname", null );
if (dbName==null||tablename==null) {
throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
}
//logBasic("table---"+tablename);
if(database == null){
//数据库连接
DatabaseMeta databaseMeta=null;
try {
databaseMeta = getTransMeta().findDatabase(dbName);
if (databaseMeta==null) {
logError("A connection with name "+dbName+" could not be found!");
setErrors(1);
return false;
}
database = new Database(getTrans(), databaseMeta);
database.connect();
//logBasic("success!");
} catch(Exception e) {
logError("Connecting to database "+dbName+" failed.", e);
setErrors(1);
return false;
}
}
//查询表数据
try {
RowMetaInterface idxRowMeta =data.outputRowMeta;
int i=0;
r = createOutputRow(r, data.outputRowMeta.size());
//int index = getInputRowMeta().size();
// Add the column name
String DataId = idxRowMeta.getString(r, sourceidname, null);
String sqlSelect = "select Id from "+tablename + " where Id = '"+ DataId +"'";
ResultSet resultSet = null;
resultSet = database.openQuery(sqlSelect);
Object[] idxRow = database.getRow(resultSet);
if (database!=null) {
database.closeQuery(resultSet);
resultSet = null;
idxRowMeta = null;
}
if(idxRow != null){
return true;
}
//logBasic("idxRow--Id"+Id);
//logBasic("idxRow--sourcetablename"+sourcetablename);
//logBasic("idxRow--DataId"+DataId);
/*
GetRequest getRequest = new GetRequest(
"crm_order_routingphone", // Index
"_doc", // /Type
DataId); // Document id
getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
//client.close();
if(exists ){
return true;
}
//if(!exists && idxRow == null){
// return true;
//}
*/
/*
//3.获得预处理对象
String sql="insert into "+tablename+" values (?,?,?,?);";//begin tran t2; commit tran t2
//logBasic("idxRow--database"+ database);
if(stat == null)
stat = database.prepareSQL(sql);
//logBasic("idxRow--database"+ stat);
//stat.addBatch(sql);
//4.SQL语句占位符设置实际参数
stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
stat.setString(4, "phone1");
//stat.setInt(5, index);
//5.执行SQL语句
boolean line = stat.execute();
//int[] line = stat.executeBatch();
//System.out.println("更新记录数"+ line);
//6.释放资源
//stat.close();
*/
}
catch(SQLServerException e) {
return true;
}catch(Exception e) {
throw new KettleException(e);
}
//释放连接
//if (database!=null) {
// database.disconnect();
//}
// Send the row on to the next step.
}
putRow(data.outputRowMeta, r);
return true;
}
N
352
80
Y
表输入
TableInput
Y
1
none
219.128.77.96Mall
SELECT * FROM PrescriptionRecord a
where a.CreationDate > '2019-05-01 17:56:29'
and datediff(day,DATEADD(d,0,DATEDIFF(d,0,getdate())-10),CreationDate )>0
order by CreationDate asc
0
N
N
N
192
96
Y
表输出
TableOutput
Y
1
none
mall
dbo
1000
N
N
Y
N
N
N
Y
N
Y
N
400
192
Y
N