diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 590be14e2ddc6..a3917fdb18aa5 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -1385,6 +1385,11 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend connectionRatePerIp.getOrDefault(ip, defaultConnectionRatePerIp) } + // Visible for testing + private[network] def maxConnectionsPerIpOverrideForIp(ip: InetAddress): Option[Int] = { + maxConnectionsPerIpOverrides.get(ip) + } + private[network] def addListener(config: KafkaConfig, listenerName: ListenerName): Unit = { counts.synchronized { if (!maxConnectionsPerListener.contains(listenerName)) { diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala index e19bc9a0238f8..140bc3bcd6c72 100644 --- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala @@ -29,7 +29,6 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} import org.apache.kafka.common.record.internal.{MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.test.api.Flaky import org.apache.kafka.common.{KafkaException, Uuid, requests} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.server.config.QuotaConfig @@ -81,7 +80,6 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { } } - @Flaky("KAFKA-17999") @Test def testDynamicConnectionQuota(): Unit = { val maxConnectionsPerIP = 5 @@ -105,6 +103,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { val maxConnectionsPerIPOverride = 7 props.put(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, s"localhost:$maxConnectionsPerIPOverride") reconfigureServers(props, perBrokerConfig = false, (SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, s"localhost:$maxConnectionsPerIPOverride")) + waitForMaxConnectionsOverrideApplied("localhost", maxConnectionsPerIPOverride) verifyMaxConnections(maxConnectionsPerIPOverride, connectAndVerify) } @@ -416,4 +415,15 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { }.asJavaCollection adminClient.alterClientQuotas(entries) } + + private def waitForMaxConnectionsOverrideApplied(ip: String, expected: Int): Unit = { + val addr = InetAddress.getByName(ip) + val quotas = brokers.head.socketServer.connectionQuotas + + TestUtils.waitUntilTrue( + () => quotas.maxConnectionsPerIpOverrideForIp(addr).contains(expected), + s"maxConnectionsPerIpOverrides not applied yet for ip=$ip (expected=$expected, current=${quotas.maxConnectionsPerIpOverrideForIp(addr)})", + 50000 + ) + } }