etl_all_Orders
Normal
/
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
TRANSNAME
Y
TRANSNAME
STATUS
Y
STATUS
LINES_READ
Y
LINES_READ
LINES_WRITTEN
Y
LINES_WRITTEN
LINES_UPDATED
Y
LINES_UPDATED
LINES_INPUT
Y
LINES_INPUT
LINES_OUTPUT
Y
LINES_OUTPUT
LINES_REJECTED
Y
LINES_REJECTED
ERRORS
Y
ERRORS
STARTDATE
Y
STARTDATE
ENDDATE
Y
ENDDATE
LOGDATE
Y
LOGDATE
DEPDATE
Y
DEPDATE
REPLAYDATE
Y
REPLAYDATE
LOG_FIELD
Y
LOG_FIELD
EXECUTING_SERVER
N
EXECUTING_SERVER
EXECUTING_USER
N
EXECUTING_USER
CLIENT
N
CLIENT
ID_BATCH
Y
ID_BATCH
SEQ_NR
Y
SEQ_NR
LOGDATE
Y
LOGDATE
TRANSNAME
Y
TRANSNAME
STEPNAME
Y
STEPNAME
STEP_COPY
Y
STEP_COPY
LINES_READ
Y
LINES_READ
LINES_WRITTEN
Y
LINES_WRITTEN
LINES_UPDATED
Y
LINES_UPDATED
LINES_INPUT
Y
LINES_INPUT
LINES_OUTPUT
Y
LINES_OUTPUT
LINES_REJECTED
Y
LINES_REJECTED
ERRORS
Y
ERRORS
INPUT_BUFFER_ROWS
Y
INPUT_BUFFER_ROWS
OUTPUT_BUFFER_ROWS
Y
OUTPUT_BUFFER_ROWS
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
LOG_DATE
Y
LOG_DATE
LOGGING_OBJECT_TYPE
Y
LOGGING_OBJECT_TYPE
OBJECT_NAME
Y
OBJECT_NAME
OBJECT_COPY
Y
OBJECT_COPY
REPOSITORY_DIRECTORY
Y
REPOSITORY_DIRECTORY
FILENAME
Y
FILENAME
OBJECT_ID
Y
OBJECT_ID
OBJECT_REVISION
Y
OBJECT_REVISION
PARENT_CHANNEL_ID
Y
PARENT_CHANNEL_ID
ROOT_CHANNEL_ID
Y
ROOT_CHANNEL_ID
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
LOG_DATE
Y
LOG_DATE
TRANSNAME
Y
TRANSNAME
STEPNAME
Y
STEPNAME
STEP_COPY
Y
STEP_COPY
LINES_READ
Y
LINES_READ
LINES_WRITTEN
Y
LINES_WRITTEN
LINES_UPDATED
Y
LINES_UPDATED
LINES_INPUT
Y
LINES_INPUT
LINES_OUTPUT
Y
LINES_OUTPUT
LINES_REJECTED
Y
LINES_REJECTED
ERRORS
Y
ERRORS
LOG_FIELD
N
LOG_FIELD
ID_BATCH
Y
ID_BATCH
CHANNEL_ID
Y
CHANNEL_ID
LOG_DATE
Y
LOG_DATE
METRICS_DATE
Y
METRICS_DATE
METRICS_CODE
Y
METRICS_CODE
METRICS_DESCRIPTION
Y
METRICS_DESCRIPTION
METRICS_SUBJECT
Y
METRICS_SUBJECT
METRICS_TYPE
Y
METRICS_TYPE
METRICS_VALUE
Y
METRICS_VALUE
0.0
0.0
10000
50
50
N
Y
50000
Y
N
1000
100
-
2019/07/11 15:20:46.215
-
2019/07/11 15:20:46.215
H4sIAAAAAAAAAAMAAAAAAAAAAAA=
N
219.128.77.96Mall
219.128.77.96
MSSQLNATIVE
Native
Mall
1433
caixukun
Encrypted 2be98afc86aa7f2e4a801a5508cc2fe83
FORCE_IDENTIFIERS_TO_LOWERCASE
N
FORCE_IDENTIFIERS_TO_UPPERCASE
N
IS_CLUSTERED
N
MSSQLUseIntegratedSecurity
false
MSSQL_DOUBLE_DECIMAL_SEPARATOR
N
PORT_NUMBER
1433
PRESERVE_RESERVED_WORD_CASE
Y
QUOTE_ALL_FIELDS
N
SUPPORTS_BOOLEAN_DATA_TYPE
Y
SUPPORTS_TIMESTAMP_DATA_TYPE
Y
USE_POOLING
N
mall
192.168.50.32
MSSQLNATIVE
Native
Mall
1433
sa
Encrypted 2be98afc86aa7f297aa15a478c7d38f99
FORCE_IDENTIFIERS_TO_LOWERCASE
N
FORCE_IDENTIFIERS_TO_UPPERCASE
N
INITIAL_POOL_SIZE
30
IS_CLUSTERED
N
MAXIMUM_POOL_SIZE
1000
MSSQLUseIntegratedSecurity
false
MSSQL_DOUBLE_DECIMAL_SEPARATOR
N
PORT_NUMBER
1433
PRESERVE_RESERVED_WORD_CASE
Y
QUOTE_ALL_FIELDS
N
SUPPORTS_BOOLEAN_DATA_TYPE
Y
SUPPORTS_TIMESTAMP_DATA_TYPE
Y
USE_POOLING
Y
表输入
Java 代码
Y
Java 代码
表输出
Y
Java 代码
UserDefinedJavaClass
Y
10
none
TRANSFORM_CLASS
Processor
import java.sql.*;
import org.pentaho.di.core.database.*;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.script.Script;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import com.microsoft.sqlserver.jdbc.SQLServerException;
Database database = null;
PreparedStatement stat = null;
PreparedStatement stat1 = null;
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
Integer index = 0;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
//logBasic("start---");
Object[] r = getRow();
if (r == null) {
try {
if (stat!=null) {
stat.close();
}
if (stat1!=null) {
stat1.close();
}
if (database!=null) {
database.disconnect();
}
if(client!=null){
client.close();
}
}
catch(Exception e) {
throw new KettleException(e);
}
setOutputDone();
return false;
}
synchronized(this) {
r = createOutputRow(r, data.outputRowMeta.size());
//获取数据库名和表名
String dbName = "mall";
String tablename = "Orders";//getInputRowMeta().getString(r, "tablename", null );
String sourceidname = "OrdersCode";//getInputRowMeta().getString(r, "sourceidname", null );
if (dbName==null||tablename==null) {
throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
}
//logBasic("table---"+tablename);
if(database == null){
//数据库连接
DatabaseMeta databaseMeta=null;
try {
databaseMeta = getTransMeta().findDatabase(dbName);
if (databaseMeta==null) {
logError("A connection with name "+dbName+" could not be found!");
setErrors(1);
return false;
}
database = new Database(getTrans(), databaseMeta);
database.connect();
//logBasic("success!");
} catch(Exception e) {
logError("Connecting to database "+dbName+" failed.", e);
setErrors(1);
return false;
}
}
//查询表数据
try {
RowMetaInterface idxRowMeta =data.outputRowMeta;
int i=0;
r = createOutputRow(r, data.outputRowMeta.size());
//int index = getInputRowMeta().size();
// Add the column name
String DataId = idxRowMeta.getString(r, sourceidname, null);
String sqlSelect = "select OrdersCode from "+tablename + " where OrdersCode = '"+ DataId +"'";
ResultSet resultSet = null;
resultSet = database.openQuery(sqlSelect);
Object[] idxRow = database.getRow(resultSet);
if (database!=null) {
database.closeQuery(resultSet);
resultSet = null;
idxRowMeta = null;
}
if(idxRow != null){
return true;
}
//logBasic("idxRow--Id"+Id);
//logBasic("idxRow--sourcetablename"+sourcetablename);
//logBasic("idxRow--DataId"+DataId);
/*
GetRequest getRequest = new GetRequest(
"crm_order_routingphone", // Index
"_doc", // /Type
DataId); // Document id
getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
//client.close();
if(exists ){
return true;
}
//if(!exists && idxRow == null){
// return true;
//}
*/
/*
//3.获得预处理对象
String sql="insert into "+tablename+" values (?,?,?,?);";//begin tran t2; commit tran t2
//logBasic("idxRow--database"+ database);
if(stat == null)
stat = database.prepareSQL(sql);
//logBasic("idxRow--database"+ stat);
//stat.addBatch(sql);
//4.SQL语句占位符设置实际参数
stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
stat.setString(4, "phone1");
//stat.setInt(5, index);
//5.执行SQL语句
boolean line = stat.execute();
//int[] line = stat.executeBatch();
//System.out.println("更新记录数"+ line);
//6.释放资源
//stat.close();
*/
}
catch(SQLServerException e) {
return true;
}catch(Exception e) {
throw new KettleException(e);
}
//释放连接
//if (database!=null) {
// database.disconnect();
//}
// Send the row on to the next step.
}
putRow(data.outputRowMeta, r);
return true;
}
N
352
96
Y
表输入
TableInput
Y
1
none
219.128.77.96Mall
SELECT
OrdersCode
, CreationDate
, LastModified
, AccountId
, AccountName
, Consignee
, isnull(DeliveryZipCode,'') as DeliveryZipCode
, Invoice
, InvoiceNo
, OrderNotes
, Sum
, InitSum
, ProductCost
, Reducing
, Discountprice
, PaymentType
, DeliveryType
, OrderTime
, ShippingTime
, ReceivingTime
, ShippingNo
, OrderStatus
, IsReturn
, TransportCosts
, RegionCode
, IsRxDrug
, OriginType
, OperateID
, DeptId
, DeptCode
, BuyUserId
, ProductLine
, SubProductLine
, DiseaseClassCode
, Age
, Sex
, SyncStatus
, IsVisible
, ErrorCount
, IsNeedReceipt
, OrdersIntegral
, CouponCode
, CouponValue
, Ispayment
, AuditorId
, AllocationTime
, SourcePlatforms
, DeviceType
, IsReward
, IsEmailmark
, CustomerDataId
, IsDrug
, HuaWuOriginType
, InvoiceSum
, AdvanceSum
, AgainOrderCount
, ConfimLevel
, BusinessId
, IsPayByCard
, PreferentialRate
, VerificationCode
, AConsigneePhone1
, AConsigneePhone2
, AConsigneePhone3
, ADeliveryAddress
, IsAutomaticSigned
, WarehouseCode
, CashCouponValue
, CashCouponCode
, IsDealWith
, IsNewOrder
, ConsigneeType
, CustomerCode
, CustomerName
, InvoiceContent
, InvoiceContentType
, OrdersType
, Weight
, Height
, HealthPrice
, LeverNumber
, IsO2O
, AdvancePayType
, InventoryAuditDate
, OrderProcessDate
, AllocationFlag
, BonusDate
, IntegralValue
, IsToERP
, MerchantNote
, OtherNote
, IsPrescription
, ConfirmId
, OrderType
, OrderFlag
FROM Orders
where 1=1
and OrderTime>'2019-06-09 22:22:47'--'2019-05-03 03:51:59'
and datediff(day,DATEADD(d,0,DATEDIFF(d,0,getdate())-5),OrderTime )>0
order by OrderTime asc
0
N
N
N
200
100
Y
表输出
TableOutput
Y
1
none
mall
dbo
1000
N
N
Y
N
N
N
Y
N
Y
N
OrdersCode
OrdersCode
CreationDate
CreationDate
LastModified
LastModified
AccountId
AccountId
AccountName
AccountName
Consignee
Consignee
DeliveryZipCode
DeliveryZipCode
Invoice
Invoice
InvoiceNo
InvoiceNo
OrderNotes
OrderNotes
Sum
Sum
InitSum
InitSum
ProductCost
ProductCost
Reducing
Reducing
Discountprice
Discountprice
PaymentType
PaymentType
DeliveryType
DeliveryType
OrderTime
OrderTime
ShippingTime
ShippingTime
ReceivingTime
ReceivingTime
ShippingNo
ShippingNo
OrderStatus
OrderStatus
IsReturn
IsReturn
TransportCosts
TransportCosts
RegionCode
RegionCode
IsRxDrug
IsRxDrug
OriginType
OriginType
OperateID
OperateID
DeptId
DeptId
DeptCode
DeptCode
BuyUserId
BuyUserId
ProductLine
ProductLine
SubProductLine
SubProductLine
DiseaseClassCode
DiseaseClassCode
Age
Age
Sex
Sex
SyncStatus
SyncStatus
IsVisible
IsVisible
ErrorCount
ErrorCount
IsNeedReceipt
IsNeedReceipt
OrdersIntegral
OrdersIntegral
CouponCode
CouponCode
CouponValue
CouponValue
Ispayment
Ispayment
AuditorId
AuditorId
AllocationTime
AllocationTime
SourcePlatforms
SourcePlatforms
DeviceType
DeviceType
IsReward
IsReward
IsEmailmark
IsEmailmark
CustomerDataId
CustomerDataId
IsDrug
IsDrug
HuaWuOriginType
HuaWuOriginType
InvoiceSum
InvoiceSum
AdvanceSum
AdvanceSum
AgainOrderCount
AgainOrderCount
ConfimLevel
ConfimLevel
BusinessId
BusinessId
IsPayByCard
IsPayByCard
PreferentialRate
PreferentialRate
VerificationCode
VerificationCode
AConsigneePhone1
AConsigneePhone1
AConsigneePhone2
AConsigneePhone2
AConsigneePhone3
AConsigneePhone3
ADeliveryAddress
ADeliveryAddress
IsAutomaticSigned
IsAutomaticSigned
WarehouseCode
WarehouseCode
CashCouponValue
CashCouponValue
CashCouponCode
CashCouponCode
IsDealWith
IsDealWith
IsNewOrder
IsNewOrder
ConsigneeType
ConsigneeType
CustomerCode
CustomerCode
CustomerName
CustomerName
InvoiceContent
InvoiceContent
InvoiceContentType
InvoiceContentType
OrdersType
OrdersType
Weight
Weight
Height
Height
HealthPrice
HealthPrice
LeverNumber
LeverNumber
IsO2O
IsO2O
AdvancePayType
AdvancePayType
InventoryAuditDate
InventoryAuditDate
OrderProcessDate
OrderProcessDate
AllocationFlag
AllocationFlag
BonusDate
BonusDate
IntegralValue
IntegralValue
IsToERP
IsToERP
MerchantNote
MerchantNote
OtherNote
OtherNote
IsPrescription
IsPrescription
ConfirmId
ConfirmId
OrderType
OrderType
OrderFlag
OrderFlag
400
200
Y
N