Skip to content

KAFKA-20417: Add KIP-1035 self-managed offset recovery integration tests#21998

Open
bbejeck wants to merge 3 commits intoapache:trunkfrom
bbejeck:KAFKA-20417-self-managed-offset-recovery-tests
Open

KAFKA-20417: Add KIP-1035 self-managed offset recovery integration tests#21998
bbejeck wants to merge 3 commits intoapache:trunkfrom
bbejeck:KAFKA-20417-self-managed-offset-recovery-tests

Conversation

@bbejeck
Copy link
Copy Markdown
Member

@bbejeck bbejeck commented Apr 8, 2026

Adds SelfManagedOffsetRecoveryIntegrationTest and
RocksDBStoreTestingUtils

Tests verify Kafka Streams recovers correctly after unclean shutdowns
when offsets are stored in RocksDB column families (KIP-1035). Covers:

  • Unclean shutdown recovery (ALOS and EOS) - Missing
    offsets triggering task corruption detection
  • Combined status=open + missing offsets
  • EOS exactly-once guarantees preserved after recovery
  • Multi-store partial corruption
  • Standby task recovery
  • KAFKA-19712 regression (standby TaskCorruptedException after state
    wipe)

Reviewers: Matthias J. Sax matthias@confluent.io

@github-actions github-actions bot added streams tests Test fixes (including flaky tests) labels Apr 8, 2026
Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass.

kafkaStreams.close(STREAMS_CLOSE_TIMEOUT);
}

private KafkaStreams startStreams() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we reuse IntegrationTestUtils#startApplicationAndWaitUntilRunning instead of adding these helpers?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left startStreams since it handles a couple of operations cleaning up the test some, but it does reuse IntegrationTestUtils.startApplicationAndWaitUntilRunning

// Phase 2: shut down instance 1, wipe its entire state, then restart.
// This simulates the LittleHorse scenario: complete state deletion followed by
// changelog restoration. The standby tasks on this instance will have stores
// that were never initialized with offsets.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The standby tasks on this instance will have stores that were never initialized with offsets.

How do we know that there will be standby task on streams1 ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added assertions to explicity show the standbys assigned to streams1

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Apr 8, 2026

@mjsax addressed comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

streams tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants