mem_memberext_etl_perday.ktr 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <transformation>
  3. <info>
  4. <name>mem_memberext_etl_perday</name>
  5. <description/>
  6. <extended_description/>
  7. <trans_version/>
  8. <trans_type>Normal</trans_type>
  9. <trans_status>0</trans_status>
  10. <directory>/</directory>
  11. <parameters>
  12. </parameters>
  13. <log>
  14. <trans-log-table>
  15. <connection/>
  16. <schema/>
  17. <table/>
  18. <size_limit_lines/>
  19. <interval/>
  20. <timeout_days/>
  21. <field>
  22. <id>ID_BATCH</id>
  23. <enabled>Y</enabled>
  24. <name>ID_BATCH</name>
  25. </field>
  26. <field>
  27. <id>CHANNEL_ID</id>
  28. <enabled>Y</enabled>
  29. <name>CHANNEL_ID</name>
  30. </field>
  31. <field>
  32. <id>TRANSNAME</id>
  33. <enabled>Y</enabled>
  34. <name>TRANSNAME</name>
  35. </field>
  36. <field>
  37. <id>STATUS</id>
  38. <enabled>Y</enabled>
  39. <name>STATUS</name>
  40. </field>
  41. <field>
  42. <id>LINES_READ</id>
  43. <enabled>Y</enabled>
  44. <name>LINES_READ</name>
  45. <subject/>
  46. </field>
  47. <field>
  48. <id>LINES_WRITTEN</id>
  49. <enabled>Y</enabled>
  50. <name>LINES_WRITTEN</name>
  51. <subject/>
  52. </field>
  53. <field>
  54. <id>LINES_UPDATED</id>
  55. <enabled>Y</enabled>
  56. <name>LINES_UPDATED</name>
  57. <subject/>
  58. </field>
  59. <field>
  60. <id>LINES_INPUT</id>
  61. <enabled>Y</enabled>
  62. <name>LINES_INPUT</name>
  63. <subject/>
  64. </field>
  65. <field>
  66. <id>LINES_OUTPUT</id>
  67. <enabled>Y</enabled>
  68. <name>LINES_OUTPUT</name>
  69. <subject/>
  70. </field>
  71. <field>
  72. <id>LINES_REJECTED</id>
  73. <enabled>Y</enabled>
  74. <name>LINES_REJECTED</name>
  75. <subject/>
  76. </field>
  77. <field>
  78. <id>ERRORS</id>
  79. <enabled>Y</enabled>
  80. <name>ERRORS</name>
  81. </field>
  82. <field>
  83. <id>STARTDATE</id>
  84. <enabled>Y</enabled>
  85. <name>STARTDATE</name>
  86. </field>
  87. <field>
  88. <id>ENDDATE</id>
  89. <enabled>Y</enabled>
  90. <name>ENDDATE</name>
  91. </field>
  92. <field>
  93. <id>LOGDATE</id>
  94. <enabled>Y</enabled>
  95. <name>LOGDATE</name>
  96. </field>
  97. <field>
  98. <id>DEPDATE</id>
  99. <enabled>Y</enabled>
  100. <name>DEPDATE</name>
  101. </field>
  102. <field>
  103. <id>REPLAYDATE</id>
  104. <enabled>Y</enabled>
  105. <name>REPLAYDATE</name>
  106. </field>
  107. <field>
  108. <id>LOG_FIELD</id>
  109. <enabled>Y</enabled>
  110. <name>LOG_FIELD</name>
  111. </field>
  112. <field>
  113. <id>EXECUTING_SERVER</id>
  114. <enabled>N</enabled>
  115. <name>EXECUTING_SERVER</name>
  116. </field>
  117. <field>
  118. <id>EXECUTING_USER</id>
  119. <enabled>N</enabled>
  120. <name>EXECUTING_USER</name>
  121. </field>
  122. <field>
  123. <id>CLIENT</id>
  124. <enabled>N</enabled>
  125. <name>CLIENT</name>
  126. </field>
  127. </trans-log-table>
  128. <perf-log-table>
  129. <connection/>
  130. <schema/>
  131. <table/>
  132. <interval/>
  133. <timeout_days/>
  134. <field>
  135. <id>ID_BATCH</id>
  136. <enabled>Y</enabled>
  137. <name>ID_BATCH</name>
  138. </field>
  139. <field>
  140. <id>SEQ_NR</id>
  141. <enabled>Y</enabled>
  142. <name>SEQ_NR</name>
  143. </field>
  144. <field>
  145. <id>LOGDATE</id>
  146. <enabled>Y</enabled>
  147. <name>LOGDATE</name>
  148. </field>
  149. <field>
  150. <id>TRANSNAME</id>
  151. <enabled>Y</enabled>
  152. <name>TRANSNAME</name>
  153. </field>
  154. <field>
  155. <id>STEPNAME</id>
  156. <enabled>Y</enabled>
  157. <name>STEPNAME</name>
  158. </field>
  159. <field>
  160. <id>STEP_COPY</id>
  161. <enabled>Y</enabled>
  162. <name>STEP_COPY</name>
  163. </field>
  164. <field>
  165. <id>LINES_READ</id>
  166. <enabled>Y</enabled>
  167. <name>LINES_READ</name>
  168. </field>
  169. <field>
  170. <id>LINES_WRITTEN</id>
  171. <enabled>Y</enabled>
  172. <name>LINES_WRITTEN</name>
  173. </field>
  174. <field>
  175. <id>LINES_UPDATED</id>
  176. <enabled>Y</enabled>
  177. <name>LINES_UPDATED</name>
  178. </field>
  179. <field>
  180. <id>LINES_INPUT</id>
  181. <enabled>Y</enabled>
  182. <name>LINES_INPUT</name>
  183. </field>
  184. <field>
  185. <id>LINES_OUTPUT</id>
  186. <enabled>Y</enabled>
  187. <name>LINES_OUTPUT</name>
  188. </field>
  189. <field>
  190. <id>LINES_REJECTED</id>
  191. <enabled>Y</enabled>
  192. <name>LINES_REJECTED</name>
  193. </field>
  194. <field>
  195. <id>ERRORS</id>
  196. <enabled>Y</enabled>
  197. <name>ERRORS</name>
  198. </field>
  199. <field>
  200. <id>INPUT_BUFFER_ROWS</id>
  201. <enabled>Y</enabled>
  202. <name>INPUT_BUFFER_ROWS</name>
  203. </field>
  204. <field>
  205. <id>OUTPUT_BUFFER_ROWS</id>
  206. <enabled>Y</enabled>
  207. <name>OUTPUT_BUFFER_ROWS</name>
  208. </field>
  209. </perf-log-table>
  210. <channel-log-table>
  211. <connection/>
  212. <schema/>
  213. <table/>
  214. <timeout_days/>
  215. <field>
  216. <id>ID_BATCH</id>
  217. <enabled>Y</enabled>
  218. <name>ID_BATCH</name>
  219. </field>
  220. <field>
  221. <id>CHANNEL_ID</id>
  222. <enabled>Y</enabled>
  223. <name>CHANNEL_ID</name>
  224. </field>
  225. <field>
  226. <id>LOG_DATE</id>
  227. <enabled>Y</enabled>
  228. <name>LOG_DATE</name>
  229. </field>
  230. <field>
  231. <id>LOGGING_OBJECT_TYPE</id>
  232. <enabled>Y</enabled>
  233. <name>LOGGING_OBJECT_TYPE</name>
  234. </field>
  235. <field>
  236. <id>OBJECT_NAME</id>
  237. <enabled>Y</enabled>
  238. <name>OBJECT_NAME</name>
  239. </field>
  240. <field>
  241. <id>OBJECT_COPY</id>
  242. <enabled>Y</enabled>
  243. <name>OBJECT_COPY</name>
  244. </field>
  245. <field>
  246. <id>REPOSITORY_DIRECTORY</id>
  247. <enabled>Y</enabled>
  248. <name>REPOSITORY_DIRECTORY</name>
  249. </field>
  250. <field>
  251. <id>FILENAME</id>
  252. <enabled>Y</enabled>
  253. <name>FILENAME</name>
  254. </field>
  255. <field>
  256. <id>OBJECT_ID</id>
  257. <enabled>Y</enabled>
  258. <name>OBJECT_ID</name>
  259. </field>
  260. <field>
  261. <id>OBJECT_REVISION</id>
  262. <enabled>Y</enabled>
  263. <name>OBJECT_REVISION</name>
  264. </field>
  265. <field>
  266. <id>PARENT_CHANNEL_ID</id>
  267. <enabled>Y</enabled>
  268. <name>PARENT_CHANNEL_ID</name>
  269. </field>
  270. <field>
  271. <id>ROOT_CHANNEL_ID</id>
  272. <enabled>Y</enabled>
  273. <name>ROOT_CHANNEL_ID</name>
  274. </field>
  275. </channel-log-table>
  276. <step-log-table>
  277. <connection/>
  278. <schema/>
  279. <table/>
  280. <timeout_days/>
  281. <field>
  282. <id>ID_BATCH</id>
  283. <enabled>Y</enabled>
  284. <name>ID_BATCH</name>
  285. </field>
  286. <field>
  287. <id>CHANNEL_ID</id>
  288. <enabled>Y</enabled>
  289. <name>CHANNEL_ID</name>
  290. </field>
  291. <field>
  292. <id>LOG_DATE</id>
  293. <enabled>Y</enabled>
  294. <name>LOG_DATE</name>
  295. </field>
  296. <field>
  297. <id>TRANSNAME</id>
  298. <enabled>Y</enabled>
  299. <name>TRANSNAME</name>
  300. </field>
  301. <field>
  302. <id>STEPNAME</id>
  303. <enabled>Y</enabled>
  304. <name>STEPNAME</name>
  305. </field>
  306. <field>
  307. <id>STEP_COPY</id>
  308. <enabled>Y</enabled>
  309. <name>STEP_COPY</name>
  310. </field>
  311. <field>
  312. <id>LINES_READ</id>
  313. <enabled>Y</enabled>
  314. <name>LINES_READ</name>
  315. </field>
  316. <field>
  317. <id>LINES_WRITTEN</id>
  318. <enabled>Y</enabled>
  319. <name>LINES_WRITTEN</name>
  320. </field>
  321. <field>
  322. <id>LINES_UPDATED</id>
  323. <enabled>Y</enabled>
  324. <name>LINES_UPDATED</name>
  325. </field>
  326. <field>
  327. <id>LINES_INPUT</id>
  328. <enabled>Y</enabled>
  329. <name>LINES_INPUT</name>
  330. </field>
  331. <field>
  332. <id>LINES_OUTPUT</id>
  333. <enabled>Y</enabled>
  334. <name>LINES_OUTPUT</name>
  335. </field>
  336. <field>
  337. <id>LINES_REJECTED</id>
  338. <enabled>Y</enabled>
  339. <name>LINES_REJECTED</name>
  340. </field>
  341. <field>
  342. <id>ERRORS</id>
  343. <enabled>Y</enabled>
  344. <name>ERRORS</name>
  345. </field>
  346. <field>
  347. <id>LOG_FIELD</id>
  348. <enabled>N</enabled>
  349. <name>LOG_FIELD</name>
  350. </field>
  351. </step-log-table>
  352. <metrics-log-table>
  353. <connection/>
  354. <schema/>
  355. <table/>
  356. <timeout_days/>
  357. <field>
  358. <id>ID_BATCH</id>
  359. <enabled>Y</enabled>
  360. <name>ID_BATCH</name>
  361. </field>
  362. <field>
  363. <id>CHANNEL_ID</id>
  364. <enabled>Y</enabled>
  365. <name>CHANNEL_ID</name>
  366. </field>
  367. <field>
  368. <id>LOG_DATE</id>
  369. <enabled>Y</enabled>
  370. <name>LOG_DATE</name>
  371. </field>
  372. <field>
  373. <id>METRICS_DATE</id>
  374. <enabled>Y</enabled>
  375. <name>METRICS_DATE</name>
  376. </field>
  377. <field>
  378. <id>METRICS_CODE</id>
  379. <enabled>Y</enabled>
  380. <name>METRICS_CODE</name>
  381. </field>
  382. <field>
  383. <id>METRICS_DESCRIPTION</id>
  384. <enabled>Y</enabled>
  385. <name>METRICS_DESCRIPTION</name>
  386. </field>
  387. <field>
  388. <id>METRICS_SUBJECT</id>
  389. <enabled>Y</enabled>
  390. <name>METRICS_SUBJECT</name>
  391. </field>
  392. <field>
  393. <id>METRICS_TYPE</id>
  394. <enabled>Y</enabled>
  395. <name>METRICS_TYPE</name>
  396. </field>
  397. <field>
  398. <id>METRICS_VALUE</id>
  399. <enabled>Y</enabled>
  400. <name>METRICS_VALUE</name>
  401. </field>
  402. </metrics-log-table>
  403. </log>
  404. <maxdate>
  405. <connection/>
  406. <table/>
  407. <field/>
  408. <offset>0.0</offset>
  409. <maxdiff>0.0</maxdiff>
  410. </maxdate>
  411. <size_rowset>2</size_rowset>
  412. <sleep_time_empty>50</sleep_time_empty>
  413. <sleep_time_full>50</sleep_time_full>
  414. <unique_connections>N</unique_connections>
  415. <feedback_shown>Y</feedback_shown>
  416. <feedback_size>50000</feedback_size>
  417. <using_thread_priorities>Y</using_thread_priorities>
  418. <shared_objects_file/>
  419. <capture_step_performance>N</capture_step_performance>
  420. <step_performance_capturing_delay>1000</step_performance_capturing_delay>
  421. <step_performance_capturing_size_limit>100</step_performance_capturing_size_limit>
  422. <dependencies>
  423. </dependencies>
  424. <partitionschemas>
  425. </partitionschemas>
  426. <slaveservers>
  427. </slaveservers>
  428. <clusterschemas>
  429. </clusterschemas>
  430. <created_user>-</created_user>
  431. <created_date>2019/05/21 19:06:33.293</created_date>
  432. <modified_user>-</modified_user>
  433. <modified_date>2019/05/21 19:06:33.293</modified_date>
  434. <key_for_session_key>H4sIAAAAAAAAAAMAAAAAAAAAAAA=</key_for_session_key>
  435. <is_key_private>N</is_key_private>
  436. </info>
  437. <notepads>
  438. </notepads>
  439. <connection>
  440. <name>MallSqlserver</name>
  441. <server>192.168.50.32</server>
  442. <type>MSSQLNATIVE</type>
  443. <access>Native</access>
  444. <database>Mall</database>
  445. <port>1433</port>
  446. <username>sa</username>
  447. <password>Encrypted 2be98afc819c69e8ea300ff228dd38f99</password>
  448. <servername/>
  449. <data_tablespace/>
  450. <index_tablespace/>
  451. <attributes>
  452. <attribute>
  453. <code>EXTRA_OPTION_MSSQLNATIVE.defaultRowPrefetch</code>
  454. <attribute>200</attribute>
  455. </attribute>
  456. <attribute>
  457. <code>EXTRA_OPTION_MSSQLNATIVE.readTimeout</code>
  458. <attribute>60</attribute>
  459. </attribute>
  460. <attribute>
  461. <code>FORCE_IDENTIFIERS_TO_LOWERCASE</code>
  462. <attribute>N</attribute>
  463. </attribute>
  464. <attribute>
  465. <code>FORCE_IDENTIFIERS_TO_UPPERCASE</code>
  466. <attribute>N</attribute>
  467. </attribute>
  468. <attribute>
  469. <code>INITIAL_POOL_SIZE</code>
  470. <attribute>100</attribute>
  471. </attribute>
  472. <attribute>
  473. <code>IS_CLUSTERED</code>
  474. <attribute>N</attribute>
  475. </attribute>
  476. <attribute>
  477. <code>MAXIMUM_POOL_SIZE</code>
  478. <attribute>300</attribute>
  479. </attribute>
  480. <attribute>
  481. <code>MSSQLUseIntegratedSecurity</code>
  482. <attribute>false</attribute>
  483. </attribute>
  484. <attribute>
  485. <code>MSSQL_DOUBLE_DECIMAL_SEPARATOR</code>
  486. <attribute>N</attribute>
  487. </attribute>
  488. <attribute>
  489. <code>PORT_NUMBER</code>
  490. <attribute>1433</attribute>
  491. </attribute>
  492. <attribute>
  493. <code>PRESERVE_RESERVED_WORD_CASE</code>
  494. <attribute>Y</attribute>
  495. </attribute>
  496. <attribute>
  497. <code>QUOTE_ALL_FIELDS</code>
  498. <attribute>N</attribute>
  499. </attribute>
  500. <attribute>
  501. <code>SUPPORTS_BOOLEAN_DATA_TYPE</code>
  502. <attribute>Y</attribute>
  503. </attribute>
  504. <attribute>
  505. <code>SUPPORTS_TIMESTAMP_DATA_TYPE</code>
  506. <attribute>Y</attribute>
  507. </attribute>
  508. <attribute>
  509. <code>USE_POOLING</code>
  510. <attribute>Y</attribute>
  511. </attribute>
  512. </attributes>
  513. </connection>
  514. <order>
  515. <hop>
  516. <from>表输入 </from>
  517. <to>Java 代码</to>
  518. <enabled>Y</enabled>
  519. </hop>
  520. <hop>
  521. <from>表输入 1</from>
  522. <to>Java 代码 2</to>
  523. <enabled>Y</enabled>
  524. </hop>
  525. <hop>
  526. <from>表输入 3</from>
  527. <to>Java 代码 3</to>
  528. <enabled>Y</enabled>
  529. </hop>
  530. <hop>
  531. <from>Java 代码</from>
  532. <to>Elasticsearch bulk insert 2</to>
  533. <enabled>Y</enabled>
  534. </hop>
  535. <hop>
  536. <from>Java 代码 2</from>
  537. <to>Elasticsearch bulk insert 2</to>
  538. <enabled>Y</enabled>
  539. </hop>
  540. <hop>
  541. <from>Java 代码 3</from>
  542. <to>Elasticsearch bulk insert 2</to>
  543. <enabled>Y</enabled>
  544. </hop>
  545. </order>
  546. <step>
  547. <name>Elasticsearch bulk insert 2</name>
  548. <type>ElasticSearchBulk</type>
  549. <description/>
  550. <distribute>Y</distribute>
  551. <custom_distribution/>
  552. <copies>1</copies>
  553. <partitioning>
  554. <method>none</method>
  555. <schema_name/>
  556. </partitioning>
  557. <general>
  558. <index>crm_memberext</index>
  559. <type>_doc?routing=%{PhoneId}</type>
  560. <batchSize>100</batchSize>
  561. <timeout>100</timeout>
  562. <timeoutUnit>SECONDS</timeoutUnit>
  563. <isJson>N</isJson>
  564. <idField>PhoneId</idField>
  565. <overwriteIfExists>Y</overwriteIfExists>
  566. <useOutput>N</useOutput>
  567. <stopOnError>Y</stopOnError>
  568. </general>
  569. <fields>
  570. <field>
  571. <columnName>Age</columnName>
  572. <targetName>Age</targetName>
  573. </field>
  574. <field>
  575. <columnName>BirthDay</columnName>
  576. <targetName>BirthDay</targetName>
  577. </field>
  578. <field>
  579. <columnName>DeliveryAddress</columnName>
  580. <targetName>DeliveryAddress</targetName>
  581. </field>
  582. <field>
  583. <columnName>DeliveryZipCode</columnName>
  584. <targetName>DeliveryZipCode</targetName>
  585. </field>
  586. <field>
  587. <columnName>Gender</columnName>
  588. <targetName>Gender</targetName>
  589. </field>
  590. <field>
  591. <columnName>IdCardNo</columnName>
  592. <targetName>IdCardNo</targetName>
  593. </field>
  594. <field>
  595. <columnName>MemName</columnName>
  596. <targetName>MemName</targetName>
  597. </field>
  598. <field>
  599. <columnName>MemberId</columnName>
  600. <targetName>MemberId</targetName>
  601. </field>
  602. <field>
  603. <columnName>MemberPhone</columnName>
  604. <targetName>MemberPhone</targetName>
  605. </field>
  606. <field>
  607. <columnName>Nation</columnName>
  608. <targetName>Nation</targetName>
  609. </field>
  610. <field>
  611. <columnName>Phone1</columnName>
  612. <targetName>Phone1</targetName>
  613. </field>
  614. <field>
  615. <columnName>Phone2</columnName>
  616. <targetName>Phone2</targetName>
  617. </field>
  618. <field>
  619. <columnName>Phone3</columnName>
  620. <targetName>Phone3</targetName>
  621. </field>
  622. <field>
  623. <columnName>SignOrgName</columnName>
  624. <targetName>SignOrgName</targetName>
  625. </field>
  626. <field>
  627. <columnName>ValidEndDate</columnName>
  628. <targetName>ValidEndDate</targetName>
  629. </field>
  630. <field>
  631. <columnName>ValidStartDate</columnName>
  632. <targetName>ValidStartDate</targetName>
  633. </field>
  634. </fields>
  635. <servers>
  636. <server>
  637. <address>192.168.50.32</address>
  638. <port>9300</port>
  639. </server>
  640. </servers>
  641. <settings>
  642. <setting>
  643. <name>cluster.name</name>
  644. <value>es</value>
  645. </setting>
  646. <setting>
  647. <name>custom.aliase.source</name>
  648. <value>mem_memberexts</value>
  649. </setting>
  650. <setting>
  651. <name>custom.fields.Birthday</name>
  652. <value>{"type":"date","format":"yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis"}</value>
  653. </setting>
  654. <setting>
  655. <name>custom.fields.DeliveryAddress</name>
  656. <value>{"type":"text","analyzer": "ik_smart",
  657. "search_analyzer":"ik_smart","index":true}</value>
  658. </setting>
  659. <setting>
  660. <name>custom.fields.HasOrder</name>
  661. <value>{"type":"boolean"}</value>
  662. </setting>
  663. <setting>
  664. <name>custom.index.number_of_replicas</name>
  665. <value>1</value>
  666. </setting>
  667. <setting>
  668. <name>custom.index.number_of_shards</name>
  669. <value>11</value>
  670. </setting>
  671. </settings>
  672. <attributes/>
  673. <cluster_schema/>
  674. <remotesteps>
  675. <input>
  676. </input>
  677. <output>
  678. </output>
  679. </remotesteps>
  680. <GUI>
  681. <xloc>576</xloc>
  682. <yloc>128</yloc>
  683. <draw>Y</draw>
  684. </GUI>
  685. </step>
  686. <step>
  687. <name>表输入 </name>
  688. <type>TableInput</type>
  689. <description/>
  690. <distribute>Y</distribute>
  691. <custom_distribution/>
  692. <copies>1</copies>
  693. <partitioning>
  694. <method>none</method>
  695. <schema_name/>
  696. </partitioning>
  697. <connection>MallSqlserver</connection>
  698. <sql>
  699. SELECT
  700. dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) as PhoneId
  701. ,Consignee as MemName
  702. , convert(varchar(19),[Age],120) as [Age]
  703. , Sex as Gender
  704. , null as IdCardNo
  705. , null as Nation
  706. , null as BirthDay
  707. , null as SignOrgName
  708. , null as ValidStartDate
  709. , null as ValidEndDate
  710. , BuyUserId as MemberId
  711. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) as MemberPhone
  712. , DeliveryZipCode
  713. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) as Phone1
  714. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) as Phone2
  715. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) as Phone3
  716. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',ADeliveryAddress) as DeliveryAddress
  717. FROM Orders a
  718. where 1=1 and (AconsigneePhone1!='')
  719. order by AConsigneePhone1 asc
  720. offset ((1-1)*5000) rows --fetch next (10) rows only</sql>
  721. <limit>0</limit>
  722. <lookup/>
  723. <execute_each_row>N</execute_each_row>
  724. <variables_active>N</variables_active>
  725. <lazy_conversion_active>N</lazy_conversion_active>
  726. <attributes/>
  727. <cluster_schema/>
  728. <remotesteps>
  729. <input>
  730. </input>
  731. <output>
  732. </output>
  733. </remotesteps>
  734. <GUI>
  735. <xloc>240</xloc>
  736. <yloc>48</yloc>
  737. <draw>Y</draw>
  738. </GUI>
  739. </step>
  740. <step>
  741. <name>表输入 1</name>
  742. <type>TableInput</type>
  743. <description/>
  744. <distribute>Y</distribute>
  745. <custom_distribution/>
  746. <copies>1</copies>
  747. <partitioning>
  748. <method>none</method>
  749. <schema_name/>
  750. </partitioning>
  751. <connection>MallSqlserver</connection>
  752. <sql>
  753. SELECT
  754. dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) as PhoneId
  755. ,Consignee as MemName
  756. , convert(varchar(19),[Age],120) as [Age]
  757. , Sex as Gender
  758. , null as IdCardNo
  759. , null as Nation
  760. , null as BirthDay
  761. , null as SignOrgName
  762. , null as ValidStartDate
  763. , null as ValidEndDate
  764. , BuyUserId as MemberId
  765. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) as MemberPhone
  766. , DeliveryZipCode
  767. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) as Phone1
  768. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) as Phone2
  769. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) as Phone3
  770. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',ADeliveryAddress) as DeliveryAddress
  771. FROM Orders a
  772. where 1=1 and (AconsigneePhone2!='')
  773. order by AConsigneePhone2 asc
  774. offset ((1-1)*5000) rows --fetch next (10) rows only</sql>
  775. <limit>0</limit>
  776. <lookup/>
  777. <execute_each_row>N</execute_each_row>
  778. <variables_active>N</variables_active>
  779. <lazy_conversion_active>N</lazy_conversion_active>
  780. <attributes/>
  781. <cluster_schema/>
  782. <remotesteps>
  783. <input>
  784. </input>
  785. <output>
  786. </output>
  787. </remotesteps>
  788. <GUI>
  789. <xloc>176</xloc>
  790. <yloc>112</yloc>
  791. <draw>Y</draw>
  792. </GUI>
  793. </step>
  794. <step>
  795. <name>表输入 3</name>
  796. <type>TableInput</type>
  797. <description/>
  798. <distribute>Y</distribute>
  799. <custom_distribution/>
  800. <copies>1</copies>
  801. <partitioning>
  802. <method>none</method>
  803. <schema_name/>
  804. </partitioning>
  805. <connection>MallSqlserver</connection>
  806. <sql>
  807. SELECT
  808. dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) as PhoneId
  809. ,Consignee as MemName
  810. , convert(varchar(19),[Age],120) as [Age]
  811. , Sex as Gender
  812. , null as IdCardNo
  813. , null as Nation
  814. , null as BirthDay
  815. , null as SignOrgName
  816. , null as ValidStartDate
  817. , null as ValidEndDate
  818. , BuyUserId as MemberId
  819. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) as MemberPhone
  820. , DeliveryZipCode
  821. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone1) as Phone1
  822. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone2) as Phone2
  823. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',AConsigneePhone3) as Phone3
  824. , dbo.DesDecryptFixKey('123456','123456e10adc3949ba59abbe56e057f20f883e',ADeliveryAddress) as DeliveryAddress
  825. FROM Orders a
  826. where 1=1 and (AconsigneePhone3!='')
  827. order by AConsigneePhone3 asc
  828. offset ((1-1)*5000) rows --fetch next (10) rows only</sql>
  829. <limit>0</limit>
  830. <lookup/>
  831. <execute_each_row>N</execute_each_row>
  832. <variables_active>N</variables_active>
  833. <lazy_conversion_active>N</lazy_conversion_active>
  834. <attributes/>
  835. <cluster_schema/>
  836. <remotesteps>
  837. <input>
  838. </input>
  839. <output>
  840. </output>
  841. </remotesteps>
  842. <GUI>
  843. <xloc>208</xloc>
  844. <yloc>208</yloc>
  845. <draw>Y</draw>
  846. </GUI>
  847. </step>
  848. <step>
  849. <name>Java 代码</name>
  850. <type>UserDefinedJavaClass</type>
  851. <description/>
  852. <distribute>Y</distribute>
  853. <custom_distribution/>
  854. <copies>10</copies>
  855. <partitioning>
  856. <method>none</method>
  857. <schema_name/>
  858. </partitioning>
  859. <definitions>
  860. <definition>
  861. <class_type>TRANSFORM_CLASS</class_type>
  862. <class_name>Processor</class_name>
  863. <class_source>import java.sql.*;
  864. import org.pentaho.di.core.database.*;
  865. import org.apache.http.HttpHost;
  866. import org.elasticsearch.ElasticsearchException;
  867. import org.elasticsearch.action.get.GetRequest;
  868. import org.elasticsearch.action.get.GetResponse;
  869. import org.elasticsearch.client.RestHighLevelClient;
  870. import org.elasticsearch.client.RequestOptions;
  871. import org.elasticsearch.client.RestClient;
  872. import org.elasticsearch.common.Strings;
  873. import org.elasticsearch.rest.RestStatus;
  874. import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
  875. import org.elasticsearch.action.update.UpdateRequest;
  876. import org.elasticsearch.action.update.UpdateResponse;
  877. import org.elasticsearch.common.xcontent.XContentBuilder;
  878. import org.elasticsearch.common.xcontent.XContentFactory;
  879. import org.elasticsearch.script.Script;
  880. import java.lang.reflect.InvocationTargetException;
  881. import java.lang.reflect.Method;
  882. import com.microsoft.sqlserver.jdbc.SQLServerException;
  883. Database database = null;
  884. PreparedStatement stat = null;
  885. PreparedStatement stat1 = null;
  886. RestHighLevelClient client = new RestHighLevelClient(
  887. RestClient.builder(
  888. new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
  889. Integer index = 0;
  890. public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
  891. {
  892. //logBasic("start---");
  893. Object[] r = getRow();
  894. if (r == null) {
  895. try {
  896. if (stat!=null) {
  897. stat.close();
  898. }
  899. if (stat1!=null) {
  900. stat1.close();
  901. }
  902. if (database!=null) {
  903. database.disconnect();
  904. }
  905. if(client!=null){
  906. client.close();
  907. }
  908. }
  909. catch(Exception e) {
  910. throw new KettleException(e);
  911. }
  912. setOutputDone();
  913. return false;
  914. }
  915. synchronized(this) {
  916. r = createOutputRow(r, data.outputRowMeta.size());
  917. //获取数据库名和表名
  918. String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null );
  919. String tablename = "DataImport_memext";//getInputRowMeta().getString(r, "tablename", null );
  920. String idname = "MemberPhone";//getInputRowMeta().getString(r, "idname", null );
  921. String sourceidname = "PhoneId";//getInputRowMeta().getString(r, "sourceidname", null );
  922. String sourcetablename = "Orders";//getInputRowMeta().getString(r, "sourcetablename", null );
  923. if (dbName==null||tablename==null) {
  924. throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
  925. }
  926. //logBasic("table---"+tablename);
  927. if(database == null){
  928. //数据库连接
  929. DatabaseMeta databaseMeta=null;
  930. try {
  931. databaseMeta = getTransMeta().findDatabase(dbName);
  932. if (databaseMeta==null) {
  933. logError("A connection with name "+dbName+" could not be found!");
  934. setErrors(1);
  935. return false;
  936. }
  937. database = new Database(getTrans(), databaseMeta);
  938. database.connect();
  939. //logBasic("success!");
  940. } catch(Exception e) {
  941. logError("Connecting to database "+dbName+" failed.", e);
  942. setErrors(1);
  943. return false;
  944. }
  945. }
  946. //查询表数据
  947. try {
  948. RowMetaInterface idxRowMeta =data.outputRowMeta;
  949. int i=0;
  950. r = createOutputRow(r, data.outputRowMeta.size());
  951. //int index = getInputRowMeta().size();
  952. // Add the index name
  953. //
  954. String Id = idxRowMeta.getString(r, idname, null);
  955. // Add the column name
  956. String DataId = idxRowMeta.getString(r, sourceidname, null);
  957. /*String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
  958. ResultSet resultSet = null;
  959. resultSet = database.openQuery(sqlSelect);
  960. Object[] idxRow = database.getRow(resultSet);
  961. if (database!=null) {
  962. database.closeQuery(resultSet);
  963. resultSet = null;
  964. }
  965. //if(idxRow != null){
  966. // return true;
  967. //}
  968. */
  969. //logBasic("idxRow--Id"+Id);
  970. //logBasic("idxRow--sourcetablename"+sourcetablename);
  971. //logBasic("idxRow--DataId"+DataId);
  972. GetRequest getRequest = new GetRequest(
  973. "crm_memberberterminal", // Index
  974. "_doc", // /Type
  975. DataId); // Document id
  976. getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
  977. getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
  978. boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
  979. //client.close();
  980. if(exists ){
  981. return true;
  982. }
  983. //if(!exists &amp;&amp; idxRow == null){
  984. // return true;
  985. //}
  986. //3.获得预处理对象
  987. String sql="insert into "+tablename+" values (?,?,?,?);";//begin tran t2; commit tran t2
  988. //logBasic("idxRow--database"+ database);
  989. if(stat == null)
  990. stat = database.prepareSQL(sql);
  991. //logBasic("idxRow--database"+ stat);
  992. //stat.addBatch(sql);
  993. //4.SQL语句占位符设置实际参数
  994. stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
  995. stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
  996. stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
  997. stat.setString(4, "phone1");
  998. //stat.setInt(5, index);
  999. //5.执行SQL语句
  1000. boolean line = stat.execute();
  1001. //int[] line = stat.executeBatch();
  1002. //System.out.println("更新记录数"+ line);
  1003. //6.释放资源
  1004. //stat.close();
  1005. //Integer pn = Integer.parseInt(Id);
  1006. //Integer curpageNum = Integer.parseInt(Id) % pagesize;
  1007. //if(pn > 0 &amp;&amp; curpageNum == 0){
  1008. // setVariable("page",String.valueOf(page));
  1009. //}
  1010. //logBasic("idxRow--getVariable"+getVariable("page"));
  1011. //logBasic("idxRow--curpageNum"+curpageNum);
  1012. //logBasic("idxRow--length"+i);
  1013. }
  1014. catch(SQLServerException e) {
  1015. return true;
  1016. }catch(Exception e) {
  1017. throw new KettleException(e);
  1018. }
  1019. //释放连接
  1020. //if (database!=null) {
  1021. // database.disconnect();
  1022. //}
  1023. // Send the row on to the next step.
  1024. }
  1025. putRow(data.outputRowMeta, r);
  1026. return true;
  1027. }</class_source>
  1028. </definition>
  1029. </definitions>
  1030. <fields>
  1031. </fields>
  1032. <clear_result_fields>N</clear_result_fields>
  1033. <info_steps/>
  1034. <target_steps/>
  1035. <usage_parameters/>
  1036. <attributes/>
  1037. <cluster_schema/>
  1038. <remotesteps>
  1039. <input>
  1040. </input>
  1041. <output>
  1042. </output>
  1043. </remotesteps>
  1044. <GUI>
  1045. <xloc>368</xloc>
  1046. <yloc>48</yloc>
  1047. <draw>Y</draw>
  1048. </GUI>
  1049. </step>
  1050. <step>
  1051. <name>Java 代码 2</name>
  1052. <type>UserDefinedJavaClass</type>
  1053. <description/>
  1054. <distribute>Y</distribute>
  1055. <custom_distribution/>
  1056. <copies>10</copies>
  1057. <partitioning>
  1058. <method>none</method>
  1059. <schema_name/>
  1060. </partitioning>
  1061. <definitions>
  1062. <definition>
  1063. <class_type>TRANSFORM_CLASS</class_type>
  1064. <class_name>Processor</class_name>
  1065. <class_source>import java.sql.*;
  1066. import org.pentaho.di.core.database.*;
  1067. import org.apache.http.HttpHost;
  1068. import org.elasticsearch.ElasticsearchException;
  1069. import org.elasticsearch.action.get.GetRequest;
  1070. import org.elasticsearch.action.get.GetResponse;
  1071. import org.elasticsearch.client.RestHighLevelClient;
  1072. import org.elasticsearch.client.RequestOptions;
  1073. import org.elasticsearch.client.RestClient;
  1074. import org.elasticsearch.common.Strings;
  1075. import org.elasticsearch.rest.RestStatus;
  1076. import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
  1077. import org.elasticsearch.action.update.UpdateRequest;
  1078. import org.elasticsearch.action.update.UpdateResponse;
  1079. import org.elasticsearch.common.xcontent.XContentBuilder;
  1080. import org.elasticsearch.common.xcontent.XContentFactory;
  1081. import org.elasticsearch.script.Script;
  1082. import java.lang.reflect.InvocationTargetException;
  1083. import java.lang.reflect.Method;
  1084. import com.microsoft.sqlserver.jdbc.SQLServerException;
  1085. Database database = null;
  1086. PreparedStatement stat = null;
  1087. PreparedStatement stat1 = null;
  1088. RestHighLevelClient client = new RestHighLevelClient(
  1089. RestClient.builder(
  1090. new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
  1091. Integer index = 0;
  1092. public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
  1093. {
  1094. //logBasic("start---");
  1095. Object[] r = getRow();
  1096. if (r == null) {
  1097. try {
  1098. if (stat!=null) {
  1099. stat.close();
  1100. }
  1101. if (stat1!=null) {
  1102. stat1.close();
  1103. }
  1104. if (database!=null) {
  1105. database.disconnect();
  1106. }
  1107. if(client!=null){
  1108. client.close();
  1109. }
  1110. }
  1111. catch(Exception e) {
  1112. throw new KettleException(e);
  1113. }
  1114. setOutputDone();
  1115. return false;
  1116. }
  1117. synchronized(this) {
  1118. r = createOutputRow(r, data.outputRowMeta.size());
  1119. //获取数据库名和表名
  1120. String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null );
  1121. String tablename = "DataImport_memext";//getInputRowMeta().getString(r, "tablename", null );
  1122. String idname = "MemberPhone";//getInputRowMeta().getString(r, "idname", null );
  1123. String sourceidname = "PhoneId";//getInputRowMeta().getString(r, "sourceidname", null );
  1124. String sourcetablename = "Orders";//getInputRowMeta().getString(r, "sourcetablename", null );
  1125. if (dbName==null||tablename==null) {
  1126. throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
  1127. }
  1128. //logBasic("table---"+tablename);
  1129. if(database == null){
  1130. //数据库连接
  1131. DatabaseMeta databaseMeta=null;
  1132. try {
  1133. databaseMeta = getTransMeta().findDatabase(dbName);
  1134. if (databaseMeta==null) {
  1135. logError("A connection with name "+dbName+" could not be found!");
  1136. setErrors(1);
  1137. return false;
  1138. }
  1139. database = new Database(getTrans(), databaseMeta);
  1140. database.connect();
  1141. //logBasic("success!");
  1142. } catch(Exception e) {
  1143. logError("Connecting to database "+dbName+" failed.", e);
  1144. setErrors(1);
  1145. return false;
  1146. }
  1147. }
  1148. //查询表数据
  1149. try {
  1150. RowMetaInterface idxRowMeta =data.outputRowMeta;
  1151. int i=0;
  1152. r = createOutputRow(r, data.outputRowMeta.size());
  1153. //int index = getInputRowMeta().size();
  1154. // Add the index name
  1155. //
  1156. String Id = idxRowMeta.getString(r, idname, null);
  1157. // Add the column name
  1158. String DataId = idxRowMeta.getString(r, sourceidname, null);
  1159. /*String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
  1160. ResultSet resultSet = null;
  1161. resultSet = database.openQuery(sqlSelect);
  1162. Object[] idxRow = database.getRow(resultSet);
  1163. if (database!=null) {
  1164. database.closeQuery(resultSet);
  1165. resultSet = null;
  1166. }
  1167. //if(idxRow != null){
  1168. // return true;
  1169. //}
  1170. */
  1171. //logBasic("idxRow--Id"+Id);
  1172. //logBasic("idxRow--sourcetablename"+sourcetablename);
  1173. //logBasic("idxRow--DataId"+DataId);
  1174. GetRequest getRequest = new GetRequest(
  1175. "crm_memberberterminal", // Index
  1176. "_doc", // /Type
  1177. DataId); // Document id
  1178. getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
  1179. getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
  1180. boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
  1181. //client.close();
  1182. if(exists ){
  1183. return true;
  1184. }
  1185. //if(!exists &amp;&amp; idxRow == null){
  1186. // return true;
  1187. //}
  1188. //3.获得预处理对象
  1189. String sql="insert into "+tablename+" values (?,?,?,?);";//begin tran t2; commit tran t2
  1190. //logBasic("idxRow--database"+ database);
  1191. if(stat == null)
  1192. stat = database.prepareSQL(sql);
  1193. //logBasic("idxRow--database"+ stat);
  1194. //stat.addBatch(sql);
  1195. //4.SQL语句占位符设置实际参数
  1196. stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
  1197. stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
  1198. stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
  1199. stat.setString(4, "phone2");
  1200. //stat.setInt(5, index);
  1201. //5.执行SQL语句
  1202. boolean line = stat.execute();
  1203. //int[] line = stat.executeBatch();
  1204. //System.out.println("更新记录数"+ line);
  1205. //6.释放资源
  1206. //stat.close();
  1207. //Integer pn = Integer.parseInt(Id);
  1208. //Integer curpageNum = Integer.parseInt(Id) % pagesize;
  1209. //if(pn > 0 &amp;&amp; curpageNum == 0){
  1210. // setVariable("page",String.valueOf(page));
  1211. //}
  1212. //logBasic("idxRow--getVariable"+getVariable("page"));
  1213. //logBasic("idxRow--curpageNum"+curpageNum);
  1214. //logBasic("idxRow--length"+i);
  1215. }
  1216. catch(SQLServerException e) {
  1217. return true;
  1218. }catch(Exception e) {
  1219. throw new KettleException(e);
  1220. }
  1221. //释放连接
  1222. //if (database!=null) {
  1223. // database.disconnect();
  1224. //}
  1225. // Send the row on to the next step.
  1226. }
  1227. putRow(data.outputRowMeta, r);
  1228. return true;
  1229. }</class_source>
  1230. </definition>
  1231. </definitions>
  1232. <fields>
  1233. </fields>
  1234. <clear_result_fields>N</clear_result_fields>
  1235. <info_steps/>
  1236. <target_steps/>
  1237. <usage_parameters/>
  1238. <attributes/>
  1239. <cluster_schema/>
  1240. <remotesteps>
  1241. <input>
  1242. </input>
  1243. <output>
  1244. </output>
  1245. </remotesteps>
  1246. <GUI>
  1247. <xloc>352</xloc>
  1248. <yloc>128</yloc>
  1249. <draw>Y</draw>
  1250. </GUI>
  1251. </step>
  1252. <step>
  1253. <name>Java 代码 3</name>
  1254. <type>UserDefinedJavaClass</type>
  1255. <description/>
  1256. <distribute>Y</distribute>
  1257. <custom_distribution/>
  1258. <copies>10</copies>
  1259. <partitioning>
  1260. <method>none</method>
  1261. <schema_name/>
  1262. </partitioning>
  1263. <definitions>
  1264. <definition>
  1265. <class_type>TRANSFORM_CLASS</class_type>
  1266. <class_name>Processor</class_name>
  1267. <class_source>import java.sql.*;
  1268. import org.pentaho.di.core.database.*;
  1269. import org.apache.http.HttpHost;
  1270. import org.elasticsearch.ElasticsearchException;
  1271. import org.elasticsearch.action.get.GetRequest;
  1272. import org.elasticsearch.action.get.GetResponse;
  1273. import org.elasticsearch.client.RestHighLevelClient;
  1274. import org.elasticsearch.client.RequestOptions;
  1275. import org.elasticsearch.client.RestClient;
  1276. import org.elasticsearch.common.Strings;
  1277. import org.elasticsearch.rest.RestStatus;
  1278. import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
  1279. import org.elasticsearch.action.update.UpdateRequest;
  1280. import org.elasticsearch.action.update.UpdateResponse;
  1281. import org.elasticsearch.common.xcontent.XContentBuilder;
  1282. import org.elasticsearch.common.xcontent.XContentFactory;
  1283. import org.elasticsearch.script.Script;
  1284. import java.lang.reflect.InvocationTargetException;
  1285. import java.lang.reflect.Method;
  1286. import com.microsoft.sqlserver.jdbc.SQLServerException;
  1287. Database database = null;
  1288. PreparedStatement stat = null;
  1289. PreparedStatement stat1 = null;
  1290. RestHighLevelClient client = new RestHighLevelClient(
  1291. RestClient.builder(
  1292. new HttpHost[]{new HttpHost("192.168.50.32", 9200, "http")}));
  1293. Integer index = 0;
  1294. public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
  1295. {
  1296. //logBasic("start---");
  1297. Object[] r = getRow();
  1298. if (r == null) {
  1299. try {
  1300. if (stat!=null) {
  1301. stat.close();
  1302. }
  1303. if (stat1!=null) {
  1304. stat1.close();
  1305. }
  1306. if (database!=null) {
  1307. database.disconnect();
  1308. }
  1309. if(client!=null){
  1310. client.close();
  1311. }
  1312. }
  1313. catch(Exception e) {
  1314. throw new KettleException(e);
  1315. }
  1316. setOutputDone();
  1317. return false;
  1318. }
  1319. synchronized(this) {
  1320. r = createOutputRow(r, data.outputRowMeta.size());
  1321. //获取数据库名和表名
  1322. String dbName = "MemberSqlServer";//getInputRowMeta().getString(r, "conname", null );
  1323. String tablename = "DataImport_memext";//getInputRowMeta().getString(r, "tablename", null );
  1324. String idname = "MemberPhone";//getInputRowMeta().getString(r, "idname", null );
  1325. String sourceidname = "PhoneId";//getInputRowMeta().getString(r, "sourceidname", null );
  1326. String sourcetablename = "Orders";//getInputRowMeta().getString(r, "sourcetablename", null );
  1327. if (dbName==null||tablename==null) {
  1328. throw new KettleException("Unable to find field with name "+tablename+" in the input row.");
  1329. }
  1330. //logBasic("table---"+tablename);
  1331. if(database == null){
  1332. //数据库连接
  1333. DatabaseMeta databaseMeta=null;
  1334. try {
  1335. databaseMeta = getTransMeta().findDatabase(dbName);
  1336. if (databaseMeta==null) {
  1337. logError("A connection with name "+dbName+" could not be found!");
  1338. setErrors(1);
  1339. return false;
  1340. }
  1341. database = new Database(getTrans(), databaseMeta);
  1342. database.connect();
  1343. //logBasic("success!");
  1344. } catch(Exception e) {
  1345. logError("Connecting to database "+dbName+" failed.", e);
  1346. setErrors(1);
  1347. return false;
  1348. }
  1349. }
  1350. //查询表数据
  1351. try {
  1352. RowMetaInterface idxRowMeta =data.outputRowMeta;
  1353. int i=0;
  1354. r = createOutputRow(r, data.outputRowMeta.size());
  1355. //int index = getInputRowMeta().size();
  1356. // Add the index name
  1357. //
  1358. String Id = idxRowMeta.getString(r, idname, null);
  1359. // Add the column name
  1360. String DataId = idxRowMeta.getString(r, sourceidname, null);
  1361. /*String sqlSelect = "select Id from "+tablename + " where DataId = '"+ DataId +"'";
  1362. ResultSet resultSet = null;
  1363. resultSet = database.openQuery(sqlSelect);
  1364. Object[] idxRow = database.getRow(resultSet);
  1365. if (database!=null) {
  1366. database.closeQuery(resultSet);
  1367. resultSet = null;
  1368. }
  1369. //if(idxRow != null){
  1370. // return true;
  1371. //}
  1372. */
  1373. //logBasic("idxRow--Id"+Id);
  1374. //logBasic("idxRow--sourcetablename"+sourcetablename);
  1375. //logBasic("idxRow--DataId"+DataId);
  1376. GetRequest getRequest = new GetRequest(
  1377. "crm_memberberterminal", // Index
  1378. "_doc", // /Type
  1379. DataId); // Document id
  1380. getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
  1381. getRequest.storedFields(new String[]{"_none_"}); // 禁止存储任何字段
  1382. boolean exists = client.exists(getRequest,RequestOptions.DEFAULT);
  1383. //client.close();
  1384. if(exists ){
  1385. return true;
  1386. }
  1387. //if(!exists &amp;&amp; idxRow == null){
  1388. // return true;
  1389. //}
  1390. //3.获得预处理对象
  1391. String sql="insert into "+tablename+" values (?,?,?,?);";//begin tran t2; commit tran t2
  1392. //logBasic("idxRow--database"+ database);
  1393. if(stat == null)
  1394. stat = database.prepareSQL(sql);
  1395. //logBasic("idxRow--database"+ stat);
  1396. //stat.addBatch(sql);
  1397. //4.SQL语句占位符设置实际参数
  1398. stat.setString(1, Id);//索引参数1代表着sql中的第一个?号,也就是我需要将条件sid所对应的sname数据更新为“儿童玩具测试”
  1399. stat.setString(2, sourcetablename);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
  1400. stat.setString(3, DataId);//索引参数2代表着sql中的第二个?号,也就是条件是sid为3
  1401. stat.setString(4, "phone3");
  1402. //stat.setInt(5, index);
  1403. //5.执行SQL语句
  1404. boolean line = stat.execute();
  1405. //int[] line = stat.executeBatch();
  1406. //System.out.println("更新记录数"+ line);
  1407. //6.释放资源
  1408. //stat.close();
  1409. //Integer pn = Integer.parseInt(Id);
  1410. //Integer curpageNum = Integer.parseInt(Id) % pagesize;
  1411. //if(pn > 0 &amp;&amp; curpageNum == 0){
  1412. // setVariable("page",String.valueOf(page));
  1413. //}
  1414. //logBasic("idxRow--getVariable"+getVariable("page"));
  1415. //logBasic("idxRow--curpageNum"+curpageNum);
  1416. //logBasic("idxRow--length"+i);
  1417. }
  1418. catch(SQLServerException e) {
  1419. return true;
  1420. }catch(Exception e) {
  1421. throw new KettleException(e);
  1422. }
  1423. //释放连接
  1424. //if (database!=null) {
  1425. // database.disconnect();
  1426. //}
  1427. // Send the row on to the next step.
  1428. }
  1429. putRow(data.outputRowMeta, r);
  1430. return true;
  1431. }</class_source>
  1432. </definition>
  1433. </definitions>
  1434. <fields>
  1435. </fields>
  1436. <clear_result_fields>N</clear_result_fields>
  1437. <info_steps/>
  1438. <target_steps/>
  1439. <usage_parameters/>
  1440. <attributes/>
  1441. <cluster_schema/>
  1442. <remotesteps>
  1443. <input>
  1444. </input>
  1445. <output>
  1446. </output>
  1447. </remotesteps>
  1448. <GUI>
  1449. <xloc>352</xloc>
  1450. <yloc>208</yloc>
  1451. <draw>Y</draw>
  1452. </GUI>
  1453. </step>
  1454. <step_error_handling>
  1455. </step_error_handling>
  1456. <slave-step-copy-partition-distribution>
  1457. </slave-step-copy-partition-distribution>
  1458. <slave_transformation>N</slave_transformation>
  1459. <attributes/>
  1460. </transformation>