Skip to content

Commit 7b9224a

Browse files
authored
Spark 3.5.x support (#1178)
* Copy-pasted scala impl for 3.5 from 3.2 * Implemented support for Spark 3.5.1. Fixes are relevant for 3.5.0, 3.4.x, 3.3.4+ as well. * Fixes: Crash on Databricks when using zip deployment, exceptions in console after successful run on windows. Added logging to help with troubleshooting * Updated CI and Nightly pipelines with Spark 3.5, fixed incompatible tests
1 parent 888f7d8 commit 7b9224a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2920
-33
lines changed

azure-pipelines-e2e-tests-template.yml

+5-9
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,14 @@ stages:
122122
script: |
123123
echo "Download Hadoop utils for Windows."
124124
$hadoopBinaryUrl = "https://github.com/steveloughran/winutils/releases/download/tag_2017-08-29-hadoop-2.8.1-native/hadoop-2.8.1.zip"
125-
# Spark 3.3.3 version binary use Hadoop3 dependency
126-
if ("3.3.3" -contains "${{ test.version }}") {
125+
# Spark 3.3.0+ version binary uses Hadoop3 dependency
126+
if ([version]"3.3.0" -le [version]"${{ test.version }}") {
127127
$hadoopBinaryUrl = "https://github.com/SparkSnail/winutils/releases/download/hadoop-3.3.5/hadoop-3.3.5.zip"
128128
}
129129
curl -k -L -o hadoop.zip $hadoopBinaryUrl
130130
Expand-Archive -Path hadoop.zip -Destination .
131131
New-Item -ItemType Directory -Force -Path hadoop\bin
132-
if ("3.3.3" -contains "${{ test.version }}") {
132+
if ([version]"3.3.0" -le [version]"${{ test.version }}") {
133133
cp hadoop-3.3.5\winutils.exe hadoop\bin
134134
# Hadoop 3.3 need to add hadoop.dll to environment varibles to avoid UnsatisfiedLinkError
135135
cp hadoop-3.3.5\hadoop.dll hadoop\bin
@@ -142,12 +142,8 @@ stages:
142142
- pwsh: |
143143
echo "Downloading Spark ${{ test.version }}"
144144
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop2.7"
145-
# In spark 3.3.0, 3.3.1, 3.3.2, 3.3.4, the binary name with hadoop2 dependency has changed to spark-${{ test.version }}-bin-hadoop2.tgz
146-
if ("3.3.0", "3.3.1", "3.3.2", "3.3.4" -contains "${{ test.version }}") {
147-
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop2"
148-
}
149-
# In spark 3.3.3, the binary don't provide hadoop2 version, so we use hadoop3 version
150-
if ("3.3.3" -contains "${{ test.version }}") {
145+
# Spark 3.3.0+ uses Hadoop3
146+
if ([version]"3.3.0" -le [version]"${{ test.version }}") {
151147
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop3"
152148
}
153149
curl -k -L -o spark-${{ test.version }}.tgz https://archive.apache.org/dist/spark/spark-${{ test.version }}/${sparkBinaryName}.tgz

azure-pipelines-pr.yml

+11-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ variables:
3131
backwardCompatibleTestOptions_Linux_3_1: ""
3232
forwardCompatibleTestOptions_Linux_3_1: ""
3333

34-
# Skip all forward/backward compatibility tests since Spark 3.2 is not supported before this release.
34+
# Skip all forward/backward compatibility tests since Spark 3.2 and 3.5 are not supported before this release.
3535
backwardCompatibleTestOptions_Windows_3_2: "--filter FullyQualifiedName=NONE"
3636
forwardCompatibleTestOptions_Windows_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
3737
backwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
@@ -41,6 +41,11 @@ variables:
4141
forwardCompatibleTestOptions_Windows_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
4242
backwardCompatibleTestOptions_Linux_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
4343
forwardCompatibleTestOptions_Linux_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
44+
45+
backwardCompatibleTestOptions_Windows_3_5: "--filter FullyQualifiedName=NONE"
46+
forwardCompatibleTestOptions_Windows_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
47+
backwardCompatibleTestOptions_Linux_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
48+
forwardCompatibleTestOptions_Linux_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
4449

4550
# Azure DevOps variables are transformed into environment variables, with these variables we
4651
# avoid the first time experience and telemetry to speed up the build.
@@ -73,6 +78,11 @@ parameters:
7378
- '3.3.2'
7479
- '3.3.3'
7580
- '3.3.4'
81+
- '3.5.0'
82+
- '3.5.1'
83+
- '3.5.2'
84+
- '3.5.3'
85+
7686
# List of OS types to run E2E tests, run each test in both 'Windows' and 'Linux' environments
7787
- name: listOfE2ETestsPoolTypes
7888
type: object

azure-pipelines.yml

+54-1
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,17 @@ variables:
3131
backwardCompatibleTestOptions_Linux_3_1: ""
3232
forwardCompatibleTestOptions_Linux_3_1: ""
3333

34-
# Skip all forward/backward compatibility tests since Spark 3.2 is not supported before this release.
34+
# Skip all forward/backward compatibility tests since Spark 3.2 and 3.5 are not supported before this release.
3535
backwardCompatibleTestOptions_Windows_3_2: "--filter FullyQualifiedName=NONE"
3636
forwardCompatibleTestOptions_Windows_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
3737
backwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
3838
forwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
3939

40+
backwardCompatibleTestOptions_Windows_3_5: "--filter FullyQualifiedName=NONE"
41+
forwardCompatibleTestOptions_Windows_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
42+
backwardCompatibleTestOptions_Linux_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
43+
forwardCompatibleTestOptions_Linux_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
44+
4045
# Azure DevOps variables are transformed into environment variables, with these variables we
4146
# avoid the first time experience and telemetry to speed up the build.
4247
DOTNET_CLI_TELEMETRY_OPTOUT: 1
@@ -413,3 +418,51 @@ stages:
413418
testOptions: ""
414419
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_2)
415420
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_2)
421+
- version: '3.5.0'
422+
enableForwardCompatibleTests: false
423+
enableBackwardCompatibleTests: false
424+
jobOptions:
425+
- pool: 'Windows'
426+
testOptions: ""
427+
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_5)
428+
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Windows_3_5)
429+
- pool: 'Linux'
430+
testOptions: ""
431+
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_5)
432+
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_5)
433+
- version: '3.5.1'
434+
enableForwardCompatibleTests: false
435+
enableBackwardCompatibleTests: false
436+
jobOptions:
437+
- pool: 'Windows'
438+
testOptions: ""
439+
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_5)
440+
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Windows_3_5)
441+
- pool: 'Linux'
442+
testOptions: ""
443+
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_5)
444+
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_5)
445+
- version: '3.5.2'
446+
enableForwardCompatibleTests: false
447+
enableBackwardCompatibleTests: false
448+
jobOptions:
449+
- pool: 'Windows'
450+
testOptions: ""
451+
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_5)
452+
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Windows_3_5)
453+
- pool: 'Linux'
454+
testOptions: ""
455+
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_5)
456+
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_5)
457+
- version: '3.5.3'
458+
enableForwardCompatibleTests: false
459+
enableBackwardCompatibleTests: false
460+
jobOptions:
461+
- pool: 'Windows'
462+
testOptions: ""
463+
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_5)
464+
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Windows_3_5)
465+
- pool: 'Linux'
466+
testOptions: ""
467+
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_5)
468+
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_5)

