forked from veeraravi/Spark-notes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanish-conytolTec.txt
160 lines (136 loc) · 6.71 KB
/
manish-conytolTec.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
import scala.util.control.Breaks._
import Array._
import org.apache.spark.sql.avro._
object controlTec{
var vin = "";
var source = "";
var utc_time = "";
var inputPath = "";
//var counter = 1;
//var rowCounter = 1;
def main(args: Array[String]) = {
System.out.println("application starting---------------------------------------------------------------------------------");
val config = new SparkConf().setAppName(s"controltec") //.setMaster("*");
// config.set("spark.default.parallelism","600");
val sparkSession: SparkSession = SparkSession.builder.config(config).enableHiveSupport.getOrCreate;
var sqlContext = sparkSession.sqlContext;
var sc = sparkSession.sparkContext;
//path where the spark program will get data from
var inputPath = "/user/hive/warehouse/controltec_primary/";
var outputPath = "http:///user/hive/warehouse/naveng.db/controltec_temp/"
System.out.println("spark contexts initialized---------------------------------------------------------------------------------");
var files = ""
files = inputPath+"*.csv"
val textFile = sc.wholeTextFiles(files).map(x => x._1).collect()//.foreach(x => processFile(x,"1HTMSAARXEH775884","Trip-Detail",sqlContext))
textFile.foreach(x => processFile(x,sparkSession, outputPath))
}
def convertToRow(row:String,deli:String):Row={
val cell = row.split(deli)
return Row.fromSeq(cell)
}
def processFile(path:String,sparkSession:SparkSession, outputPath: String) = {
//path is the hdfs path to a file
println(s"path: $path ---------------------------------");
var source = ""
val key =path.toUpperCase()
if(key.contains("TRIP-DETAIL")){
source = "Trip-Detail"
}
if(key.contains("RVD")){
source = "RVD"
}
val splitedFilePath = path.split("/")
val fileName = splitedFilePath(splitedFilePath.length-1)
val dotIndex =fileName.lastIndexOf(".")
val splitString= "_"+source+"_"
val splitedFileName = fileName.substring(0,dotIndex).split(splitString)
val vin = splitedFileName(0)
val utc_time = splitedFileName(splitedFileName.length-1)
val df = sparkSession.read.format("csv").option("header", "true").option("inferSchema", "true").load(path)
val longTableDF = df.collect().map(x => printColumn(x,vin,source,utc_time))
.flatMap((line) => line)
.map(attr => {attr.split("\u0001")})
.map(attr => returnRow(attr))
//println(s"$counter , ${attr(0).trim}, ${attr(1).trim}, ${attr(2).trim}, ${attr(3).trim}, ${attr(4).trim}, ${attr(5)}");
val schemaString = "vin,source,utc_time,time_stamp,attribute,value"
val schema = StructType(schemaString.split(",").map(fieldName => StructField(fieldName.trim,StringType,true)))
val d =sparkSession.sparkContext.parallelize(longTableDF,1000)
val longTableDataFrame = sparkSession.createDataFrame(d,schema)
//longTableDataFrame.coalesce(1).write.mode("append").format("avro").save(outputPath)
//longTableDataFrame.show();
println(s"outputPath: $outputPath");
//longTableDataFrame.write.mode(SaveMode.Append).parquet(outputPath+"text.avro");
//newDataFrame.write.mode("append").partitionBy("utc_year","utc_month","model_year").parquet("/user/hive/warehouse/naveng.db/controltec_parq")
//longTableDataFrame.write.mode("append").format("com.databricks.spark.avro").save("/user/hive/warehouse/naveng.db/controltec_aws/");
//longTableDataFrame.select("vin").coalesce(1).write.mode("append").format("csv").save(outputPath);
//longTableDataFrame.unpersist()
longTableDataFrame.write.mode("append").format("com.databricks.spark.avro").save("/user/hive/warehouse/naveng.db/controltec_temp");
longTableDataFrame.unpersist()
}
def returnRow(attr: Array[String]):Row = {
return Row(attr(0).trim, attr(1).trim, attr(2).trim,attr(3).trim,attr(4).trim,attr(5))
}
def getFilePaths(file:(String,String)):String = {
var filePath = file._1
return filePath
}
/* def transpose(data:String)={
val stringList = data.split("\n")
val headerString= stringList(0)
val rdd = sc.parallelize(stringList).map(x => convertToRow(x,","))
val sqlContext = new SQLContext(sc)
val schemaString = "a,b,c,d"
val schema = StructType(schemaString.split(",").map(fieldName => StructField(fieldName.trim,StringType,true)))
val longTableDataFrame = sqlContext.createDataFrame(rdd,schema)
longTableDataFrame.write.mode("append").format("com.databricks.spark.avro").save("/user/hive/warehouse/navemg.db/controltec_aws")
df.show()
}*/
def printColumn(dfRow: Row,vin:String,source:String,utc_time:String): Array[String] = {
var colIndex = 0
//counter += 1;
// var longTableArray = new Array[String](1);
var longTableArray = new Array[String](dfRow.schema.length-1)
// check if the last column is an empty column
if(dfRow.schema.fieldNames(dfRow.schema.fieldNames.length-1).trim().isEmpty()){
longTableArray = new Array[String](dfRow.schema.length-1)
}else{
longTableArray = new Array[String](dfRow.schema.length)
}
//initialize array, subtract 1 is because there is a extra comma at the end
//var longTableArray = new Array[String](dfRow.schema.length-1)
val schemaRowArray = dfRow.schema.fieldNames.length
//for each attribute, dfRow.getString(0) = time_stamp
for(colName <- dfRow.schema.fieldNames) {
if(colName.trim().isEmpty()){
return longTableArray
}
var cell = ""
if(dfRow.get(colIndex)==null){
cell = "null"
}else if(dfRow.get(colIndex).toString().trim().isEmpty()){
cell = "null"
}else{
cell = dfRow.get(colIndex).toString
}
longTableArray(colIndex) = vin + "\u0001"+ source+"\u0001"+utc_time+"\u0001"+ dfRow.getAs("time_stamp")+ "\u0001" + colName.toString + "\u0001" + cell
colIndex += 1
}
return longTableArray
}
/*def merge(sc:SparkContext,srcPath:String,desPath:String) = {
val srcFileSystem = FileSystem.get(new URI(srcPath), sc.hadoopConfiguration)
val detFileSystem = FileSystem.get(new URI(desPath),sc.hadoopConfiguration)
detFileSystem.delete(new Path(desPath),true)
FileUtil.copyMerge(srcFileSystem,new Path(srcPath),detFileSystem,new Path(desPath),true,sc.hadoopConfiguration,null)
}*/
}