diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 3965701667179..71948830b9a11 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -53,6 +53,72 @@ public final class ClientUtils { private ClientUtils() { } + /** + * Resolves a single URL to one or more InetSocketAddress based on the DNS lookup strategy. + * + * @param url the original URL string (for logging) + * @param host the hostname extracted from the URL + * @param port the port extracted from the URL + * @param clientDnsLookup the DNS lookup strategy + * @return list of resolved addresses (may be empty if addresses are unresolved) + * @throws UnknownHostException if DNS resolution fails + */ + private static List resolveAddress( + String url, + String host, + Integer port, + ClientDnsLookup clientDnsLookup) throws UnknownHostException { + + List addresses = new ArrayList<>(); + + if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) { + InetAddress[] inetAddresses = InetAddress.getAllByName(host); + for (InetAddress inetAddress : inetAddresses) { + String resolvedCanonicalName = inetAddress.getCanonicalHostName(); + InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port); + if (address.isUnresolved()) { + log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", + url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host); + } else { + addresses.add(address); + } + } + } else { + InetSocketAddress address = new InetSocketAddress(host, port); + if (address.isUnresolved()) { + log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", + url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); + } else { + addresses.add(address); + } + } + + return addresses; + } + + public static List parseAddresses(List urls, ClientDnsLookup clientDnsLookup) { + List addresses = new ArrayList<>(); + if (urls == null) { + return addresses; + } + urls.forEach(url -> { + final String host = getHost(url); + final Integer port = getPort(url); + + if (host == null || port == null) { + log.warn("Skipping invalid bootstrap URL: {}", url); + return; + } + + try { + addresses.addAll(resolveAddress(url, host, port, clientDnsLookup)); + } catch (UnknownHostException e) { + // Silently ignore - this matches the original behavior + } + }); + return addresses; + } + public static List parseAndValidateAddresses(AbstractConfig config) { List urls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); String clientDnsLookupConfig = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG); @@ -73,25 +139,7 @@ public static List parseAndValidateAddresses(List url if (host == null || port == null) throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); - if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) { - InetAddress[] inetAddresses = InetAddress.getAllByName(host); - for (InetAddress inetAddress : inetAddresses) { - String resolvedCanonicalName = inetAddress.getCanonicalHostName(); - InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port); - if (address.isUnresolved()) { - log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host); - } else { - addresses.add(address); - } - } - } else { - InetSocketAddress address = new InetSocketAddress(host, port); - if (address.isUnresolved()) { - log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); - } else { - addresses.add(address); - } - } + addresses.addAll(resolveAddress(url, host, port, clientDnsLookup)); } catch (IllegalArgumentException e) { throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); @@ -151,6 +199,7 @@ static List filterPreferredAddresses(InetAddress[] allAddresses) { } public static NetworkClient createNetworkClient(AbstractConfig config, + List bootstrapServers, Metrics metrics, String metricsGroupPrefix, LogContext logContext, @@ -161,6 +210,7 @@ public static NetworkClient createNetworkClient(AbstractConfig config, Sensor throttleTimeSensor, ClientTelemetrySender clientTelemetrySender) { return createNetworkClient(config, + bootstrapServers, config.getString(CommonClientConfigs.CLIENT_ID_CONFIG), metrics, metricsGroupPrefix, @@ -177,6 +227,7 @@ public static NetworkClient createNetworkClient(AbstractConfig config, } public static NetworkClient createNetworkClient(AbstractConfig config, + List bootstrapServers, String clientId, Metrics metrics, String metricsGroupPrefix, @@ -201,6 +252,14 @@ public static NetworkClient createNetworkClient(AbstractConfig config, metricsGroupPrefix, channelBuilder, logContext); + ClientDnsLookup dnsLookup = ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)); + + NetworkClient.BootstrapConfiguration bootstrapConfiguration = NetworkClient.BootstrapConfiguration.enabled( + bootstrapServers != null ? bootstrapServers : List.of(), + dnsLookup, + config.getLong(CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG), + config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)); + return new NetworkClient(metadataUpdater, metadata, selector, @@ -221,7 +280,8 @@ public static NetworkClient createNetworkClient(AbstractConfig config, hostResolver, clientTelemetrySender, config.getLong(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG), - MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG)) + MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG)), + bootstrapConfiguration ); } catch (Throwable t) { closeQuietly(selector, "Selector"); diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 33b936b378d57..30a406531ebf2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -60,6 +60,12 @@ public class CommonClientConfigs { + "resolve each bootstrap address into a list of canonical names. After " + "the bootstrap phase, this behaves the same as use_all_dns_ips."; + public static final String BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG = "bootstrap.resolve.timeout.ms"; + public static final long DEFAULT_BOOTSTRAP_RESOLVE_TIMEOUT_MS = 2 * 60 * 1000L; + public static final String BOOTSTRAP_RESOLVE_TIMEOUT_MS_DOC = "Maximum amount of time clients can spend trying to" + + " resolve for the bootstrap server address. If the resolution cannot be completed within this timeframe, a " + + "BootstrapResolutionException will be thrown."; + public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms"; public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions."; diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java index 3d5154994fcda..7217bdc63e530 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -81,6 +82,19 @@ public void handleSuccessfulResponse(RequestHeader requestHeader, long now, Meta // Do nothing } + @Override + public boolean isBootstrapped() { + // ManualMetadataUpdater is designed for cases where nodes are manually set, + // so we consider it bootstrapped if nodes have been provided + return !nodes.isEmpty(); + } + + @Override + public void bootstrap(List addresses) { + // ManualMetadataUpdater doesn't use NetworkClient's bootstrap mechanism + // Nodes should be set manually via constructor or setNodes() + } + @Override public void close() { } diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java b/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java index 424ecb0d29196..aa0caaef239c5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java @@ -227,8 +227,8 @@ private void computeClusterView() { .stream() .map(metadata -> MetadataResponse.toPartitionInfo(metadata, nodes)) .collect(Collectors.toList()); - this.clusterInstance = new Cluster(clusterId, nodes.values(), partitionInfos, unauthorizedTopics, - invalidTopics, internalTopics, controller, topicIds); + this.clusterInstance = new Cluster(clusterId, nodes.values(), partitionInfos, + unauthorizedTopics, invalidTopics, internalTopics, controller, topicIds); } static MetadataSnapshot bootstrap(List addresses) { diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java index 825d2e67f70d7..add46cb7fcbca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.requests.RequestHeader; import java.io.Closeable; +import java.net.InetSocketAddress; import java.util.List; import java.util.Optional; @@ -102,6 +103,17 @@ default boolean needsRebootstrap(long now, long rebootstrapTriggerMs) { */ default void rebootstrap(long now) {} + /** + * Returns true if the metadata has been bootstrapped. + */ + boolean isBootstrapped(); + + /** + * Bootstrap the metadata cache with the given addresses. + * @param addresses list of addresses for the bootstrap servers + */ + void bootstrap(List addresses); + /** * Close this updater. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 692847a8b1553..108b22238f085 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -21,7 +21,9 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.BootstrapResolutionException; import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; import org.apache.kafka.common.metrics.Sensor; @@ -47,6 +49,7 @@ import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -137,6 +140,10 @@ private enum State { private final AtomicReference state; + private final BootstrapConfiguration bootstrapConfiguration; + + private final Timer bootstrapTimer; + private final TelemetrySender telemetrySender; public NetworkClient(Selectable selector, @@ -154,7 +161,8 @@ public NetworkClient(Selectable selector, boolean discoverBrokerVersions, ApiVersions apiVersions, LogContext logContext, - MetadataRecoveryStrategy metadataRecoveryStrategy) { + MetadataRecoveryStrategy metadataRecoveryStrategy, + BootstrapConfiguration bootstrapConfiguration) { this(selector, metadata, clientId, @@ -171,7 +179,8 @@ public NetworkClient(Selectable selector, apiVersions, logContext, Long.MAX_VALUE, - metadataRecoveryStrategy); + metadataRecoveryStrategy, + bootstrapConfiguration); } public NetworkClient(Selectable selector, @@ -190,7 +199,8 @@ public NetworkClient(Selectable selector, ApiVersions apiVersions, LogContext logContext, long rebootstrapTriggerMs, - MetadataRecoveryStrategy metadataRecoveryStrategy) { + MetadataRecoveryStrategy metadataRecoveryStrategy, + BootstrapConfiguration bootstrapConfiguration) { this(null, metadata, selector, @@ -211,7 +221,8 @@ public NetworkClient(Selectable selector, new DefaultHostResolver(), null, rebootstrapTriggerMs, - metadataRecoveryStrategy); + metadataRecoveryStrategy, + bootstrapConfiguration); } public NetworkClient(Selectable selector, @@ -230,7 +241,8 @@ public NetworkClient(Selectable selector, ApiVersions apiVersions, Sensor throttleTimeSensor, LogContext logContext, - MetadataRecoveryStrategy metadataRecoveryStrategy) { + MetadataRecoveryStrategy metadataRecoveryStrategy, + BootstrapConfiguration bootstrapConfiguration) { this(null, metadata, selector, @@ -251,7 +263,8 @@ public NetworkClient(Selectable selector, new DefaultHostResolver(), null, Long.MAX_VALUE, - metadataRecoveryStrategy); + metadataRecoveryStrategy, + bootstrapConfiguration); } public NetworkClient(Selectable selector, @@ -269,7 +282,8 @@ public NetworkClient(Selectable selector, boolean discoverBrokerVersions, ApiVersions apiVersions, LogContext logContext, - MetadataRecoveryStrategy metadataRecoveryStrategy) { + MetadataRecoveryStrategy metadataRecoveryStrategy, + BootstrapConfiguration bootstrapConfiguration) { this(metadataUpdater, null, selector, @@ -290,7 +304,8 @@ public NetworkClient(Selectable selector, new DefaultHostResolver(), null, Long.MAX_VALUE, - metadataRecoveryStrategy); + metadataRecoveryStrategy, + bootstrapConfiguration); } public NetworkClient(MetadataUpdater metadataUpdater, @@ -313,7 +328,8 @@ public NetworkClient(MetadataUpdater metadataUpdater, HostResolver hostResolver, ClientTelemetrySender clientTelemetrySender, long rebootstrapTriggerMs, - MetadataRecoveryStrategy metadataRecoveryStrategy) { + MetadataRecoveryStrategy metadataRecoveryStrategy, + BootstrapConfiguration bootstrapConfiguration) { /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the * super constructor is invoked. @@ -346,6 +362,8 @@ public NetworkClient(MetadataUpdater metadataUpdater, this.telemetrySender = (clientTelemetrySender != null) ? new TelemetrySender(clientTelemetrySender) : null; this.rebootstrapTriggerMs = rebootstrapTriggerMs; this.metadataRecoveryStrategy = metadataRecoveryStrategy; + this.bootstrapConfiguration = bootstrapConfiguration; + this.bootstrapTimer = time.timer(bootstrapConfiguration.bootstrapResolveTimeoutMs); } /** @@ -629,6 +647,7 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long @Override public List poll(long timeout, long now) { ensureActive(); + ensureBootstrapped(now); if (!abortedSends.isEmpty()) { // If there are aborted sends because of unsupported version exceptions or disconnects, @@ -758,8 +777,9 @@ public void close() { @Override public LeastLoadedNode leastLoadedNode(long now) { List nodes = this.metadataUpdater.fetchNodes(); - if (nodes.isEmpty()) - throw new IllegalStateException("There are no nodes in the Kafka cluster"); + if (nodes.isEmpty()) { + return handleEmptyNodeList(); + } int inflight = Integer.MAX_VALUE; Node foundConnecting = null; @@ -821,6 +841,19 @@ public LeastLoadedNode leastLoadedNode(long now) { } } + /** + * Handle the case when there are no nodes available. + * During bootstrap phase, return null node to allow DNS resolution to continue. + * After bootstrap, throw IllegalStateException. + */ + private LeastLoadedNode handleEmptyNodeList() { + if (!metadataUpdater.isBootstrapped()) { + log.debug("No nodes available yet, still in bootstrap phase"); + return new LeastLoadedNode(null, false); + } + throw new IllegalStateException("There are no nodes in the Kafka cluster"); + } + public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) { try { return AbstractResponse.parseResponse(responseBuffer, requestHeader); @@ -1034,7 +1067,7 @@ private void handleApiVersionsResponse(List responses, // the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned. // If not provided, the client falls back to version 0. short maxApiVersion = 0; - if (apiVersionsResponse.data().apiKeys().size() > 0) { + if (!apiVersionsResponse.data().apiKeys().isEmpty()) { ApiVersion apiVersion = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id); if (apiVersion != null) { maxApiVersion = apiVersion.maxVersion(); @@ -1171,6 +1204,80 @@ private boolean isTelemetryApi(ApiKeys apiKey) { return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY; } + public static class BootstrapConfiguration { + public final List bootstrapServers; + public final ClientDnsLookup clientDnsLookup; + public final long bootstrapResolveTimeoutMs; + public final long retryBackoffMs; + private final boolean isBootstrapDisabled; + + private BootstrapConfiguration(final List bootstrapServers, + final ClientDnsLookup clientDnsLookup, + final long bootstrapResolveTimeoutMs, + final long retryBackoffMs, + final boolean isBootstrapDisabled) { + this.bootstrapServers = bootstrapServers; + this.clientDnsLookup = clientDnsLookup; + this.bootstrapResolveTimeoutMs = bootstrapResolveTimeoutMs; + this.retryBackoffMs = retryBackoffMs; + this.isBootstrapDisabled = isBootstrapDisabled; + } + + public static BootstrapConfiguration enabled(final List bootstrapServers, + final ClientDnsLookup clientDnsLookup, + final long bootstrapResolveTimeoutMs, + final long retryBackoffMs) { + return new BootstrapConfiguration(bootstrapServers, clientDnsLookup, bootstrapResolveTimeoutMs, retryBackoffMs, false); + } + + public static BootstrapConfiguration disabled() { + return new BootstrapConfiguration(List.of(), null, 0, 0, true); + } + } + + /** + * Attempts to resolve bootstrap server addresses via DNS and create an initial bootstrap cluster. + * This method is called from {@link #poll(long, long)} and uses a non-blocking approach. + * + *

On each invocation, this method will attempt DNS resolution once. If resolution fails, + * it returns immediately and will be retried on the next poll invocation. This ensures the + * event loop remains responsive and doesn't block on DNS lookups. + * + * @param currentTimeMs The current time in milliseconds + * @throws BootstrapResolutionException if the bootstrap timeout expires before DNS resolution succeeds + */ + void ensureBootstrapped(final long currentTimeMs) { + if (bootstrapConfiguration.isBootstrapDisabled || metadataUpdater.isBootstrapped()) + return; + + if (Thread.interrupted()) { + throw new InterruptException(new InterruptedException()); + } + + // Timer is already initialized in constructor, just update it with current time + bootstrapTimer.update(currentTimeMs); + + // Attempt DNS resolution (single attempt per poll, typically fast) + List servers = ClientUtils.parseAddresses(bootstrapConfiguration.bootstrapServers, bootstrapConfiguration.clientDnsLookup); + + if (!servers.isEmpty()) { + log.debug("Bootstrap DNS resolution succeeded, {} servers resolved", servers.size()); + metadataUpdater.bootstrap(servers); + return; + } + + // DNS resolution failed + if (bootstrapTimer.isExpired()) { + throw new BootstrapResolutionException("Failed to resolve bootstrap servers after " + + bootstrapConfiguration.bootstrapResolveTimeoutMs + "ms. " + + "Please check your bootstrap.servers configuration and DNS settings."); + } + + // DNS failed but timeout not reached, log and will retry on next poll + log.warn("Failed to resolve bootstrap servers, will retry on next poll. Remaining time: {}ms", + bootstrapTimer.remainingMs()); + } + class DefaultMetadataUpdater implements MetadataUpdater { /* the current cluster metadata */ @@ -1195,6 +1302,7 @@ class DefaultMetadataUpdater implements MetadataUpdater { @Override public List fetchNodes() { + ensureBootstrapped(time.milliseconds()); return metadata.fetch().nodes(); } @@ -1226,7 +1334,10 @@ public long maybeUpdate(long now) { LeastLoadedNode leastLoadedNode = leastLoadedNode(now); // Rebootstrap if needed and configured. + // Only rebootstrap if we've already completed initial bootstrap - otherwise we're still + // in the initial DNS resolution phase and should let ensureBootstrapped() handle it. if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP + && isBootstrapped() && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) { rebootstrap(now); @@ -1321,6 +1432,17 @@ public void rebootstrap(long now) { metadataAttemptStartMs = Optional.of(now); } + @Override + public boolean isBootstrapped() { + // We are bootstrapped if we have any nodes available (either from DNS resolution or metadata response) + return !metadata.fetch().nodes().isEmpty(); + } + + @Override + public void bootstrap(List addresses) { + metadata.bootstrap(addresses); + } + @Override public void close() { this.metadata.close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java index 3908688615657..82c9a75a4a2df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java @@ -87,6 +87,12 @@ public class AdminClientConfig extends AbstractConfig { public static final String RETRY_BACKOFF_MAX_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MAX_MS_CONFIG; private static final String RETRY_BACKOFF_MAX_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MAX_MS_DOC; + /** + * bootstrap.resolve.timeout.ms + */ + public static final String BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG = CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG; + private static final String BOOTSTRAP_RESOLVE_TIMEOUT_MS_DOC = CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_DOC; + /** * enable.metrics.push */ @@ -196,6 +202,12 @@ public class AdminClientConfig extends AbstractConfig { atLeast(0L), Importance.LOW, RETRY_BACKOFF_MAX_MS_DOC) + .define(BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG, + Type.LONG, + CommonClientConfigs.DEFAULT_BOOTSTRAP_RESOLVE_TIMEOUT_MS, + atLeast(0L), + Importance.MEDIUM, + BOOTSTRAP_RESOLVE_TIMEOUT_MS_DOC) .define(ENABLE_METRICS_PUSH_CONFIG, Type.BOOLEAN, false, diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 8d2ea1982cbc0..35e839ab4905b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -40,7 +40,6 @@ import org.apache.kafka.clients.admin.internals.AdminApiFuture; import org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture; import org.apache.kafka.clients.admin.internals.AdminApiHandler; -import org.apache.kafka.clients.admin.internals.AdminBootstrapAddresses; import org.apache.kafka.clients.admin.internals.AdminFetchMetricsManager; import org.apache.kafka.clients.admin.internals.AdminMetadataManager; import org.apache.kafka.clients.admin.internals.AllBrokersStrategy; @@ -513,6 +512,43 @@ static String prettyPrintException(Throwable throwable) { return throwable.getClass().getSimpleName(); } + /** + * Determines which bootstrap configuration to use based on the provided config. + * Validates that exactly one of bootstrap.servers or bootstrap.controllers is configured. + * + * @param config The admin client configuration + * @return true if using bootstrap.controllers, false if using bootstrap.servers + * @throws ConfigException if both or neither bootstrap configurations are set + */ + static boolean determineBootstrapType(AdminClientConfig config) { + List bootstrapServers = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + if (bootstrapServers == null) { + bootstrapServers = Collections.emptyList(); + } + List controllerServers = config.getList(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG); + if (controllerServers == null) { + controllerServers = Collections.emptyList(); + } + + if (bootstrapServers.isEmpty()) { + if (controllerServers.isEmpty()) { + throw new ConfigException("You must set either " + + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + " or " + + AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG); + } else { + return true; // Using bootstrap.controllers + } + } else { + if (controllerServers.isEmpty()) { + return false; // Using bootstrap.servers + } else { + throw new ConfigException("You cannot set both " + + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + " and " + + AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG); + } + } + } + static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory) { return createInternal(config, timeoutProcessorFactory, null); } @@ -533,12 +569,19 @@ static KafkaAdminClient createInternal( try { // Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it // simplifies communication with older brokers) - AdminBootstrapAddresses adminAddresses = AdminBootstrapAddresses.fromConfig(config); + boolean usingBootstrapControllers = determineBootstrapType(config); AdminMetadataManager metadataManager = new AdminMetadataManager(logContext, config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), - adminAddresses.usingBootstrapControllers()); - metadataManager.update(Cluster.bootstrap(adminAddresses.addresses()), time.milliseconds()); + usingBootstrapControllers); + + // Get the appropriate bootstrap configuration + List bootstrapAddressesToUse = usingBootstrapControllers + ? config.getList(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG) + : config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + + // Don't create bootstrap cluster here - let NetworkClient.ensureBootstrapped() handle it + // during the first poll after DNS resolution succeeds List reporters = CommonClientConfigs.metricsReporters(clientId, config); clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); clientTelemetryReporter.ifPresent(reporters::add); @@ -550,7 +593,9 @@ static KafkaAdminClient createInternal( MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); metrics = new Metrics(metricConfig, reporters, time, metricsContext); + networkClient = ClientUtils.createNetworkClient(config, + bootstrapAddressesToUse, clientId, metrics, "admin-client", diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddresses.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddresses.java deleted file mode 100644 index 77a491a320180..0000000000000 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddresses.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.admin.internals; - -import org.apache.kafka.clients.ClientUtils; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigException; - -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -public final class AdminBootstrapAddresses { - private final boolean usingBootstrapControllers; - private final List addresses; - - AdminBootstrapAddresses( - boolean usingBootstrapControllers, - List addresses - ) { - this.usingBootstrapControllers = usingBootstrapControllers; - this.addresses = addresses; - } - - public boolean usingBootstrapControllers() { - return usingBootstrapControllers; - } - - public List addresses() { - return addresses; - } - - public static AdminBootstrapAddresses fromConfig(AbstractConfig config) { - List bootstrapServers = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); - if (bootstrapServers == null) { - bootstrapServers = Collections.emptyList(); - } - List controllerServers = config.getList(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG); - if (controllerServers == null) { - controllerServers = Collections.emptyList(); - } - String clientDnsLookupConfig = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG); - if (bootstrapServers.isEmpty()) { - if (controllerServers.isEmpty()) { - throw new ConfigException("You must set either " + - CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + " or " + - AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG); - } else { - return new AdminBootstrapAddresses(true, - ClientUtils.parseAndValidateAddresses(controllerServers, clientDnsLookupConfig)); - } - } else { - if (controllerServers.isEmpty()) { - return new AdminBootstrapAddresses(false, - ClientUtils.parseAndValidateAddresses(bootstrapServers, clientDnsLookupConfig)); - } else { - throw new ConfigException("You cannot set both " + - CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + " and " + - AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG); - } - } - } - - @Override - public int hashCode() { - return Objects.hash(usingBootstrapControllers, addresses); - } - - @Override - public boolean equals(Object o) { - if (o == null || (!o.getClass().equals(AdminBootstrapAddresses.class))) return false; - AdminBootstrapAddresses other = (AdminBootstrapAddresses) o; - return usingBootstrapControllers == other.usingBootstrapControllers && - addresses.equals(other.addresses); - } - - @Override - public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("AdminBootstrapAddresses"); - bld.append("(usingBootstrapControllers=").append(usingBootstrapControllers); - bld.append(", addresses=["); - String prefix = ""; - for (InetSocketAddress address : addresses) { - bld.append(prefix).append(address); - prefix = ", "; - } - bld.append("])"); - return bld.toString(); - } -} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java index 0ac5419991e1b..764aa440f81c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; +import java.net.InetSocketAddress; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -148,6 +149,17 @@ public void rebootstrap(long now) { AdminMetadataManager.this.rebootstrap(now); } + @Override + public boolean isBootstrapped() { + return bootstrapCluster != null; + } + + @Override + public void bootstrap(List addresses) { + // Called by NetworkClient.ensureBootstrapped() after DNS resolution succeeds + AdminMetadataManager.this.update(Cluster.bootstrap(addresses), 0); + } + @Override public void close() { } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 4dd6002ece595..d1745f497d556 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -133,6 +133,11 @@ public class ConsumerConfig extends AbstractConfig { /** client.dns.lookup */ public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG; + /** + * bootstrap.resolve.timeout.ms + */ + public static final String BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG = CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG; + /** * enable.auto.commit */ @@ -426,6 +431,11 @@ public class ConsumerConfig extends AbstractConfig { ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), Importance.MEDIUM, CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC) + .define(BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG, + Type.LONG, + CommonClientConfigs.DEFAULT_BOOTSTRAP_RESOLVE_TIMEOUT_MS, + Importance.HIGH, + CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_DOC) .define(GROUP_ID_CONFIG, Type.STRING, null, Importance.HIGH, GROUP_ID_DOC) .define(GROUP_INSTANCE_ID_CONFIG, Type.STRING, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 8adc71b904b7d..21fd0e7f1ca27 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -114,7 +114,6 @@ import org.slf4j.Logger; import org.slf4j.event.Level; -import java.net.InetSocketAddress; import java.time.Duration; import java.util.Arrays; import java.util.Collection; @@ -493,8 +492,6 @@ public AsyncKafkaConsumer(final ConsumerConfig config, interceptorList, Arrays.asList(deserializers.keyDeserializer(), deserializers.valueDeserializer())); this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners); - final List addresses = ClientUtils.parseAndValidateAddresses(config); - metadata.bootstrap(addresses); this.fetchMetricsManager = createFetchMetricsManager(metrics); FetchConfig fetchConfig = new FetchConfig(config); @@ -2353,7 +2350,6 @@ boolean processBackgroundEvents() { * * Each iteration gives the application thread an opportunity to process background events, which may be * necessary to complete the overall processing. - * *

* * As an example, take {@link #unsubscribe()}. To start unsubscribing, the application thread enqueues an diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index 4851e54d8ac5f..44cd954324ae2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -63,7 +63,6 @@ import org.slf4j.Logger; import org.slf4j.event.Level; -import java.net.InetSocketAddress; import java.time.Duration; import java.util.Arrays; import java.util.Collection; @@ -190,8 +189,6 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { interceptorList, Arrays.asList(this.deserializers.keyDeserializer(), this.deserializers.valueDeserializer())); this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners); - List addresses = ClientUtils.parseAndValidateAddresses(config); - this.metadata.bootstrap(addresses); this.fetchMetricsManager = createFetchMetricsManager(metrics); FetchConfig fetchConfig = new FetchConfig(config); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java index 2f715e206cc07..10be5854d021c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java @@ -89,6 +89,7 @@ public static ConsumerNetworkClient createConsumerNetworkClient(ConsumerConfig c long retryBackoffMs, ClientTelemetrySender clientTelemetrySender) { NetworkClient netClient = ClientUtils.createNetworkClient(config, + config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), metrics, CONSUMER_METRIC_GROUP_PREFIX, logContext, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 42c1c73acb565..fba2808075d24 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.BootstrapResolutionException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; @@ -157,7 +158,12 @@ public void poll(final long timeoutMs, final long currentTimeMs) { * @param onClose True when the network thread is closing. */ public void poll(final long timeoutMs, final long currentTimeMs, boolean onClose) { - trySend(currentTimeMs); + try { + trySend(currentTimeMs); + } catch (BootstrapResolutionException e) { + // Bootstrap DNS resolution timeout - propagate to app thread + backgroundEventHandler.add(new ErrorEvent(e)); + } long pollTimeoutMs = timeoutMs; if (!unsentRequests.isEmpty()) { @@ -476,6 +482,7 @@ public static Supplier supplier(final Time time, @Override protected NetworkClientDelegate create() { KafkaClient client = ClientUtils.createNetworkClient(config, + config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), metrics, CONSUMER_METRIC_GROUP_PREFIX, logContext, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 2702ad52921e8..6e0138e4c9f3f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -85,7 +85,6 @@ import org.slf4j.Logger; -import java.net.InetSocketAddress; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -452,7 +451,6 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali transactionManager, new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME)); - List addresses = ClientUtils.parseAndValidateAddresses(config); if (metadata != null) { this.metadata = metadata; } else { @@ -463,7 +461,6 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali logContext, clusterResourceListeners, Time.SYSTEM); - this.metadata.bootstrap(addresses); } this.errors = this.metrics.sensor("errors"); this.sender = newSender(logContext, kafkaClient, this.metadata); @@ -528,6 +525,7 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics); KafkaClient client = kafkaClient != null ? kafkaClient : ClientUtils.createNetworkClient(producerConfig, + producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), this.metrics, "producer", logContext, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 3ecc9e1e2cbb1..d6cf75033816e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -224,6 +224,9 @@ public class ProducerConfig extends AbstractConfig { /** retry.backoff.max.ms */ public static final String RETRY_BACKOFF_MAX_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MAX_MS_CONFIG; + /** bootstrap.resolve.timeout.ms */ + public static final String BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG = CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG; + /** * enable.metrics.push */ @@ -427,6 +430,12 @@ public class ProducerConfig extends AbstractConfig { atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MAX_MS_DOC) + .define(BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG, + Type.LONG, + CommonClientConfigs.DEFAULT_BOOTSTRAP_RESOLVE_TIMEOUT_MS, + atLeast(0L), + Importance.MEDIUM, + CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_DOC) .define(ENABLE_METRICS_PUSH_CONFIG, Type.BOOLEAN, true, diff --git a/clients/src/main/java/org/apache/kafka/common/errors/BootstrapResolutionException.java b/clients/src/main/java/org/apache/kafka/common/errors/BootstrapResolutionException.java new file mode 100644 index 0000000000000..899562af01f92 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/BootstrapResolutionException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.KafkaException; + +/** + * Indicates that the {@link org.apache.kafka.clients.NetworkClient} was unable to resolve a DNS address within + * the time specified by {@link org.apache.kafka.clients.CommonClientConfigs#BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG} + * + * @see org.apache.kafka.clients.CommonClientConfigs + */ +public class BootstrapResolutionException extends KafkaException { + public BootstrapResolutionException(String message) { + super(message); + } + + public BootstrapResolutionException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index d3f52a0f57571..82909e45342f5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.BootstrapResolutionException; import org.apache.kafka.common.errors.RebootstrapRequiredException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.ClusterResourceListeners; @@ -91,26 +92,32 @@ import static org.mockito.Mockito.when; public class NetworkClientTest { + protected final MockTime time = new MockTime(); + private static final List BOOTSTRAP_ADDRESSES = new ArrayList<>(List.of( + "127.0.0.1:8000", + "127.0.0.2:8000")); + private static List initialAddresses; + private static List newAddresses; + private static NetworkClient.BootstrapConfiguration bootstrapConfiguration = + NetworkClient.BootstrapConfiguration.enabled( + BOOTSTRAP_ADDRESSES, + ClientDnsLookup.USE_ALL_DNS_IPS, + 10 * 1000, + CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS); protected final int defaultRequestTimeoutMs = 1000; - protected final MockTime time = new MockTime(); protected final MockSelector selector = new MockSelector(time); protected final Node node = TestUtils.singletonCluster().nodes().iterator().next(); protected final long reconnectBackoffMsTest = 10 * 1000; protected final long reconnectBackoffMaxMsTest = 10 * 10000; protected final long connectionSetupTimeoutMsTest = 5 * 1000; protected final long connectionSetupTimeoutMaxMsTest = 127 * 1000; - private final int reconnectBackoffExpBase = ClusterConnectionStates.RECONNECT_BACKOFF_EXP_BASE; - private final double reconnectBackoffJitter = ClusterConnectionStates.RECONNECT_BACKOFF_JITTER; private final TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(Collections.singletonList(node)); private final NetworkClient client = createNetworkClient(reconnectBackoffMaxMsTest); private final NetworkClient clientWithNoExponentialBackoff = createNetworkClient(reconnectBackoffMsTest); private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes(); private final NetworkClient clientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery(); - private static ArrayList initialAddresses; - private static ArrayList newAddresses; - static { try { initialAddresses = new ArrayList<>(Arrays.asList( @@ -129,10 +136,11 @@ public class NetworkClientTest { } private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) { + bootstrapMetadataUpdater(metadataUpdater); return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext(), - MetadataRecoveryStrategy.NONE); + MetadataRecoveryStrategy.NONE, bootstrapConfiguration); } private NetworkClient createNetworkClientWithMaxInFlightRequestsPerConnection( @@ -140,7 +148,7 @@ private NetworkClient createNetworkClientWithMaxInFlightRequestsPerConnection( return new NetworkClient(selector, metadataUpdater, "mock", maxInFlightRequestsPerConnection, reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext(), - MetadataRecoveryStrategy.NONE); + MetadataRecoveryStrategy.NONE, bootstrapConfiguration); } private NetworkClient createNetworkClientWithMultipleNodes(long reconnectBackoffMaxMs, long connectionSetupTimeoutMsTest, int nodeNumber) { @@ -149,34 +157,43 @@ private NetworkClient createNetworkClientWithMultipleNodes(long reconnectBackoff return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext(), - MetadataRecoveryStrategy.NONE); + MetadataRecoveryStrategy.NONE, bootstrapConfiguration); } private NetworkClient createNetworkClientWithStaticNodes() { return new NetworkClient(selector, metadataUpdater, "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext(), - MetadataRecoveryStrategy.NONE); + MetadataRecoveryStrategy.NONE, bootstrapConfiguration); } - private NetworkClient createNetworkClientWithNoVersionDiscovery(Metadata metadata) { + private NetworkClient createNetworkClientWithNoVersionDiscovery(Metadata metadata, boolean disableBootstrap) { + if (disableBootstrap) { + bootstrapConfiguration = NetworkClient.BootstrapConfiguration.disabled(); + } return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new LogContext(), - MetadataRecoveryStrategy.NONE); + MetadataRecoveryStrategy.NONE, bootstrapConfiguration); } private NetworkClient createNetworkClientWithNoVersionDiscovery() { + bootstrapMetadataUpdater(metadataUpdater); return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new LogContext(), - MetadataRecoveryStrategy.NONE); + MetadataRecoveryStrategy.NONE, bootstrapConfiguration); } @BeforeEach public void setup() { selector.reset(); + bootstrapConfiguration = NetworkClient.BootstrapConfiguration.enabled( + BOOTSTRAP_ADDRESSES, + ClientDnsLookup.USE_ALL_DNS_IPS, + CommonClientConfigs.DEFAULT_BOOTSTRAP_RESOLVE_TIMEOUT_MS, + CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS); } @Test @@ -259,7 +276,7 @@ public synchronized void rebootstrap() { reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new LogContext(), rebootstrapTriggerMs, - MetadataRecoveryStrategy.REBOOTSTRAP); + MetadataRecoveryStrategy.REBOOTSTRAP, bootstrapConfiguration); MetadataUpdater metadataUpdater = TestUtils.fieldValue(client, NetworkClient.class, "metadataUpdater"); metadata.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 9999))); @@ -313,7 +330,8 @@ public synchronized void rebootstrap() { NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new LogContext(), - rebootstrapTriggerMs, MetadataRecoveryStrategy.REBOOTSTRAP); + rebootstrapTriggerMs, MetadataRecoveryStrategy.REBOOTSTRAP, bootstrapConfiguration); + client.poll(0, time.milliseconds()); MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); metadata.updateWithCurrentRequestVersion(metadataResponse, false, time.milliseconds()); @@ -598,7 +616,6 @@ public void testDefaultRequestTimeout() { * second produce call is intentionally made to emulate a request timeout. In the case that a timeout occurs * during a request, we want to ensure that we {@link Metadata#requestUpdate(boolean) request a metadata update} so that * on a subsequent invocation of {@link NetworkClient#poll(long, long) poll}, the metadata request will be sent. - * *

* * The {@link MetadataUpdater} has a specific method to handle @@ -613,10 +630,12 @@ private void testRequestTimeout(int requestTimeoutMs) { MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); metadata.updateWithCurrentRequestVersion(metadataResponse, false, time.milliseconds()); - NetworkClient client = createNetworkClientWithNoVersionDiscovery(metadata); + NetworkClient client = createNetworkClientWithNoVersionDiscovery(metadata, true); + client.poll(0, time.milliseconds()); // Send first produce without any timeout. ClientResponse clientResponse = produce(client, requestTimeoutMs, false); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, time.milliseconds()); assertEquals(node.idString(), clientResponse.destination()); assertFalse(clientResponse.wasDisconnected(), "Expected response to succeed and not disconnect"); assertFalse(clientResponse.wasTimedOut(), "Expected response to succeed and not time out"); @@ -860,7 +879,7 @@ public void testAuthenticationFailureWithInFlightMetadataRequest() { Node node1 = cluster.nodes().get(0); Node node2 = cluster.nodes().get(1); - NetworkClient client = createNetworkClientWithNoVersionDiscovery(metadata); + NetworkClient client = createNetworkClientWithNoVersionDiscovery(metadata, true); awaitReady(client, node1); @@ -1019,6 +1038,7 @@ public void testDisconnectDuringUserMetadataRequest() { @Test public void testServerDisconnectAfterInternalApiVersionRequest() throws Exception { final long numIterations = 5; + int reconnectBackoffExpBase = ClusterConnectionStates.RECONNECT_BACKOFF_EXP_BASE; double reconnectBackoffMaxExp = Math.log(reconnectBackoffMaxMsTest / (double) Math.max(reconnectBackoffMsTest, 1)) / Math.log(reconnectBackoffExpBase); for (int i = 0; i < numIterations; i++) { @@ -1034,6 +1054,7 @@ public void testServerDisconnectAfterInternalApiVersionRequest() throws Exceptio long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp)) * reconnectBackoffMsTest); long delay = client.connectionDelay(node, time.milliseconds()); + double reconnectBackoffJitter = ClusterConnectionStates.RECONNECT_BACKOFF_JITTER; assertEquals(expectedBackoff, delay, reconnectBackoffJitter * expectedBackoff); if (i == numIterations - 1) { break; @@ -1134,7 +1155,7 @@ public void testReconnectAfterAddressChange() { initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0])); AtomicInteger initialAddressConns = new AtomicInteger(); AtomicInteger newAddressConns = new AtomicInteger(); - MockSelector selector = new MockSelector(this.time, inetSocketAddress -> { + MockSelector selector = new MockSelector(time, inetSocketAddress -> { InetAddress inetAddress = inetSocketAddress.getAddress(); if (initialAddresses.contains(inetAddress)) { initialAddressConns.incrementAndGet(); @@ -1151,8 +1172,8 @@ public void testReconnectAfterAddressChange() { NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, - time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, - Long.MAX_VALUE, MetadataRecoveryStrategy.NONE); + time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, + Long.MAX_VALUE, MetadataRecoveryStrategy.NONE, bootstrapConfiguration); // Connect to one the initial addresses, then change the addresses and disconnect client.ready(node, time.milliseconds()); @@ -1212,8 +1233,8 @@ public void testFailedConnectionToFirstAddress() { NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, - time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, - Long.MAX_VALUE, MetadataRecoveryStrategy.NONE); + time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, + Long.MAX_VALUE, MetadataRecoveryStrategy.NONE, bootstrapConfiguration); // First connection attempt should fail client.ready(node, time.milliseconds()); @@ -1248,7 +1269,7 @@ public void testFailedConnectionToFirstAddressAfterReconnect() { initialAddresses.toArray(new InetAddress[0]), newAddresses.toArray(new InetAddress[0])); AtomicInteger initialAddressConns = new AtomicInteger(); AtomicInteger newAddressConns = new AtomicInteger(); - MockSelector selector = new MockSelector(this.time, inetSocketAddress -> { + MockSelector selector = new MockSelector(time, inetSocketAddress -> { InetAddress inetAddress = inetSocketAddress.getAddress(); if (initialAddresses.contains(inetAddress)) { initialAddressConns.incrementAndGet(); @@ -1265,8 +1286,8 @@ public void testFailedConnectionToFirstAddressAfterReconnect() { NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, - time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, - Long.MAX_VALUE, MetadataRecoveryStrategy.NONE); + time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender, + Long.MAX_VALUE, MetadataRecoveryStrategy.NONE, bootstrapConfiguration); // Connect to one the initial addresses, then change the addresses and disconnect client.ready(node, time.milliseconds()); @@ -1375,7 +1396,7 @@ public void testTelemetryRequest() { reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), null, new LogContext(), new DefaultHostResolver(), mockClientTelemetrySender, - Long.MAX_VALUE, MetadataRecoveryStrategy.NONE); + Long.MAX_VALUE, MetadataRecoveryStrategy.NONE, bootstrapConfiguration); // Send the ApiVersionsRequest client.ready(node, time.milliseconds()); @@ -1479,4 +1500,126 @@ public KafkaException getAndClearFailure() { return failure; } } + + private void bootstrapMetadataWithNodes(Metadata metadata, List nodes) { + List serverAddresses = new ArrayList<>(); + nodes.forEach(node -> serverAddresses.add(new InetSocketAddress(node.host(), node.port()))); + metadata.bootstrap(serverAddresses); + } + + private void bootstrapMetadata(Metadata metadata) { + List serverAddresses = new ArrayList<>(List.of( + new InetSocketAddress("localhost0", 8000), + new InetSocketAddress("localhost1", 8000) + )); + metadata.bootstrap(serverAddresses); + } + + private void bootstrapMetadataUpdater(final MetadataUpdater metadataUpdater) { + List serverAddresses = new ArrayList<>(List.of( + new InetSocketAddress("localhost0", 8000), + new InetSocketAddress("localhost1", 8000) + )); + metadataUpdater.bootstrap(serverAddresses); + System.out.println("Bootstraping metadata------:" + metadataUpdater.isBootstrapped()); + } + + @Test + public void testEnsureBootstrappedSuccess() { + Metadata metadata = new Metadata(50, 50, 5000, new LogContext(), new ClusterResourceListeners()); + NetworkClient.BootstrapConfiguration config = NetworkClient.BootstrapConfiguration.enabled( + BOOTSTRAP_ADDRESSES, + ClientDnsLookup.USE_ALL_DNS_IPS, + 5000, + CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS); + NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, + reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024, + defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, + time, false, new ApiVersions(), new LogContext(), + MetadataRecoveryStrategy.NONE, config); + + // First poll should succeed in bootstrapping + client.poll(1000, time.milliseconds()); + + // Verify bootstrap succeeded + MetadataUpdater metadataUpdater = TestUtils.fieldValue(client, NetworkClient.class, "metadataUpdater"); + assertTrue(metadataUpdater.isBootstrapped()); + } + + @Test + public void testEnsureBootstrappedTimeoutThrowsException() { + Metadata metadata = new Metadata(50, 50, 5000, new LogContext(), new ClusterResourceListeners()); + // Use invalid addresses that cannot be resolved + List invalidAddresses = List.of("invalid.host.that.does.not.exist:9092"); + NetworkClient.BootstrapConfiguration config = NetworkClient.BootstrapConfiguration.enabled( + invalidAddresses, + ClientDnsLookup.USE_ALL_DNS_IPS, + 100, // Short timeout + CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS); + NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, + reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024, + defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, + time, false, new ApiVersions(), new LogContext(), + MetadataRecoveryStrategy.NONE, config); + + // First poll to initialize bootstrap timer + client.poll(10, time.milliseconds()); + + // Advance time past bootstrap timeout + time.sleep(150); + + // Should throw BootstrapResolutionException on next poll + assertThrows(BootstrapResolutionException.class, () -> client.poll(1000, time.milliseconds())); + } + + @Test + public void testEnsureBootstrappedPollTimeoutReturnsWithoutError() { + Metadata metadata = new Metadata(50, 50, 5000, new LogContext(), new ClusterResourceListeners()); + // Use invalid addresses that cannot be resolved + List invalidAddresses = List.of("invalid.host.that.does.not.exist:9092"); + NetworkClient.BootstrapConfiguration config = NetworkClient.BootstrapConfiguration.enabled( + invalidAddresses, + ClientDnsLookup.USE_ALL_DNS_IPS, + 5000, // Long bootstrap timeout + CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS); + NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, + reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024, + defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, + time, false, new ApiVersions(), new LogContext(), + MetadataRecoveryStrategy.NONE, config); + + // Directly call ensureBootstrapped + // Should return without error even though bootstrap hasn't succeeded (will retry on next poll) + // DNS resolution will fail but timeout hasn't been reached yet + client.ensureBootstrapped(time.milliseconds()); + + // Verify that no exception was thrown and metadata is still empty + assertEquals(0, metadata.fetch().nodes().size(), "Metadata should have no nodes after failed DNS resolution"); + } + + @Test + public void testEnsureBootstrappedRetryUntilSuccess() { + Metadata metadata = new Metadata(50, 50, 5000, new LogContext(), new ClusterResourceListeners()); + NetworkClient.BootstrapConfiguration config = NetworkClient.BootstrapConfiguration.enabled( + BOOTSTRAP_ADDRESSES, + ClientDnsLookup.USE_ALL_DNS_IPS, + 5000, + CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS); + NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, + reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024, + defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, + time, false, new ApiVersions(), new LogContext(), + MetadataRecoveryStrategy.NONE, config); + + // First poll should succeed in bootstrapping + client.poll(1000, time.milliseconds()); + + // Verify bootstrap succeeded + MetadataUpdater metadataUpdater = TestUtils.fieldValue(client, NetworkClient.class, "metadataUpdater"); + assertTrue(metadataUpdater.isBootstrapped()); + + // Subsequent polls should not fail even if already bootstrapped + client.poll(1000, time.milliseconds()); + assertTrue(metadataUpdater.isBootstrapped()); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index ca1c5e9be30d6..0ee9b24861e31 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -16,9 +16,7 @@ */ package org.apache.kafka.clients.admin; -import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientRequest; -import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.MetadataRecoveryStrategy; import org.apache.kafka.clients.MockClient; @@ -529,8 +527,7 @@ private static Cluster mockCluster(int numNodes, int controllerIndex) { } private static Cluster mockBootstrapCluster() { - return Cluster.bootstrap(ClientUtils.parseAndValidateAddresses( - singletonList("localhost:8121"), ClientDnsLookup.USE_ALL_DNS_IPS)); + return Cluster.bootstrap(singletonList(InetSocketAddress.createUnresolved("localhost", 8121))); } private static AdminClientUnitTestEnv mockClientEnv(String... configVals) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddressesTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddressesTest.java deleted file mode 100644 index 0581d672fb8a0..0000000000000 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddressesTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.admin.internals; - -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.common.config.ConfigException; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -public class AdminBootstrapAddressesTest { - - @Test - public void testNoBootstrapSet() { - Map map = Map.of( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "", - AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "" - ); - AdminClientConfig config = new AdminClientConfig(map); - assertEquals("You must set either bootstrap.servers or bootstrap.controllers", - assertThrows(ConfigException.class, () -> AdminBootstrapAddresses.fromConfig(config)). - getMessage()); - } - - @Test - public void testTwoBootstrapsSet() { - Map map = new HashMap<>(); - map.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "localhost:9092"); - map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - AdminClientConfig config = new AdminClientConfig(map); - assertEquals("You cannot set both bootstrap.servers and bootstrap.controllers", - assertThrows(ConfigException.class, () -> AdminBootstrapAddresses.fromConfig(config)). - getMessage()); - } - - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testFromConfig(boolean usingBootstrapControllers) { - Map map = new HashMap<>(); - String connectString = "localhost:9092,localhost:9093,localhost:9094"; - if (usingBootstrapControllers) { - map.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, connectString); - } else { - map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, connectString); - } - AdminClientConfig config = new AdminClientConfig(map); - AdminBootstrapAddresses addresses = AdminBootstrapAddresses.fromConfig(config); - assertEquals(usingBootstrapControllers, addresses.usingBootstrapControllers()); - assertEquals(Arrays.asList( - new InetSocketAddress("localhost", 9092), - new InetSocketAddress("localhost", 9093), - new InetSocketAddress("localhost", 9094)), - addresses.addresses()); - } -} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index da4a799fa5e70..f3885f020cd4a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.BootstrapResolutionException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.InvalidGroupIdException; @@ -589,8 +590,10 @@ public void testConstructorClose(GroupProtocol groupProtocol) { Properties props = new Properties(); props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "invalid-23-8409-adsfsdj"); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + // Use an invalid interceptor class to trigger constructor failure + props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "invalid.interceptor.class"); final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); @@ -4194,6 +4197,46 @@ public void testConstructorFailsOnNetworkClientConstructorFailure(GroupProtocol assertEquals("No LoginModule found for org.example.InvalidLoginModule", cause.getMessage()); } + @ParameterizedTest + @EnumSource(value = GroupProtocol.class) + public void testConsumerBootstrapResolutionExceptionPropagatedToPoll(GroupProtocol protocol) throws InterruptedException { + // Use an invalid hostname that will fail DNS resolution + String invalidHost = "invalid-host-that-does-not-exist-12345.example.com:9092"; + + Map configs = Map.of( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, invalidHost, + // Set a short bootstrap timeout so the test doesn't take too long + CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG, "3000", + ConsumerConfig.GROUP_PROTOCOL_CONFIG, protocol.name(), + ConsumerConfig.GROUP_ID_CONFIG, "test-group" + ); + + try (KafkaConsumer consumer = new KafkaConsumer<>(configs)) { + // Subscribe to a topic to trigger metadata fetch + consumer.subscribe(Set.of("test-topic")); + + // Poll continuously until we get the BootstrapResolutionException + // The exception should be thrown after bootstrap.resolve.timeout.ms expires + BootstrapResolutionException exception = + assertThrows(BootstrapResolutionException.class, () -> { + long startTime = System.currentTimeMillis(); + long maxWaitTime = 15000; // 15 seconds max to prevent test hanging + + while (System.currentTimeMillis() - startTime < maxWaitTime) { + consumer.poll(Duration.ofMillis(100)); + } + fail("Expected BootstrapResolutionException to be thrown within " + maxWaitTime + "ms"); + }); + + // Verify the exception message contains information about DNS resolution failure + assertTrue(exception.getMessage().contains("Failed to resolve bootstrap servers") || + exception.getMessage().contains("DNS resolution"), + "Exception message should mention DNS resolution failure: " + exception.getMessage()); + } + } + private MetricName expectedMetricName(String clientId, String config, Class clazz) { Map expectedTags = new LinkedHashMap<>(); expectedTags.put("client-id", clientId); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index e6efb4ab74066..ccc99ba7b6954 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -200,6 +200,7 @@ public class FetchRequestManagerTest { private MemoryRecords nextRecords; private MemoryRecords emptyRecords; private MemoryRecords partialRecords; + private NetworkClient.BootstrapConfiguration bootstrapConfiguration; @BeforeEach public void setup() { @@ -208,6 +209,7 @@ public void setup() { emptyRecords = buildRecords(0L, 0, 0); partialRecords = buildRecords(4L, 1, 0); partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000); + bootstrapConfiguration = NetworkClient.BootstrapConfiguration.disabled(); } private void assignFromUser(Set partitions) { @@ -1906,7 +1908,7 @@ public void testQuotaMetrics() { NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000, time, true, new ApiVersions(), metricsManager.throttleTimeSensor(), new LogContext(), - MetadataRecoveryStrategy.NONE); + MetadataRecoveryStrategy.NONE, bootstrapConfiguration); ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse( 400, ApiMessageType.ListenerType.BROKER); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index a77101e519c63..3efe35e02ce40 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -197,6 +197,7 @@ public class FetcherTest { private MemoryRecords emptyRecords; private MemoryRecords partialRecords; private ExecutorService executorService; + private NetworkClient.BootstrapConfiguration mockBootstrapConfig; @BeforeEach public void setup() { @@ -206,6 +207,7 @@ public void setup() { emptyRecords = buildRecords(0L, 0, 0); partialRecords = buildRecords(4L, 1, 0); partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000); + mockBootstrapConfig = NetworkClient.BootstrapConfiguration.disabled(); } private void assignFromUser(Set partitions) { @@ -1893,7 +1895,7 @@ public void testQuotaMetrics() { NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000, time, true, new ApiVersions(), metricsManager.throttleTimeSensor(), new LogContext(), - MetadataRecoveryStrategy.NONE); + MetadataRecoveryStrategy.NONE, mockBootstrapConfig); ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse( 400, ApiMessageType.ListenerType.BROKER); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 98ffd66270c5a..4087cd1c7f91a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientDnsLookup; +import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.LeastLoadedNode; @@ -563,8 +565,10 @@ public void testNoSerializerProvided() { public void testConstructorFailureCloseResource() { Properties props = new Properties(); props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar.local:9999"); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + // Use invalid interceptor class to cause constructor failure after metrics initialization + props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "non.existent.Interceptor"); final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); @@ -1107,6 +1111,9 @@ private > void doTestHeaders(Class serializerCla long nowMs = Time.SYSTEM.milliseconds(); String topic = "topic"; ProducerMetadata metadata = newMetadata(0, 0, 90000); + // Bootstrap the metadata to mark it as configured (required for lazy bootstrapping) + metadata.bootstrap(ClientUtils.parseAndValidateAddresses( + Collections.singletonList("localhost:9999"), ClientDnsLookup.USE_ALL_DNS_IPS)); metadata.add(topic, nowMs); MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(topic, 1)); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index ffcb4510190fb..b710f0e34b291 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -179,6 +179,7 @@ public class SenderTest { private Sender sender = null; private SenderMetricsRegistry senderMetricsRegistry = null; private final LogContext logContext = new LogContext(); + private final NetworkClient.BootstrapConfiguration bootstrapConfig = NetworkClient.BootstrapConfiguration.disabled(); @BeforeEach public void setup() { @@ -241,7 +242,7 @@ public void testQuotaMetrics() { NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000, time, true, new ApiVersions(), throttleTimeSensor, logContext, - MetadataRecoveryStrategy.NONE); + MetadataRecoveryStrategy.NONE, bootstrapConfig); ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse( 400, ApiMessageType.ListenerType.BROKER); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java index 9ff90e7a516cd..c6e86c67682f9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java @@ -397,6 +397,12 @@ private static ConfigDef config(Crypto crypto) { atLeast(0L), ConfigDef.Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MAX_MS_DOC) + .define(CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG, + ConfigDef.Type.LONG, + CommonClientConfigs.DEFAULT_BOOTSTRAP_RESOLVE_TIMEOUT_MS, + atLeast(0L), + ConfigDef.Importance.MEDIUM, + CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_DOC) .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, Math.toIntExact(TimeUnit.SECONDS.toMillis(40)), diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index a283ce7cf3878..e37b34bf12b64 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.runtime.distributed; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.GroupRebalanceConfig; @@ -106,6 +107,11 @@ public WorkerGroupMember(DistributedConfig config, metadata.bootstrap(addresses); String metricGrpPrefix = "connect"; ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext); + NetworkClient.BootstrapConfiguration bootstrapConfiguration = NetworkClient.BootstrapConfiguration.enabled( + config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), + ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)), + config.getLong(CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG), + config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)); NetworkClient netClient = new NetworkClient( new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext), metadata, @@ -123,7 +129,8 @@ public WorkerGroupMember(DistributedConfig config, new ApiVersions(), logContext, config.getLong(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG), - MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG)) + MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG)), + bootstrapConfiguration ); this.client = new ConsumerNetworkClient( logContext, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index d46d76c3606ef..a80d495c2c016 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -72,7 +72,6 @@ import jakarta.ws.rs.core.Response; import static jakarta.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; -import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG; import static org.apache.kafka.common.config.AbstractConfig.CONFIG_PROVIDERS_CONFIG; import static org.apache.kafka.common.config.TopicConfig.DELETE_RETENTION_MS_CONFIG; @@ -80,7 +79,6 @@ import static org.apache.kafka.connect.integration.BlockingConnectorTest.TASK_STOP; import static org.apache.kafka.connect.integration.TestableSourceConnector.TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; -import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX; import static org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG; @@ -211,9 +209,9 @@ public void testRestartFailedTask() throws Exception { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); - // Properties for the source connector. The task should fail at startup due to the bad broker address. + // Properties for the source connector. The task should fail at startup due to injected error. props.put(TASKS_MAX_CONFIG, Objects.toString(numTasks)); - props.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, "nobrokerrunningatthisaddress"); + props.put("task-" + CONNECTOR_NAME + "-0.start.inject.error", "true"); // Try to start the connector and its single task. connect.configureConnector(CONNECTOR_NAME, props); @@ -221,8 +219,8 @@ public void testRestartFailedTask() throws Exception { connect.assertions().assertConnectorIsRunningAndTasksHaveFailed(CONNECTOR_NAME, numTasks, "Connector tasks did not fail in time"); - // Reconfigure the connector without the bad broker address. - props.remove(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG); + // Reconfigure the connector without the injected error. + props.remove("task-" + CONNECTOR_NAME + "-0.start.inject.error"); connect.configureConnector(CONNECTOR_NAME, props); // Restart the failed task diff --git a/core/src/main/java/kafka/server/NetworkUtils.java b/core/src/main/java/kafka/server/NetworkUtils.java index 5f084bfd98904..b789bb30f02f7 100644 --- a/core/src/main/java/kafka/server/NetworkUtils.java +++ b/core/src/main/java/kafka/server/NetworkUtils.java @@ -85,7 +85,8 @@ public static NetworkClient buildNetworkClient(String prefix, true, new ApiVersions(), logContext, - MetadataRecoveryStrategy.NONE + MetadataRecoveryStrategy.NONE, + NetworkClient.BootstrapConfiguration.disabled() ); } } diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 91daeedf81536..4b9d8720ae157 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -98,7 +98,8 @@ object TransactionMarkerChannelManager { true, new ApiVersions, logContext, - MetadataRecoveryStrategy.NONE + MetadataRecoveryStrategy.NONE, + NetworkClient.BootstrapConfiguration.disabled ) new TransactionMarkerChannelManager(config, diff --git a/core/src/main/scala/kafka/raft/KafkaRaftManager.scala b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala index 1d1d72767dbbc..2500d945b4496 100644 --- a/core/src/main/scala/kafka/raft/KafkaRaftManager.scala +++ b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala @@ -236,7 +236,6 @@ class KafkaRaftManager[T]( val reconnectBackoffMs = 50 val reconnectBackoffMsMs = 500 val discoverBrokerVersions = true - val networkClient = new NetworkClient( selector, new ManualMetadataUpdater(), @@ -253,7 +252,8 @@ class KafkaRaftManager[T]( discoverBrokerVersions, apiVersions, logContext, - MetadataRecoveryStrategy.NONE + MetadataRecoveryStrategy.NONE, + NetworkClient.BootstrapConfiguration.disabled ) (controllerListenerName, networkClient) diff --git a/core/src/main/scala/kafka/server/BrokerBlockingSender.scala b/core/src/main/scala/kafka/server/BrokerBlockingSender.scala index b4de360d532f6..cc684866050b8 100644 --- a/core/src/main/scala/kafka/server/BrokerBlockingSender.scala +++ b/core/src/main/scala/kafka/server/BrokerBlockingSender.scala @@ -78,6 +78,7 @@ class BrokerBlockingSender(sourceBroker: BrokerEndPoint, channelBuilder, logContext ) + val networkClient = new NetworkClient( selector, new ManualMetadataUpdater(), @@ -94,7 +95,8 @@ class BrokerBlockingSender(sourceBroker: BrokerEndPoint, false, new ApiVersions, logContext, - MetadataRecoveryStrategy.NONE + MetadataRecoveryStrategy.NONE, + NetworkClient.BootstrapConfiguration.disabled ) (networkClient, reconfigurableChannelBuilder) } diff --git a/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java index c54fb97a65bdc..6a1763bbeab56 100644 --- a/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java +++ b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManagerImpl.java @@ -113,6 +113,7 @@ private KafkaClient buildNetworkClient(ControllerInformation controllerInfo) { channelBuilder, logContext ); + return new NetworkClient( selector, manualMetadataUpdater, @@ -129,7 +130,8 @@ private KafkaClient buildNetworkClient(ControllerInformation controllerInfo) { true, apiVersions, logContext, - MetadataRecoveryStrategy.NONE + MetadataRecoveryStrategy.NONE, + NetworkClient.BootstrapConfiguration.disabled() ); } diff --git a/tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java index 9090fe46e4756..651d77394f718 100644 --- a/tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/BrokerApiVersionsCommand.java @@ -151,6 +151,7 @@ protected static class AdminClient implements AutoCloseable { .define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, ConfigDef.Type.LONG, CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, ConfigDef.Importance.MEDIUM, CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC) .define(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, ConfigDef.Type.LONG, CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, ConfigDef.Importance.MEDIUM, CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC) .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, DEFAULT_RETRY_BACKOFF_MS, ConfigDef.Importance.MEDIUM, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) + .define(CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG, ConfigDef.Type.LONG, CommonClientConfigs.DEFAULT_BOOTSTRAP_RESOLVE_TIMEOUT_MS, ConfigDef.Importance.MEDIUM, CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_DOC) .withClientSslSupport() .withClientSaslSupport(); @@ -182,6 +183,11 @@ static AdminClient create(AbstractConfig config) { "admin", ClientUtils.createChannelBuilder(config, time, logContext), logContext); + NetworkClient.BootstrapConfiguration bootstrapConfiguration = NetworkClient.BootstrapConfiguration.enabled( + config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), + ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)), + config.getLong(CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG), + config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)); NetworkClient networkClient = new NetworkClient( selector, metadata, @@ -198,7 +204,8 @@ static AdminClient create(AbstractConfig config) { true, new ApiVersions(), logContext, - MetadataRecoveryStrategy.NONE); + MetadataRecoveryStrategy.NONE, + bootstrapConfiguration); ConsumerNetworkClient highLevelClient = new ConsumerNetworkClient( logContext, networkClient, diff --git a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java index 1304a4ce596dd..451582a0852f6 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java @@ -17,6 +17,7 @@ package org.apache.kafka.tools; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.ClientUtils; @@ -664,7 +665,7 @@ private static class ReplicaFetcherBlockingSend { metrics, time, "replica-fetcher", - new HashMap() {{ + new HashMap<>() {{ put("broker-id", sourceNode.idString()); put("fetcher-id", String.valueOf(fetcherId)); }}, @@ -672,6 +673,11 @@ private static class ReplicaFetcherBlockingSend { channelBuilder, logContext ); + NetworkClient.BootstrapConfiguration bootstrapConfiguration = NetworkClient.BootstrapConfiguration.enabled( + consumerConfig.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), + ClientDnsLookup.forConfig(consumerConfig.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)), + consumerConfig.getLong(ConsumerConfig.BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG), + consumerConfig.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)); this.networkClient = new NetworkClient( selector, new ManualMetadataUpdater(), @@ -688,7 +694,8 @@ private static class ReplicaFetcherBlockingSend { false, new ApiVersions(), logContext, - MetadataRecoveryStrategy.forName(consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG)) + MetadataRecoveryStrategy.forName(consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG)), + bootstrapConfiguration ); } diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java index 9452c03ec9937..e0392e76c68ef 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java @@ -18,7 +18,9 @@ package org.apache.kafka.trogdor.workload; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.ManualMetadataUpdater; import org.apache.kafka.clients.MetadataRecoveryStrategy; import org.apache.kafka.clients.NetworkClient; @@ -164,6 +166,11 @@ public boolean tryConnect() { try (Metrics metrics = new Metrics()) { try (Selector selector = new Selector(conf.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, Time.SYSTEM, "", channelBuilder, logContext)) { + NetworkClient.BootstrapConfiguration bootstrapConfiguration = NetworkClient.BootstrapConfiguration.enabled( + conf.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), + ClientDnsLookup.forConfig(conf.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)), + conf.getLong(CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG), + conf.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)); try (NetworkClient client = new NetworkClient(selector, updater, "ConnectionStressWorker", @@ -179,7 +186,8 @@ public boolean tryConnect() { false, new ApiVersions(), logContext, - MetadataRecoveryStrategy.NONE)) { + MetadataRecoveryStrategy.NONE, + bootstrapConfiguration)) { NetworkClientUtils.awaitReady(client, targetNode, Time.SYSTEM, 500); } }