KAFKA-14648: Do not fail clients if bootstrap servers is not immediately resolvable#21080
KAFKA-14648: Do not fail clients if bootstrap servers is not immediately resolvable#21080frankvicky wants to merge 56 commits intoapache:trunkfrom
Conversation
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
|
|
||
| @Override | ||
| public void bootstrap(List<InetSocketAddress> addresses) { | ||
| // AdminClient handles bootstrap during construction, so this method is not used |
There was a problem hiding this comment.
KIP-909 states "Consumer, Producer, and Admin Clients: The bootstrap code will be changed.", so this comment is not correct
| config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); | ||
| metrics = new Metrics(metricConfig, reporters, time, metricsContext); | ||
|
|
||
| // Use the appropriate bootstrap configuration determined by AdminBootstrapAddresses |
There was a problem hiding this comment.
It seems AdminBootstrapAddresses.fromConfig(config); will resolve the dns already, which is conflict with KIP-909
|
looks like the test failures may be related to the change |
| unresolvedAddresses.add(InetSocketAddress.createUnresolved(host, port)); | ||
| } | ||
| } | ||
| metadataManager.update(Cluster.bootstrap(unresolvedAddresses), time.milliseconds()); |
There was a problem hiding this comment.
The cluster created by Cluster.bootstrap has isBootstrapConfigured=true, so bootstrapCluster will be updated immediately. This makes isBootstrapped() incorrectly return true
There was a problem hiding this comment.
Maybe we should leverage MetadataUpdater#bootstrap to set the resolved bootstrap servers once the DNS resolution succeeds in NetworkClient?
| long remainingBootstrapTimeMs = bootstrapTimer.remainingMs(); | ||
| long sleepTimeMs = Math.min(Math.min(remainingPollTimeMs, remainingBootstrapTimeMs), bootstrapConfiguration.retryBackoffMs); | ||
|
|
||
| if (sleepTimeMs > 0) { |
There was a problem hiding this comment.
What happens if users try to interrupt the poll?
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky Just wondering, did we discuss the blocking DNS resolution in poll before? I'm thinking we could use a separate BootstrapResolver class with an independent thread to handle this. We can pass it to Metadata or AdminMetadataManager so we don't have to change NetworkClient too much. NetworkClient#ensureBootstrapped can just poll the metadataUpdater's state with blocking.
|
|
||
| while (true) { | ||
| // Check if thread has been interrupted | ||
| if (Thread.interrupted()) { |
There was a problem hiding this comment.
Should we propagate the exception directly?
if (Thread.interrupted()) {
throw new InterruptException(new InterruptedException());
}| * @param isBootstrapConfigured whether the cluster is bootstrapped | ||
| * @return a new Cluster instance with the specified bootstrap state | ||
| */ | ||
| public static Cluster withBootstrapFlag(String clusterId, |
There was a problem hiding this comment.
This change seems to be unnecessary. Also, it is a public class so we should not make this change here
There was a problem hiding this comment.
I understand your concern about adding a new public API. However, the Cluster.withBootstrapFlag() method is necessary for the KIP-909 implementation. Here's why:
- The isBootstrapConfigured flag must be preserved across metadata updates:
- After DNS resolution succeeds, we create a cluster with
isBootstrapConfigured=true - When a partial metadata update occurs (via
MetadataSnapshot.mergeWith()), this flag needs to be preserved - This allows
AdminMetadataManager.update()to correctly distinguish between bootstrap cluster and real cluster metadata
- Package structure constraints:
- Cluster is in the
org.apache.kafka.common package MetadataSnapshotis in theorg.apache.kafka.clientspackage- We cannot use package-private methods to access each other
- The only options are: public static method or restructuring the package layout
- Issues with alternative approaches:
- Not preserving the flag: Would break
AdminMetadataManager's ability to correctly identify bootstrap clusters, affecting re-bootstrap logic - Modifying package structure: Too large a change with high risk
- Other designs: All require similar public API additions or more complex refactoring
Alternative consideration:
If adding this public method is a blocker, the alternative would be to:
- Move both Cluster and MetadataSnapshot to the same package, or
- Add a ClusterBuilder class in the clients package that has access to create clusters with specific bootstrap flags
There was a problem hiding this comment.
we had on offline discussion about this one. Another option to consider is to completely remove the isBootstrapConfigured flag, and rely on checking if metadata has nodes instead (having nodes, either bootstrap nodes or from real metadata, means that we can skip DNS resolution, which is what the isBootstrapConfigured was used for).
The changes are in place now, with that, no need for changes on this Cluster class. Thoughts @chia7712 ?
| config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), | ||
| config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), | ||
| adminAddresses.usingBootstrapControllers()); | ||
| metadataManager.update(Cluster.bootstrap(adminAddresses.addresses()), time.milliseconds()); |
There was a problem hiding this comment.
it seems adminAddresses.addresses() is not used any more. Maybe we could just remove AdminBootstrapAddresses and use a helper method instead?
the while loop in |
|
About the blocking behaviour, agree with removing the while loop within the network client poll, and let the retry happen on the next poll. -- update |
|
|
||
| // DNS resolution failed | ||
| if (bootstrapTimer.isExpired()) { | ||
| throw new BootstrapResolutionException("Failed to resolve bootstrap servers after " + |
There was a problem hiding this comment.
from the KIP I get that we want this to propagate to the consumer.poll right?
If so, then we need to catch this exception and send an error event to the app thread, otherwise this error will be swallowed in the background (this
One option I see is to catch this at the NetworkClientDelegate level, where we have the backgroundEventHandler, and propagate the error there via event (just like we do to propagate metadata errors)
Btw, we really need to have a test at the KafkaConsumerTest level covering this behaviour. E.g, create consumer with invalid host, poll continuously, poll should eventually throw the bootstrap exception (for both consumers). I expect that wouldn't work now for the async.
| } | ||
|
|
||
| @Test | ||
| public void testAsyncConsumerBootstrapResolutionExceptionPropagatedToPoll() throws InterruptedException { |
There was a problem hiding this comment.
this behaviour applies equally to both consumers. Should we better parameterize the test to run it for group.protocol=classic too?
This PR aims to deliver
KIP-909.
Reviewers: Lianet Magrans 98415067+lianetm@users.noreply.github.com