@@ -29,7 +29,6 @@ import kafka.network.Processor._
29
29
import kafka .network .RequestChannel .{CloseConnectionResponse , EndThrottlingResponse , NoOpResponse , SendResponse , StartThrottlingResponse }
30
30
import kafka .network .SocketServer ._
31
31
import kafka .server .{BrokerReconfigurable , KafkaConfig }
32
- import org .apache .kafka .network .EndPoint
33
32
import org .apache .kafka .common .message .ApiMessageType .ListenerType
34
33
import kafka .utils ._
35
34
import org .apache .kafka .common .config .ConfigException
@@ -96,7 +95,7 @@ class SocketServer(
96
95
memoryPoolSensor.add(new Meter (TimeUnit .MILLISECONDS , memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
97
96
private val memoryPool = if (config.queuedMaxBytes > 0 ) new SimpleMemoryPool (config.queuedMaxBytes, config.socketRequestMaxBytes, false , memoryPoolSensor) else MemoryPool .NONE
98
97
// data-plane
99
- private [network] val dataPlaneAcceptors = new ConcurrentHashMap [EndPoint , DataPlaneAcceptor ]()
98
+ private [network] val dataPlaneAcceptors = new ConcurrentHashMap [Endpoint , DataPlaneAcceptor ]()
100
99
val dataPlaneRequestChannel = new RequestChannel (maxQueuedRequests, time, apiVersionManager.newRequestMetrics)
101
100
102
101
private [this ] val nextProcessorId : AtomicInteger = new AtomicInteger (0 )
@@ -161,8 +160,8 @@ class SocketServer(
161
160
* Therefore, we do not know that any particular request processor will be running by the end of
162
161
* this function -- just that it might be running.
163
162
*
164
- * @param authorizerFutures Future per [[EndPoint ]] used to wait before starting the
165
- * processor corresponding to the [[EndPoint ]]. Any endpoint
163
+ * @param authorizerFutures Future per [[Endpoint ]] used to wait before starting the
164
+ * processor corresponding to the [[Endpoint ]]. Any endpoint
166
165
* that does not appear in this map will be started once all
167
166
* authorizerFutures are complete.
168
167
*
@@ -181,7 +180,7 @@ class SocketServer(
181
180
// Because of ephemeral ports, we need to match acceptors to futures by looking at
182
181
// the listener name, rather than the endpoint object.
183
182
val authorizerFuture = authorizerFutures.find {
184
- case (endpoint, _) => acceptor.endPoint.listenerName.value(). equals(endpoint.listenerName().get ())
183
+ case (endpoint, _) => acceptor.endPoint.listener. equals(endpoint.listener ())
185
184
} match {
186
185
case None => allAuthorizerFuturesComplete
187
186
case Some ((_, future)) => future
@@ -210,23 +209,24 @@ class SocketServer(
210
209
enableFuture
211
210
}
212
211
213
- private def createDataPlaneAcceptorAndProcessors (endpoint : EndPoint ): Unit = synchronized {
212
+ private def createDataPlaneAcceptorAndProcessors (endpoint : Endpoint ): Unit = synchronized {
214
213
if (stopped) {
215
214
throw new RuntimeException (" Can't create new data plane acceptor and processors: SocketServer is stopped." )
216
215
}
217
- val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
218
- connectionQuotas.addListener(config, endpoint.listenerName)
219
- val isPrivilegedListener = config.interBrokerListenerName == endpoint.listenerName
216
+ val listenerName = ListenerName .normalised(endpoint.listener)
217
+ val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
218
+ connectionQuotas.addListener(config, listenerName)
219
+ val isPrivilegedListener = config.interBrokerListenerName == listenerName
220
220
val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel)
221
221
config.addReconfigurable(dataPlaneAcceptor)
222
222
dataPlaneAcceptor.configure(parsedConfigs)
223
223
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
224
- info(s " Created data-plane acceptor and processors for endpoint : ${endpoint. listenerName}" )
224
+ info(s " Created data-plane acceptor and processors for endpoint : ${listenerName}" )
225
225
}
226
226
227
- private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
227
+ private def endpoints = config.listeners.map(l => ListenerName .normalised(l.listener) -> l).toMap
228
228
229
- protected def createDataPlaneAcceptor (endPoint : EndPoint , isPrivilegedListener : Boolean , requestChannel : RequestChannel ): DataPlaneAcceptor = {
229
+ protected def createDataPlaneAcceptor (endPoint : Endpoint , isPrivilegedListener : Boolean , requestChannel : RequestChannel ): DataPlaneAcceptor = {
230
230
new DataPlaneAcceptor (this , endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
231
231
}
232
232
@@ -277,7 +277,7 @@ class SocketServer(
277
277
/**
278
278
* This method is called to dynamically add listeners.
279
279
*/
280
- def addListeners (listenersAdded : Seq [EndPoint ]): Unit = synchronized {
280
+ def addListeners (listenersAdded : Seq [Endpoint ]): Unit = synchronized {
281
281
if (stopped) {
282
282
throw new RuntimeException (" can't add new listeners: SocketServer is stopped." )
283
283
}
@@ -297,10 +297,10 @@ class SocketServer(
297
297
}
298
298
}
299
299
300
- def removeListeners (listenersRemoved : Seq [EndPoint ]): Unit = synchronized {
300
+ def removeListeners (listenersRemoved : Seq [Endpoint ]): Unit = synchronized {
301
301
info(s " Removing data-plane listeners for endpoints $listenersRemoved" )
302
302
listenersRemoved.foreach { endpoint =>
303
- connectionQuotas.removeListener(config, endpoint.listenerName )
303
+ connectionQuotas.removeListener(config, ListenerName .normalised( endpoint.listener) )
304
304
dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
305
305
acceptor.beginShutdown()
306
306
acceptor.close()
@@ -345,7 +345,7 @@ class SocketServer(
345
345
// For test usage
346
346
def dataPlaneAcceptor (listenerName : String ): Option [DataPlaneAcceptor ] = {
347
347
dataPlaneAcceptors.asScala.foreach { case (endPoint, acceptor) =>
348
- if (endPoint.listenerName.value() == listenerName)
348
+ if (endPoint.listener == listenerName)
349
349
return Some (acceptor)
350
350
}
351
351
None
@@ -374,7 +374,7 @@ object DataPlaneAcceptor {
374
374
}
375
375
376
376
class DataPlaneAcceptor (socketServer : SocketServer ,
377
- endPoint : EndPoint ,
377
+ endPoint : Endpoint ,
378
378
config : KafkaConfig ,
379
379
nodeId : Int ,
380
380
connectionQuotas : ConnectionQuotas ,
@@ -404,7 +404,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
404
404
* Returns the listener name associated with this reconfigurable. Listener-specific
405
405
* configs corresponding to this listener name are provided for reconfiguration.
406
406
*/
407
- override def listenerName (): ListenerName = endPoint.listenerName
407
+ override def listenerName (): ListenerName = ListenerName .normalised( endPoint.listener)
408
408
409
409
/**
410
410
* Returns the names of configs that may be reconfigured.
@@ -451,7 +451,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
451
451
val newNumNetworkThreads = configs.get(SocketServerConfigs .NUM_NETWORK_THREADS_CONFIG ).asInstanceOf [Int ]
452
452
453
453
if (newNumNetworkThreads != processors.length) {
454
- info(s " Resizing network thread pool size for ${endPoint.listenerName } listener from ${processors.length} to $newNumNetworkThreads" )
454
+ info(s " Resizing network thread pool size for ${endPoint.listener } listener from ${processors.length} to $newNumNetworkThreads" )
455
455
if (newNumNetworkThreads > processors.length) {
456
456
addProcessors(newNumNetworkThreads - processors.length)
457
457
} else if (newNumNetworkThreads < processors.length) {
@@ -472,7 +472,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
472
472
* Thread that accepts and configures new connections. There is one of these per endpoint.
473
473
*/
474
474
private [kafka] abstract class Acceptor (val socketServer : SocketServer ,
475
- val endPoint : EndPoint ,
475
+ val endPoint : Endpoint ,
476
476
var config : KafkaConfig ,
477
477
nodeId : Int ,
478
478
val connectionQuotas : ConnectionQuotas ,
@@ -515,15 +515,15 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
515
515
private val backwardCompatibilityMetricGroup = new KafkaMetricsGroup (" kafka.network" , " Acceptor" )
516
516
private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName(
517
517
" AcceptorBlockedPercent" ,
518
- Map (ListenerMetricTag -> endPoint.listenerName.value ).asJava)
518
+ Map (ListenerMetricTag -> endPoint.listener ).asJava)
519
519
private val blockedPercentMeter = metricsGroup.newMeter(blockedPercentMeterMetricName," blocked time" , TimeUnit .NANOSECONDS )
520
520
private var currentProcessorIndex = 0
521
521
private [network] val throttledSockets = new mutable.PriorityQueue [DelayedCloseSocket ]()
522
522
private val started = new AtomicBoolean ()
523
523
private [network] val startedFuture = new CompletableFuture [Void ]()
524
524
525
525
val thread : KafkaThread = KafkaThread .nonDaemon(
526
- s " data-plane-kafka-socket-acceptor- ${endPoint.listenerName }- ${endPoint.securityProtocol}- ${endPoint.port}" ,
526
+ s " data-plane-kafka-socket-acceptor- ${endPoint.listener }- ${endPoint.securityProtocol}- ${endPoint.port}" ,
527
527
this )
528
528
529
529
def start (): Unit = synchronized {
@@ -535,19 +535,19 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
535
535
serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize)
536
536
debug(s " Opened endpoint ${endPoint.host}: ${endPoint.port}" )
537
537
}
538
- debug(s " Starting processors for listener ${endPoint.listenerName }" )
538
+ debug(s " Starting processors for listener ${endPoint.listener }" )
539
539
processors.foreach(_.start())
540
- debug(s " Starting acceptor thread for listener ${endPoint.listenerName }" )
540
+ debug(s " Starting acceptor thread for listener ${endPoint.listener }" )
541
541
thread.start()
542
542
startedFuture.complete(null )
543
543
started.set(true )
544
544
} catch {
545
545
case e : ClosedChannelException =>
546
- debug(s " Refusing to start acceptor for ${endPoint.listenerName } since the acceptor has already been shut down. " )
546
+ debug(s " Refusing to start acceptor for ${endPoint.listener } since the acceptor has already been shut down. " )
547
547
startedFuture.completeExceptionally(e)
548
548
case t : Throwable =>
549
- error(s " Unable to start acceptor for ${endPoint.listenerName }" , t)
550
- startedFuture.completeExceptionally(new RuntimeException (s " Unable to start acceptor for ${endPoint.listenerName }" , t))
549
+ error(s " Unable to start acceptor for ${endPoint.listener }" , t)
550
+ startedFuture.completeExceptionally(new RuntimeException (s " Unable to start acceptor for ${endPoint.listener }" , t))
551
551
}
552
552
}
553
553
@@ -628,7 +628,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
628
628
new InetSocketAddress (host, port)
629
629
}
630
630
val serverChannel = socketServer.socketFactory.openServerSocket(
631
- endPoint.listenerName.value() ,
631
+ endPoint.listener ,
632
632
socketAddress,
633
633
listenBacklogSize,
634
634
recvBufferSize)
@@ -682,14 +682,15 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
682
682
private def accept (key : SelectionKey ): Option [SocketChannel ] = {
683
683
val serverSocketChannel = key.channel().asInstanceOf [ServerSocketChannel ]
684
684
val socketChannel = serverSocketChannel.accept()
685
+ val listenerName = ListenerName .normalised(endPoint.listener)
685
686
try {
686
- connectionQuotas.inc(endPoint. listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
687
+ connectionQuotas.inc(listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
687
688
configureAcceptedSocketChannel(socketChannel)
688
689
Some (socketChannel)
689
690
} catch {
690
691
case e : TooManyConnectionsException =>
691
692
info(s " Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections. " )
692
- connectionQuotas.closeChannel(this , endPoint. listenerName, socketChannel)
693
+ connectionQuotas.closeChannel(this , listenerName, socketChannel)
693
694
None
694
695
case e : ConnectionThrottledException =>
695
696
val ip = socketChannel.socket.getInetAddress
@@ -699,7 +700,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
699
700
None
700
701
case e : IOException =>
701
702
error(s " Encountered an error while configuring the connection, closing it. " , e)
702
- connectionQuotas.closeChannel(this , endPoint. listenerName, socketChannel)
703
+ connectionQuotas.closeChannel(this , listenerName, socketChannel)
703
704
None
704
705
}
705
706
}
@@ -741,7 +742,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
741
742
def wakeup (): Unit = nioSelector.wakeup()
742
743
743
744
def addProcessors (toCreate : Int ): Unit = synchronized {
744
- val listenerName = endPoint.listenerName
745
+ val listenerName = ListenerName .normalised( endPoint.listener)
745
746
val securityProtocol = endPoint.securityProtocol
746
747
val listenerProcessors = new ArrayBuffer [Processor ]()
747
748
@@ -761,7 +762,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
761
762
listenerName : ListenerName ,
762
763
securityProtocol : SecurityProtocol ,
763
764
connectionDisconnectListeners : Seq [ConnectionDisconnectListener ]): Processor = {
764
- val name = s " data-plane-kafka-network-thread- $nodeId- ${endPoint.listenerName }- ${endPoint.securityProtocol}- $id"
765
+ val name = s " data-plane-kafka-network-thread- $nodeId- ${endPoint.listener }- ${endPoint.securityProtocol}- $id"
765
766
new Processor (id,
766
767
time,
767
768
config.socketRequestMaxBytes,
0 commit comments