1
2
3
4
5
6
7
8
9
10
11
12
13
| val diff = diff1.union(diff2)
val raw = spark.read.textFile(path1)
// 限定有3个字段, 存成df
val raw_df = raw.filter(x => x.split("\t").length == 3).map(x => (x.split("\t")(0), x.split("\t")(1).toLong, x.split("\t")(2))).toDF("f1", "f2", "f3")
// join获取rawdata
val joined_diff = diff.join(raw_df, diff("f1") === raw_df("f1") && diff("f2") === raw_df("f2"), "inner").toDF("f1", "f2", "f3", "f4", "f5", "f6")
// 存数据
val result_df = joined_diff.select($"f1", $"f2", $"f3", $"f4")
result_df.repartition(1).write.mode("overwrite").option("delimiter", "\t").csv(path2)
|