forked from veeraravi/Spark-notes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathakhila-spark-watchdog-driver.txt
286 lines (245 loc) · 22.6 KB
/
akhila-spark-watchdog-driver.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
package com.mmm.speckdata.process.driver
import org.apache.log4j.Level
import org.apache.log4j.Logger
import java.util.Properties
import java.io.IOException
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import java.sql.Connection
import java.sql.DriverManager
import java.time.Instant
import com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement
import org.apache.spark.sql.DataFrame
object WatchDogDataProcessDriver {
def main(args: Array[String]): Unit = {
var logger = Logger.getRootLogger
logger.setLevel(Level.ERROR)
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
Logger.getLogger("com").setLevel(Level.OFF);
var properties = new Properties()
try {
properties.load(FileSystem.get(new Configuration()).open(new Path("/filtrete/WatchDogStream.props")))
} catch {
case ex: IOException => {
System.err.println("Unable to fetch Configuration details...")
logger.error("Unable to fetch Configuration details ...")
//System.exit(1)
}
}
try{
val shufflePartition = args(0).toInt//properties.getProperty("shufflePartition_process").toInt
val hivemaxpartitions = args(1).toInt
val sqlDbConnStr = properties.get("dbServer") + ";" +
"database=" + properties.get("database") + ";" +
"user=" + properties.get("dbUser") + ";" +
"password=" + properties.get("dbPassword")
val sqlConn: Connection = getSqlJdbcConnection(sqlDbConnStr)
val spark = SparkSession
.builder()
.appName("WatchDogDataProcessDriver")
.config("hive.exec.dynamic.partition", true)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions",hivemaxpartitions)
.config("hive.exec.parallel", true)
.config("hive.enforce.bucketing", true)
.config("spark.sql.shuffle.partitions",s"$shufflePartition")
.config("spark.ui.showConsoleProgress",false)
.enableHiveSupport()
.getOrCreate()
val processStartTime = Instant.now.getEpochSecond
println(s"--------------- Indoor Speck Data Process Start: $processStartTime")
// Process Start Entry into IndoorJobControl table
val IndoorJobControlInsertQuery = s"INSERT INTO IndoorJobControl (processstartdt,processstartUTS,status) values (getUTCdate(),$processStartTime,'Inprocess')"
updateDB(sqlConn,IndoorJobControlInsertQuery)
sqlConn.close()
// Load DSTCalendar
val DSTOptions = new java.util.HashMap[String, String]()
DSTOptions.put("url", sqlDbConnStr)
DSTOptions.put("dbtable", "DSTCalendar")
val DST_DF = spark.read.format("jdbc").options(DSTOptions).load().select("year", "dst_start_unixTS","dst_end_unixTS")
DST_DF.createOrReplaceTempView("DSTCalendarTmp")
// Load Indoor Job Control table
val IndoorJobControlOptions = new java.util.HashMap[String, String]()
IndoorJobControlOptions.put("url", sqlDbConnStr)
IndoorJobControlOptions.put("dbtable", "IndoorJobControl")
val IndoorJobControl_DF = spark.read.format("jdbc").options(IndoorJobControlOptions).load().select("status","processstartUTS","processstartdt","processenddt","processendUTS").filter("status='Processed'")
IndoorJobControl_DF.createOrReplaceTempView("IndoorJobControl")
// Load IndoorAirQualityDevice
val indoorDeviceOptions = new java.util.HashMap[String, String]()
indoorDeviceOptions.put("url", sqlDbConnStr)
indoorDeviceOptions.put("dbtable", "IndoorAirQualityDevice")
val IAQD_DF = spark.read.format("jdbc").options(indoorDeviceOptions).load()
val IAQD_ACTIVE_DF = IAQD_DF.select("indoorDeviceID","accountID","indoorDeviceToken","gmtOffset","dstOffset").filter("activeFlag=1")
IAQD_ACTIVE_DF.createOrReplaceTempView("IndoorAirQualityDevice")
// Get latest sucessful processed Unix Timestamp value from IndoorJobControl table
val maxProcessStartUTS = spark.sql("select nvl(max(processstartUTS),0) maxUTS from IndoorJobControl").select("maxUTS").collectAsList().get(0).getLong(0)
println(s"Process data between.. $maxProcessStartUTS and $processStartTime")
//Added on 3/6/2018
//Get data from IndoorSpeckDeviceDataTmp
val IndoorSpeckDeviceDataTmp_query = s"select indoordeviceid,WDDataTS,speckdatahr,temperature,humidity,Ultrafine, PM2_5,AQI,createdTS from watchdogdata where createdTS > $maxProcessStartUTS and createdTS <= $processStartTime"
val IndoorSpeckDeviceDataTmp_DF = spark.sql(IndoorSpeckDeviceDataTmp_query).persist()
IndoorSpeckDeviceDataTmp_DF.createOrReplaceTempView("WatchDogDeviceDataTmp_Spark")
//Get data from Hive IndoorSpeckDeviceDataTmp table into Spark for processing
val IndoorSpeckDataTmp_Query = s"select distinct B.speckDataHr, cast(FROM_UNIXTIME(B.WDDataTS,'yyyy') as string) as utcYear, cast(FROM_UNIXTIME(B.WDDataTS,'yyyyMM') as string) as utcMonth, cast(FROM_UNIXTIME(B.WDDataTS,'yyyyMMdd') as string) as utcDate, "+
s"case when B.WDDataTS between C.dst_start_unixTS and C.dst_end_unixTS then DATE_FORMAT(FROM_UNIXTIME(cast(B.WDDataTS as bigint) + cast(A.dstOffset as bigint)*3600), 'yyyyMMddHH') else DATE_FORMAT(FROM_UNIXTIME(cast(B.WDDataTS as bigint) + cast(A.gmtOffset as bigint)*3600), 'yyyyMMddHH') end as localHour, "+
s"case when B.WDDataTS between C.dst_start_unixTS and C.dst_end_unixTS then DATE_FORMAT(FROM_UNIXTIME(cast(B.WDDataTS as bigint) + cast(A.dstOffset as bigint)*3600), 'yyyyMMdd') else DATE_FORMAT(FROM_UNIXTIME(cast(B.WDDataTS as bigint) + cast(A.gmtOffset as bigint)*3600), 'yyyyMMdd') end as localDate, "+
s"case when B.WDDataTS between C.dst_start_unixTS and C.dst_end_unixTS then DATE_FORMAT(FROM_UNIXTIME(cast(B.WDDataTS as bigint) + cast(A.dstOffset as bigint)*3600), 'yyyyMM') else DATE_FORMAT(FROM_UNIXTIME(cast(B.WDDataTS as bigint) + cast(A.gmtOffset as bigint)*3600), 'yyyyMM') end as localMonth "+
s"from IndoorAirQualityDevice A JOIN WatchDogDeviceDataTmp_Spark B on A.indoorDeviceID = B.indoorDeviceId JOIN DSTCalendarTmp C on cast(FROM_UNIXTIME(B.WDDataTS,'yyyy') as string) = C.year "+
s"where B.createduts > $maxProcessStartUTS and B.createduts <= $processStartTime"
val IndoorSpeckDataTmp_DF = spark.sql(IndoorSpeckDataTmp_Query).persist()
IndoorSpeckDataTmp_DF.createOrReplaceTempView("WDSpeckDataTS")
IndoorSpeckDataTmp_DF.show()
if(IndoorSpeckDataTmp_DF.count()>0){
val minLocalDate = spark.sql("select min(localDate) minlocalDate from WDSpeckDataTS").select("minlocalDate").collectAsList().get(0).getString(0)
val maxLocalDate = spark.sql("select max(localHour) maxlocalDate from WDSpeckDataTS").select("maxlocalDate").collectAsList().get(0).getString(0)
val minLocalMonth = spark.sql("select min(localMonth) minlocalMonth from WDSpeckDataTS").select("minlocalMonth").collectAsList().get(0).getString(0)
val maxLocalMonth = spark.sql("select max(localMonth) maxlocalMonth from WDSpeckDataTS").select("maxlocalMonth").collectAsList().get(0).getString(0)
val minSpeckDataHr = spark.sql("select min(speckDataHr) minSpeckDataHr from WDSpeckDataTS").select("minSpeckDataHr").collectAsList().get(0).getString(0)
val maxSpeckDataHr = spark.sql("select max(speckDataHr) maxSpeckDataHr from WDSpeckDataTS").select("maxSpeckDataHr").collectAsList().get(0).getString(0)
//Added on 3/6/2018
//Get data from IndoorSpeckDeviceData
/*val IndoorSpeckDeviceData_query = "select indoorDeviceId,WDDataTS, particleConcentration, temperature,humidity, cast(FROM_UNIXTIME(WDDataTS,'yyyy') as string) as utcYear "+
"from IndoorSpeckDeviceDataNew where Hr in (select distinct speckDataHr from WDSpeckDataTS)"*/
val IndoorSpeckDeviceData_query = "select indoorDeviceId,WDDataTS, temperature,humidity, Ultrafine, PM2_5,AQI,cast(FROM_UNIXTIME(WDDataTS,'yyyy') as string) as utcYear "+
s"from IndoorSpeckDeviceDataNew where hr >= \'$minSpeckDataHr\' and hr <= \'$maxSpeckDataHr\'"
val IndoorSpeckDeviceData_DF = spark.sql(IndoorSpeckDeviceData_query)
IndoorSpeckDeviceData_DF.createOrReplaceTempView("WatchDogDeviceData_Spark")
/*
* Hourly Aggregate Processing
*/
//Get data for all devices from IndoorSpeckDeviceData for distinct hours in IndoorSpeckDataTmp for Hourly processing UNION with data in IndoorSpeckDeviceDataTmp table
/*val IndoorSpeckDataHourly_Query = s"(select indoorDeviceId,WDDataTS,particleConcentration, cast(FROM_UNIXTIME(WDDataTS,'yyyy') as string) as utcYear from IndoorSpeckDeviceDataTmp where createduts > $maxProcessStartUTS and createduts <= $processStartTime) "+
"UNION "+
"(select indoorDeviceId,WDDataTS, particleConcentration, cast(FROM_UNIXTIME(WDDataTS,'yyyy') as string) as utcYear from IndoorSpeckDeviceData "+
"where Hr in (select distinct speckDataHr from WDSpeckDataTS))"*/
val IndoorSpeckDataHourly_Query = "select indoorDeviceId,WDDataTS,temperature,humidity, Ultrafine, PM2_5,AQI, cast(FROM_UNIXTIME(WDDataTS,'yyyy') as string) as utcYear from WatchDogDeviceDataTmp_Spark "+
"UNION "+
"select indoorDeviceId,WDDataTS,temperature,humidity, Ultrafine, PM2_5,AQI, utcYear from WatchDogDeviceData_Spark"
val IndoorSpeckDataHourly_Query_DF = spark.sql(IndoorSpeckDataHourly_Query)
IndoorSpeckDataHourly_Query_DF.createOrReplaceTempView("WDDataHourly")
// Convert WDDataTS to local date and local hour
val IndoorHourlyLocal_Query = "SELECT A.indoorDeviceID, B.utcYear, B.WDDataTS, "+
"case when B.WDDataTS between C.dst_start_unixTS and C.dst_end_unixTS then (cast(B.WDDataTS as bigint) + cast(A.dstOffset as bigint)*3600) else (cast(B.WDDataTS as bigint) + cast(A.gmtOffset as bigint)*3600) end as localUTS, "+
"case when B.WDDataTS between C.dst_start_unixTS and C.dst_end_unixTS then FROM_UNIXTIME(cast(B.WDDataTS as bigint) + cast(A.dstOffset as bigint)*3600) else FROM_UNIXTIME(cast(B.WDDataTS as bigint) + cast(A.gmtOffset as bigint)*3600) end as speckDataLocalTS, "+
"case when B.WDDataTS between C.dst_start_unixTS and C.dst_end_unixTS then DATE_FORMAT(FROM_UNIXTIME(cast(B.WDDataTS as bigint) + cast(A.dstOffset as bigint)*3600), 'yyyyMMdd') else DATE_FORMAT(FROM_UNIXTIME(cast(B.WDDataTS as bigint) + cast(A.gmtOffset as bigint)*3600), 'yyyyMMdd') end as localDate, "+
"case when B.WDDataTS between C.dst_start_unixTS and C.dst_end_unixTS then DATE_FORMAT(FROM_UNIXTIME(cast(B.WDDataTS as bigint) + cast(A.dstOffset as bigint)*3600), 'HH') else DATE_FORMAT(FROM_UNIXTIME(cast(B.WDDataTS as bigint) + cast(A.gmtOffset as bigint)*3600), 'HH') end as localHour "+
"FROM IndoorAirQualityDevice A JOIN WDDataHourly B on A.indoorDeviceID = B.indoorDeviceId JOIN DSTCalendarTmp C on B.utcYear = C.year"
val IndoorHourlyLocal_DF = spark.sql(IndoorHourlyLocal_Query)
IndoorHourlyLocal_DF.createOrReplaceTempView("WDDataHourlyLocal")
// Computer Hourly aggregatation
val IndoorHourlyAvg_Query = "SELECT indoorDeviceID, localHour, localDate,concat(localDate,localHour) as localDtHr, round(avg(Ultrafine),2) as avgUltrafine,round(avg(PM2_5),2) as avgPM2_5,round(avg(AQI),2) as avgAQI,count(*) as recordCount from WDDataHourlyLocal group by indoorDeviceID, localHour, localDate"
val IndoorHoulryAvg_DF = spark.sql(IndoorHourlyAvg_Query)
IndoorHoulryAvg_DF.createOrReplaceTempView("WDHoulryAvg")
//Insert Into IndoorAirQuality_Hourly Hive table
val IndoorAirQualityHourly_Query = "INSERT OVERWRITE TABLE WatchDogData_hourly PARTITION(localdate,localDtHr) "+
"SELECT indoorDeviceId, avgUltrafine,avgPM2_5,avgAQI, recordCount, localHour, FROM_UNIXTIME(UNIX_TIMESTAMP()) as createdTS, localDate, localDtHr from WDHoulryAvg"
spark.sql(IndoorAirQualityHourly_Query)
/*
* Daily Aggregate processing
*/
/*val IndoorSpeckDataDaily_Query = s"select indoordeviceId, (particleConcentration * recordcount) as particleSum,recordcount, localHour, localDate, substr(localDate,0,6) as month, localDtHr from IndoorAirQuality_Hourly "+
s"where localDate in (select distinct localdate from WDSpeckDataTS)"*/
val IndoorSpeckDataDaily_Query = s"select indoordeviceId, (ultrafine * recordcount) as ultrafineSum,(PM2_5 * recordcount) as pm2_5Sum,(AQI * recordcount) as aqiSum,recordcount, localHour, localDate, substr(localDate,0,6) as month, localDtHr from IndoorAirQuality_Hourly "+
s"where localdate >= $minLocalDate and localdate <= $maxLocalDate"
val IndoorSpeckDataDaily_DF = spark.sql(IndoorSpeckDataDaily_Query)
IndoorSpeckDataDaily_DF.createOrReplaceTempView("IndoorSpeckDataDailyLocal")
val IndoorSpeckDailyAvg_Query = "select indoordeviceId, round((sum(particleSum) / sum(recordcount)),2) as avgParticleConcentration,sum(recordcount) as recordcount,localDate,month from IndoorSpeckDataDailyLocal group by month,localDate,indoordeviceId "
val IndoorSpeckDailyAvg_DF = spark.sql(IndoorSpeckDailyAvg_Query)
IndoorSpeckDailyAvg_DF.createOrReplaceTempView("IndoorSpeckDataDailyAvg")
//Insert into IndoorAirQuality_Daily Hive table
val IndoorAirQualityDaily_Query = "INSERT OVERWRITE TABLE indoorairquality_daily PARTITION(month,localdate) "+
"SELECT indoorDeviceId, avgParticleConcentration,recordcount, FROM_UNIXTIME(UNIX_TIMESTAMP()) as createdTS, month,localDate from IndoorSpeckDataDailyAvg"
spark.sql(IndoorAirQualityDaily_Query)
/*
* Monthly Aggregation Processing
*/
/*val IndoorSpeckDataMonthly_Query = "select indoordeviceId, (particleConcentration * recordcount) as particleSum,recordcount, substr(month,0,4) as year, month, localdate from IndoorAirQuality_Daily "+
"where month in (select distinct localmonth from WDSpeckDataTS)"*/
val IndoorSpeckDataMonthly_Query = "select indoordeviceId, (particleConcentration * recordcount) as particleSum,recordcount, substr(month,0,4) as year, month, localdate from IndoorAirQuality_Daily "+
s"where month >= $minLocalMonth and month <= $maxLocalMonth"
val IndoorSpeckDataMonthly_DF = spark.sql(IndoorSpeckDataMonthly_Query)
IndoorSpeckDataMonthly_DF.createOrReplaceTempView("IndoorSpeckDataMonthlyLocal")
val IndoorSpeckMonthlyAvg_Query = "select indoordeviceId,round((sum(particleSum) / sum(recordcount)),2) as avgParticleConcentration,sum(recordcount) as recordcount, year, month from IndoorSpeckDataMonthlyLocal group by year, month, indoordeviceId"
val IndoorSpeckMonthlyAvg_DF = spark.sql(IndoorSpeckMonthlyAvg_Query)
IndoorSpeckMonthlyAvg_DF.createOrReplaceTempView("IndoorSpeckDataMonthlyAvg")
//Insert into IndoorAirQuality_Monthly Hive table
val IndoorAirQualityMonthly_Query = "INSERT OVERWRITE TABLE indoorairquality_monthly PARTITION(month) "+
"SELECT indoorDeviceId, avgParticleConcentration,recordcount, year, FROM_UNIXTIME(UNIX_TIMESTAMP()) as createdTS, month from IndoorSpeckDataMonthlyAvg"
spark.sql(IndoorAirQualityMonthly_Query)
/*
* Writing to SQL tables
*/
// Push Indoor Hourly Average for past 24 Hours from current date to SQL Table
val IndoorHourlySQL_Query = "select cast(CONCAT(indoorDeviceId,'_',localDate,'_',localHour) as string) as indoorDeviceKey, cast(indoorDeviceId as string) as indoorDeviceId, cast(localDate as string) as date, "+
"cast(localHour as string) as hour, particleconcentration,0, cast(FROM_UNIXTIME(UNIX_TIMESTAMP()) as string) as createdTS, cast(FROM_UNIXTIME(UNIX_TIMESTAMP()) as string) as modifiedTS "+
"from indoorairquality_hourly where localdthr >= FROM_UNIXTIME(UNIX_TIMESTAMP()-(24*60*60),'yyyyMMddHH')"
val IndoorHourlySQL_DF = spark.sql(IndoorHourlySQL_Query)
//IndoorHourlySQL_DF.show()
updateDFToSqlDB(spark,sqlDbConnStr,IndoorHourlySQL_DF,"usp_IndoorAirQuality_WD_Hourly_Upsert","IndoorAirQualityWDHourlyUpsert")
// Push Indoor Daily Average for past 30 days from current date to SQL Table
val IndoorDailySQL_Query = "select cast(indoorDeviceId as string) as indoorDeviceId, month, localdate, particleconcentration, cast(FROM_UNIXTIME(UNIX_TIMESTAMP()) as string) as createdTS, cast(FROM_UNIXTIME(UNIX_TIMESTAMP()) as string) as modifiedTS "+
"from indoorairquality_daily where localdate >= FROM_UNIXTIME(UNIX_TIMESTAMP()-(30*24*60*60),'yyyyMMdd')"
val IndoorDailySQL_DF = spark.sql(IndoorDailySQL_Query)
//IndoorDailySQL_DF.show()
updateDFToSqlDB(spark,sqlDbConnStr,IndoorDailySQL_DF,"usp_IndoorAirQuality_WD_Daily_Upsert","IndoorAirQualityWDDailyUpsert")
// Push Indoor Monthly Average for past 12 months from current month to SQL Table
val IndoorMonthlySQL_Query = "select cast(indoorDeviceId as string) as indoorDeviceId, year, month, particleconcentration, cast(FROM_UNIXTIME(UNIX_TIMESTAMP()) as string) as createdTS, cast(FROM_UNIXTIME(UNIX_TIMESTAMP()) as string) as modifiedTS "+
"from indoorairquality_monthly where month >= FROM_UNIXTIME(UNIX_TIMESTAMP()-(365*24*60*60),'yyyyMMdd')"
val IndoorMonthlySQL_DF = spark.sql(IndoorMonthlySQL_Query)
//IndoorMonthlySQL_DF.show()
updateDFToSqlDB(spark,sqlDbConnStr,IndoorMonthlySQL_DF,"usp_IndoorAirQuality_WD_Monthly_Upsert","IndoorAirQualityWDMonthlyUpsert")
// Insert into IndoorSpeckDeviceData from IndoorSpeckDataTmp after pushing data to SQL
/*
val IndoorSpeckDeviceData_InsertQuery = s"INSERT INTO TABLE IndoorSpeckDeviceData PARTITION(Yr,Month,Dt,Hr) SELECT indoorDeviceId,WDDataTS,particleConcentration,temperature,humidity, "+
s"FROM_UNIXTIME(UNIX_TIMESTAMP()) as createdTS,FROM_UNIXTIME(UNIX_TIMESTAMP()) as modifiedTS, cast(FROM_UNIXTIME(WDDataTS,'yyyy') as string) as utcYear, "+
s"cast(FROM_UNIXTIME(WDDataTS,'yyyyMM') as string) as utcMonth,cast(FROM_UNIXTIME(WDDataTS,'yyyyMMdd') as string) as utcDate, speckDataHr from IndoorSpeckDeviceDataTmp where createduts > $maxProcessStartUTS and createduts <= $processStartTime"
*/
val IndoorSpeckDeviceData_InsertQuery = s"INSERT INTO TABLE IndoorSpeckDeviceDataNew PARTITION(Hr) SELECT indoorDeviceId,WDDataTS,particleConcentration,temperature,humidity, "+
s"FROM_UNIXTIME(UNIX_TIMESTAMP()) as createdTS,FROM_UNIXTIME(UNIX_TIMESTAMP()) as modifiedTS, speckDataHr from WatchDogDeviceDataTmp_Spark"
spark.sql(IndoorSpeckDeviceData_InsertQuery)
}
//Update IndoorJobControl status after successful process
val curEndTime = Instant.now.getEpochSecond
val sqlConn1: Connection = getSqlJdbcConnection(sqlDbConnStr)
val IndoorJobControlUpdateQuery = s"UPDATE IndoorJobControl SET status='Processed', processenddt = getUTCdate(), processendUTS = $curEndTime where status = 'InProcess'"
updateDB(sqlConn1,IndoorJobControlUpdateQuery)
sqlConn1.close()
println(s"--------------- Indoor Speck Data Process End: $curEndTime")
} catch {
case ex: Exception => {
ex.printStackTrace()
}
}
}
def getSqlJdbcConnection(sqlDatabaseConnectionString : String): Connection ={
var con:Connection = null
try{
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
con = DriverManager.getConnection(sqlDatabaseConnectionString)
}catch{
case e:Exception => { print("Exception while Creating Connection " + e.getMessage)}
}
con
}
def updateDB(sqlConn: Connection, updateQuery: String) = {
sqlConn.createStatement().executeUpdate(updateQuery)
}
def updateDFToSqlDB(spark: SparkSession, sqlConnStr: String, localDF: DataFrame, dbProcName: String, tableType : String) = {
val records = localDF.collect()
val schema = localDF.schema
val length = records.length
val sqlConn = getSqlJdbcConnection(sqlConnStr)
if(length > 0){
val tableData = new TableData(localDF.schema,records)
val pStmt: SQLServerPreparedStatement = sqlConn.prepareStatement(s"EXECUTE $dbProcName ?").asInstanceOf[SQLServerPreparedStatement]
pStmt.setStructured(1,tableType , tableData)
pStmt.execute()
}
sqlConn.close()
}
}