Skip to content

Commit 27e8bb5

Browse files
authored
Spark 3.3 support (#1194)
Support running spark.net on Spark3.3.0, Spark3.3.1, Spark3.3.2, Spark3.3.3, Spark3.3.4
1 parent 4a6f038 commit 27e8bb5

35 files changed

+2936
-23
lines changed

azure-pipelines-e2e-tests-template.yml

+29-6
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,38 @@ stages:
121121
targetType: inline
122122
script: |
123123
echo "Download Hadoop utils for Windows."
124-
curl -k -L -o hadoop.zip https://github.com/steveloughran/winutils/releases/download/tag_2017-08-29-hadoop-2.8.1-native/hadoop-2.8.1.zip
124+
$hadoopBinaryUrl = "https://github.com/steveloughran/winutils/releases/download/tag_2017-08-29-hadoop-2.8.1-native/hadoop-2.8.1.zip"
125+
# Spark 3.3.3 version binary use Hadoop3 dependency
126+
if ("3.3.3" -contains "${{ test.version }}") {
127+
$hadoopBinaryUrl = "https://github.com/SparkSnail/winutils/releases/download/hadoop-3.3.5/hadoop-3.3.5.zip"
128+
}
129+
curl -k -L -o hadoop.zip $hadoopBinaryUrl
125130
Expand-Archive -Path hadoop.zip -Destination .
126131
New-Item -ItemType Directory -Force -Path hadoop\bin
127-
cp hadoop-2.8.1\winutils.exe hadoop\bin
132+
if ("3.3.3" -contains "${{ test.version }}") {
133+
cp hadoop-3.3.5\winutils.exe hadoop\bin
134+
# Hadoop 3.3 need to add hadoop.dll to environment varibles to avoid UnsatisfiedLinkError
135+
cp hadoop-3.3.5\hadoop.dll hadoop\bin
136+
cp hadoop-3.3.5\hadoop.dll C:\Windows\System32
137+
[System.Environment]::SetEnvironmentVariable("PATH", $Env:Path + ";$(Build.BinariesDirectory)$(PATH_SEPARATOR)hadoop", [System.EnvironmentVariableTarget]::Machine)
138+
} else {
139+
cp hadoop-2.8.1\winutils.exe hadoop\bin
140+
}
128141
129142
- pwsh: |
130143
echo "Downloading Spark ${{ test.version }}"
131-
curl -k -L -o spark-${{ test.version }}.tgz https://archive.apache.org/dist/spark/spark-${{ test.version }}/spark-${{ test.version }}-bin-hadoop2.7.tgz
144+
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop2.7"
145+
# In spark 3.3.0, 3.3.1, 3.3.2, 3.3.4, the binary name with hadoop2 dependency has changed to spark-${{ test.version }}-bin-hadoop2.tgz
146+
if ("3.3.0", "3.3.1", "3.3.2", "3.3.4" -contains "${{ test.version }}") {
147+
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop2"
148+
}
149+
# In spark 3.3.3, the binary don't provide hadoop2 version, so we use hadoop3 version
150+
if ("3.3.3" -contains "${{ test.version }}") {
151+
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop3"
152+
}
153+
curl -k -L -o spark-${{ test.version }}.tgz https://archive.apache.org/dist/spark/spark-${{ test.version }}/${sparkBinaryName}.tgz
132154
tar xzvf spark-${{ test.version }}.tgz
155+
move $sparkBinaryName spark-${{ test.version }}-bin-hadoop
133156
displayName: 'Download Spark Distro ${{ test.version }}'
134157
workingDirectory: $(Build.BinariesDirectory)
135158
@@ -142,7 +165,7 @@ stages:
142165
workingDirectory: $(Build.SourcesDirectory)$(PATH_SEPARATOR)dotnet-spark
143166
env:
144167
HADOOP_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)hadoop
145-
SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop2.7
168+
SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop
146169
DOTNET_WORKER_DIR: $(CURRENT_DOTNET_WORKER_DIR)
147170

148171
- pwsh: |
@@ -167,7 +190,7 @@ stages:
167190
workingDirectory: $(Build.SourcesDirectory)$(PATH_SEPARATOR)dotnet-spark
168191
env:
169192
HADOOP_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)hadoop
170-
SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop2.7
193+
SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop
171194
DOTNET_WORKER_DIR: $(BACKWARD_COMPATIBLE_DOTNET_WORKER_DIR)
172195

