Skip to content

Commit deeb11e

Browse files
Initial Commit
1 parent e6e16f6 commit deeb11e

File tree

10 files changed

+167
-0
lines changed

10 files changed

+167
-0
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,7 @@
22
*/.ipynb*
33
*Untitled*
44

5+
*/.idea*
6+
*/app-logs*
7+
*__pycache__*
8+

01-HelloSpark/HelloSpark.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import sys
2+
from pyspark.sql import *
3+
from lib.logger import Log4j
4+
from lib.utils import *
5+
6+
if __name__ == "__main__":
7+
conf = get_spark_app_config()
8+
9+
spark = SparkSession \
10+
.builder \
11+
.appName("HelloSpark") \
12+
.master("local[2]") \
13+
.getOrCreate()
14+
15+
logger = Log4j(spark)
16+
17+
if len(sys.argv) != 2:
18+
logger.error("Usage: HelloSpark <filename>")
19+
sys.exit(-1)
20+
21+
logger.info("Starting HelloSpark")
22+
23+
survey_raw_df = load_survey_df(spark, sys.argv[1])
24+
partitioned_survey_df = survey_raw_df.repartition(2)
25+
count_df = count_by_country(partitioned_survey_df)
26+
count_df.show()
27+
28+
logger.info("Finished HelloSpark")
29+
spark.stop()

01-HelloSpark/data/sample.csv

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
"Timestamp","Age","Gender","Country","state","self_employed","family_history","treatment","work_interfere","no_employees","remote_work","tech_company","benefits","care_options","wellness_program","seek_help","anonymity","leave","mental_health_consequence","phys_health_consequence","coworkers","supervisor","mental_health_interview","phys_health_interview","mental_vs_physical","obs_consequence","comments"
2+
2014-08-27 11:29:31,37,"Female","United States","IL",NA,"No","Yes","Often","6-25","No","Yes","Yes","Not sure","No","Yes","Yes","Somewhat easy","No","No","Some of them","Yes","No","Maybe","Yes","No",NA
3+
2014-08-27 11:29:37,44,"M","United States","IN",NA,"No","No","Rarely","More than 1000","No","No","Don't know","No","Don't know","Don't know","Don't know","Don't know","Maybe","No","No","No","No","No","Don't know","No",NA
4+
2014-08-27 11:29:44,32,"Male","Canada",NA,NA,"No","No","Rarely","6-25","No","Yes","No","No","No","No","Don't know","Somewhat difficult","No","No","Yes","Yes","Yes","Yes","No","No",NA
5+
2014-08-27 11:29:46,31,"Male","United Kingdom",NA,NA,"Yes","Yes","Often","26-100","No","Yes","No","Yes","No","No","No","Somewhat difficult","Yes","Yes","Some of them","No","Maybe","Maybe","No","Yes",NA
6+
2014-08-27 11:30:22,31,"Male","United States","TX",NA,"No","No","Never","100-500","Yes","Yes","Yes","No","Don't know","Don't know","Don't know","Don't know","No","No","Some of them","Yes","Yes","Yes","Don't know","No",NA
7+
2014-08-27 11:31:22,33,"Male","United States","TN",NA,"Yes","No","Sometimes","6-25","No","Yes","Yes","Not sure","No","Don't know","Don't know","Don't know","No","No","Yes","Yes","No","Maybe","Don't know","No",NA
8+
2014-08-27 11:31:50,35,"Female","United States","MI",NA,"Yes","Yes","Sometimes","1-5","Yes","Yes","No","No","No","No","No","Somewhat difficult","Maybe","Maybe","Some of them","No","No","No","Don't know","No",NA
9+
2014-08-27 11:32:05,39,"M","Canada",NA,NA,"No","No","Never","1-5","Yes","Yes","No","Yes","No","No","Yes","Don't know","No","No","No","No","No","No","No","No",NA
10+
2014-08-27 11:32:39,42,"Female","United States","IL",NA,"Yes","Yes","Sometimes","100-500","No","Yes","Yes","Yes","No","No","No","Very difficult","Maybe","No","Yes","Yes","No","Maybe","No","No",NA

01-HelloSpark/lib/__init__.py

Whitespace-only changes.

01-HelloSpark/lib/logger.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
class Log4j(object):
2+
def __init__(self, spark):
3+
root_class = "guru.learningjournal.spark.examples"
4+
conf = spark.sparkContext.getConf()
5+
app_name = conf.get("spark.app.name")
6+
log4j = spark._jvm.org.apache.log4j
7+
self.logger = log4j.LogManager.getLogger(root_class + "." + app_name)
8+
9+
def warn(self, message):
10+
self.logger.warn(message)
11+
12+
def info(self, message):
13+
self.logger.info(message)
14+
15+
def error(self, message):
16+
self.logger.error(message)
17+
18+
def debug(self, message):
19+
self.logger.debug(message)

