Skip to content

Commit c2f545b

Browse files
committed
Initial commit
0 parents  commit c2f545b

File tree

4 files changed

+128
-0
lines changed

4 files changed

+128
-0
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*.class
2+
*.log
3+
.idea
4+
project
5+
target
6+
data

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Java Version 1.8
2+
Scala Version 2.12.10
3+
Spark Version 3.0.0-preview2

build.sbt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
name := "spark-tutorial"
2+
version := "0.1"
3+
scalaVersion := "2.12.10"
4+
5+
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
6+
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0-preview2"
7+
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0-preview2"

src/main/scala/SparkTutorial.scala

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import org.apache.log4j.{Level, Logger}
2+
import org.apache.spark.sql.SparkSession
3+
4+
object SparkTutorial {
5+
def main(args: Array[String]): Unit = {
6+
7+
// Turn off logging
8+
Logger.getLogger("org").setLevel(Level.OFF)
9+
Logger.getLogger("akka").setLevel(Level.OFF)
10+
11+
//--------------------------------------------------------------------------------------------------------
12+
// Setting up a Spark Session
13+
//--------------------------------------------------------------------------------------------------------
14+
15+
// Create a SparkSession to work with Spark
16+
val sparkBuilder = SparkSession
17+
.builder()
18+
.appName("SparkTutorial")
19+
.master("local[4]") // local, with 4 worker cores
20+
val spark = sparkBuilder.getOrCreate()
21+
22+
// Set the default number of shuffle partitions (default is 200, which is too high for local deployment)
23+
spark.conf.set("spark.sql.shuffle.partitions", "8") //
24+
25+
// Importing implicit encoders for standard library classes and tuples that are used as Dataset types
26+
import spark.implicits._
27+
28+
println("-----------------------------------------------------------------------------------------------")
29+
30+
//--------------------------------------------------------------------------------------------------------
31+
// Loading data
32+
//--------------------------------------------------------------------------------------------------------
33+
34+
// Create a Dataset programmatically
35+
val numbers = spark.createDataset((0 until 100).toList)
36+
37+
// Read a Dataset from a file
38+
val customers = spark.read
39+
.option("inferSchema", "true")
40+
.option("header", "true")
41+
.option("sep", ";")
42+
.csv("data/tpch_customer.csv") // also text, json, jdbc, parquet
43+
.as[(Int, String, String, Int, String, String, String, String)]
44+
45+
println("-----------------------------------------------------------------------------------------------")
46+
47+
//--------------------------------------------------------------------------------------------------------
48+
// Basic transformations
49+
//--------------------------------------------------------------------------------------------------------
50+
51+
// Basic transformations on datasets return new datasets
52+
val mapped = numbers.map(i => "This is a number: " + i)
53+
val filtered = mapped.filter(s => s.contains("1"))
54+
val sorted = filtered.sort()
55+
List(numbers, mapped, filtered, sorted).foreach(dataset => println(dataset.getClass))
56+
sorted.show()
57+
58+
println("-----------------------------------------------------------------------------------------------")
59+
60+
// Basic terminal operations
61+
val collected = filtered.collect() // collects the entire dataset to the driver process
62+
val reduced = filtered.reduce((s1, s2) => s1 + "," + s2) // reduces all values successively to one
63+
filtered.foreach(s => println(s)) // performs an action for each element (take care where the action is evaluated!)
64+
List(collected, reduced).foreach(result => println(result.getClass))
65+
66+
println("-----------------------------------------------------------------------------------------------")
67+
68+
// DataFrame and Dataset
69+
val untypedDF = numbers.toDF() // DS to DF
70+
val stringTypedDS = untypedDF.map(r => r.get(0).toString) // DF to DS via map
71+
val integerTypedDS = untypedDF.as[Int] // DF to DS via as() function that cast columns to a concrete types
72+
List(untypedDF, stringTypedDS, integerTypedDS).foreach(result => println(result.head.getClass))
73+
List(untypedDF, stringTypedDS, integerTypedDS).foreach(result => println(result.head))
74+
75+
println("-----------------------------------------------------------------------------------------------")
76+
77+
// Mapping to tuples
78+
numbers
79+
.map(i => (i, "nonce", 3.1415, true))
80+
.take(10)
81+
.foreach(println(_))
82+
83+
println("-----------------------------------------------------------------------------------------------")
84+
85+
// SQL on DataFrames
86+
customers.createOrReplaceTempView("customers") // make this dataframe visible as a table
87+
val sqlResult = spark.sql("SELECT * FROM customers WHERE C_NATIONKEY = 15") // perform an sql query on the table
88+
89+
import org.apache.spark.sql.functions._
90+
91+
sqlResult // DF
92+
.as[(Int, String, String, Int, String, String, String, String)] // DS
93+
.sort(desc("C_NATIONKEY")) // desc() is a standard function from the spark.sql.functions package
94+
.head(10)
95+
.foreach(println(_))
96+
97+
println("-----------------------------------------------------------------------------------------------")
98+
99+
// Grouping and aggregation for Datasets
100+
// val topEarners = customers
101+
// .groupByKey { case (name, age, salary, company) => company }
102+
// .mapGroups { case (key, iterator) =>
103+
// val topEarner = iterator.toList.maxBy(t => t._3) // could be problematic: Why?
104+
// (key, topEarner._1, topEarner._3)
105+
// }
106+
// .sort(desc("_3"))
107+
// topEarners.collect().foreach(t => println(t._1 + "'s top earner is " + t._2 + " with salary " + t._3))
108+
109+
println("-----------------------------------------------------------------------------------------------")
110+
111+
}
112+
}

0 commit comments

Comments
 (0)