Skip to content
Open
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
e5be409
KAFKA-14648: Add boostrap timeout config
frankvicky Dec 4, 2025
62f852a
Merge branch 'trunk' into KAFKA-14648
frankvicky Dec 5, 2025
e1f8466
fix the error test
frankvicky Dec 5, 2025
4a06620
Seperate config and state
frankvicky Dec 15, 2025
b6fe307
add disable instance to BootstrapConfiguration
frankvicky Dec 16, 2025
2a146be
Fix tests
frankvicky Dec 16, 2025
9922711
Merge branch 'trunk' into KAFKA-14648
frankvicky Dec 17, 2025
668f24e
Merge branch 'trunk' into KAFKA-14648
frankvicky Jan 2, 2026
9f1afdb
Fix the failed tests
frankvicky Jan 2, 2026
f7adeda
Fix failed tests
frankvicky Jan 5, 2026
cf31b6e
Merge branch 'trunk' into KAFKA-14648
frankvicky Jan 5, 2026
3168e32
Fix failed tests
frankvicky Jan 5, 2026
3bf9402
Merge branch 'trunk' into KAFKA-14648
frankvicky Jan 5, 2026
71c285f
Fix the failed tests
frankvicky Jan 5, 2026
14474a5
Merge branch 'trunk' into KAFKA-14648
frankvicky Jan 8, 2026
f73e995
Merge branch 'trunk' into KAFKA-14648
frankvicky Jan 14, 2026
d764844
Merge branch 'trunk' into KAFKA-14648
frankvicky Feb 3, 2026
39ee0b1
fix conflict
frankvicky Feb 3, 2026
0afecf5
fix failed tests
frankvicky Feb 3, 2026
925fcf6
Merge branch 'trunk' into KAFKA-14648
frankvicky Feb 25, 2026
f91a196
address comments
frankvicky Feb 25, 2026
3390a69
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 5, 2026
2422387
address comments
frankvicky Mar 8, 2026
b566622
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 8, 2026
8d36da7
fix failed tests
frankvicky Mar 9, 2026
aa9a72e
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 9, 2026
ef3bb8f
address comments
frankvicky Mar 17, 2026
8d22bd3
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 17, 2026
c06e01c
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 18, 2026
7458b72
address comments
frankvicky Mar 18, 2026
993b42b
address comments
frankvicky Mar 18, 2026
daeea8e
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 18, 2026
9b53482
address comments
frankvicky Mar 18, 2026
6f414e9
fix failed tests
frankvicky Mar 18, 2026
7ca2642
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 18, 2026
2550e57
address comments
frankvicky Mar 18, 2026
4ebbd77
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 18, 2026
4566ef1
fix failed tests
frankvicky Mar 19, 2026
0c15459
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 19, 2026
e1b31b2
address comments
frankvicky Mar 19, 2026
b368e71
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 19, 2026
bfdd24c
address comments
frankvicky Mar 19, 2026
8645123
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 25, 2026
e3159c8
address comments
frankvicky Mar 25, 2026
4e745ab
address comments
frankvicky Mar 25, 2026
0621ede
address comments
frankvicky Mar 27, 2026
9ceba73
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 27, 2026
ff5492e
add interrupt mechanism
frankvicky Mar 27, 2026
3d9c1b2
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 27, 2026
c0ce72a
address comments
frankvicky Mar 27, 2026
faa8054
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 30, 2026
02ca329
Merge branch 'trunk' into KAFKA-14648
frankvicky Apr 2, 2026
11e5b3f
Merge branch 'trunk' into KAFKA-14648
frankvicky Apr 2, 2026
752d79c
address comments
frankvicky Apr 2, 2026
46eb423
Merge branch 'trunk' into KAFKA-14648
frankvicky Apr 8, 2026
f78ddc3
address comments
frankvicky Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 80 additions & 20 deletions clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,72 @@ public final class ClientUtils {
private ClientUtils() {
}

/**
* Resolves a single URL to one or more InetSocketAddress based on the DNS lookup strategy.
*
* @param url the original URL string (for logging)
* @param host the hostname extracted from the URL
* @param port the port extracted from the URL
* @param clientDnsLookup the DNS lookup strategy
* @return list of resolved addresses (may be empty if addresses are unresolved)
* @throws UnknownHostException if DNS resolution fails
*/
private static List<InetSocketAddress> resolveAddress(
String url,
String host,
Integer port,
ClientDnsLookup clientDnsLookup) throws UnknownHostException {

List<InetSocketAddress> addresses = new ArrayList<>();

if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
InetAddress[] inetAddresses = InetAddress.getAllByName(host);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InetAddress.getAllByName is a blocking call, which will cause poll loop to block. Did the KIP mention this behavior?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not, we should update the KIP as well

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (InetAddress inetAddress : inetAddresses) {
String resolvedCanonicalName = inetAddress.getCanonicalHostName();
InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port);
if (address.isUnresolved()) {
log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}",
url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host);
} else {
addresses.add(address);
}
}
} else {
InetSocketAddress address = new InetSocketAddress(host, port);
if (address.isUnresolved()) {
log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}",
url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
} else {
addresses.add(address);
}
}

return addresses;
}