src/csharp/Extensions/Microsoft.Spark.Extensions.Delta.E2ETest/DeltaFixture.cs

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public DeltaFixture()
2727
(3, 3, 2) => "delta-core_2.12:2.3.0",
2828
(3, 3, 3) => "delta-core_2.12:2.3.0",
2929
(3, 3, 4) => "delta-core_2.12:2.3.0",
30+
(3, 5, _) => "delta-spark_2.12:3.2.0",
3031
_ => throw new NotSupportedException($"Spark {sparkVersion} not supported.")
3132
};
3233

src/csharp/Microsoft.Spark.E2ETest/IpcTests/SparkContextTests.cs

+8-2
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,22 @@ public void TestSignaturesV2_4_X()
5757

5858
/// <summary>
5959
/// Test signatures for APIs introduced in Spark 3.1.*.
60+
/// In Spark 3.5 Spark throws an exception when trying to delete
61+
/// archive.zip from temp folder, and causes failures of other tests
6062
/// </summary>
61-
[SkipIfSparkVersionIsLessThan(Versions.V3_1_0)]
63+
[SkipIfSparkVersionIsNotInRange(Versions.V3_1_0, Versions.V3_3_0)]
6264
public void TestSignaturesV3_1_X()
6365
{
6466
SparkContext sc = SparkContext.GetOrCreate(new SparkConf());
6567

6668
string archivePath = $"{TestEnvironment.ResourceDirectory}archive.zip";
69+
6770
sc.AddArchive(archivePath);
6871

69-
Assert.IsType<string[]>(sc.ListArchives().ToArray());
72+
var archives = sc.ListArchives().ToArray();
73+
74+
Assert.IsType<string[]>(archives);
75+
Assert.NotEmpty(archives.Where(a => a.EndsWith("archive.zip")));
7076
}
7177
}
7278
}

