forked from veeraravi/Spark-notes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdataframe.scala
54 lines (48 loc) · 2.31 KB
/
dataframe.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
val df = Seq(
(System.currentTimeMillis, "user1", 0.3, Seq(0.1, 0.2)),
(System.currentTimeMillis + 1000000L, "user1", 0.5, Seq(0.1, 0.2)),
(System.currentTimeMillis + 2000000L, "user1", 0.2, Seq(0.1, 0.2)),
(System.currentTimeMillis + 3000000L, "user1", 0.1, Seq(0.1, 0.2)),
(System.currentTimeMillis + 4000000L, "user1", 1.3, Seq(0.1, 0.2)),
(System.currentTimeMillis + 5000000L, "user1", 2.3, Seq(0.1, 0.2)),
(System.currentTimeMillis + 6000000L, "user2", 2.3, Seq(0.1, 0.2))
).toDF("t", "u", "s", "l")
val get_time = udf((x: Long) => {
new java.sql.Timestamp(x).toString
})
val below = df.
withColumn("t", get_time($"t")).
withColumn("struct", struct($"t", $"s", $"l")).
select("u", "struct").
groupBy("u").agg(collect_list("struct").as("struct"))
val res = df.
withColumn("min", min("t").over(org.apache.spark.sql.expressions.Window.partitionBy("u"))).
withColumn("max", max("t").over(org.apache.spark.sql.expressions.Window.partitionBy("u"))).
filter("s > 1.0").join(below, Seq("u"))
/*
+-----+-------------+---+----------+-------------+-------------+--------------------+
| u| t| s| l| min| max| struct|
+-----+-------------+---+----------+-------------+-------------+--------------------+
|user1|1501200459653|2.3|[0.1, 0.2]|1501195459653|1501200459653|[[2017-07-28 11:0...|
|user1|1501199459653|1.3|[0.1, 0.2]|1501195459653|1501200459653|[[2017-07-28 11:0...|
+-----+-------------+---+----------+-------------+-------------+--------------------+
*/
val below = df.
withColumn("t", get_time($"t")).
withColumn("struct", struct($"t", $"s", $"l")).
select("u", "struct").
groupBy("u").agg(collect_list("struct").as("struct"))
val res = below.
select($"u", explode($"struct").as("x"), $"struct").
select($"u", $"x.l".as("l"), $"x.t".as("t"), $"x.s".as("s"), $"struct").
filter($"s" > 1.0)
res.show
/*
+-----+----------+--------------------+---+--------------------+
| u| l| t| s| struct|
+-----+----------+--------------------+---+--------------------+
|user1|[0.1, 0.2]|2017-07-28 12:12:...|2.3|[[2017-07-28 11:0...|
|user1|[0.1, 0.2]|2017-07-28 11:55:...|1.3|[[2017-07-28 11:0...|
|user2|[0.1, 0.2]|2017-07-28 12:29:...|2.3|[[2017-07-28 12:2...|
+-----+----------+--------------------+---+--------------------+
*/