forked from veeraravi/Spark-notes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDataFrames.scala
60 lines (44 loc) · 1.79 KB
/
DataFrames.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.sundogsoftware.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
object DataFrames {
case class Person(ID:Int, name:String, age:Int, numFriends:Int)
def mapper(line:String): Person = {
val fields = line.split(',')
val person:Person = Person(fields(0).toInt, fields(1), fields(2).toInt, fields(3).toInt)
return person
}
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Use new SparkSession interface in Spark 2.0
val spark = SparkSession
.builder
.appName("SparkSQL")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
.getOrCreate()
// Convert our csv file to a DataSet, using our Person case
// class to infer the schema.
import spark.implicits._
val lines = spark.sparkContext.textFile("../fakefriends.csv")
val people = lines.map(mapper).toDS().cache()
// There are lots of other ways to make a DataFrame.
// For example, spark.read.json("json file path")
// or sqlContext.table("Hive table name")
println("Here is our inferred schema:")
people.printSchema()
println("Let's select the name column:")
people.select("name").show()
println("Filter out anyone over 21:")
people.filter(people("age") < 21).show()
println("Group by age:")
people.groupBy("age").count().show()
println("Make everyone 10 years older:")
people.select(people("name"), people("age") + 10).show()
spark.stop()
}
}