src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/CatalogTests.cs

-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ public void TestSignaturesV2_4_X()
5959
Assert.IsType<bool>(catalog.FunctionExists("functionname"));
6060
Assert.IsType<Database>(catalog.GetDatabase("default"));
6161
Assert.IsType<Function>(catalog.GetFunction("abs"));
62-
Assert.IsType<Function>(catalog.GetFunction(null, "abs"));
6362
Assert.IsType<Table>(catalog.GetTable("users"));
6463
Assert.IsType<Table>(catalog.GetTable("default", "users"));
6564
Assert.IsType<bool>(catalog.IsCached("users"));

src/csharp/Microsoft.Spark.UnitTest/TypeConverterTests.cs

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public void TestBaseCase()
2020
Assert.Equal((short)1, TypeConverter.ConvertTo<short>((short)1));
2121
Assert.Equal((ushort)1, TypeConverter.ConvertTo<ushort>((ushort)1));
2222
Assert.Equal(1, TypeConverter.ConvertTo<int>(1));
23+
Assert.Equal(1L, TypeConverter.ConvertTo<long>(1));
2324
Assert.Equal(1u, TypeConverter.ConvertTo<uint>(1u));
2425
Assert.Equal(1L, TypeConverter.ConvertTo<long>(1L));
2526
Assert.Equal(1ul, TypeConverter.ConvertTo<ulong>(1ul));

src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadWriter.cs

+1
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ internal PayloadWriter Create(Version version = null)
351351
new BroadcastVariableWriterV2_4_X(),
352352
new CommandWriterV2_4_X());
353353
case Versions.V3_3_0:
354+
case Versions.V3_5_1:
354355
return new PayloadWriter(
355356
version,
356357
new TaskContextWriterV3_3_X(),

src/csharp/Microsoft.Spark.Worker.UnitTest/TestData.cs

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public static IEnumerable<object[]> VersionData() =>
2020
new object[] { Versions.V3_0_0 },
2121
new object[] { Versions.V3_2_0 },
2222
new object[] { Versions.V3_3_0 },
23+
new object[] { Versions.V3_5_1 },
2324
};
2425

2526
internal static Payload GetDefaultPayload()

src/csharp/Microsoft.Spark.Worker/Processor/TaskContextProcessor.cs

+10-12
Original file line numberDiff line numberDiff line change
@@ -31,31 +31,32 @@ internal TaskContext Process(Stream stream)
3131
private static TaskContext ReadTaskContext_2_x(Stream stream)
3232
=> new()
3333
{
34+
IsBarrier = SerDe.ReadBool(stream),
35+
Port = SerDe.ReadInt32(stream),
36+
Secret = SerDe.ReadString(stream),
37+
3438
StageId = SerDe.ReadInt32(stream),
3539
PartitionId = SerDe.ReadInt32(stream),
3640
AttemptNumber = SerDe.ReadInt32(stream),
3741
AttemptId = SerDe.ReadInt64(stream),
3842
};
3943

44+
// Needed for 3.3.0+
45+
// https://issues.apache.org/jira/browse/SPARK-36173
4046
private static TaskContext ReadTaskContext_3_3(Stream stream)
4147
=> new()
4248
{
49+
IsBarrier = SerDe.ReadBool(stream),
50+
Port = SerDe.ReadInt32(stream),
51+
Secret = SerDe.ReadString(stream),
52+
4353
StageId = SerDe.ReadInt32(stream),
4454
PartitionId = SerDe.ReadInt32(stream),
4555
AttemptNumber = SerDe.ReadInt32(stream),
4656
AttemptId = SerDe.ReadInt64(stream),
47-
// CPUs field is added into TaskContext from 3.3.0 https://issues.apache.org/jira/browse/SPARK-36173
4857
CPUs = SerDe.ReadInt32(stream)
4958
};
5059

