Skip to content

Fix slow partition adjustment issue with multiple streams and dynamic partitions detection #572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 45 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b2ff154
sync with upstream
Jul 7, 2020
29da83e
Merge branch 'master' of github.com:Azure/azure-event-hubs-spark
Jul 31, 2020
0dd57b6
Merge branch 'master' of github.com:Azure/azure-event-hubs-spark
Aug 26, 2020
6544494
Merge branch 'master' of https://github.com/Azure/azure-event-hubs-spark
Sep 16, 2020
01c0af5
Merge branch 'master' of https://github.com/nyaghma/azure-event-hubs-…
Sep 16, 2020
922ab7e
Merge branch 'master' of github.com:Azure/azure-event-hubs-spark
Oct 13, 2020
f861628
Merge branch 'master' of github.com:Azure/azure-event-hubs-spark
Nov 7, 2020
59b0ea1
Merge branch 'master' of github.com:Azure/azure-event-hubs-spark
Nov 10, 2020
02c7e66
Merge branch 'master' of github.com:Azure/azure-event-hubs-spark
Nov 10, 2020
37b699b
Merge branch 'master' of github.com:Azure/azure-event-hubs-spark
Nov 12, 2020
1a31a3f
Merge branch 'master' of github.com:Azure/azure-event-hubs-spark
Nov 12, 2020
4edac2b
Merge branch 'master' of github.com:Azure/azure-event-hubs-spark
Nov 13, 2020
529e6cd
Merge branch 'master' of github.com:Azure/azure-event-hubs-spark
Dec 3, 2020
67a2bbe
Merge branch 'master' of github.com:Azure/azure-event-hubs-spark
Dec 8, 2020
0c09744
fix slow partition adj when a new partition added
Dec 8, 2020
7c9e7e3
Spark3 compatibility (#520)
nyaghma Jul 14, 2020
efecbff
Update option keys (#521)
nyaghma Jul 24, 2020
b14632d
Slow partitions adjustment (#512)
nyaghma Jul 24, 2020
74485ca
Do not retry receiver close, log and ignore transient errors. (#523)
JamesBirdsall Jul 30, 2020
c2cf8c7
Update FAQ (#527)
nyaghma Aug 5, 2020
5da86a3
Update version number for new release (2.3.17) (#529)
sjkwak Aug 5, 2020
2251c24
Update documentation for the current release (2.3.17) (#530)
sjkwak Aug 7, 2020
3e251e6
add AAD auth client to connect ehs (#535)
duhuan Sep 30, 2020
03377d1
Multi readers example (#540)
nyaghma Oct 1, 2020
b6a2b38
Slow partitions adjustment doc (#533)
nyaghma Oct 1, 2020
a1e4c67
fix the error when num of partitions increased (#500)
tilumi Oct 6, 2020
5feb961
Fix non streaming rpc endpoint (#542)
nyaghma Oct 6, 2020
865dae8
Dynamic added partitions (#544)
nyaghma Oct 27, 2020
728493a
fixes for pyspark documentation (#551)
alexott Oct 29, 2020
6db45d1
Partition sender fix (#550)
nyaghma Nov 4, 2020
ac93a73
Retry client create (#552)
nyaghma Nov 6, 2020
753d225
Throttling status plugin update (#555)
nyaghma Nov 6, 2020
129fb2b
Serializable contexts (#558)
nyaghma Nov 10, 2020
b841785
Fix logging issue (#559)
nyaghma Nov 10, 2020
3327b4c
Update version number for new release (2.3.18) and client SDK depende…
sjkwak Nov 12, 2020
598b51e
Rpc endpoint for direct stream (#561)
nyaghma Nov 12, 2020
6509ed4
Rpc endpoint recreation (#564)
nyaghma Nov 13, 2020
ddd9e32
Update documentation for the current release (2.3.18) (#565)
sjkwak Nov 17, 2020
7d30724
update scala-maven-plugin
arerlend Dec 5, 2020
f736996
fix test lint
arerlend Dec 5, 2020
3d8e11a
fix slow partition adj when a new partition added
Dec 8, 2020
874fb4f
Merge branch 'fixParitionAdjustmentRateLimitIssue' of github.com:nyag…
Dec 8, 2020
b887f4d
fix slow partition adj issue with multi streams
Dec 11, 2020
a57b7d4
Merge branch 'master' into fixParitionAdjustmentRateLimitIssue
sjkwak Jun 17, 2021
62efcb7
Merge branch 'master' into fixParitionAdjustmentRateLimitIssue
sjkwak Jun 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ object EventHubsUtils extends Logging {
s"There is an existing partitionPerformanceReceiverRef on the driver, use that one rather than creating a new one")
} catch {
case e: Exception =>
val partitionsStatusTracker = PartitionsStatusTracker.getPartitionStatusTracker
val partitionPerformanceReceiver: PartitionPerformanceReceiver =
new PartitionPerformanceReceiver(SparkEnv.get.rpcEnv, partitionsStatusTracker)
new PartitionPerformanceReceiver(SparkEnv.get.rpcEnv)
partitionPerformanceReceiverRef = SparkEnv.get.rpcEnv
.setupEndpoint(PartitionPerformanceReceiver.ENDPOINT_NAME, partitionPerformanceReceiver)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,43 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{ RpcEndpoint, RpcEnv }
import org.apache.spark.SparkContext
import org.json4s.jackson.Serialization
import scala.collection.mutable

private[spark] class PartitionPerformanceReceiver(override val rpcEnv: RpcEnv,
val statusTracker: PartitionsStatusTracker)
private[spark] class PartitionPerformanceReceiver(override val rpcEnv: RpcEnv)
extends RpcEndpoint
with Logging {

// Keeps track of PartitionsStatusTracker per EventHub source
var statusTrackers: mutable.Map[String, PartitionsStatusTracker] =
mutable.Map[String, PartitionsStatusTracker]()

def addStatusTracker(ehName: String, statusTracker: PartitionsStatusTracker): Unit = {
statusTrackers(ehName) = statusTracker
}

def getStatusTracker(ehName: String): Option[PartitionsStatusTracker] = {
statusTrackers.get(ehName)
}

override def onStart(): Unit = {
logInfo("Start PartitionPerformanceReceiver RPC endpoint")
}

override def receive: PartialFunction[Any, Unit] = {
case ppm: PartitionPerformanceMetric => {
logDebug(s"Received PartitionPerformanceMetric $ppm")
statusTracker.updatePartitionPerformance(ppm.nAndP,
ppm.requestSeqNo,
ppm.batchSize,
ppm.receiveTimeInMillis)
val ehStatusTracker = getStatusTracker(ppm.nAndP.ehName)
ehStatusTracker match {
case Some(statusTracker) =>
statusTracker.updatePartitionPerformance(ppm.nAndP,
ppm.requestSeqNo,
ppm.batchSize,
ppm.receiveTimeInMillis)
case None =>
logError(
s"PartitionPerformanceReceiver doesn't have a PartitionsStatusTracker for EventHub ${ppm.nAndP.ehName} " +
s"to send the received PartitionPerformanceMetric ${ppm}.")
}
}
case _ => {
logError(s"Received an unknown message in PartitionPerformanceReceiver. It's not acceptable!")
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,27 @@
*/

package org.apache.spark.eventhubs.utils
import java.net.URI
import org.apache.spark.eventhubs.{
DefaultMaxAcceptableBatchReceiveTime,
NameAndPartition,
PartitionContext,
PartitionsStatusTracker,
SequenceNumber
}

import org.apache.spark.eventhubs.{ NameAndPartition, PartitionsStatusTracker, SequenceNumber }
import scala.collection.breakOut

private[spark] object SimulatedPartitionStatusTracker {
val sourceTracker = PartitionsStatusTracker.getPartitionStatusTracker
var sourceTracker = new PartitionsStatusTracker(
1,
new PartitionContext(new URI("sb://namespace.servicebus.windows.net"), "mockEH"),
DefaultMaxAcceptableBatchReceiveTime.toMillis,
None)

def updateSourceTrackerForNewEH(tracker: PartitionsStatusTracker) = {
sourceTracker = tracker
}

def updatePartitionPerformance(nAndP: NameAndPartition,
requestSeqNo: SequenceNumber,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,31 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext,
private lazy val throttlingStatusPlugin: Option[ThrottlingStatusPlugin] =
ehConf.throttlingStatusPlugin()

PartitionsStatusTracker.setDefaultValuesInTracker(
private var localBatchId = -1

// Create a partition status tracker for this source and add it to the partitionPerformanceReceiver
// this is being used only if slow partition adjustment is on
val partitionsStatusTracker = new PartitionsStatusTracker(
partitionCount,
partitionContext,
ehConf.maxAcceptableBatchReceiveTime.getOrElse(DefaultMaxAcceptableBatchReceiveTime).toMillis,
throttlingStatusPlugin
)
partitionPerformanceReceiver.addStatusTracker(ehName, partitionsStatusTracker)

var partitionsThrottleFactor: mutable.Map[NameAndPartition, Double] =
(for (pid <- 0 until partitionCount) yield (NameAndPartition(ehName, pid), 1.0))(breakOut)

val defaultPartitionsPerformancePercentage: Map[NameAndPartition, Double] =
var defaultPartitionsPerformancePercentage: Map[NameAndPartition, Double] =
(for (pid <- 0 until partitionCount) yield (NameAndPartition(ehName, pid), 1.0))(breakOut)

private def updatePartitionCountInPartitionsStatusTracker(numberOfPartitions: Int) = {
logInfo(s"Update the partitionCount to ${numberOfPartitions} in the PartitionsStatusTracker.")
partitionsStatusTracker.updateNumberofPartitionsInTracker(numberOfPartitions)
defaultPartitionsPerformancePercentage =
(for (pid <- 0 until numberOfPartitions) yield (NameAndPartition(ehName, pid), 1.0))(breakOut)
}

private lazy val initialPartitionSeqNos = {
val metadataLog =
new HDFSMetadataLog[EventHubsSourceOffset](sqlContext.sparkSession, metadataPath) {
Expand Down Expand Up @@ -294,18 +306,18 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext,
.get(nameAndPartition)
.map { size =>
val begin = from.getOrElse(nameAndPartition, fromNew(nameAndPartition))
// adjust performance performance pewrcentages to use as much as events possible in the batch
val perforamnceFactor: Double = if (slowPartitionAdjustment) {
partitionsPerformancePercentage(nameAndPartition)
// adjust performance percentages to use as much as events possible in the batch
val performanceFactor: Double = if (slowPartitionAdjustment) {
partitionsPerformancePercentage.getOrElse(nameAndPartition, 1.0)
} else 1.0

if (slowPartitionAdjustment) {
partitionsThrottleFactor(nameAndPartition) = perforamnceFactor
partitionsThrottleFactor(nameAndPartition) = performanceFactor
logInfo(
s"Slow partition adjustment is on, so prorate amount for $nameAndPartition will be adjusted by" +
s" the perfromanceFactor = $perforamnceFactor")
s" the performanceFactor = $performanceFactor")
}
val prorate = limit * (size / total) * perforamnceFactor
val prorate = limit * (size / total) * performanceFactor
logDebug(s"rateLimit $nameAndPartition prorated amount is $prorate")
// Don't completely starve small partitions
val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
Expand Down Expand Up @@ -354,6 +366,7 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext,
val startingSeqNos = if (prevOffsets.size < untilSeqNos.size) {
logInfo(
s"Number of partitions has increased from ${prevOffsets.size} to ${untilSeqNos.size}")
updatePartitionCountInPartitionsStatusTracker(partitionCount)
val defaultSeqNos = ehClient
.translate(ehConf, partitionCount)
.map {
Expand Down Expand Up @@ -426,7 +439,7 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext,
localBatchId += 1
logDebug(
s"Slow partition adjustment is on, add the current batch $localBatchId to the tracker.")
partitionsStatusTracker.addorUpdateBatch(localBatchId, offsetRanges)
partitionsStatusTracker.addOrUpdateBatch(localBatchId, offsetRanges)
}

/**
Expand Down Expand Up @@ -463,7 +476,6 @@ private[eventhubs] object EventHubsSource {
""".stripMargin

private[eventhubs] val VERSION = 1
private var localBatchId = -1

def getSortedExecutorList(sc: SparkContext): Array[String] = {
val bm = sc.env.blockManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,8 @@ private[sql] class EventHubsSourceProvider

private[sql] object EventHubsSourceProvider extends Serializable {
// RPC endpoint for partition performance communication in the driver
val partitionsStatusTracker = PartitionsStatusTracker.getPartitionStatusTracker
val partitionPerformanceReceiver: PartitionPerformanceReceiver =
new PartitionPerformanceReceiver(SparkEnv.get.rpcEnv, partitionsStatusTracker)
new PartitionPerformanceReceiver(SparkEnv.get.rpcEnv)
val partitionPerformanceReceiverRef: RpcEndpointRef = SparkEnv.get.rpcEnv
.setupEndpoint(PartitionPerformanceReceiver.ENDPOINT_NAME, partitionPerformanceReceiver)

Expand Down
Loading