01-HelloSpark/lib/utils.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import configparser
2+
3+
from pyspark import SparkConf
4+
5+
6+
def load_survey_df(spark, data_file):
7+
return spark.read \
8+
.option("header", "true") \
9+
.option("inferSchema", "true") \
10+
.csv(data_file)
11+
12+
13+
def count_by_country(survey_df):
14+
return survey_df.filter("Age < 40") \
15+
.select("Age", "Gender", "Country", "state") \
16+
.groupBy("Country") \
17+
.count()
18+
19+
20+
def get_spark_app_config():
21+
spark_conf = SparkConf()
22+
config = configparser.ConfigParser()
23+
config.read("spark.conf")
24+
25+
for (key, val) in config.items("SPARK_APP_CONFIGS"):
26+
spark_conf.set(key, val)
27+
return spark_conf

01-HelloSpark/log4j.properties

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Set everything to be logged to the console
2+
log4j.rootCategory=WARN, console
3+
4+
# define console appender
5+
log4j.appender.console=org.apache.log4j.ConsoleAppender
6+
log4j.appender.console.target=System.out
7+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
9+
10+
#application log
11+
log4j.logger.guru.learningjournal.spark.examples=INFO, console, file
12+
log4j.additivity.guru.learningjournal.spark.examples=false
13+
14+
#define rolling file appender
15+
log4j.appender.file=org.apache.log4j.RollingFileAppender
16+
log4j.appender.file.File=${spark.yarn.app.container.log.dir}/${logfile.name}.log
17+
#log4j.appender.file.File=app-logs/hello-spark.log
18+
#define following in Java System
19+
# -Dlog4j.configuration=file:log4j.properties
20+
# -Dlogfile.name=hello-spark
21+
# -Dspark.yarn.app.container.log.dir=app-logs
22+
log4j.appender.file.ImmediateFlush=true
23+
log4j.appender.file.Append=false
24+
log4j.appender.file.MaxFileSize=500MB
25+
log4j.appender.file.MaxBackupIndex=2
26+
log4j.appender.file.layout=org.apache.log4j.PatternLayout
27+
log4j.appender.file.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
28+
29+
30+
# Recommendations from Spark template
31+
log4j.logger.org.apache.spark.repl.Main=WARN
32+
log4j.logger.org.spark_project.jetty=WARN
33+
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
34+
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
35+
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
36+
log4j.logger.org.apache.parquet=ERROR
37+
log4j.logger.parquet=ERROR
38+
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
39+
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
40+

01-HelloSpark/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pyspark==2.4.5

01-HelloSpark/spark.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[SPARK_APP_CONFIGS]
2+
spark.app.name = HelloSpark
3+
spark.master = local[3]
4+
spark.sql.shuffle.partitions = 2

01-HelloSpark/test_utile.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from unittest import TestCase
2+
from pyspark.sql import SparkSession
3+
from lib.utils import load_survey_df, count_by_country
4+
5+
6+
class UtilsTestCase(TestCase):
7+
spark = None
8+
9+
@classmethod
10+
def setUpClass(cls) -> None:
11+
cls.spark = SparkSession.builder \
12+
.master("local[3]") \
13+
.appName("HelloSparkTest") \
14+
.getOrCreate()
15+
16+
def test_datafile_loading(self):
17+
sample_df = load_survey_df(self.spark, "data/sample.csv")
18+
result_count = sample_df.count()
19+
self.assertEqual(result_count, 9, "Record count should be 9")
20+
21+
def test_country_count(self):
22+
sample_df = load_survey_df(self.spark, "data/sample.csv")
23+
count_list = count_by_country(sample_df).collect()
24+
count_dict = dict()
25+
for row in count_list:
26+
count_dict[row["Country"]] = row["count"]
27+
self.assertEqual(count_dict["United States"], 4, "Count for United States should be 4")
28+
self.assertEqual(count_dict["Canada"], 2, "Count for Canada should be 2")
29+
self.assertEqual(count_dict["United Kingdom"], 1, "Count for Unites Kingdom should be 1")
30+
31+
@classmethod
32+
def tearDownClass(cls) -> None:
33+
cls.spark.stop()

0 commit comments

Comments
 (0)