Skip to content

fix: Add missing files and comments to better match the course content #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -9,3 +9,18 @@
*.crc
*.parquet
*/*/_SUCCESS

# JetBrains IDE
.idea/

# Spark-Programming-In-Python
app-logs/
metastore_db/
spark-warehouse/
derby.log

# 05-DataSinkDemo
05-DataSinkDemo/dataSink/

# 14-GroupingDemo
14-GroupingDemo/output/
25 changes: 25 additions & 0 deletions .run/runConfigurations/HelloRDD.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="HelloRDD" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="Spark-Programming-In-Python" />
<option name="ENV_FILES" value="" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/02-HelloRDD" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/02-HelloRDD/HelloRDD.py" />
<option name="PARAMETERS" value="data/sample.csv" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
25 changes: 25 additions & 0 deletions .run/runConfigurations/HelloSpark.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="HelloSpark" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="Spark-Programming-In-Python" />
<option name="ENV_FILES" value="" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/01-HelloSpark" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/01-HelloSpark/HelloSpark.py" />
<option name="PARAMETERS" value="data/sample.csv" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
25 changes: 25 additions & 0 deletions .run/runConfigurations/HelloSparkSQL.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="HelloSparkSQL" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="Spark-Programming-In-Python" />
<option name="ENV_FILES" value="" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/03-HelloSparkSQL" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/03-HelloSparkSQL/HelloSparkSQL.py" />
<option name="PARAMETERS" value="data/sample.csv" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
4 changes: 4 additions & 0 deletions 01-HelloSpark/test_utile.py → 01-HelloSpark/test_utils.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,10 @@ def setUpClass(cls) -> None:
.appName("HelloSparkTest") \
.getOrCreate()

@classmethod
def tearDownClass(cls) -> None:
cls.spark.stop()

def test_datafile_loading(self):
sample_df = load_survey_df(self.spark, "data/sample.csv")
result_count = sample_df.count()
16 changes: 16 additions & 0 deletions 04-SparkSchemaDemo/SparkSchemaDemo.py
Original file line number Diff line number Diff line change
@@ -34,6 +34,13 @@
ORIGIN_CITY_NAME STRING, DEST STRING, DEST_CITY_NAME STRING, CRS_DEP_TIME INT, DEP_TIME INT,
WHEELS_ON INT, TAXI_IN INT, CRS_ARR_TIME INT, ARR_TIME INT, CANCELLED INT, DISTANCE INT"""

# CSV with inferred schema, this gives incorrect schema type for FL_DATE (string)
# flightTimeCsvDF = spark.read \
# .format("csv") \
# .option("header", "true") \
# .option("inferSchema", "true") \
# .load("data/flight*.csv")

flightTimeCsvDF = spark.read \
.format("csv") \
.option("header", "true") \
@@ -45,6 +52,11 @@
flightTimeCsvDF.show(5)
logger.info("CSV Schema:" + flightTimeCsvDF.schema.simpleString())

# JSON with inferred schema, this gives incorrect schema type for FL_DATE (string)
# flightTimeJsonDF = spark.read \
# .format("json") \
# .load("data/flight*.json")

flightTimeJsonDF = spark.read \
.format("json") \
.schema(flightSchemaDDL) \
@@ -54,9 +66,13 @@
flightTimeJsonDF.show(5)
logger.info("JSON Schema:" + flightTimeJsonDF.schema.simpleString())

# Parquet files include the schema, so no need to specify it
# There is a PyCharm plugin named "Avro and Parquet Viewer" you can install to view the parquet file schema/data.
flightTimeParquetDF = spark.read \
.format("parquet") \
.load("data/flight*.parquet")

flightTimeParquetDF.show(5)
logger.info("Parquet Schema:" + flightTimeParquetDF.schema.simpleString())

spark.stop()
24 changes: 17 additions & 7 deletions 05-DataSinkDemo/DataSinkDemo.py
Original file line number Diff line number Diff line change
@@ -19,6 +19,13 @@
logger.info("Num Partitions before: " + str(flightTimeParquetDF.rdd.getNumPartitions()))
flightTimeParquetDF.groupBy(spark_partition_id()).count().show()

# Writes only 1 file because all the records are in the partition 0 (even though there are 2 partitions)
# flightTimeParquetDF.write \
# .format("avro") \
# .mode("overwrite") \
# .option("path", "dataSink/avro/") \
# .save()

partitionedDF = flightTimeParquetDF.repartition(5)
logger.info("Num Partitions after: " + str(partitionedDF.rdd.getNumPartitions()))
partitionedDF.groupBy(spark_partition_id()).count().show()
@@ -29,10 +36,13 @@
.option("path", "dataSink/avro/") \
.save()

flightTimeParquetDF.write \
.format("json") \
.mode("overwrite") \
.option("path", "dataSink/json/") \
.partitionBy("OP_CARRIER", "ORIGIN") \
.option("maxRecordsPerFile", 10000) \
.save()
# This will take some time to complete
(flightTimeParquetDF.write
.format("json")
.mode("overwrite")
.option("path", "dataSink/json/")
.partitionBy("OP_CARRIER", "ORIGIN")
.option("maxRecordsPerFile", 10000) # Use this to control the number of records per file and file size
.save())

spark.stop()
26 changes: 16 additions & 10 deletions 06-SparkSQLTableDemo/SparkSQLTableDemo.py
Original file line number Diff line number Diff line change
@@ -3,12 +3,12 @@
from lib.logger import Log4j

if __name__ == "__main__":
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSQLTableDemo") \
.enableHiveSupport() \
.getOrCreate()
spark = (SparkSession
.builder
.master("local[3]")
.appName("SparkSQLTableDemo")
.enableHiveSupport() # Needed to allow the connectivity to a persistent Hive metastore
.getOrCreate())

logger = Log4j(spark)

@@ -17,10 +17,16 @@
.load("dataSource/")

spark.sql("CREATE DATABASE IF NOT EXISTS AIRLINE_DB")
spark.catalog.setCurrentDatabase("AIRLINE_DB")
spark.catalog.setCurrentDatabase("AIRLINE_DB") # If not set, it will default to the default database

flightTimeParquetDF.write \
.mode("overwrite") \
.saveAsTable("flight_data_tbl")
(flightTimeParquetDF.write
# .format("csv") # Uncomment if you would like to inspect the records, or use parquet plugin for PyCharm
.mode("overwrite")
# .partitionBy("ORIGIN", "OP_CARRIER") # You do not want to partition on a column with too many unique values
.bucketBy(5, "OP_CARRIER", "ORIGIN") # Bucketing is a way to distribute data across a fixed number of files
.sortBy("OP_CARRIER", "ORIGIN") # Companion for bucketBy, it allows the files to be ready by certain operations
.saveAsTable("flight_data_tbl")) # Alternatively you can use saveAsTable("AIRLINE_DB.flight_data_tbl")

logger.info(spark.catalog.listTables("AIRLINE_DB"))

spark.stop()
103 changes: 102 additions & 1 deletion 07-Notebook/MyPythonNotebook.ipynb
Original file line number Diff line number Diff line change
@@ -1 +1,102 @@
{"cells":[{"cell_type":"code","source":["from pyspark.sql import *\nfrom pyspark.sql.functions import *\nfrom pyspark.sql.types import *\n\ndef to_date_df(df, fmt, fld):\n return df.withColumn(fld, to_date(col(fld), fmt))"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":1},{"cell_type":"code","source":["my_schema = StructType([\n StructField(\"ID\", StringType()),\n StructField(\"EventDate\", StringType())])\n\nmy_rows = [Row(\"123\", \"04/05/2020\"), Row(\"124\", \"4/5/2020\"), Row(\"125\", \"04/5/2020\"), Row(\"126\", \"4/05/2020\")]\nmy_rdd = spark.sparkContext.parallelize(my_rows, 2)\nmy_df = spark.createDataFrame(my_rdd, my_schema)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":2},{"cell_type":"code","source":["my_df.printSchema()\nmy_df.show()\nnew_df = to_date_df(my_df, \"M/d/y\", \"EventDate\")\nnew_df.printSchema()\nnew_df.show() "],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\">root\n-- ID: string (nullable = true)\n-- EventDate: string (nullable = true)\n\n+---+----------+\n ID| EventDate|\n+---+----------+\n123|04/05/2020|\n124| 4/5/2020|\n125| 04/5/2020|\n126| 4/05/2020|\n+---+----------+\n\nroot\n-- ID: string (nullable = true)\n-- EventDate: date (nullable = true)\n\n+---+----------+\n ID| EventDate|\n+---+----------+\n123|2020-04-05|\n124|2020-04-05|\n125|2020-04-05|\n126|2020-04-05|\n+---+----------+\n\n</div>"]}}],"execution_count":3}],"metadata":{"name":"MyPythonNotebook","notebookId":2858727166054233},"nbformat":4,"nbformat_minor":0}
{
"cells": [
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "cc1e9686-728e-460d-83c7-570ed284664d",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
"from pyspark.sql import *\n",
"from pyspark.sql.functions import *\n",
"from pyspark.sql.types import *\n",
"\n",
"def to_date_df(df, fmt, fld):\n",
" return df.withColumn(fld, to_date(col(fld), fmt))"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "1f7e7d9c-53fc-452d-af00-f49c38616c76",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
"my_schema = StructType([\n",
" StructField(\"ID\", StringType()),\n",
" StructField(\"EventDate\", StringType())])\n",
"\n",
"my_rows = [Row(\"123\", \"04/05/2020\"), Row(\"124\", \"4/5/2020\"), Row(\"125\", \"04/5/2020\"), Row(\"126\", \"4/05/2020\")]\n",
"my_rdd = spark.sparkContext.parallelize(my_rows, 2)\n",
"my_df = spark.createDataFrame(my_rdd, my_schema)"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "61a01972-dea8-41d2-a05b-82311b420bd8",
"showTitle": false,
"title": ""
}
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"output_type": "stream",
"text": [
"root\n |-- ID: string (nullable = true)\n |-- EventDate: string (nullable = true)\n\n+---+----------+\n| ID| EventDate|\n+---+----------+\n|123|04/05/2020|\n|124| 4/5/2020|\n|125| 04/5/2020|\n|126| 4/05/2020|\n+---+----------+\n\nroot\n |-- ID: string (nullable = true)\n |-- EventDate: date (nullable = true)\n\n+---+----------+\n| ID| EventDate|\n+---+----------+\n|123|2020-04-05|\n|124|2020-04-05|\n|125|2020-04-05|\n|126|2020-04-05|\n+---+----------+\n\n"
]
}
],
"source": [
"my_df.printSchema()\n",
"my_df.show()\n",
"new_df = to_date_df(my_df, \"M/d/y\", \"EventDate\")\n",
"new_df.printSchema()\n",
"new_df.show() "
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"dashboards": [],
"environmentMetadata": null,
"language": "python",
"notebookMetadata": {
"pythonIndentUnit": 4
},
"notebookName": "MyPythonNotebook",
"widgets": {}
}
},
"nbformat": 4,
"nbformat_minor": 0
}
2 changes: 2 additions & 0 deletions 08-RowDemo/RowDemo.py
Original file line number Diff line number Diff line change
@@ -31,3 +31,5 @@ def to_date_df(df, fmt, fld):
new_df = to_date_df(my_df, "M/d/y", "EventDate")
new_df.printSchema()
new_df.show()

spark.stop()
4 changes: 4 additions & 0 deletions 08-RowDemo/RowDemo_Test.py
Original file line number Diff line number Diff line change
@@ -24,6 +24,10 @@ def setUpClass(cls) -> None:
my_rdd = cls.spark.sparkContext.parallelize(my_rows, 2)
cls.my_df = cls.spark.createDataFrame(my_rdd, my_schema)

@classmethod
def tearDownClass(cls) -> None:
cls.spark.stop()

def test_data_type(self):
rows = to_date_df(self.my_df, "M/d/y", "EventDate").collect()
for row in rows:
9 changes: 9 additions & 0 deletions 09-LogFileDemo/LogFileDemo.py
Original file line number Diff line number Diff line change
@@ -18,9 +18,18 @@
regexp_extract('value', log_reg, 6).alias('request'),
regexp_extract('value', log_reg, 10).alias('referrer'))

logs_df.printSchema()

# Pre-transformation where hosts are not properly grouped
# logs_df.groupBy("referrer") \
# .count() \
# .show(100, truncate=False)

logs_df \
.where("trim(referrer) != '-' ") \
.withColumn("referrer", substring_index("referrer", "/", 3)) \
.groupBy("referrer") \
.count() \
.show(100, truncate=False)

spark.stop()
177 changes: 176 additions & 1 deletion 10-ExploringColumns/ColumnsNotebook.ipynb

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions 11-UDFDemo/UDFDemo.py
Original file line number Diff line number Diff line change
@@ -46,3 +46,5 @@ def parse_gender(gender):

survey_df3 = survey_df.withColumn("Gender", expr("parse_gender_udf(Gender)"))
survey_df3.show(10)

spark.stop()
2 changes: 2 additions & 0 deletions 13-AggDemo/AggDemo.py
Original file line number Diff line number Diff line change
@@ -49,3 +49,5 @@
)

summary_df.show()

spark.stop()
2 changes: 2 additions & 0 deletions 14-GroupingDemo/GroupingDemo.py
Original file line number Diff line number Diff line change
@@ -36,3 +36,5 @@
.save("output")

exSummary_df.sort("Country", "WeekNumber").show()

spark.stop()
9 changes: 6 additions & 3 deletions 15-WindowingDemo/WindowingDemo.py
Original file line number Diff line number Diff line change
@@ -14,10 +14,13 @@

summary_df = spark.read.parquet("data/summary.parquet")

running_total_window = Window.partitionBy("Country") \
.orderBy("WeekNumber") \
.rowsBetween(-2, Window.currentRow)
running_total_window = (Window.partitionBy("Country")
.orderBy("WeekNumber")
# .rowsBetween(Window.unboundedPreceding, Window.currentRow)) # For all previous rows
.rowsBetween(-2, Window.currentRow)) # For 2 rows before current row - 3 week window

summary_df.withColumn("RunningTotal",
f.sum("InvoiceValue").over(running_total_window)) \
.show()

spark.stop()
2 changes: 2 additions & 0 deletions 16-RankingDemo/RankingDemo.py
Original file line number Diff line number Diff line change
@@ -24,3 +24,5 @@
.where(f.col("Rank") == 1) \
.sort("Country", "WeekNumber") \
.show()

spark.stop()
2 changes: 2 additions & 0 deletions 17-SparkJoinDemo/SparkJoinDemo.py
Original file line number Diff line number Diff line change
@@ -45,3 +45,5 @@
.drop(product_renamed_df.prod_id) \
.select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty") \
.show()

spark.stop()
10 changes: 9 additions & 1 deletion 18-OuterJoinDemo/OuterJoinDemo.py
Original file line number Diff line number Diff line change
@@ -41,10 +41,18 @@

product_renamed_df = product_df.withColumnRenamed("qty", "reorder_qty")

order_df.join(product_renamed_df, join_expr, "outer") \
.drop(product_renamed_df.prod_id) \
.select("*") \
.sort("order_id") \
.show()

order_df.join(product_renamed_df, join_expr, "left") \
.drop(product_renamed_df.prod_id) \
.select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty") \
.withColumn("prod_name", expr("coalesce(prod_name, prod_id)")) \
.withColumn("list_price", expr("coalesce(list_price, unit_price)")) \
.sort("order_id") \
.show()
.show()

spark.stop()
12 changes: 11 additions & 1 deletion 19-ShuffleJoinDemo/SuffleJoinDemo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

from lib.logger import Log4j

@@ -14,10 +15,19 @@
flight_time_df1 = spark.read.json("data/d1/")
flight_time_df2 = spark.read.json("data/d2/")

# Ensure we get 3 partitions after the shuffle
spark.conf.set("spark.sql.shuffle.partitions", 3)

join_expr = flight_time_df1.id == flight_time_df2.id
# Shuffle Join
join_df = flight_time_df1.join(flight_time_df2, join_expr, "inner")

join_df.collect()
input("press a key to stop...")

# Broadcast Join
broadcast_df = flight_time_df1.join(broadcast(flight_time_df2), join_expr, "inner")

broadcast_df.count()
input("http://localhost:4040/jobs/ - http://localhost:4040/SQL/: press a key to stop...")

spark.stop()
42 changes: 16 additions & 26 deletions 20-BucketJoinDemo/BucketJoinDemo.py
Original file line number Diff line number Diff line change
@@ -15,43 +15,33 @@
df2 = spark.read.json("data/d2/")
# df1.show()
# df2.show()
'''

spark.sql("CREATE DATABASE IF NOT EXISTS MY_DB")
spark.sql("USE MY_DB")

df1.coalesce(1).write \
.bucketBy(3, "id") \
.mode("overwrite") \
.saveAsTable("MY_DB.flight_data1")
if not spark.catalog.tableExists("MY_DB.flight_data1"):
logger.info("MY_DB.flight_data1 table does not exist, creating it")
df1.coalesce(1).write \
.bucketBy(3, "id") \
.mode("overwrite") \
.saveAsTable("MY_DB.flight_data1")

df2.coalesce(1).write \
.bucketBy(3, "id") \
.mode("overwrite") \
.saveAsTable("MY_DB.flight_data2")
'''
if not spark.catalog.tableExists("MY_DB.flight_data2"):
logger.info("MY_DB.flight_data2 table does not exist, creating it")
df2.coalesce(1).write \
.bucketBy(3, "id") \
.mode("overwrite") \
.saveAsTable("MY_DB.flight_data2")

df3 = spark.read.table("MY_DB.flight_data1")
df4 = spark.read.table("MY_DB.flight_data2")

# Disable the broadcast join, we want SortMergeJoin without shuffle
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
join_expr = df3.id == df4.id
join_df = df3.join(df4, join_expr, "inner")

join_df.collect()
input("press a key to stop...")















input("http://localhost:4040/SQL/: press a key to stop...")

spark.stop()
28 changes: 9 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,17 @@
# Apache Spark 3 - Spark Programming in Python for Beginners
This is the central repository for all the materials related to <em>Apache Spark 3 - Spark Programming in Python for Beginners</em> <br>Course by Prashant Pandey.
<br> You can get the full course at <a href="https://www.udemy.com/course/draft/3184584/?referralCode=77E18B4F800479A263D5">
Apache Spark Course @ Udemy.
</a>
This is the central repository for all the materials related to *Apache Spark 3 - Spark Programming in Python for Beginners*
Course by Prashant Pandey.

<div>
<a href="https://www.udemy.com/course/draft/3184584/?referralCode=77E18B4F800479A263D5">
<img src="https://www.learningjournal.guru/_resources/img/jpg-5x/spark-beginners-course.jpg" alt="Apache Spark 3 - Spark Programming in Python for Beginners" width="300" align="left">
</a>
You can get the full course at [Apache Spark Course @ Udemy](https://www.udemy.com/course/draft/3184584/?referralCode=77E18B4F800479A263D5).

<h2> Description </h2>
<p align="justify">
I am creating <em>Apache Spark 3 - Spark Programming in Python for Beginners </em>course to help you understand the Spark programming and apply that knowledge to build data engineering solutions. This course is example-driven and follows a working session like approach. We will be taking a live coding approach and explain all the needed concepts along the way.
</p>
![Apache Spark 3 - Spark Programming in Python for Beginners](https://www.learningjournal.guru/_resources/img/jpg-5x/spark-beginners-course.jpg)

<h3>Who should take this Course?</h3>
<p align="justify">
## Description
I am creating *Apache Spark 3 - Spark Programming in Python for Beginners* course to help you understand the Spark programming and apply that knowledge to build data engineering solutions. This course is example-driven and follows a working session like approach. We will be taking a live coding approach and explain all the needed concepts along the way.

### Who should take this Course?
I designed this course for software engineers willing to develop a Data Engineering pipeline and application using the Apache Spark. I am also creating this course for data architects and data engineers who are responsible for designing and building the organization’s data-centric infrastructure. Another group of people is the managers and architects who do not directly work with Spark implementation. Still, they work with the people who implement Apache Spark at the ground level.
</p>

<h3>Spark and source code version</h3>
<p align="justify">
### Spark and source code version
This Course is using the Apache Spark 3.x. I have tested all the source code and examples used in this Course on Apache Spark 3.0.0 open-source distribution.
</p>

</div>
66 changes: 66 additions & 0 deletions SETUP.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Setup
## macOS
### Homebrew
Homebrew is a package manager for macOS that makes it easy to install and manage software. Follow these steps to install Homebrew:

1. Open the Terminal.
2. Paste the following command and press Enter:
```shell
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
```

### Installing Dependencies
Install anaconda
```shell
brew install --cask anaconda
```

Install jdk 11
```shell
brew install java11
```

Install Apache Spark
```shell
brew install apache-spark
```

Determine the Spark and Scala version
```shell
pyspark # The version will show at the top of the output
```
```pyspark
print(version) # This will print <your-spark-version>
print(".".join(spark.sparkContext._jvm.scala.util.Properties.versionNumberString().split(".")[:2])) # This will print <your-scala-version>
exit()
```

Configuring the Spark Project Application Logs (Log4J) - Be sure to change `<your-scala-version` and `<your-spark-version>` to the versions you found above
```shell
cd /opt/homebrew/Cellar/apache-spark/<your-spark-version>/libexec/conf
cp spark-defaults.conf.template spark-defaults.conf
echo "spark.driver.extraJavaOptions -Dlog4j.configuration=file:log4j.properties -Dspark.yarn.app.container.log.dir=app-logs -Dlogfile.name=hello-spark" >> spark-defaults.conf
echo "spark.jars.packages org.apache.spark:spark-avro_<your-scala-version>:<your-spark-version>" >> spark-defaults.conf
```

Add Java and Spark to the PATH (bash_profile or zshrc)
```shell
# JDK
export PATH="/opt/homebrew/opt/openjdk@11/bin:$PATH"
# Spark
export SPARK_HOME=/opt/homebrew/Cellar/apache-spark/<your-spark-version>/libexec
export PATH=$SPARK_HOME/bin:$PATH
```

If you are using conda, it will use the SPARK_HOME configured here. You will need to **restart PyCharm** in order for the
environment variables to take effect. Not restarting after setting these will result in things like logs not working as expected.

Set your python interpreter to a conda environment
1. PyCharm -> Settings... -> Project -> Python Interpreter
2. Add Interpreter -> Add Local Interpreter...
3. Select `Conda Environment` on the left panel -> `Create new environment` -> OK

Install pyspark
1. At the bottom left, there is an icon that looks like a stack of rectangles.
2. Click on it and search for "pyspark".
3. Click on the `install` button, select the same version as `<your-spark-version>` from above and wait for it to finish.