Skip to content
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
e5be409
KAFKA-14648: Add boostrap timeout config
frankvicky Dec 4, 2025
62f852a
Merge branch 'trunk' into KAFKA-14648
frankvicky Dec 5, 2025
e1f8466
fix the error test
frankvicky Dec 5, 2025
4a06620
Seperate config and state
frankvicky Dec 15, 2025
b6fe307
add disable instance to BootstrapConfiguration
frankvicky Dec 16, 2025
2a146be
Fix tests
frankvicky Dec 16, 2025
9922711
Merge branch 'trunk' into KAFKA-14648
frankvicky Dec 17, 2025
668f24e
Merge branch 'trunk' into KAFKA-14648
frankvicky Jan 2, 2026
9f1afdb
Fix the failed tests
frankvicky Jan 2, 2026
f7adeda
Fix failed tests
frankvicky Jan 5, 2026
cf31b6e
Merge branch 'trunk' into KAFKA-14648
frankvicky Jan 5, 2026
3168e32
Fix failed tests
frankvicky Jan 5, 2026
3bf9402
Merge branch 'trunk' into KAFKA-14648
frankvicky Jan 5, 2026
71c285f
Fix the failed tests
frankvicky Jan 5, 2026
14474a5
Merge branch 'trunk' into KAFKA-14648
frankvicky Jan 8, 2026
f73e995
Merge branch 'trunk' into KAFKA-14648
frankvicky Jan 14, 2026
d764844
Merge branch 'trunk' into KAFKA-14648
frankvicky Feb 3, 2026
39ee0b1
fix conflict
frankvicky Feb 3, 2026
0afecf5
fix failed tests
frankvicky Feb 3, 2026
925fcf6
Merge branch 'trunk' into KAFKA-14648
frankvicky Feb 25, 2026
f91a196
address comments
frankvicky Feb 25, 2026
3390a69
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 5, 2026
2422387
address comments
frankvicky Mar 8, 2026
b566622
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 8, 2026
8d36da7
fix failed tests
frankvicky Mar 9, 2026
aa9a72e
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 9, 2026
ef3bb8f
address comments
frankvicky Mar 17, 2026
8d22bd3
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 17, 2026
c06e01c
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 18, 2026
7458b72
address comments
frankvicky Mar 18, 2026
993b42b
address comments
frankvicky Mar 18, 2026
daeea8e
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 18, 2026
9b53482
address comments
frankvicky Mar 18, 2026
6f414e9
fix failed tests
frankvicky Mar 18, 2026
7ca2642
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 18, 2026
2550e57
address comments
frankvicky Mar 18, 2026
4ebbd77
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 18, 2026
4566ef1
fix failed tests
frankvicky Mar 19, 2026
0c15459
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 19, 2026
e1b31b2
address comments
frankvicky Mar 19, 2026
b368e71
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 19, 2026
bfdd24c
address comments
frankvicky Mar 19, 2026
8645123
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 25, 2026
e3159c8
address comments
frankvicky Mar 25, 2026
4e745ab
address comments
frankvicky Mar 25, 2026
0621ede
address comments
frankvicky Mar 27, 2026
9ceba73
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 27, 2026
ff5492e
add interrupt mechanism
frankvicky Mar 27, 2026
3d9c1b2
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 27, 2026
c0ce72a
address comments
frankvicky Mar 27, 2026
faa8054
Merge branch 'trunk' into KAFKA-14648
frankvicky Mar 30, 2026
02ca329
Merge branch 'trunk' into KAFKA-14648
frankvicky Apr 2, 2026
11e5b3f
Merge branch 'trunk' into KAFKA-14648
frankvicky Apr 2, 2026
752d79c
address comments
frankvicky Apr 2, 2026
46eb423
Merge branch 'trunk' into KAFKA-14648
frankvicky Apr 8, 2026
f78ddc3
address comments
frankvicky Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,44 @@ public final class ClientUtils {
private ClientUtils() {
}

public static List<InetSocketAddress> validateAddresses(List<String> urls, ClientDnsLookup clientDnsLookup) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the old one is called parseAndValidateAddresses, this one is probably closer to parseAddresses, as we omit the validation part, not the parsing part

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky Can we change this to parseAddresses as @Nikita-Shupletsov suggested?

List<InetSocketAddress> addresses = new ArrayList<>();
if (urls == null) {
return addresses;
}
urls.forEach(url -> {
final String host = getHost(url);
final Integer port = getPort(url);
Comment on lines +105 to +106
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getHost and getPort may return null. Should we check it?


if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
InetAddress[] inetAddresses;
try {
inetAddresses = InetAddress.getAllByName(host);
} catch (UnknownHostException e) {
inetAddresses = new InetAddress[0];
}

for (InetAddress inetAddress : inetAddresses) {
String resolvedCanonicalName = inetAddress.getCanonicalHostName();
InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port);
if (address.isUnresolved()) {
log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host);
} else {
addresses.add(address);
}
}
} else {
InetSocketAddress address = new InetSocketAddress(host, port);
if (address.isUnresolved()) {
log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
} else {
addresses.add(address);
}
}
});
return addresses;
}

