forked from veeraravi/Spark-notes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAnalytical Window Functions.txt
343 lines (320 loc) · 15.3 KB
/
Analytical Window Functions.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
Apache Spark Analytical Window Functions
It’s been a while since I wrote a posts here is one interesting one which will help you to do some cool stuff with Spark and Windowing functions.I would also like to thank and appreciate Suresh my colleague for helping me learn this awesome SQL functionality.
Window Functions helps us to compare current row with other rows in the same dataframe, calculating running totals , sequencing of events and sessionization of transactions etc.
I will cover couple of examples which will demonstrate the usage of Window Functions.Let’s create the simple employee dataframe to work on the various analytical and ranking functions.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
// Create Spark Session
val sparkSession = SparkSession.builder.master("local").appName("Window Function").getOrCreate()
import sparkSession.implicits._
// Create Sample Dataframe
val empDF = sparkSession.createDataFrame(Seq(
(7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),
(7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),
(7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),
(7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),
(7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),
(7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),
(7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),
(7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),
(7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),
(7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),
(7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)
)).toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
// Create Spark Session
val sparkSession = SparkSession.builder.master("local").appName("Window Function").getOrCreate()
import sparkSession.implicits._
// Create Sample Dataframe
val empDF = sparkSession.createDataFrame(Seq(
(7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),
(7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),
(7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),
(7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),
(7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),
(7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),
(7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),
(7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),
(7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),
(7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),
(7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)
)).toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")
First of all we will need to define the window we will be working on i.e. we will partition by department (deptno) and order by salary (sal). Below is the code to do it via Spark Dataframe API.
val partitionWindow = Window.partitionBy($"deptno").orderBy($"sal".desc)
1
val partitionWindow = Window.partitionBy($"deptno").orderBy($"sal".desc)
Rank salary within each department
//SQL
SELECT empno,deptno,sal,RANK() OVER (partition by deptno ORDER BY sal desc) as rank FROM emp;
1
2
//SQL
SELECT empno,deptno,sal,RANK() OVER (partition by deptno ORDER BY sal desc) as rank FROM emp;
//DF API
val rankTest = rank().over(partitionWindow)
empDF.select($"*", rankTest as "rank").show
1
2
3
//DF API
val rankTest = rank().over(partitionWindow)
empDF.select($"*", rankTest as "rank").show
Results :
EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO RANK
7788 SCOTT ANALYST 7566 19Apr87 3000 0 20 1
7566 JONES MANAGER 7839 2Apr81 2975 0 20 2
7876 ADAMS CLERK 7788 23May87 1100 0 20 3
7839 KING PRESIDENT 0 17Nov81 5000 0 10 1
7782 CLARK MANAGER 7839 9Jun81 2450 0 10 2
7369 SMITH CLERK 7902 17Dec80 800 20 10 3
7698 BLAKE MANAGER 7839 1May81 2850 0 30 1
7499 ALLEN SALESMAN 7698 20Feb81 1600 300 30 2
7844 TURNER SALESMAN 7698 8Sep81 1500 0 30 3
7521 WARD SALESMAN 7698 22Feb81 1250 500 30 4
7654 MARTIN SALESMAN 7698 28Sep81 1250 1400 30 4
Dense Rank salary within each department
//SQL
SELECT empno,deptno,sal,DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal desc) as dense_rank FROM emp;
1
2
//SQL
SELECT empno,deptno,sal,DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal desc) as dense_rank FROM emp;
//DF API
val rankTest = dense_rank().over(partitionWindow)
empDF.select($"*", rankTest as "dense_rank").show
1
2
3
//DF API
val rankTest = dense_rank().over(partitionWindow)
empDF.select($"*", rankTest as "dense_rank").show
Results :
EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO DENSE_RANK
7788 SCOTT ANALYST 7566 19Apr87 3000 0 20 1
7566 JONES MANAGER 7839 2Apr81 2975 0 20 2
7876 ADAMS CLERK 7788 23May87 1100 0 20 3
7839 KING PRESIDENT 0 17Nov81 5000 0 10 1
7782 CLARK MANAGER 7839 9Jun81 2450 0 10 2
7369 SMITH CLERK 7902 17Dec80 800 20 10 3
7698 BLAKE MANAGER 7839 1May81 2850 0 30 1
7499 ALLEN SALESMAN 7698 20Feb81 1600 300 30 2
7844 TURNER SALESMAN 7698 8Sep81 1500 0 30 3
7521 WARD SALESMAN 7698 22Feb81 1250 500 30 4
7654 MARTIN SALESMAN 7698 28Sep81 1250 1400 30 4
Row Number within each department
//SQL
SELECT empno,deptno,sal,ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal desc) as row_num FROM emp;
1
2
//SQL
SELECT empno,deptno,sal,ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal desc) as row_num FROM emp;
//DF API
val rowNumberTest = row_number().over(partitionWindow)
empDF.select($"*", rowNumberTest as "row_number").show
1
2
3
//DF API
val rowNumberTest = row_number().over(partitionWindow)
empDF.select($"*", rowNumberTest as "row_number").show
Results :
EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO ROW_NUMBER
7788 SCOTT ANALYST 7566 19Apr87 3000 0 20 1
7566 JONES MANAGER 7839 2Apr81 2975 0 20 2
7876 ADAMS CLERK 7788 23May87 1100 0 20 3
7839 KING PRESIDENT 0 17Nov81 5000 0 10 1
7782 CLARK MANAGER 7839 9Jun81 2450 0 10 2
7369 SMITH CLERK 7902 17Dec80 800 20 10 3
7698 BLAKE MANAGER 7839 1May81 2850 0 30 1
7499 ALLEN SALESMAN 7698 20Feb81 1600 300 30 2
7844 TURNER SALESMAN 7698 8Sep81 1500 0 30 3
7521 WARD SALESMAN 7698 22Feb81 1250 500 30 4
7654 MARTIN SALESMAN 7698 28Sep81 1250 1400 30 5
Running Total (Salary) within each department
//SQL
SELECT empno,deptno,sal,sum(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as running_total FROM emp;
1
2
//SQL
SELECT empno,deptno,sal,sum(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as running_total FROM emp;
//DF API
val sumTest = sum($"sal").over(partitionWindow)
empDF.select($"*", sumTest as "running_total").show
1
2
3
//DF API
val sumTest = sum($"sal").over(partitionWindow)
empDF.select($"*", sumTest as "running_total").show
Results :
EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO RUNNING_TOTAL
7788 SCOTT ANALYST 7566 19Apr87 3000 0 20 3000
7566 JONES MANAGER 7839 2Apr81 2975 0 20 5975
7876 ADAMS CLERK 7788 23May87 1100 0 20 7075
7839 KING PRESIDENT 0 17Nov81 5000 0 10 5000
7782 CLARK MANAGER 7839 9Jun81 2450 0 10 7450
7369 SMITH CLERK 7902 17Dec80 800 20 10 8250
7698 BLAKE MANAGER 7839 1May81 2850 0 30 2850
7499 ALLEN SALESMAN 7698 20Feb81 1600 300 30 4450
7844 TURNER SALESMAN 7698 8Sep81 1500 0 30 5950
7521 WARD SALESMAN 7698 22Feb81 1250 500 30 8450
7654 MARTIN SALESMAN 7698 28Sep81 1250 1400 30 8450
Lead function allows us to compare current row with subsequent rows within each partition depending on the second argument (offset) which is by default set to 1 i.e. next row but you can change that parameter 2 to compare against every other row.The 3rd parameter is default value to be returned when no subsequent values exists or null.
//SQL
SELECT empno,deptno,sal,lead(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as next_val FROM emp;
1
2
//SQL
SELECT empno,deptno,sal,lead(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as next_val FROM emp;
//DF API
val leadTest = lead($"sal", 1, 0).over(partitionWindow)
empDF.select($"*", leadTest as "next_val").show
1
2
3
//DF API
val leadTest = lead($"sal", 1, 0).over(partitionWindow)
empDF.select($"*", leadTest as "next_val").show
Results :
EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO NEXT_VAL
7788 SCOTT ANALYST 7566 19Apr87 3000 0 20 2975
7566 JONES MANAGER 7839 2Apr81 2975 0 20 1100
7876 ADAMS CLERK 7788 23May87 1100 0 20 0
7839 KING PRESIDENT 0 17Nov81 5000 0 10 2450
7782 CLARK MANAGER 7839 9Jun81 2450 0 10 800
7369 SMITH CLERK 7902 17Dec80 800 20 10 0
7698 BLAKE MANAGER 7839 1May81 2850 0 30 1600
7499 ALLEN SALESMAN 7698 20Feb81 1600 300 30 1500
7844 TURNER SALESMAN 7698 8Sep81 1500 0 30 1250
7521 WARD SALESMAN 7698 22Feb81 1250 500 30 1250
7654 MARTIN SALESMAN 7698 28Sep81 1250 1400 30 0
Lag function allows us to compare current row with preceding rows within each partition depending on the second argument (offset) which is by default set to 1 i.e. previous row but you can change that parameter 2 to compare against every other preceding row.The 3rd parameter is default value to be returned when no preceding values exists or null.
//SQL
SELECT empno,deptno,sal,lag(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as pre_val FROM emp;
1
2
//SQL
SELECT empno,deptno,sal,lag(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as pre_val FROM emp;
//DF API
val lagTest = lag($"sal", 1, 0).over(partitionWindow)
empDF.select($"*", lagTest as "prev_val").show
1
2
3
//DF API
val lagTest = lag($"sal", 1, 0).over(partitionWindow)
empDF.select($"*", lagTest as "prev_val").show
Results :
EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO PREV_VAL
7788 SCOTT ANALYST 7566 19Apr87 3000 0 20 0
7566 JONES MANAGER 7839 2Apr81 2975 0 20 3000
7876 ADAMS CLERK 7788 23May87 1100 0 20 2975
7839 KING PRESIDENT 0 17Nov81 5000 0 10 0
7782 CLARK MANAGER 7839 9Jun81 2450 0 10 5000
7369 SMITH CLERK 7902 17Dec80 800 20 10 2450
7698 BLAKE MANAGER 7839 1May81 2850 0 30 0
7499 ALLEN SALESMAN 7698 20Feb81 1600 300 30 2850
7844 TURNER SALESMAN 7698 8Sep81 1500 0 30 1600
7521 WARD SALESMAN 7698 22Feb81 1250 500 30 1500
7654 MARTIN SALESMAN 7698 28Sep81 1250 1400 30 1250
First value within each partition .i.e. highest salary (we are using order by descending) within each department can be compared against every member within each department.
//SQL
SELECT empno,deptno,sal,first_value(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as first_val FROM emp;
1
2
//SQL
SELECT empno,deptno,sal,first_value(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as first_val FROM emp;
//DF API
val firstValTest = first($"sal").over(partitionWindow)
empDF.select($"*", firstValTest as "first_val").show
1
2
3
//DF API
val firstValTest = first($"sal").over(partitionWindow)
empDF.select($"*", firstValTest as "first_val").show
Results :
EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO FIRST_VAL
7788 SCOTT ANALYST 7566 19Apr87 3000 0 20 3000
7566 JONES MANAGER 7839 2Apr81 2975 0 20 3000
7876 ADAMS CLERK 7788 23May87 1100 0 20 3000
7839 KING PRESIDENT 0 17Nov81 5000 0 10 5000
7782 CLARK MANAGER 7839 9Jun81 2450 0 10 5000
7369 SMITH CLERK 7902 17Dec80 800 20 10 5000
7698 BLAKE MANAGER 7839 1May81 2850 0 30 2850
7499 ALLEN SALESMAN 7698 20Feb81 1600 300 30 2850
7844 TURNER SALESMAN 7698 8Sep81 1500 0 30 2850
7521 WARD SALESMAN 7698 22Feb81 1250 500 30 2850
7654 MARTIN SALESMAN 7698 28Sep81 1250 1400 30 2850
Last value within each partition .i.e. lowet salary (we are using order by descending) within each department can be compared against every member within each department.
//SQL
SELECT empno,deptno,sal,last_value(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as last_val FROM emp;
1
2
//SQL
SELECT empno,deptno,sal,last_value(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as last_val FROM emp;
//DF API
val lastValTest = last($"sal").over(partitionWindow)
empDF.select($"*", lastValTest as "last_val").show
1
2
3
//DF API
val lastValTest = last($"sal").over(partitionWindow)
empDF.select($"*", lastValTest as "last_val").show
Results :
EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO LAST_VAL
7788 SCOTT ANALYST 7566 19Apr87 3000 0 20 3000
7566 JONES MANAGER 7839 2Apr81 2975 0 20 2975
7876 ADAMS CLERK 7788 23May87 1100 0 20 1100
7839 KING PRESIDENT 0 17Nov81 5000 0 10 5000
7782 CLARK MANAGER 7839 9Jun81 2450 0 10 2450
7369 SMITH CLERK 7902 17Dec80 800 20 10 800
7698 BLAKE MANAGER 7839 1May81 2850 0 30 2850
7499 ALLEN SALESMAN 7698 20Feb81 1600 300 30 1600
7844 TURNER SALESMAN 7698 8Sep81 1500 0 30 1500
7521 WARD SALESMAN 7698 22Feb81 1250 500 30 1250
7654 MARTIN SALESMAN 7698 28Sep81 1250 1400 30 1250
Oops what happened here the last_val has the same value as in sal column but we were expecting the lowest salary within the department in the last_val column so for that we really need to understand how the window operates and works. There are two types of frames ROW and RANGE.The details are explained in this posts from databricks.
This happens because default window frame is range between unbounded preceding and current row, so the last_value() never looks beyond current row unless you change the frame.
Last value fixed by supplying the window frame for last_val() to operate on. We will be using start frame current row and end frame unbounded following to get the last value.
//Define new window partition to operate on row frame
val partitionWindowWithUnboundedFollowing = Window.partitionBy($"deptno").orderBy($"sal".desc).rowsBetween(Window.currentRow, Window.unboundedFollowing)
1
2
//Define new window partition to operate on row frame
val partitionWindowWithUnboundedFollowing = Window.partitionBy($"deptno").orderBy($"sal".desc).rowsBetween(Window.currentRow, Window.unboundedFollowing)
//SQL
SELECT empno,deptno,sal,last_value(sal) OVER (PARTITION BY deptno ORDER BY sal desc ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as last_val FROM emp;
1
2
//SQL
SELECT empno,deptno,sal,last_value(sal) OVER (PARTITION BY deptno ORDER BY sal desc ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as last_val FROM emp;
//DF API
val lastValTest2 = last($"sal").over(partitionWindowWithUnboundedFollowing)
empDF.select($"*", lastValTest2 as "last_val").show
1
2
3
//DF API
val lastValTest2 = last($"sal").over(partitionWindowWithUnboundedFollowing)
empDF.select($"*", lastValTest2 as "last_val").show
Results :
EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO LAST_VAL
7788 SCOTT ANALYST 7566 19Apr87 3000 0 20 1100
7566 JONES MANAGER 7839 2Apr81 2975 0 20 1100
7876 ADAMS CLERK 7788 23May87 1100 0 20 1100
7839 KING PRESIDENT 0 17Nov81 5000 0 10 800
7782 CLARK MANAGER 7839 9Jun81 2450 0 10 800
7369 SMITH CLERK 7902 17Dec80 800 20 10 800
7698 BLAKE MANAGER 7839 1May81 2850 0 30 1250
7499 ALLEN SALESMAN 7698 20Feb81 1600 300 30 1250
7844 TURNER SALESMAN 7698 8Sep81 1500 0 30 1250
7521 WARD SALESMAN 7698 22Feb81 1250 500 30 1250
7654 MARTIN SALESMAN 7698 28Sep81 1250 1400 30 1250
If you see the above table the issue is resolved. To understand more in details please read through the databricks posts.
I really enjoy using window functions they are very powerful and sometimes solve complex problems with just one single line of SQL.
===================================================================================================================