forked from veeraravi/Spark-notes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark-df-example.txt
113 lines (28 loc) · 2.63 KB
/
spark-df-example.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
spark-shell –master yarn --num-executors 2
import java.util.Calendar
import org.apache.spark.sql.SparkSession
import spark.implicits._
val format = new java.text.SimpleDateFormat("dd-MM-yyyy")
val date = format.format(new java.util.Date())
val security_data = spark.read.parquet("s3://rfi-staging/21-12-2016/*")
security_data.createOrReplaceTempView("security_master_table")
val sqlDF_security_master = spark.sql("SELECT TICKER,ASSET_CLASS_CODE FROM security_master_table")
val country = spark.read.parquet("s3://rfi-staging/reference-files/Country/*")
country.createOrReplaceTempView("country_table")
val sqlDF_country = spark.sql("SELECT count(*) from country_table")
//sqlDF_country.show
val country_join = spark.sql("SELECT * from security_master_table s JOIN country_table c where s.Country_Issue = c.COUNTRY_ISO_2")
country_join.saveAsParquetFile("s3://rfi-processed/good-records" + date)
val currency = spark.read.parquet("s3://rfi-staging/reference-files/Currency/*")
currency.createOrReplaceTempView("currency_table")
val sqlDF_currency = spark.sql("SELECT count(*) from currency_table")
sqlDF_currency.show
val currency_join = spark.sql("SELECT * from security_master_table s join currency_table cur where s.CURRENCY_TRADED = cur.ISO_CCY_CODE")
currency_join.show
val issue_master = spark.read.parquet("s3://rfi-staging/reference-files/Issue_Master/*")
issue_master.createOrReplaceTempView("issue_master_table")
val sqlDF_issue_master = spark.sql("SELECT count(*) from issue_master_table")
sqlDF_issue_master.show
val issue_master_join = spark.sql("SELECT * from security_master_table s join issue_master_table im where s.ISSUER = im.ISSUER")
issue_master_join.show
val final_gooddata = spark.sql("SELECT sm.CADIS_ID, sm.ASSET_CLASS_CODE, sm.BB_EXCH_CODE, sm.CADIS_ID_ISSUER, sm.COUNTRY_INCORPORATION, sm.COUNTRY_ISSUE, cou.COUNTRY_NAME as COUNTRY_ISSUE_DESC, sm.CURRENCY_TRADED, cur.ISO_DESCRIPTION as CURRENCY_TRADED, sm.CUSIP, sm.CUSIP/ISIN,sm.DATA_QUALITY_IND, sm.INDUSTRY_CODE, sm.ISIN, sm.ISSUE_DATE, iss.LONG_NAME as ISSUER_LONG_NAME, iss.SHORT_NAME as ISSUER_SHORT_NAME, sm.PRIMARY_EXCHANGE, sm.PRIMARY_SECURITY_TICKER, sm.SEC_TYP_BB, sm.SEC_TYP2_BB, sm.SEDOL, sm.TICKER, sm.TICKER_BB, sm.CURRENCY_SETTLEMENT from security_master_table sm JOIN country_table cou JOIN currency_table cur JOIN issue_master_table iss WHERE iss.CADIS_ID_ISSUER = sm.CADIS_ID_ISSUER and cou.COUNTRY_ISO_2 = sm.COUNTRY_ISSUE and sm.CURRENCY_TRADED = cur.ISO_CCY_CODE")