public static List<InetSocketAddress> parseAndValidateAddresses(AbstractConfig config) {
List<String> urls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
String clientDnsLookupConfig = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG);
Expand Down Expand Up @@ -219,6 +257,7 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
ClientTelemetrySender clientTelemetrySender) {
ChannelBuilder channelBuilder = null;
Selector selector = null;
NetworkClient.BootstrapConfiguration bootstrapConfiguration;

try {
channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
Expand All @@ -228,6 +267,20 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
metricsGroupPrefix,
channelBuilder,
logContext);
List<String> bootstrapServers = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
ClientDnsLookup dnsLookup = ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));

// Only validate if bootstrap servers are provided (non-empty list)
// This allows configurations that don't use bootstrap (e.g., broker-to-broker) to skip validation
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why to resolve the dns for now? IIRC, KIP-909 tries to handle the resolution error in the poll loop, right?

if (bootstrapServers != null && !bootstrapServers.isEmpty()) {
parseAndValidateAddresses(bootstrapServers, dnsLookup);
}

bootstrapConfiguration = new NetworkClient.BootstrapConfiguration(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line#275 could be merged into this one.

bootstrapServers,
dnsLookup,
CommonClientConfigs.DEFAULT_BOOTSTRAP_RESOLVE_TIMEOUT_MS
);
return new NetworkClient(metadataUpdater,
metadata,
selector,
Expand All @@ -248,7 +301,8 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
hostResolver,
clientTelemetrySender,
config.getLong(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG),
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG)),
bootstrapConfiguration
);
} catch (Throwable t) {
closeQuietly(selector, "Selector");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public class CommonClientConfigs {
+ "resolve each bootstrap address into a list of canonical names. After "
+ "the bootstrap phase, this behaves the same as <code>use_all_dns_ips</code>.";

public static final String BOOTSTRAP_RESOLVE_TIMEOUT_MS_CONFIG = "bootstrap.resolve.timeout.ms";
public static final long DEFAULT_BOOTSTRAP_RESOLVE_TIMEOUT_MS = 2 * 60 * 1000L;
public static final String BOOTSTRAP_RESOLVE_TIMEOUT_MS_DOC = "Maximum amount of time clients can spend trying to" +
" resolve for the bootstrap server address. If the resolution cannot be completed within this timeframe, a " +
"<code>BootstrapResolutionException</code> will be thrown.";

public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,5 +212,4 @@ ClientRequest newClientRequest(String nodeId,
* was invoked for this client.
*/
boolean active();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary formatting change in a file with no other changes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this file back to the original state?

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -81,6 +82,16 @@ public void handleSuccessfulResponse(RequestHeader requestHeader, long now, Meta
// Do nothing
}

@Override
public boolean isBootstrapped() {
return false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use default implementation to reduce the changes in code base

}

@Override
public void bootstrap(List<InetSocketAddress> addresses) {

}

@Override
public void close() {
}
Expand Down
8 changes: 6 additions & 2 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,13 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
return this.metadataSnapshot.mergeWith(metadataResponse.clusterId(), nodes, partitions,
unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller(), topicIds,
(topic, isInternal) -> !topics.contains(topic) && retainTopic(topic, isInternal, nowMs));
else
else {
// Preserve the bootstrap flag from the current snapshot when creating a new one
boolean isBootstrapConfigured = this.metadataSnapshot.cluster().isBootstrapConfigured();
return new MetadataSnapshot(metadataResponse.clusterId(), nodes, partitions,
unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller(), topicIds);
unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller(), topicIds,
isBootstrapConfigured, null);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class MetadataSnapshot {
private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
private final Map<String, Uuid> topicIds;
private final Map<Uuid, String> topicNames;
private final boolean isBootstrapConfigured;
private Cluster clusterInstance;

public MetadataSnapshot(String clusterId,
Expand All @@ -64,7 +65,7 @@ public MetadataSnapshot(String clusterId,
Set<String> internalTopics,
Node controller,
Map<String, Uuid> topicIds) {
this(clusterId, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller, topicIds, null);
this(clusterId, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller, topicIds, false, null);
}

// Visible for testing
Expand All @@ -76,6 +77,7 @@ public MetadataSnapshot(String clusterId,
Set<String> internalTopics,
Node controller,
Map<String, Uuid> topicIds,
boolean isBootstrapConfigured,
Cluster clusterInstance) {
this.clusterId = clusterId;
this.nodes = Collections.unmodifiableMap(nodes);
Expand All @@ -87,6 +89,7 @@ public MetadataSnapshot(String clusterId,
this.topicNames = Collections.unmodifiableMap(
topicIds.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey))
);
this.isBootstrapConfigured = isBootstrapConfigured;

Map<TopicPartition, PartitionMetadata> tmpMetadataByPartition = new HashMap<>(partitions.size());
for (PartitionMetadata p : partitions) {
Expand Down Expand Up @@ -199,8 +202,9 @@ MetadataSnapshot mergeWith(String newClusterId,
Set<String> newInvalidTopics = fillSet(addInvalidTopics, invalidTopics, shouldRetainTopic);
Set<String> newInternalTopics = fillSet(addInternalTopics, internalTopics, shouldRetainTopic);

// Preserve the bootstrap flag from the current snapshot during merge
return new MetadataSnapshot(newClusterId, newNodes, newMetadataByPartition.values(), newUnauthorizedTopics,
newInvalidTopics, newInternalTopics, newController, newTopicIds);
newInvalidTopics, newInternalTopics, newController, newTopicIds, this.isBootstrapConfigured, null);
}

/**
Expand All @@ -227,8 +231,9 @@ private void computeClusterView() {
.stream()
.map(metadata -> MetadataResponse.toPartitionInfo(metadata, nodes))
.collect(Collectors.toList());
this.clusterInstance = new Cluster(clusterId, nodes.values(), partitionInfos, unauthorizedTopics,
invalidTopics, internalTopics, controller, topicIds);
// Use the factory method that preserves the bootstrap state
this.clusterInstance = Cluster.withBootstrapFlag(clusterId, nodes.values(), partitionInfos,
unauthorizedTopics, invalidTopics, internalTopics, controller, topicIds, isBootstrapConfigured);
}

static MetadataSnapshot bootstrap(List<InetSocketAddress> addresses) {
Expand All @@ -240,12 +245,12 @@ static MetadataSnapshot bootstrap(List<InetSocketAddress> addresses) {
}
return new MetadataSnapshot(null, nodes, Collections.emptyList(),
Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
null, Collections.emptyMap(), Cluster.bootstrap(addresses));
null, Collections.emptyMap(), true, Cluster.bootstrap(addresses));
}

static MetadataSnapshot empty() {
return new MetadataSnapshot(null, Collections.emptyMap(), Collections.emptyList(),
Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap(), Cluster.empty());
Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap(), false, Cluster.empty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.requests.RequestHeader;

import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -102,6 +103,17 @@ default boolean needsRebootstrap(long now, long rebootstrapTriggerMs) {
*/
default void rebootstrap(long now) {}

/**
* Returns true if the metadata has been bootstrapped.
*/
boolean isBootstrapped();

/**
* Bootstrap the metadata cache with the given addresses.
* @param addresses list of addresses for the bootstrap servers
*/
void bootstrap(List<InetSocketAddress> addresses);

/**
* Close this updater.
*/
Expand Down
Loading
Loading