51-
private static void ReadBarrierInfo(Stream stream)
52-
{
53-
// Read barrier-related payload. Note that barrier is currently not supported.
54-
SerDe.ReadBool(stream); // IsBarrier
55-
SerDe.ReadInt32(stream); // BoundPort
56-
SerDe.ReadString(stream); // Secret
57-
}
58-
5960
private static void ReadTaskContextProperties(Stream stream, TaskContext taskContext)
6061
{
6162
int numProperties = SerDe.ReadInt32(stream);
@@ -87,7 +88,6 @@ private static class TaskContextProcessorV2_4_X
8788
{
8889
internal static TaskContext Process(Stream stream)
8990
{
90-
ReadBarrierInfo(stream);
9191
TaskContext taskContext = ReadTaskContext_2_x(stream);
9292
ReadTaskContextProperties(stream, taskContext);
9393

@@ -99,7 +99,6 @@ private static class TaskContextProcessorV3_0_X
9999
{
100100
internal static TaskContext Process(Stream stream)
101101
{
102-
ReadBarrierInfo(stream);
103102
TaskContext taskContext = ReadTaskContext_2_x(stream);
104103
ReadTaskContextResources(stream);
105104
ReadTaskContextProperties(stream, taskContext);
@@ -112,7 +111,6 @@ private static class TaskContextProcessorV3_3_X
112111
{
113112
internal static TaskContext Process(Stream stream)
114113
{
115-
ReadBarrierInfo(stream);
116114
TaskContext taskContext = ReadTaskContext_3_3(stream);
117115
ReadTaskContextResources(stream);
118116
ReadTaskContextProperties(stream, taskContext);

src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs

+9-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System.Diagnostics;
99
using System.IO;
1010
using System.Net;
11+
using System.Net.Sockets;
1112
using System.Text;
1213
using System.Threading;
1314
using Microsoft.Spark.Network;
@@ -184,7 +185,7 @@ private object CallJavaMethod(
184185
ISocketWrapper socket = null;
185186

186187
try
187-
{
188+
{
188189
// Limit the number of connections to the JVM backend. Netty is configured
189190
// to use a set number of threads to process incoming connections. Each
190191
// new connection is delegated to these threads in a round robin fashion.
@@ -299,6 +300,13 @@ private object CallJavaMethod(
299300
}
300301
else
301302
{
303+
if (e.InnerException is SocketException)
304+
{
305+
_logger.LogError(
306+
"Scala worker abandoned the connection, likely fatal crash on Java side. \n" +
307+
"Ensure Spark runs with sufficient memory.");
308+
}
309+
302310
// In rare cases we may hit the Netty connection thread deadlock.
303311
// If max backend threads is 10 and we are currently using 10 active
304312
// connections (0 in the _sockets queue). When we hit this exception,

src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs

+8-6
Original file line numberDiff line numberDiff line change
@@ -248,20 +248,22 @@ public Database GetDatabase(string dbName) =>
248248
new Database((JvmObjectReference)Reference.Invoke("getDatabase", dbName));
249249

250250
/// <summary>
251-
/// Get the function with the specified name. If you are trying to get an in-built
252-
/// function then use the unqualified name.
251+
/// Get the function with the specified name. This function can be a temporary function
252+
/// or a function.
253253
/// </summary>
254254
/// <param name="functionName">Is either a qualified or unqualified name that designates a
255-
/// function. If no database identifier is provided, it refers to a temporary function or
256-
/// a function in the current database.</param>
255+
/// function. It follows the same resolution rule with SQL: search for built-in/temp
256+
/// functions first then functions in the current database(namespace).</param>
257257
/// <returns>`Function` object which includes the class name, database, description,
258258
/// whether it is temporary and the name of the function.</returns>
259259
public Function GetFunction(string functionName) =>
260260
new Function((JvmObjectReference)Reference.Invoke("getFunction", functionName));
261261

262262
/// <summary>
263-
/// Get the function with the specified name. If you are trying to get an in-built function
264-
/// then pass null as the dbName.
263+
/// Get the function with the specified name in the specified database under the Hive
264+
/// Metastore.
265+
/// To get built-in functions, or functions in other catalogs, please use `getFunction(functionName)` with
266+
/// qualified function name instead.
265267
/// </summary>
266268
/// <param name="dbName">Is a name that designates a database. Built-in functions will be
267269
/// in database null rather than default.</param>

src/csharp/Microsoft.Spark/Utils/TypeConverter.cs

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ private static object Convert(object obj, Type toType)
3333
{
3434
return ConvertToDictionary(hashtable, toType);
3535
}
36+
// Fails to convert int to long otherwise
37+
else if (toType.IsPrimitive)
38+
{
39+
return System.Convert.ChangeType(obj, toType);
40+
}
3641

3742
return obj;
3843
}

src/csharp/Microsoft.Spark/Versions.cs

+1
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,6 @@ internal static class Versions
1313
internal const string V3_1_1 = "3.1.1";
1414
internal const string V3_2_0 = "3.2.0";
1515
internal const string V3_3_0 = "3.3.0";
16+
internal const string V3_5_1 = "3.5.1";
1617
}
1718
}

0 commit comments

Comments
 (0)