173196
- checkout: forwardCompatibleRelease
@@ -189,5 +212,5 @@ stages:
189212
workingDirectory: $(Build.SourcesDirectory)$(PATH_SEPARATOR)dotnet-spark-${{ parameters.forwardCompatibleRelease }}
190213
env:
191214
HADOOP_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)hadoop
192-
SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop2.7
215+
SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop
193216
DOTNET_WORKER_DIR: $(CURRENT_DOTNET_WORKER_DIR)

azure-pipelines-pr.yml

+10
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ variables:
3737
backwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
3838
forwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
3939

40+
backwardCompatibleTestOptions_Windows_3_3: "--filter FullyQualifiedName=NONE"
41+
forwardCompatibleTestOptions_Windows_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
42+
backwardCompatibleTestOptions_Linux_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
43+
forwardCompatibleTestOptions_Linux_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
44+
4045
# Azure DevOps variables are transformed into environment variables, with these variables we
4146
# avoid the first time experience and telemetry to speed up the build.
4247
DOTNET_CLI_TELEMETRY_OPTOUT: 1
@@ -63,6 +68,11 @@ parameters:
6368
- '3.2.1'
6469
- '3.2.2'
6570
- '3.2.3'
71+
- '3.3.0'
72+
- '3.3.1'
73+
- '3.3.2'
74+
- '3.3.3'
75+
- '3.3.4'
6676
# List of OS types to run E2E tests, run each test in both 'Windows' and 'Linux' environments
6777
- name: listOfE2ETestsPoolTypes
6878
type: object

src/csharp/Extensions/Microsoft.Spark.Extensions.Delta.E2ETest/DeltaFixture.cs

+10-5
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@ public class DeltaFixture
1616
public DeltaFixture()
1717
{
1818
Version sparkVersion = SparkSettings.Version;
19-
string deltaVersion = (sparkVersion.Major, sparkVersion.Minor) switch
19+
string deltaVersion = (sparkVersion.Major, sparkVersion.Minor, sparkVersion.Build) switch
2020
{
21-
(2, _) => "delta-core_2.11:0.6.1",
22-
(3, 0) => "delta-core_2.12:0.8.0",
23-
(3, 1) => "delta-core_2.12:1.0.0",
24-
(3, 2) => "delta-core_2.12:1.1.0",
21+
(2, _, _) => "delta-core_2.11:0.6.1",
22+
(3, 0, _) => "delta-core_2.12:0.8.0",
23+
(3, 1, _) => "delta-core_2.12:1.0.0",
24+
(3, 2, _) => "delta-core_2.12:1.1.0",
25+
(3, 3, 0) => "delta-core_2.12:2.1.0",
26+
(3, 3, 1) => "delta-core_2.12:2.1.0",
27+
(3, 3, 2) => "delta-core_2.12:2.3.0",
28+
(3, 3, 3) => "delta-core_2.12:2.3.0",
29+
(3, 3, 4) => "delta-core_2.12:2.3.0",
2530
_ => throw new NotSupportedException($"Spark {sparkVersion} not supported.")
2631
};
2732

src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadWriter.cs

+45
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,45 @@ public void Write(Stream stream, TaskContext taskContext)
112112
}
113113
}
114114

