forked from veeraravi/Spark-notes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark-reduceshuffle-aggribykey.txt
156 lines (106 loc) · 9.21 KB
/
spark-reduceshuffle-aggribykey.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
--------------------------------------------------------------------------------------------------
How to reduce shuffling in spark join:
--------------------------------------------------------------------------------------------------
https://community.hortonworks.com/questions/107217/how-to-reduce-spark-shuffling-caused-by-join-with.html
1. set up the shuffle partitions to a higher number than 200, because 200 is default value for shuffle partitions. ( spark.sql.shuffle.partitions=500 or 1000)
2. while loading hive ORC table into dataframes, use the "CLUSTER BY" clause with the join key. Something like,
df1 = sqlContext.sql("SELECT * FROM TABLE1 CLSUTER BY JOINKEY1")
df2 = sqlContext.sql("SELECT * FROM TABLE2 CLUSTER BY JOINKEY2")
df1.registerTempTable("df1_tbl")
df2.registerTempTable("df2_tbl")
Now join df1_tbl & df2_tbl using joinkey1 & joinkey2.
--------Alternative
There are couple of options available to reduce the shuffle (not eliminate in some cases)
Using the broadcast variables
By using the broad cast variable, you can eliminate the shuffle of a big table, however you must broadcast the small data across all the executors
This may not be feasible all the cases, if both tables are big.
The other alternative (good practice to implement) is to implement the predicated pushdown for Hive data, this filters only the data which is required for the computation at the Hive Level and extract small amount of data.
This may not avoid complete shuffle but certainly speed up the shuffle as the amount of the data which pulled to memory will reduce significantly ( in some cases)
sqlContext.setConf("spark.sql.orc.filterPushdown",
"true") -- If
you are using ORC files / spark.sql.parquet.filterPushdown in case of Parquet
files.
Last but not recommended approach is to extract form single partition by keeping the option .repartitin(1) to the DataFrame you will be avoided the shuffle but all the data will not count on parallelism as the single executor participate on the operation.
On the other note, the shuffle will be quick if the data is evenly distributed (key being used to join the table).
https://www.coursera.org/lecture/scala-spark-big-data/wide-vs-narrow-dependencies-shGAX
https://heather.miller.am/teaching/cs212/slides/week20.pdf
https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies
--------------------------------------------------------------------------------------------------
Apache Spark RDD vs DataFrame vs DataSet:
--------------------------------------------------------------------------------------------------
https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html
https://stackoverflow.com/questions/31508083/difference-between-dataframe-dataset-and-rdd-in-spark
--------------------------------------------------------------------------------------------------
Spark groupbyKey vs reduceByKey vs aggregateByKey
--------------------------------------------------------------------------------------------------
https://learndbigdata.com/2018/07/17/spark-groupbykey-reducebykey-aggregatebykey/
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
https://stackoverflow.com/questions/43364432/spark-difference-between-reducebykey-vs-groupbykey-vs-aggregatebykey-vs-combineb
https://github.com/vaquarkhan/Apache-Kafka-poc-and-notes/wiki/reduceByKey--vs-groupBykey-vs-aggregateByKey-vs-combineByKey
groupByKey:
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)) )
groupByKey can cause out of disk problems as data is sent over the network and collected on the reduce workers.
reduceByKey:
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
Data is combined at each partition , only one output for one key at each partition to send over network. reduceByKey required combining all your values into another value with the exact same type.
aggregateByKey:
same as reduceByKey, which takes an initial value.
3 parameters as input i. initial value ii. Combiner logic iii. sequence op logic
*Example:* `
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
`
ouput: Aggregate By Key sum Results bar -> 3 foo -> 5
combineByKey:
3 parameters as input
Initial value : unlike aggregateByKey, need not to pass constant always, we can pass a function a function which will return a new value.
merging function
combine function
Example:`
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
`
reduceByKey,aggregateByKey,combineByKey preferred over groupByKey
Reference: Avoid groupByKey
Then apart from these 4, we have
foldByKey which is same as reduceByKey but with a user defined Zero Value.
AggregateByKey takes 3 parameters as input and uses 2 functions for merging(one for merging on same partitions and another to merge values across partition. The first parameter is ZeroValue)
whereas
ReduceBykey takes 1 parameter only which is a function for merging.
CombineByKey takes 3 parameter and all 3 are functions. Similar to aggregateBykey except it can have a function for ZeroValue.
GroupByKey takes no parameter and groups everything. Also, it is an overhead for data transfer across partitions.
--------------------------------------------------------------------------------------------------------------
Achieve Exactly-Once Semantics in Spark Streaming
--------------------------------------------------------------------------------------------------------------
http://shzhangji.com/blog/2017/07/31/how-to-achieve-exactly-once-semantics-in-spark-streaming/
There’re three semantics in stream processing, namely at-most-once, at-least-once, and exactly-once. In a typical Spark Streaming application, there’re three processing phases: receive data, do transformation, and push outputs. Each phase takes different efforts to achieve different semantics.
For receiving data, it largely depends on the data source. For instance, reading files from a fault-tolerant file system like HDFS, gives us exactly-once semantics. For upstream queues that support acknowledgement, e.g. RabbitMQ, we can combine it with Spark’s write ahead logs to achieve at-least-once semantics. For unreliable receivers like socketTextStream, there might be data loss due to worker/driver failure and gives us undefined semantics. Kafka, on the other hand, is offset based, and its direct API can give us exactly-once semantics.
When transforming data with Spark’s RDD, we automatically get exactly-once semantics, for RDD is itself immutable, fault-tolerant and deterministically re-computable. As long as the source data is available, and there’s no side effects during transformation, the result will always be the same.
Output operation by default has at-least-once semantics. The foreachRDD function will execute more than once if there’s worker failure, thus writing same data to external storage multiple times. There’re two approaches to solve this issue, idempotent updates, and transactional updates. They are further discussed in the following sections.
Exactly-once with Idempotent Writes
If multiple writes produce the same data, then this output operation is idempotent. saveAsTextFile is a typical idempotent update; messages with unique keys can be written to database without duplication. This approach will give us the equivalent exactly-once semantics. Note though it’s usually for map-only procedures, and it requires some setup on Kafka DStream.
Set enable.auto.commit to false. By default, Kafka DStream will commit the consumer offsets right after it receives the data. We want to postpone this action unitl the batch is fully processed.
Turn on Spark Streaming’s checkpointing to store Kafka offsets. But if the application code changes, checkpointed data is not reusable. This leads to a second option:
Commit Kafka offsets after outputs. Kafka provides a commitAsync API, and the HasOffsetRanges class can be used to extract offsets from the initial RDD:
--------------------------------------------------------------------------------------------------------------
backpressure kafka
--------------------------------------------------------------------------------------------------------------