123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- import org.apache.spark.sql.SparkSession
- object Danwei {
- def main(args: Array[String]): Unit = {
- System.setProperty("hadoop.home.dir","E:\\Scala\\hadoop-2.6.5")
- var ss = SparkSession.builder().appName("Test").master("local[*]").getOrCreate()
- var sc = ss.sparkContext
- var rdd1 = sc.textFile("./src/main/scala/data/danwei2.txt",1)
- var rdd2 = rdd1.map(f =>{
- val arr = f.split(",")
- arr
- })
- var acc = sc.longAccumulator("name")
- acc.isZero
- var rdd3 = rdd2.map(f => {
- if(f.length != 2){
- println("错误" + f.toBuffer)
- acc.add(1)
- println(acc.value)
- f
- }else if(f(1).contains("贴")){
- val source = "贴" + "/" + f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("枚")){
- val source = "枚" + "/" + f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("袋")){
- val source = "袋" + "/" + f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("粒")){
- val source = "粒" + "/" + f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("片")){
- val source = "片" + "/" + f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("丸")){
- val source = "丸" + "/" + f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("套装")){
- val source = "套装"
- acc.add(1)
- source
- }else if(f(1).contains("支")){
- val source = "支" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("ml")){
- val source = "ml" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("mg")){
- val source = "mg" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("g")){
- val source = "g" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("条")){
- val source = "条" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("张")){
- val source = "张" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("L")){
- val source = "L" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("瓶")){
- val source = "瓶" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("只")){
- val source = "只" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("包")){
- val source = "包" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("个")){
- val source = "个" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("给")){
- val source = "给" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("扎")){
- val source = "扎" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("吸")){
- val source = "吸" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("mm")){
- val source = "mm" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("人份")){
- val source = "人份" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("卷")){
- val source = "卷" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("把")){
- val source = "把" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("球")){
- val source = "球" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("根")){
- val source = "根" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("双")){
- val source = "双" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("件")){
- val source = "件" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("抽")){
- val source = "抽" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("cm")){
- val source = "cm" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("m")){
- val source = "m" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("单盘")){
- val source = "单盘" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("块")){
- val source = "块" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("双")){
- val source = "双" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("克")){
- val source = "克" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("幅")){
- val source = "幅" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("公分")){
- val source = "公分" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("头")){
- val source = "头" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("对")){
- val source = "对" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("G")){
- val source = "G" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("s")){
- val source = "s" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("副")){
- val source = "副" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("台")){
- val source = "台" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("帖")){
- val source = "帖" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("米")){
- val source = "米" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("套")){
- val source = "套" + "/" +f(0).toString
- acc.add(1)
- source
- }else if(f(1).contains("毫升")){
- val source = "毫升" + "/" +f(0).toString
- acc.add(1)
- source
- }else {
- println(f(1))
- acc.add(1)
- f(0)
- }
- })
- rdd3.collect()
- println(acc.value)
- rdd3.saveAsTextFile("./src/main/scala/data/outPut1")
- }
- }
|