115+
/// <summary>
116+
/// TaskContextWriter for version 3.3.*.
117+
/// </summary>
118+
internal sealed class TaskContextWriterV3_3_X : ITaskContextWriter
119+
{
120+
public void Write(Stream stream, TaskContext taskContext)
121+
{
122+
SerDe.Write(stream, taskContext.IsBarrier);
123+
SerDe.Write(stream, taskContext.Port);
124+
SerDe.Write(stream, taskContext.Secret);
125+
126+
SerDe.Write(stream, taskContext.StageId);
127+
SerDe.Write(stream, taskContext.PartitionId);
128+
SerDe.Write(stream, taskContext.AttemptNumber);
129+
SerDe.Write(stream, taskContext.AttemptId);
130+
// Add CPUs field for spark 3.3.x
131+
SerDe.Write(stream, taskContext.CPUs);
132+
133+
SerDe.Write(stream, taskContext.Resources.Count());
134+
foreach (TaskContext.Resource resource in taskContext.Resources)
135+
{
136+
SerDe.Write(stream, resource.Key);
137+
SerDe.Write(stream, resource.Value);
138+
SerDe.Write(stream, resource.Addresses.Count());
139+
foreach (string address in resource.Addresses)
140+
{
141+
SerDe.Write(stream, address);
142+
}
143+
}
144+
145+
SerDe.Write(stream, taskContext.LocalProperties.Count);
146+
foreach (KeyValuePair<string, string> kv in taskContext.LocalProperties)
147+
{
148+
SerDe.Write(stream, kv.Key);
149+
SerDe.Write(stream, kv.Value);
150+
}
151+
}
152+
}
153+
115154
///////////////////////////////////////////////////////////////////////////
116155
// BroadcastVariable writer for different Spark versions.
117156
///////////////////////////////////////////////////////////////////////////
@@ -311,6 +350,12 @@ internal PayloadWriter Create(Version version = null)
311350
new TaskContextWriterV3_0_X(),
312351
new BroadcastVariableWriterV2_4_X(),
313352
new CommandWriterV2_4_X());
353+
case Versions.V3_3_0:
354+
return new PayloadWriter(
355+
version,
356+
new TaskContextWriterV3_3_X(),
357+
new BroadcastVariableWriterV2_4_X(),
358+
new CommandWriterV2_4_X());
314359
default:
315360
throw new NotSupportedException($"Spark {version} is not supported.");
316361
}

src/csharp/Microsoft.Spark.Worker.UnitTest/TestData.cs

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public static IEnumerable<object[]> VersionData() =>
1919
new object[] { Versions.V2_4_0 },
2020
new object[] { Versions.V3_0_0 },
2121
new object[] { Versions.V3_2_0 },
22+
new object[] { Versions.V3_3_0 },
2223
};
2324

2425
internal static Payload GetDefaultPayload()

src/csharp/Microsoft.Spark.Worker/Processor/TaskContextProcessor.cs

+35-12
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,31 @@ internal TaskContext Process(Stream stream)
2222
return (_version.Major, _version.Minor) switch
2323
{
2424
(2, 4) => TaskContextProcessorV2_4_X.Process(stream),
25-
(3, _) => TaskContextProcessorV3_0_X.Process(stream),
25+
(3, _) t when t.Minor < 3 => TaskContextProcessorV3_0_X.Process(stream),
26+
(3, _) => TaskContextProcessorV3_3_X.Process(stream),
2627
_ => throw new NotSupportedException($"Spark {_version} not supported.")
2728
};
2829
}
2930

30-
private static TaskContext ReadTaskContext(Stream stream)
31+
private static TaskContext ReadTaskContext_2_x(Stream stream)
32+
=> new()
3133
{
32-
return new TaskContext
33-
{
34-
StageId = SerDe.ReadInt32(stream),
35-
PartitionId = SerDe.ReadInt32(stream),
36-
AttemptNumber = SerDe.ReadInt32(stream),
37-
AttemptId = SerDe.ReadInt64(stream)
38-
};
39-
}
34+
StageId = SerDe.ReadInt32(stream),
35+
PartitionId = SerDe.ReadInt32(stream),
36+
AttemptNumber = SerDe.ReadInt32(stream),
37+
AttemptId = SerDe.ReadInt64(stream),
38+
};
39+
40+
private static TaskContext ReadTaskContext_3_3(Stream stream)
41+
=> new()
42+
{
43+
StageId = SerDe.ReadInt32(stream),
44+
PartitionId = SerDe.ReadInt32(stream),
45+
AttemptNumber = SerDe.ReadInt32(stream),
46+
AttemptId = SerDe.ReadInt64(stream),
47+
// CPUs field is added into TaskContext from 3.3.0 https://issues.apache.org/jira/browse/SPARK-36173
48+
CPUs = SerDe.ReadInt32(stream)
49+
};
4050

