Skip to content

KAFKA-20329: Test headers dsl.store.format further (4/N)#21827

Open
aliehsaeedii wants to merge 5 commits intoapache:trunkfrom
aliehsaeedii:dsl_headers_testing_4
Open

KAFKA-20329: Test headers dsl.store.format further (4/N)#21827
aliehsaeedii wants to merge 5 commits intoapache:trunkfrom
aliehsaeedii:dsl_headers_testing_4

Conversation

@aliehsaeedii
Copy link
Copy Markdown
Contributor

Parametrized 28 DSL and processor test classes to validate functionality with both headers and default store configurations. This ensures backward compatibility and correctness as Kafka Streams transitions to supporting record headers in state stores.

Modified Tests:

  • Materializer Tests
  • Windowed KStream Tests
  • Aggregation Tests
  • Cogroup Tests
  • Join Tests
  • KTable Tests

@github-actions github-actions bot added triage PRs from the community streams tests Test fixes (including flaky tests) labels Mar 19, 2026
@aliehsaeedii aliehsaeedii force-pushed the dsl_headers_testing_4 branch from 6ab5990 to 9488168 Compare March 26, 2026 14:15
@github-actions
Copy link
Copy Markdown

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

setUp(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
Copy link
Copy Markdown
Member

@mjsax mjsax Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we did disable this explicitly as it's disabled by default -- seems we have similar stuff in other tests on this class. Should we also remove it? -- Any reason why you remove it here, but not elsewhere?

Still wondering, why we have this explicit disabling 🤔

setUp(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this?

public void shouldNotFailIfTableIsVersionedButMaterializationIsInherited(final boolean withHeaders) {
setUp(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();
final Properties props = new Properties();
Copy link
Copy Markdown
Member

@mjsax mjsax Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setUp create it's own local Properties which is passed into TTD. So if we create new one here, that we need to pass into builder.build() below, why do we need to call setUp() at all?

This pattern of calling setUp and using local Properties is on other methods, too.

Copy link
Copy Markdown
Contributor Author

@aliehsaeedii aliehsaeedii Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setUp must be called on every test as it used to be a @beforeEach method


//should not throw an error
builder.build();
builder.build(props);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to pass in props now, but not before? I understand that the "headers" flag is a DSL config, so if the build(...) method needs that we pass props to pick it up, it seems we need to make this change everywhere to actually enable header store? This would also apply to your other PRs (maybe even the already merged N/2? -- If we need to pass in here, all your other changes seems to actually not enable header stores?

public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap(final EmitStrategy.StrategyType inputType, final boolean enableCaching, final boolean withHeaders) {
setup(inputType, enableCaching, withHeaders);
// This test expects caching behavior for accurate result counts
if (!enableCaching && !emitFinal) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we run this test only with caching disabled, and emit-on-change, but not with emit-on-final? Seems the original test setup did includ emit-on-final?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original test considers caching as enabled:

public void shouldHandleMultipleSessionsAndMerging(final EmitStrategy.StrategyType inputType, final boolean enableCaching, final boolean withHeaders) {
setup(inputType, enableCaching, withHeaders);
// This test expects caching behavior for accurate result counts
if (!enableCaching && !emitFinal) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

props.putAll(CONFIG);
props.putAll(StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()));
props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS);
builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need this change at all, given that versions stores don't support headers yet?

@MethodSource("data")
public void testCountOfVersionedStore(final boolean withHeaders) {
final StreamsBuilder builder;
if (withHeaders) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above


@BeforeEach
public void setUp() {
public void setUp(final boolean withHeaders) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to parameterize this test (same question for "sliding" and "windowed" case). This test already has explicit setup to test w/ and w/o header stores, for different test methods.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Made a pass.

@github-actions github-actions bot removed needs-attention triage PRs from the community labels Apr 7, 2026
@aliehsaeedii aliehsaeedii force-pushed the dsl_headers_testing_4 branch from 9488168 to 162d4ad Compare April 7, 2026 21:29
Copy link
Copy Markdown
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR

private boolean emitFinal;

private void setup(final EmitStrategy.StrategyType inputType, final boolean enableCaching) {
public static Stream<Arguments> data() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have some meaningful name?
for example emitStrategyAndCachingMatrix

@@ -92,14 +92,22 @@ public class KStreamSlidingWindowAggregateTest {

public static Stream<Arguments> data() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, we should rename it.

private boolean emitFinal;

public static Stream<Arguments> getEmitStrategy() {
public static Stream<Arguments> data() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath())));

private StreamsBuilder createStreamBuilderInMemory() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant empty line


@BeforeEach
public void setUp() {
private void setupProps(final boolean withHeaders) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
I suppose we could keep the original name.

assertThat(outputTopic.readRecord(), equalTo(new TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp)));
}

private void setDslStoreFormat(final boolean withHeaders) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed there's Javadoc for this method in other test files. I’m fine with or without it—up to you—but the key is to stay consistent.

assertThat(outputTopic.readRecord(), equalTo(new TestRecord<>(expectedKey, expectedValue, null, expectedTimestamp)));
}

private void setDslStoreFormat(final boolean withHeaders) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}

private void setDslStoreFormat(final boolean withHeaders) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}

private void setDslStoreFormat(final boolean withHeaders) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}

private void setDslStoreFormat(final boolean withHeaders) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@frankvicky
Copy link
Copy Markdown
Contributor

retrigger CI

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved streams tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants