forked from veeraravi/Spark-notes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjson-process.txt
61 lines (49 loc) · 1.7 KB
/
json-process.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
val rec1: String = """{
"visitorId": "v1",
"products": [{
"id": "i1",
"interest": 0.68
}, {
"id": "i2",
"interest": 0.42
}]
}"""
val rec2: String = """{
"visitorId": "v2",
"products": [{
"id": "i1",
"interest": 0.78
}, {
"id": "i3",
"interest": 0.11
}]
}"""
val visitsData: Seq[String] = Seq(rec1, rec2)
val productIdToNameMap = Map("i1" -> "Nike Shoes", "i2" -> "Umbrella", "i3" -> "Jeans")
/*
val mprdd = sc.parallelize(productIdToNameMap.toSeq)
val lookupDF = mprdd.toDF("id","product")
*/
val rdd = sc.parallelize(visitsData)
val df = spark.read.json(rdd)
val prods = df.select(df("visitorId"),explode(df("products")).alias("products"))
val allCols = prods.select(prods("visitorId"),prods("products.id").alias("id"),prods("products.interest").alias("interest"))
val broadcast = sc.broadcast(productIdToNameMap)
val productsAsRdd=allCols.rdd.map(row => {
val visitorId = row.getString(0)
val id = row.getString(1)
val interest = row.getDouble(2)
val rowAsString = visitorId+","+id+","+interest
rowAsString
})
val productsToJoin = productsAsRdd.map(line => (line.split(",")(1),line))
val joined = productsToJoin.map(v=>(v._2,(broadcast.value(v._1))))
val finalRdd = joined.map(row => {
val visitorId = row._1.split(",")(0)
val id = row._1.split(",")(1)
val interest = row._1.split(",")(2).toFloat
val name = row._2
val jsonString = s"{visitorId: $visitorId, id: $id,interest: $interest, name: $name}"
//val rowAsString = visitorId+","+id+","+interest+","+name
jsonString
})