public static List<InetSocketAddress> parseAddresses(List<String> urls, ClientDnsLookup clientDnsLookup) {
List<InetSocketAddress> addresses = new ArrayList<>();
if (urls == null) {
return addresses;
}
urls.forEach(url -> {
final String host = getHost(url);
final Integer port = getPort(url);
Comment on lines +105 to +106
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getHost and getPort may return null. Should we check it?


if (host == null || port == null) {
log.warn("Skipping invalid bootstrap URL: {}", url);
return;
}

try {
addresses.addAll(resolveAddress(url, host, port, clientDnsLookup));
} catch (UnknownHostException e) {
// Silently ignore - this matches the original behavior
}
});
return addresses;
}

public static List<InetSocketAddress> parseAndValidateAddresses(AbstractConfig config) {
List<String> urls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
String clientDnsLookupConfig = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG);
Expand All @@ -73,25 +139,7 @@ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> url
if (host == null || port == null)
throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);

if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
InetAddress[] inetAddresses = InetAddress.getAllByName(host);
for (InetAddress inetAddress : inetAddresses) {
String resolvedCanonicalName = inetAddress.getCanonicalHostName();
InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port);
if (address.isUnresolved()) {
log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host);
} else {
addresses.add(address);
}
}
} else {
InetSocketAddress address = new InetSocketAddress(host, port);
if (address.isUnresolved()) {
log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
} else {
addresses.add(address);
}
}
addresses.addAll(resolveAddress(url, host, port, clientDnsLookup));

} catch (IllegalArgumentException e) {
throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
Expand Down Expand Up @@ -151,6 +199,7 @@ static List<InetAddress> filterPreferredAddresses(InetAddress[] allAddresses) {
}

public static NetworkClient createNetworkClient(AbstractConfig config,
List<String> bootstrapServers,
Metrics metrics,
String metricsGroupPrefix,
LogContext logContext,
Expand All @@ -161,6 +210,7 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
Sensor throttleTimeSensor,
ClientTelemetrySender clientTelemetrySender) {
return createNetworkClient(config,
bootstrapServers,
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG),
metrics,
metricsGroupPrefix,
Expand All @@ -177,6 +227,7 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
}

public static NetworkClient createNetworkClient(AbstractConfig config,
List<String> bootstrapServers,
String clientId,
Metrics metrics,
String metricsGroupPrefix,
Expand All @@ -201,6 +252,14 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
metricsGroupPrefix,
channelBuilder,
logContext);
ClientDnsLookup dnsLookup = ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));

NetworkClient.BootstrapConfiguration bootstrapConfiguration = NetworkClient.BootstrapConfiguration.enabled(
bootstrapServers != null ? bootstrapServers : List.of(),
dnsLookup,
config.getLong(CommonClientConfigs.BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG),
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG));

return new NetworkClient(metadataUpdater,
metadata,
selector,
Expand All @@ -221,7 +280,8 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
hostResolver,
clientTelemetrySender,
config.getLong(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG),
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG)),
bootstrapConfiguration
);
} catch (Throwable t) {
closeQuietly(selector, "Selector");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public class CommonClientConfigs {
+ "resolve each bootstrap address into a list of canonical names. After "
+ "the bootstrap phase, this behaves the same as <code>use_all_dns_ips</code>.";

public static final String BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG = "bootstrap.resolve.timeout.ms";
public static final long DEFAULT_BOOTSTRAP_RESOLVE_TIMEOUT_MS = 2 * 60 * 1000L;
public static final String BOOTSTRAP_RESOLVE_TIMEOUT_MS_DOC = "Maximum amount of time clients can spend trying to" +
" resolve for the bootstrap server address. If the resolution cannot be completed within this timeframe, a " +
"<code>BootstrapResolutionException</code> will be thrown.";

public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -81,6 +82,19 @@ public void handleSuccessfulResponse(RequestHeader requestHeader, long now, Meta
// Do nothing
}

@Override
public boolean isBootstrapped() {
// ManualMetadataUpdater is designed for cases where nodes are manually set,
// so we consider it bootstrapped if nodes have been provided
return !nodes.isEmpty();
}

@Override
public void bootstrap(List<InetSocketAddress> addresses) {
// ManualMetadataUpdater doesn't use NetworkClient's bootstrap mechanism
// Nodes should be set manually via constructor or setNodes()
}

@Override
public void close() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ private void computeClusterView() {
.stream()
.map(metadata -> MetadataResponse.toPartitionInfo(metadata, nodes))
.collect(Collectors.toList());
this.clusterInstance = new Cluster(clusterId, nodes.values(), partitionInfos, unauthorizedTopics,
invalidTopics, internalTopics, controller, topicIds);
this.clusterInstance = new Cluster(clusterId, nodes.values(), partitionInfos,
unauthorizedTopics, invalidTopics, internalTopics, controller, topicIds);
}

static MetadataSnapshot bootstrap(List<InetSocketAddress> addresses) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.requests.RequestHeader;

import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -102,6 +103,17 @@ default boolean needsRebootstrap(long now, long rebootstrapTriggerMs) {
*/
default void rebootstrap(long now) {}

/**
* Returns true if the metadata has been bootstrapped.
*/
boolean isBootstrapped();

/**
* Bootstrap the metadata cache with the given addresses.
* @param addresses list of addresses for the bootstrap servers
*/
void bootstrap(List<InetSocketAddress> addresses);

/**
* Close this updater.
*/
Expand Down
Loading
Loading