-
Notifications
You must be signed in to change notification settings - Fork 15.1k
KAFKA-14648: Do not fail clients if bootstrap servers is not immediately resolvable #21080
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from 26 commits
e5be409
62f852a
e1f8466
4a06620
b6fe307
2a146be
9922711
668f24e
9f1afdb
f7adeda
cf31b6e
3168e32
3bf9402
71c285f
14474a5
f73e995
d764844
39ee0b1
0afecf5
925fcf6
f91a196
3390a69
2422387
b566622
8d36da7
aa9a72e
ef3bb8f
8d22bd3
c06e01c
7458b72
993b42b
daeea8e
9b53482
6f414e9
7ca2642
2550e57
4ebbd77
4566ef1
0c15459
e1b31b2
b368e71
bfdd24c
8645123
e3159c8
4e745ab
0621ede
9ceba73
ff5492e
3d9c1b2
c0ce72a
faa8054
02ca329
11e5b3f
752d79c
46eb423
f78ddc3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,6 +53,67 @@ public final class ClientUtils { | |
| private ClientUtils() { | ||
| } | ||
|
|
||
| /** | ||
| * Resolves a single URL to one or more InetSocketAddress based on the DNS lookup strategy. | ||
| * | ||
| * @param url the original URL string (for logging) | ||
| * @param host the hostname extracted from the URL | ||
| * @param port the port extracted from the URL | ||
| * @param clientDnsLookup the DNS lookup strategy | ||
| * @return list of resolved addresses (may be empty if addresses are unresolved) | ||
| * @throws UnknownHostException if DNS resolution fails | ||
| */ | ||
| private static List<InetSocketAddress> resolveAddress( | ||
| String url, | ||
| String host, | ||
| Integer port, | ||
| ClientDnsLookup clientDnsLookup) throws UnknownHostException { | ||
|
|
||
| List<InetSocketAddress> addresses = new ArrayList<>(); | ||
|
|
||
| if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) { | ||
| InetAddress[] inetAddresses = InetAddress.getAllByName(host); | ||
| for (InetAddress inetAddress : inetAddresses) { | ||
| String resolvedCanonicalName = inetAddress.getCanonicalHostName(); | ||
| InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port); | ||
| if (address.isUnresolved()) { | ||
| log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", | ||
| url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host); | ||
| } else { | ||
| addresses.add(address); | ||
| } | ||
| } | ||
| } else { | ||
| InetSocketAddress address = new InetSocketAddress(host, port); | ||
| if (address.isUnresolved()) { | ||
| log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", | ||
| url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); | ||
| } else { | ||
| addresses.add(address); | ||
| } | ||
| } | ||
|
|
||
| return addresses; | ||
| } | ||
|
|
||
| public static List<InetSocketAddress> validateAddresses(List<String> urls, ClientDnsLookup clientDnsLookup) { | ||
|
||
| List<InetSocketAddress> addresses = new ArrayList<>(); | ||
| if (urls == null) { | ||
| return addresses; | ||
| } | ||
| urls.forEach(url -> { | ||
| final String host = getHost(url); | ||
| final Integer port = getPort(url); | ||
|
Comment on lines
+105
to
+106
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
|
||
| try { | ||
| addresses.addAll(resolveAddress(url, host, port, clientDnsLookup)); | ||
| } catch (UnknownHostException e) { | ||
| // Silently ignore - this matches the original behavior | ||
| } | ||
| }); | ||
| return addresses; | ||
| } | ||
|
|
||
| public static List<InetSocketAddress> parseAndValidateAddresses(AbstractConfig config) { | ||
| List<String> urls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); | ||
| String clientDnsLookupConfig = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG); | ||
|
|
@@ -73,25 +134,7 @@ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> url | |
| if (host == null || port == null) | ||
| throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); | ||
|
|
||
| if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) { | ||
| InetAddress[] inetAddresses = InetAddress.getAllByName(host); | ||
| for (InetAddress inetAddress : inetAddresses) { | ||
| String resolvedCanonicalName = inetAddress.getCanonicalHostName(); | ||
| InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port); | ||
| if (address.isUnresolved()) { | ||
| log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host); | ||
| } else { | ||
| addresses.add(address); | ||
| } | ||
| } | ||
| } else { | ||
| InetSocketAddress address = new InetSocketAddress(host, port); | ||
| if (address.isUnresolved()) { | ||
| log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); | ||
| } else { | ||
| addresses.add(address); | ||
| } | ||
| } | ||
| addresses.addAll(resolveAddress(url, host, port, clientDnsLookup)); | ||
|
|
||
| } catch (IllegalArgumentException e) { | ||
| throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); | ||
|
|
@@ -219,6 +262,7 @@ public static NetworkClient createNetworkClient(AbstractConfig config, | |
| ClientTelemetrySender clientTelemetrySender) { | ||
| ChannelBuilder channelBuilder = null; | ||
| Selector selector = null; | ||
| NetworkClient.BootstrapConfiguration bootstrapConfiguration; | ||
|
|
||
| try { | ||
| channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext); | ||
|
|
@@ -228,6 +272,37 @@ public static NetworkClient createNetworkClient(AbstractConfig config, | |
| metricsGroupPrefix, | ||
| channelBuilder, | ||
| logContext); | ||
| List<String> bootstrapServers = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); | ||
| ClientDnsLookup dnsLookup = ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)); | ||
|
|
||
| // Try to get bootstrap.controllers if it exists (only defined in AdminClientConfig) | ||
| List<String> bootstrapControllers = null; | ||
| try { | ||
| bootstrapControllers = config.getList("bootstrap.controllers"); | ||
|
||
| } catch (ConfigException e) { | ||
| // bootstrap.controllers is not defined in this config (e.g., ProducerConfig, ConsumerConfig) | ||
| // This is expected and not an error | ||
| } | ||
|
|
||
| // Determine which bootstrap addresses to use | ||
| List<String> bootstrapAddresses; | ||
| if (bootstrapServers != null && !bootstrapServers.isEmpty()) { | ||
| bootstrapAddresses = bootstrapServers; | ||
| // Only validate if bootstrap servers are provided (non-empty list) | ||
| // This allows configurations that don't use bootstrap (e.g., broker-to-broker) to skip validation | ||
| parseAndValidateAddresses(bootstrapServers, dnsLookup); | ||
| } else if (bootstrapControllers != null && !bootstrapControllers.isEmpty()) { | ||
| bootstrapAddresses = bootstrapControllers; | ||
| parseAndValidateAddresses(bootstrapControllers, dnsLookup); | ||
| } else { | ||
| bootstrapAddresses = List.of(); | ||
| } | ||
|
|
||
| bootstrapConfiguration = new NetworkClient.BootstrapConfiguration( | ||
|
||
| bootstrapAddresses, | ||
| dnsLookup, | ||
| CommonClientConfigs.DEFAULT_BOOTSTRAP_RESOLVE_TIMEOUT_MS | ||
| ); | ||
| return new NetworkClient(metadataUpdater, | ||
| metadata, | ||
| selector, | ||
|
|
@@ -248,7 +323,8 @@ public static NetworkClient createNetworkClient(AbstractConfig config, | |
| hostResolver, | ||
| clientTelemetrySender, | ||
| config.getLong(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG), | ||
| MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG)) | ||
| MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG)), | ||
| bootstrapConfiguration | ||
| ); | ||
| } catch (Throwable t) { | ||
| closeQuietly(selector, "Selector"); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -212,5 +212,5 @@ ClientRequest newClientRequest(String nodeId, | |
| * was invoked for this client. | ||
| */ | ||
| boolean active(); | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: unnecessary formatting change in a file with no other changes.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we put this file back to the original state? |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| import org.apache.kafka.common.requests.MetadataResponse; | ||
| import org.apache.kafka.common.requests.RequestHeader; | ||
|
|
||
| import java.net.InetSocketAddress; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
|
|
@@ -81,6 +82,16 @@ public void handleSuccessfulResponse(RequestHeader requestHeader, long now, Meta | |
| // Do nothing | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isBootstrapped() { | ||
| return false; | ||
|
||
| } | ||
|
|
||
| @Override | ||
| public void bootstrap(List<InetSocketAddress> addresses) { | ||
|
|
||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InetAddress.getAllByNameis a blocking call, which will causepollloop to block. Did the KIP mention this behavior?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If not, we should update the KIP as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-909%3A+DNS+Resolution+Failure+Should+Not+Fail+the+Clients