forked from veeraravi/Spark-notes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark-merge-single-file.txt
46 lines (32 loc) · 1.86 KB
/
spark-merge-single-file.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
If you can fit all the data into RAM on one worker (and thus can use .coalesce(1)), you can use dbfs to find and move the resulting CSV file:
val fileprefix= "/mnt/aws/path/file-prefix"
dataset .coalesce(1)
.write
//.mode("overwrite") // I usually don't use this, but you may want to.
.option("header", "true") .option("delimiter","\t") .csv(fileprefix+".tmp")
val partition_path = dbutils.fs.ls(fileprefix+".tmp/") .filter(file=>file.name.endsWith(".csv"))(0).path
dbutils.fs.cp(partition_path,fileprefix+".tab")
dbutils.fs.rm(fileprefix+".tmp",recurse=true)
If your file does not fit into RAM on the worker, you may want to consider chaoticequilibrium's suggestion to use FileUtils.copyMerge(). I have not done this, and don't yet know if is possible or not, e.g., on S3.
If you are running Spark with HDFS, I've been solving the problem by writing csv files normally and leveraging HDFS to do the merging. I'm doing that in Spark (1.6) directly:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
val newData = << create your dataframe >>
val outputfile = "/user/feeds/project/outputs/subject"
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob = outputFileName
newData.write
.format("com.databricks.spark.csv")
.option("header", "false")
.mode("overwrite")
.save(outputFileName)
merge(mergeFindGlob, mergedFileName )
newData.unpersist()