4151
private static void ReadBarrierInfo(Stream stream)
4252
{
@@ -78,7 +88,7 @@ private static class TaskContextProcessorV2_4_X
7888
internal static TaskContext Process(Stream stream)
7989
{
8090
ReadBarrierInfo(stream);
81-
TaskContext taskContext = ReadTaskContext(stream);
91+
TaskContext taskContext = ReadTaskContext_2_x(stream);
8292
ReadTaskContextProperties(stream, taskContext);
8393

8494
return taskContext;
@@ -90,7 +100,20 @@ private static class TaskContextProcessorV3_0_X
90100
internal static TaskContext Process(Stream stream)
91101
{
92102
ReadBarrierInfo(stream);
93-
TaskContext taskContext = ReadTaskContext(stream);
103+
TaskContext taskContext = ReadTaskContext_2_x(stream);
104+
ReadTaskContextResources(stream);
105+
ReadTaskContextProperties(stream, taskContext);
106+
107+
return taskContext;
108+
}
109+
}
110+
111+
private static class TaskContextProcessorV3_3_X
112+
{
113+
internal static TaskContext Process(Stream stream)
114+
{
115+
ReadBarrierInfo(stream);
116+
TaskContext taskContext = ReadTaskContext_3_3(stream);
94117
ReadTaskContextResources(stream);
95118
ReadTaskContextProperties(stream, taskContext);
96119

src/csharp/Microsoft.Spark/TaskContext.cs

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ internal class TaskContext
2121

2222
internal long AttemptId { get; set; }
2323

24+
internal int CPUs { get; set; }
25+
2426
internal bool IsBarrier { get; set; }
2527

2628
internal int Port { get; set; }

src/csharp/Microsoft.Spark/Versions.cs

+1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ internal static class Versions
1212
internal const string V3_1_0 = "3.1.0";
1313
internal const string V3_1_1 = "3.1.1";
1414
internal const string V3_2_0 = "3.2.0";
15+
internal const string V3_3_0 = "3.3.0";
1516
}
1617
}

src/scala/microsoft-spark-3-3/pom.xml

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<parent>
5+
<groupId>com.microsoft.scala</groupId>
6+
<artifactId>microsoft-spark</artifactId>
7+
<version>${microsoft-spark.version}</version>
8+
</parent>
9+
<artifactId>microsoft-spark-3-3_2.12</artifactId>
10+
<inceptionYear>2019</inceptionYear>
11+
<properties>
12+
<encoding>UTF-8</encoding>
13+
<scala.version>2.12.10</scala.version>
14+
<scala.binary.version>2.12</scala.binary.version>
15+
<spark.version>3.3.0</spark.version>
16+
</properties>
17+
18+
<dependencies>
19+
<dependency>
20+
<groupId>org.scala-lang</groupId>
21+
<artifactId>scala-library</artifactId>
22+
<version>${scala.version}</version>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.apache.spark</groupId>
26+
<artifactId>spark-core_${scala.binary.version}</artifactId>
27+
<version>${spark.version}</version>
28+
<scope>provided</scope>
29+
</dependency>
30+
<dependency>
31+
<groupId>org.apache.spark</groupId>
32+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
33+
<version>${spark.version}</version>
34+
<scope>provided</scope>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.apache.spark</groupId>
38+
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
39+
<version>${spark.version}</version>
40+
<scope>provided</scope>
41+
</dependency>
42+
<dependency>
43+
<groupId>junit</groupId>
44+
<artifactId>junit</artifactId>
45+
<version>4.13.1</version>
46+
<scope>test</scope>
47+
</dependency>
48+
<dependency>
49+
<groupId>org.specs</groupId>
50+
<artifactId>specs</artifactId>
51+
<version>1.2.5</version>
52+
<scope>test</scope>
53+
</dependency>
54+
</dependencies>
55+
56+
<build>
57+
<sourceDirectory>src/main/scala</sourceDirectory>
58+
<testSourceDirectory>src/test/scala</testSourceDirectory>
59+
<plugins>
60+
<plugin>
61+
<groupId>org.scala-tools</groupId>
62+
<artifactId>maven-scala-plugin</artifactId>
63+
<version>2.15.2</version>
64+
<executions>
65+
<execution>
66+
<goals>
67+
<goal>compile</goal>
68+
<goal>testCompile</goal>
69+
</goals>
70+
</execution>
71+
</executions>
72+
<configuration>
73+
<scalaVersion>${scala.version}</scalaVersion>
74+
<args>
75+
<arg>-target:jvm-1.8</arg>
76+
<arg>-deprecation</arg>
77+
<arg>-feature</arg>
78+
</args>
79+
</configuration>
80+
</plugin>
81+
</plugins>
82+
</build>
83+
</project>

0 commit comments

Comments
 (0)