-
Notifications
You must be signed in to change notification settings - Fork 15.1k
KAFKA-14648: Do not fail clients if bootstrap servers is not immediately resolvable #21080
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
frankvicky
wants to merge
56
commits into
apache:trunk
Choose a base branch
from
frankvicky:KAFKA-14648
base: trunk
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
56 commits
Select commit
Hold shift + click to select a range
e5be409
KAFKA-14648: Add boostrap timeout config
frankvicky 62f852a
Merge branch 'trunk' into KAFKA-14648
frankvicky e1f8466
fix the error test
frankvicky 4a06620
Seperate config and state
frankvicky b6fe307
add disable instance to BootstrapConfiguration
frankvicky 2a146be
Fix tests
frankvicky 9922711
Merge branch 'trunk' into KAFKA-14648
frankvicky 668f24e
Merge branch 'trunk' into KAFKA-14648
frankvicky 9f1afdb
Fix the failed tests
frankvicky f7adeda
Fix failed tests
frankvicky cf31b6e
Merge branch 'trunk' into KAFKA-14648
frankvicky 3168e32
Fix failed tests
frankvicky 3bf9402
Merge branch 'trunk' into KAFKA-14648
frankvicky 71c285f
Fix the failed tests
frankvicky 14474a5
Merge branch 'trunk' into KAFKA-14648
frankvicky f73e995
Merge branch 'trunk' into KAFKA-14648
frankvicky d764844
Merge branch 'trunk' into KAFKA-14648
frankvicky 39ee0b1
fix conflict
frankvicky 0afecf5
fix failed tests
frankvicky 925fcf6
Merge branch 'trunk' into KAFKA-14648
frankvicky f91a196
address comments
frankvicky 3390a69
Merge branch 'trunk' into KAFKA-14648
frankvicky 2422387
address comments
frankvicky b566622
Merge branch 'trunk' into KAFKA-14648
frankvicky 8d36da7
fix failed tests
frankvicky aa9a72e
Merge branch 'trunk' into KAFKA-14648
frankvicky ef3bb8f
address comments
frankvicky 8d22bd3
Merge branch 'trunk' into KAFKA-14648
frankvicky c06e01c
Merge branch 'trunk' into KAFKA-14648
frankvicky 7458b72
address comments
frankvicky 993b42b
address comments
frankvicky daeea8e
Merge branch 'trunk' into KAFKA-14648
frankvicky 9b53482
address comments
frankvicky 6f414e9
fix failed tests
frankvicky 7ca2642
Merge branch 'trunk' into KAFKA-14648
frankvicky 2550e57
address comments
frankvicky 4ebbd77
Merge branch 'trunk' into KAFKA-14648
frankvicky 4566ef1
fix failed tests
frankvicky 0c15459
Merge branch 'trunk' into KAFKA-14648
frankvicky e1b31b2
address comments
frankvicky b368e71
Merge branch 'trunk' into KAFKA-14648
frankvicky bfdd24c
address comments
frankvicky 8645123
Merge branch 'trunk' into KAFKA-14648
frankvicky e3159c8
address comments
frankvicky 4e745ab
address comments
frankvicky 0621ede
address comments
frankvicky 9ceba73
Merge branch 'trunk' into KAFKA-14648
frankvicky ff5492e
add interrupt mechanism
frankvicky 3d9c1b2
Merge branch 'trunk' into KAFKA-14648
frankvicky c0ce72a
address comments
frankvicky faa8054
Merge branch 'trunk' into KAFKA-14648
frankvicky 02ca329
Merge branch 'trunk' into KAFKA-14648
frankvicky 11e5b3f
Merge branch 'trunk' into KAFKA-14648
frankvicky 752d79c
address comments
frankvicky 46eb423
Merge branch 'trunk' into KAFKA-14648
frankvicky f78ddc3
address comments
frankvicky File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,6 +53,72 @@ public final class ClientUtils { | |
| private ClientUtils() { | ||
| } | ||
|
|
||
| /** | ||
| * Resolves a single URL to one or more InetSocketAddress based on the DNS lookup strategy. | ||
| * | ||
| * @param url the original URL string (for logging) | ||
| * @param host the hostname extracted from the URL | ||
| * @param port the port extracted from the URL | ||
| * @param clientDnsLookup the DNS lookup strategy | ||
| * @return list of resolved addresses (may be empty if addresses are unresolved) | ||
| * @throws UnknownHostException if DNS resolution fails | ||
| */ | ||
| private static List<InetSocketAddress> resolveAddress( | ||
| String url, | ||
| String host, | ||
| Integer port, | ||
| ClientDnsLookup clientDnsLookup) throws UnknownHostException { | ||
|
|
||
| List<InetSocketAddress> addresses = new ArrayList<>(); | ||
|
|
||
| if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) { | ||
| InetAddress[] inetAddresses = InetAddress.getAllByName(host); | ||
| for (InetAddress inetAddress : inetAddresses) { | ||
| String resolvedCanonicalName = inetAddress.getCanonicalHostName(); | ||
| InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port); | ||
| if (address.isUnresolved()) { | ||
| log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", | ||
| url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host); | ||
| } else { | ||
| addresses.add(address); | ||
| } | ||
| } | ||
| } else { | ||
| InetSocketAddress address = new InetSocketAddress(host, port); | ||
| if (address.isUnresolved()) { | ||
| log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", | ||
| url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); | ||
| } else { | ||
| addresses.add(address); | ||
| } | ||
| } | ||
|
|
||
| return addresses; | ||
| } | ||
|
|
||
| public static List<InetSocketAddress> parseAddresses(List<String> urls, ClientDnsLookup clientDnsLookup) { | ||
| List<InetSocketAddress> addresses = new ArrayList<>(); | ||
| if (urls == null) { | ||
| return addresses; | ||
| } | ||
| urls.forEach(url -> { | ||
| final String host = getHost(url); | ||
| final Integer port = getPort(url); | ||
|
Comment on lines
+105
to
+106
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
|
||
| if (host == null || port == null) { | ||
| log.warn("Skipping invalid bootstrap URL: {}", url); | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| addresses.addAll(resolveAddress(url, host, port, clientDnsLookup)); | ||
| } catch (UnknownHostException e) { | ||
| // Silently ignore - this matches the original behavior | ||
| } | ||
| }); | ||
| return addresses; | ||
| } | ||
|
|
||
| public static List<InetSocketAddress> parseAndValidateAddresses(AbstractConfig config) { | ||
| List<String> urls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); | ||
| String clientDnsLookupConfig = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG); | ||
|
|
@@ -73,25 +139,7 @@ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> url | |
| if (host == null || port == null) | ||
| throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); | ||
|
|
||
| if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) { | ||
| InetAddress[] inetAddresses = InetAddress.getAllByName(host); | ||
| for (InetAddress inetAddress : inetAddresses) { | ||
| String resolvedCanonicalName = inetAddress.getCanonicalHostName(); | ||
| InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port); | ||
| if (address.isUnresolved()) { | ||
| log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host); | ||
| } else { | ||
| addresses.add(address); | ||
| } | ||
| } | ||
| } else { | ||
| InetSocketAddress address = new InetSocketAddress(host, port); | ||
| if (address.isUnresolved()) { | ||
| log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); | ||
| } else { | ||
| addresses.add(address); | ||
| } | ||
| } | ||
| addresses.addAll(resolveAddress(url, host, port, clientDnsLookup)); | ||
|
|
||
| } catch (IllegalArgumentException e) { | ||
| throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); | ||
|
|
@@ -151,6 +199,7 @@ static List<InetAddress> filterPreferredAddresses(InetAddress[] allAddresses) { | |
| } | ||
|
|
||
| public static NetworkClient createNetworkClient(AbstractConfig config, | ||
| List<String> bootstrapServers, | ||
| Metrics metrics, | ||
| String metricsGroupPrefix, | ||
| LogContext logContext, | ||
|
|
@@ -161,6 +210,7 @@ public static NetworkClient createNetworkClient(AbstractConfig config, | |
| Sensor throttleTimeSensor, | ||
| ClientTelemetrySender clientTelemetrySender) { | ||
| return createNetworkClient(config, | ||
| bootstrapServers, | ||
| config.getString(CommonClientConfigs.CLIENT_ID_CONFIG), | ||
| metrics, | ||
| metricsGroupPrefix, | ||
|
|
@@ -177,6 +227,7 @@ public static NetworkClient createNetworkClient(AbstractConfig config, | |
| } | ||
|
|
||
| public static NetworkClient createNetworkClient(AbstractConfig config, | ||
| List<String> bootstrapServers, | ||
| String clientId, | ||
| Metrics metrics, | ||
| String metricsGroupPrefix, | ||
|
|
@@ -201,6 +252,14 @@ public static NetworkClient createNetworkClient(AbstractConfig config, | |
| metricsGroupPrefix, | ||
| channelBuilder, | ||
| logContext); | ||
| ClientDnsLookup dnsLookup = ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)); | ||
|
|
||
| NetworkClient.BootstrapConfiguration bootstrapConfiguration = NetworkClient.BootstrapConfiguration.enabled( | ||
| bootstrapServers != null ? bootstrapServers : List.of(), | ||
| dnsLookup, | ||
| config.getLong(CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG), | ||
| config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)); | ||
|
|
||
| return new NetworkClient(metadataUpdater, | ||
| metadata, | ||
| selector, | ||
|
|
@@ -221,7 +280,8 @@ public static NetworkClient createNetworkClient(AbstractConfig config, | |
| hostResolver, | ||
| clientTelemetrySender, | ||
| config.getLong(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG), | ||
| MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG)) | ||
| MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG)), | ||
| bootstrapConfiguration | ||
| ); | ||
| } catch (Throwable t) { | ||
| closeQuietly(selector, "Selector"); | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InetAddress.getAllByNameis a blocking call, which will causepollloop to block. Did the KIP mention this behavior?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If not, we should update the KIP as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-909%3A+DNS+Resolution+Failure+Should+Not+Fail+the+Clients