diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index cf98c0226bf8b..11167c84fc1f0 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -48,9 +48,10 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.time.Duration; @@ -136,12 +137,13 @@ public void after() throws Exception { IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } - @Test - public void shouldKStreamGlobalKTableLeftJoin() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldKStreamGlobalKTableLeftJoin(final boolean withHeaders) throws Exception { final KStream streamTableJoin = stream.leftJoin(globalTable, keyMapper, joiner); streamTableJoin.foreach(foreachAction); produceInitialGlobalTableValues(); - startStreams(); + startStreams(withHeaders); produceTopicValues(streamTopic); final Map expected = new HashMap<>(); @@ -207,12 +209,13 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { ); } - @Test - public void shouldKStreamGlobalKTableJoin() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldKStreamGlobalKTableJoin(final boolean withHeaders) throws Exception { final KStream streamTableJoin = stream.join(globalTable, keyMapper, joiner); streamTableJoin.foreach(foreachAction); produceInitialGlobalTableValues(); - startStreams(); + startStreams(withHeaders); produceTopicValues(streamTopic); final Map expected = new HashMap<>(); @@ -277,11 +280,12 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { ); } - @Test - public void shouldRestoreTransactionalMessages() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldRestoreTransactionalMessages(final boolean withHeaders) throws Exception { produceInitialGlobalTableValues(); - startStreams(); + startStreams(withHeaders); final Map expected = new HashMap<>(); expected.put(1L, "A"); @@ -309,13 +313,14 @@ public void shouldRestoreTransactionalMessages() throws Exception { ); } - @Test - public void shouldNotRestoreAbortedMessages() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldNotRestoreAbortedMessages(final boolean withHeaders) throws Exception { produceAbortedMessages(); produceInitialGlobalTableValues(); produceAbortedMessages(); - startStreams(); + startStreams(withHeaders); final Map expected = new HashMap<>(); expected.put(1L, "A"); @@ -350,7 +355,10 @@ private void createTopics(final String safeTestName) throws Exception { CLUSTER.createTopic(globalTableTopic, 2, 1); } - private void startStreams() { + private void startStreams(final boolean withHeaders) { + if (withHeaders) { + streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } startStreams(null); } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 9730eff0490ae..4fc0cf85eab41 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -54,9 +54,10 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.time.Duration; @@ -139,8 +140,10 @@ public void whenShuttingDown() throws Exception { IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } - @Test - public void shouldKStreamGlobalKTableLeftJoin() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldKStreamGlobalKTableLeftJoin(final boolean withHeaders) throws Exception { + IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders); final KStream streamTableJoin = stream.leftJoin(globalTable, keyMapper, joiner); streamTableJoin.process(supplier); produceInitialGlobalTableValues(); @@ -224,8 +227,10 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { "waiting for final values"); } - @Test - public void shouldKStreamGlobalKTableJoin() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldKStreamGlobalKTableJoin(final boolean withHeaders) throws Exception { + IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders); final KStream streamTableJoin = stream.join(globalTable, keyMapper, joiner); streamTableJoin.process(supplier); produceInitialGlobalTableValues(); @@ -309,8 +314,10 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { "waiting for final values"); } - @Test - public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldRestoreGlobalInMemoryKTableOnRestart(final boolean withHeaders) throws Exception { + IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders); builder = new StreamsBuilder(); globalTable = builder.globalTable( globalTableTopic, @@ -340,8 +347,10 @@ public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception { assertThat(timestampedStore.approximateNumEntries(), equalTo(4L)); } - @Test - public void shouldGetToRunningWithOnlyGlobalTopology() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldGetToRunningWithOnlyGlobalTopology(final boolean withHeaders) throws Exception { + IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders); builder = new StreamsBuilder(); globalTable = builder.globalTable( globalTableTopic, @@ -375,8 +384,9 @@ public void process(final Record record) { ); } - @Test - public void testProcessingExceptionHandlerContinueEnabledRestorationPhase() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testProcessingExceptionHandlerContinueEnabledRestorationPhase(final boolean withHeaders) throws Exception { createBuilderWithFailedProcessor(); // enable processing exception handler invoked config TestGlobalProcessingExceptionHandler.shouldResume = true; @@ -384,6 +394,8 @@ public void testProcessingExceptionHandlerContinueEnabledRestorationPhase() thro streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, TestGlobalProcessingExceptionHandler.class); + IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders); + produceInitialGlobalTableValues(); startStreams(); waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, Duration.ofSeconds(30)); @@ -391,8 +403,9 @@ public void testProcessingExceptionHandlerContinueEnabledRestorationPhase() thro assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get()); } - @Test - public void testProcessingExceptionHandlerFailEnabledRestorationPhase() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testProcessingExceptionHandlerFailEnabledRestorationPhase(final boolean withHeaders) throws Exception { createBuilderWithFailedProcessor(); // enable processing exception handler invoked config TestGlobalProcessingExceptionHandler.shouldResume = false; @@ -400,6 +413,8 @@ public void testProcessingExceptionHandlerFailEnabledRestorationPhase() throws E streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, TestGlobalProcessingExceptionHandler.class); + IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders); + produceInitialGlobalTableValues(); assertThrows(StreamsException.class, () -> { startStreams(); @@ -408,8 +423,9 @@ public void testProcessingExceptionHandlerFailEnabledRestorationPhase() throws E } - @Test - public void testProcessingExceptionHandlerDisabledRestorationPhase() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testProcessingExceptionHandlerDisabledRestorationPhase(final boolean withHeaders) throws Exception { createBuilderWithFailedProcessor(); // disable processing exception handler invoked config TestGlobalProcessingExceptionHandler.shouldResume = false; @@ -417,6 +433,8 @@ public void testProcessingExceptionHandlerDisabledRestorationPhase() throws Exce streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, TestGlobalProcessingExceptionHandler.class); + IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders); + produceInitialGlobalTableValues(); assertThrows(StreamsException.class, () -> { startStreams(); @@ -425,8 +443,9 @@ public void testProcessingExceptionHandlerDisabledRestorationPhase() throws Exce } - @Test - public void testProcessingExceptionHandlerContinueEnabledRunTimePhase() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testProcessingExceptionHandlerContinueEnabledRunTimePhase(final boolean withHeaders) throws Exception { createBuilderWithFailedProcessor(); // enable processing exception handler invoked config TestGlobalProcessingExceptionHandler.shouldResume = true; @@ -434,6 +453,8 @@ public void testProcessingExceptionHandlerContinueEnabledRunTimePhase() throws E streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, TestGlobalProcessingExceptionHandler.class); + IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders); + startStreams(); waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, Duration.ofSeconds(30)); produceInitialGlobalTableValues(); @@ -445,14 +466,17 @@ public void testProcessingExceptionHandlerContinueEnabledRunTimePhase() throws E ); } - @Test - public void testProcessingExceptionHandlerFailEnabledRunTimePhase() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testProcessingExceptionHandlerFailEnabledRunTimePhase(final boolean withHeaders) throws Exception { createBuilderWithFailedProcessor(); // enable processing exception handler invoked config streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG, true); streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, TestGlobalProcessingExceptionHandler.class); + IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders); + startStreams(); waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, Duration.ofSeconds(30)); produceInitialGlobalTableValues(); @@ -460,14 +484,17 @@ public void testProcessingExceptionHandlerFailEnabledRunTimePhase() throws Excep assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get()); } - @Test - public void testProcessingExceptionHandlerDisabledRunTimePhase() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testProcessingExceptionHandlerDisabledRunTimePhase(final boolean withHeaders) throws Exception { createBuilderWithFailedProcessor(); // disable processing exception handler invoked config streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG, false); streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, TestGlobalProcessingExceptionHandler.class); + IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders); + startStreams(); waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, Duration.ofSeconds(30)); produceInitialGlobalTableValues(); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 31e39a4c8b7d6..b197aa7365033 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -51,7 +51,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.CsvSource; import java.io.IOException; import java.time.Duration; @@ -149,11 +149,14 @@ private Admin createAdminClient() { return Admin.create(adminClientConfig); } - private void configureStreams(final boolean streamsProtocolEnabled, final String appID) { + private void configureStreams(final boolean streamsProtocolEnabled, final boolean withHeaders, final String appID) { streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); if (streamsProtocolEnabled) { streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); } + if (withHeaders) { + streamsProp.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } } /* @@ -161,10 +164,10 @@ private void configureStreams(final boolean streamsProtocolEnabled, final String * for internal repartition topics. See KAFKA-10689 */ @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldGetToRunningWithWindowedTableInFKJ(final boolean streamsProtocolEnabled) throws Exception { - final String appID = APP_ID + "-windowed-FKJ-" + streamsProtocolEnabled; - configureStreams(streamsProtocolEnabled, appID); + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldGetToRunningWithWindowedTableInFKJ(final boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception { + final String appID = APP_ID + "-windowed-FKJ-" + streamsProtocolEnabled + "-" + withHeaders; + configureStreams(streamsProtocolEnabled, withHeaders, appID); final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream inputTopic = streamsBuilder.stream(DEFAULT_INPUT_TOPIC); @@ -191,10 +194,10 @@ public void shouldGetToRunningWithWindowedTableInFKJ(final boolean streamsProtoc @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldCompactTopicsForKeyValueStoreChangelogs(final boolean streamsProtocolEnabled) throws Exception { - final String appID = APP_ID + "-compact-" + streamsProtocolEnabled; - configureStreams(streamsProtocolEnabled, appID); + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldCompactTopicsForKeyValueStoreChangelogs(final boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception { + final String appID = APP_ID + "-compact-" + streamsProtocolEnabled + "-" + withHeaders; + configureStreams(streamsProtocolEnabled, withHeaders, appID); // // Step 1: Configure and start a simple word count topology @@ -229,10 +232,10 @@ public void shouldCompactTopicsForKeyValueStoreChangelogs(final boolean streamsP } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs(final boolean streamsProtocolEnabled) throws Exception { - final String appID = APP_ID + "-compact-delete-" + streamsProtocolEnabled; - configureStreams(streamsProtocolEnabled, appID); + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs(final boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception { + final String appID = APP_ID + "-compact-delete-" + streamsProtocolEnabled + "-" + withHeaders; + configureStreams(streamsProtocolEnabled, withHeaders, appID); // // Step 1: Configure and start a simple word count topology diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index b31cee581b019..738e5cb5aa383 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -52,7 +52,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.CsvSource; import java.io.IOException; import java.time.Duration; @@ -343,11 +343,14 @@ private void closeApplication() throws Exception { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldAddMetricsOnAllLevels(final boolean streamsProtocolEnabled) throws Exception { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldAddMetricsOnAllLevels(final boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception { if (streamsProtocolEnabled) { streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); } + if (withHeaders) { + streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())) .to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String())); @@ -382,11 +385,14 @@ public void shouldAddMetricsOnAllLevels(final boolean streamsProtocolEnabled) th } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldAddMetricsForWindowStoreAndSuppressionBuffer(final boolean streamsProtocolEnabled) throws Exception { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldAddMetricsForWindowStoreAndSuppressionBuffer(final boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception { if (streamsProtocolEnabled) { streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); } + if (withHeaders) { + streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } final Duration windowSize = Duration.ofMillis(50); builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())) @@ -415,11 +421,14 @@ public void shouldAddMetricsForWindowStoreAndSuppressionBuffer(final boolean str } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldAddMetricsForSessionStore(final boolean streamsProtocolEnabled) throws Exception { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldAddMetricsForSessionStore(final boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception { if (streamsProtocolEnabled) { streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); } + if (withHeaders) { + streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } final Duration inactivityGap = Duration.ofMillis(50); builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index b58dd655a2db9..1681a7cceb5c1 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -76,7 +76,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.CsvSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -199,8 +199,8 @@ public void shutdown() throws Exception { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldRestoreNullRecord(final boolean useNewProtocol) throws Exception { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldRestoreNullRecord(final boolean useNewProtocol, final boolean withHeaders) throws Exception { final StreamsBuilder builder = new StreamsBuilder(); final String applicationId = appId; @@ -220,6 +220,9 @@ public void shouldRestoreNullRecord(final boolean useNewProtocol) throws Excepti if (useNewProtocol) { streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); } + if (withHeaders) { + streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } CLUSTER.createTopics(inputTopic); CLUSTER.createTopics(outputTopic); @@ -269,8 +272,8 @@ public void shouldRestoreNullRecord(final boolean useNewProtocol) throws Excepti } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean useNewProtocol) throws Exception { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean useNewProtocol, final boolean withHeaders) throws Exception { final AtomicInteger numReceived = new AtomicInteger(0); final Topology topology = new Topology(); @@ -278,6 +281,9 @@ public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean useN if (useNewProtocol) { props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); } + if (withHeaders) { + props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } // restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions final int offsetLimitDelta = 1000; @@ -330,8 +336,8 @@ public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean useN } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean useNewProtocol) throws Exception { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean useNewProtocol, final boolean withHeaders) throws Exception { final AtomicInteger numReceived = new AtomicInteger(0); final StreamsBuilder builder = new StreamsBuilder(); @@ -340,6 +346,9 @@ public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean useNew if (useNewProtocol) { props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); } + if (withHeaders) { + props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } // restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions final int offsetLimitDelta = 1000; @@ -395,8 +404,8 @@ public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean useNew } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldRestoreStateFromChangelogTopic(final boolean useNewProtocol) throws Exception { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldRestoreStateFromChangelogTopic(final boolean useNewProtocol, final boolean withHeaders) throws Exception { final String changelog = appId + "-store-changelog"; CLUSTER.createTopic(changelog, 2, 1); @@ -408,6 +417,9 @@ public void shouldRestoreStateFromChangelogTopic(final boolean useNewProtocol) t if (useNewProtocol) { props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); } + if (withHeaders) { + props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } // restoring from 1000 to 5000, and then process from 5000 to 10000 on each of the two partitions final int offsetCheckpointed = 1000; @@ -446,8 +458,8 @@ public void shouldRestoreStateFromChangelogTopic(final boolean useNewProtocol) t } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean useNewProtocol) throws InterruptedException { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean useNewProtocol, final boolean withHeaders) { final StreamsBuilder builder = new StreamsBuilder(); final KStream stream = builder.stream(inputStream); @@ -461,6 +473,9 @@ public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean useNewProto if (useNewProtocol) { props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); } + if (withHeaders) { + props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } kafkaStreams = new KafkaStreams(builder.build(), props); try { startApplicationAndWaitUntilRunning(kafkaStreams); @@ -470,8 +485,8 @@ public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean useNewProto } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean useNewProtocol) throws InterruptedException { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean useNewProtocol, final boolean withHeaders) throws InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronously(inputStream, asList(KeyValue.pair(1, 1), KeyValue.pair(2, 2), @@ -504,6 +519,9 @@ public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean useNewP if (useNewProtocol) { props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); } + if (withHeaders) { + props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } kafkaStreams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); @@ -520,8 +538,8 @@ public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean useNewP } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final boolean useNewProtocol) throws Exception { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final boolean useNewProtocol, final boolean withHeaders) throws Exception { final StreamsBuilder builder = new StreamsBuilder(); builder.table( inputStream, @@ -539,6 +557,9 @@ public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(f if (useNewProtocol) { props1.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); } + if (withHeaders) { + props1.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } purgeLocalStreamsState(props1); final KafkaStreams streams1 = new KafkaStreams(builder.build(), props1); @@ -548,6 +569,9 @@ public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(f if (useNewProtocol) { props2.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); } + if (withHeaders) { + props2.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } purgeLocalStreamsState(props2); final KafkaStreams streams2 = new KafkaStreams(builder.build(), props2); @@ -603,8 +627,8 @@ public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(f } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldInvokeUserDefinedGlobalStateRestoreListener(final boolean useNewProtocol) throws Exception { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldInvokeUserDefinedGlobalStateRestoreListener(final boolean useNewProtocol, final boolean withHeaders) throws Exception { final String inputTopic = "inputTopic"; final String outputTopic = "outputTopic"; CLUSTER.createTopic(inputTopic, 5, 1); @@ -633,7 +657,7 @@ public void shouldInvokeUserDefinedGlobalStateRestoreListener(final boolean useN sendEvents(inputTopic, sampleData); - kafkaStreams = startKafkaStreams(builder, null, kafkaStreams1Configuration, useNewProtocol); + kafkaStreams = startKafkaStreams(builder, null, kafkaStreams1Configuration, useNewProtocol, withHeaders); validateReceivedMessages(sampleData, outputTopic); @@ -642,7 +666,7 @@ public void shouldInvokeUserDefinedGlobalStateRestoreListener(final boolean useN IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations); final TestStateRestoreListener kafkaStreams1StateRestoreListener = new TestStateRestoreListener("ks1", RESTORATION_DELAY); - kafkaStreams = startKafkaStreams(builder, kafkaStreams1StateRestoreListener, kafkaStreams1Configuration, useNewProtocol); + kafkaStreams = startKafkaStreams(builder, kafkaStreams1StateRestoreListener, kafkaStreams1Configuration, useNewProtocol, withHeaders); // Ensure all the restoring tasks are in active state before starting the new instance. // Otherwise, the tasks which assigned to first kafka streams won't encounter "restoring suspend" after being reassigned to the second instance. @@ -659,7 +683,8 @@ public void shouldInvokeUserDefinedGlobalStateRestoreListener(final boolean useN try (final KafkaStreams kafkaStreams2 = startKafkaStreams(builder, kafkaStreams2StateRestoreListener, kafkaStreams2Configuration, - useNewProtocol)) { + useNewProtocol, + withHeaders)) { waitForCondition(() -> State.RUNNING == kafkaStreams2.state(), 90_000, @@ -675,8 +700,8 @@ public void shouldInvokeUserDefinedGlobalStateRestoreListener(final boolean useN } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldRecordRestoreMetrics(final boolean useNewProtocol) throws Exception { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldRecordRestoreMetrics(final boolean useNewProtocol, final boolean withHeaders) throws Exception { final AtomicInteger numReceived = new AtomicInteger(0); final StreamsBuilder builder = new StreamsBuilder(); @@ -685,6 +710,9 @@ public void shouldRecordRestoreMetrics(final boolean useNewProtocol) throws Exce if (useNewProtocol) { props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); } + if (withHeaders) { + props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); @@ -745,11 +773,15 @@ private void validateReceivedMessages(final List> exp private KafkaStreams startKafkaStreams(final StreamsBuilder streamsBuilder, final StateRestoreListener stateRestoreListener, final Map extraConfiguration, - final boolean useNewProtocol) { + final boolean useNewProtocol, + final boolean withHeaders) { final Properties streamsConfiguration = props(mkObjectProperties(extraConfiguration)); if (useNewProtocol) { streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); } + if (withHeaders) { + streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } final KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfiguration); kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java index d77355831bc00..338b1b6529cdf 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java @@ -46,7 +46,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.CsvSource; import java.io.IOException; import java.time.Duration; @@ -146,12 +146,15 @@ private interface MetricsVerifier { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir(final boolean streamsProtocolEnabled, final TestInfo testInfo) throws Exception { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir(final boolean streamsProtocolEnabled, final boolean withHeaders, final TestInfo testInfo) throws Exception { final Properties streamsConfiguration = streamsConfig(testInfo); if (streamsProtocolEnabled) { streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); } + if (withHeaders) { + streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); final StreamsBuilder builder = builderForStateStores(); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java index 8c8ef3dae9c7d..c83dfe6d10dcd 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java @@ -42,7 +42,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.CsvSource; import java.io.IOException; import java.time.Duration; @@ -90,7 +90,7 @@ public void after() { client2.close(Duration.ofSeconds(60)); } - private Properties streamsConfiguration(final boolean streamsProtocolEnabled) { + private Properties streamsConfiguration(final boolean streamsProtocolEnabled, final boolean withHeaders) { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -103,12 +103,15 @@ private Properties streamsConfiguration(final boolean streamsProtocolEnabled) { } else { streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); } + if (withHeaders) { + streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } return streamsConfiguration; } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final boolean streamsProtocolEnabled) throws Exception { + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception { final StreamsBuilder builder = new StreamsBuilder(); final String stateStoreName = "myTransformState"; final StoreBuilder> keyValueStoreBuilder = @@ -127,7 +130,7 @@ public void process(final Record record) { final Topology topology = builder.build(); - createClients(topology, streamsConfiguration(streamsProtocolEnabled), topology, streamsConfiguration(streamsProtocolEnabled)); + createClients(topology, streamsConfiguration(streamsProtocolEnabled, withHeaders), topology, streamsConfiguration(streamsProtocolEnabled, withHeaders)); setStateListenersForVerification(thread -> thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty()); @@ -139,11 +142,11 @@ public void process(final Record record) { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(final boolean streamsProtocolEnabled) throws Exception { - final Properties streamsConfiguration1 = streamsConfiguration(streamsProtocolEnabled); + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(final boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception { + final Properties streamsConfiguration1 = streamsConfiguration(streamsProtocolEnabled, withHeaders); streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); - final Properties streamsConfiguration2 = streamsConfiguration(streamsProtocolEnabled); + final Properties streamsConfiguration2 = streamsConfiguration(streamsProtocolEnabled, withHeaders); streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final StreamsBuilder builder = new StreamsBuilder(); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index 6f29b7b81f515..d588f607263fc 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -56,7 +56,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.CsvSource; import java.io.IOException; import java.time.Duration; @@ -114,14 +114,14 @@ public static void closeCluster() { private Properties properties; - private Properties basicProps(final boolean streamsRebalanceProtocolEnabled) { + private Properties basicProps(final boolean streamsRebalanceProtocolEnabled, final boolean withHeaders) { final String protocol; if (streamsRebalanceProtocolEnabled) { protocol = GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()); } else { protocol = GroupProtocol.CLASSIC.name().toLowerCase(Locale.getDefault()); } - return mkObjectProperties( + final Properties props = mkObjectProperties( mkMap( mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), @@ -133,6 +133,10 @@ private Properties basicProps(final boolean streamsRebalanceProtocolEnabled) { mkEntry(StreamsConfig.GROUP_PROTOCOL_CONFIG, protocol) ) ); + if (withHeaders) { + props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } + return props; } @BeforeEach @@ -154,9 +158,9 @@ public void teardown() throws IOException { } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void shouldShutdownClient(final boolean streamsRebalanceProtocolEnabled) throws Exception { - properties = basicProps(streamsRebalanceProtocolEnabled); + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldShutdownClient(final boolean streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception { + properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders); try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { kafkaStreams.setUncaughtExceptionHandler(exception -> SHUTDOWN_CLIENT); @@ -171,38 +175,38 @@ public void shouldShutdownClient(final boolean streamsRebalanceProtocolEnabled) } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void shouldReplaceThreads(final boolean streamsRebalanceProtocolEnabled) throws Exception { - properties = basicProps(streamsRebalanceProtocolEnabled); + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldReplaceThreads(final boolean streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception { + properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders); testReplaceThreads(2); } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void shouldReplaceThreadsWithoutJavaHandler(final boolean streamsRebalanceProtocolEnabled) throws Exception { - properties = basicProps(streamsRebalanceProtocolEnabled); + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldReplaceThreadsWithoutJavaHandler(final boolean streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception { + properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders); Thread.setDefaultUncaughtExceptionHandler((t, e) -> fail("exception thrown")); testReplaceThreads(2); } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void shouldReplaceSingleThread(final boolean streamsRebalanceProtocolEnabled) throws Exception { - properties = basicProps(streamsRebalanceProtocolEnabled); + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldReplaceSingleThread(final boolean streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception { + properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders); testReplaceThreads(1); } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void shouldShutdownMultipleThreadApplication(final boolean streamsRebalanceProtocolEnabled) throws Exception { - properties = basicProps(streamsRebalanceProtocolEnabled); + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldShutdownMultipleThreadApplication(final boolean streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception { + properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders); testShutdownApplication(2); } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void shouldShutdownSingleThreadApplication(final boolean streamsRebalanceProtocolEnabled) throws Exception { - properties = basicProps(streamsRebalanceProtocolEnabled); + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldShutdownSingleThreadApplication(final boolean streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception { + properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders); testShutdownApplication(1); } @@ -234,9 +238,9 @@ public void process(final Record record) { } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread(final boolean streamsRebalanceProtocolEnabled) throws Exception { - properties = basicProps(streamsRebalanceProtocolEnabled); + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread(final boolean streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception { + properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders); builder.addGlobalStore( new KeyValueStoreBuilder<>( Stores.persistentKeyValueStore("globalStore"), @@ -263,9 +267,9 @@ public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread(final b } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void shouldEmitSameRecordAfterFailover(final boolean streamsRebalanceProtocolEnabled) throws Exception { - properties = basicProps(streamsRebalanceProtocolEnabled); + @CsvSource({"false, false", "false, true", "true, false", "true, true"}) + public void shouldEmitSameRecordAfterFailover(final boolean streamsRebalanceProtocolEnabled, final boolean withHeaders) throws Exception { + properties = basicProps(streamsRebalanceProtocolEnabled, withHeaders); properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java index 7cb8108a2de39..dbb703b35e5e8 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java @@ -47,9 +47,10 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,8 +102,9 @@ public static void closeCluster() { private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); private static final long COMMIT_INTERVAL = 100L; - @Test - public void shouldRecoverBufferAfterShutdown(final TestInfo testInfo) { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldRecoverBufferAfterShutdown(final boolean withHeaders, final TestInfo testInfo) { final String testId = safeUniqueTestName(testInfo); final String appId = "appId_" + testId; final String input = "input" + testId; @@ -149,6 +151,7 @@ public void shouldRecoverBufferAfterShutdown(final TestInfo testInfo) { )); streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); + IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfig, withHeaders); KafkaStreams driver = getStartedStreams(streamsConfig, builder, true); try { diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index 3d7c141129f7c..8c4000b900290 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -48,8 +48,9 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.time.Instant; @@ -113,8 +114,9 @@ private static KTable buildCountsTable(final String input, final S .count(Materialized.>as("counts").withCachingDisabled()); } - @Test - public void shouldUseDefaultSerdes() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldUseDefaultSerdes(final boolean withHeaders) { final String testId = "-shouldInheritSerdes"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; @@ -140,7 +142,7 @@ public void shouldUseDefaultSerdes() { .toStream() .to(outputRaw); - final Properties streamsConfig = getStreamsConfig(appId); + final Properties streamsConfig = getStreamsConfig(appId, withHeaders); streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); @@ -165,8 +167,9 @@ public void shouldUseDefaultSerdes() { } } - @Test - public void shouldInheritSerdes() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldInheritSerdes(final boolean withHeaders) { final String testId = "-shouldInheritSerdes"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; @@ -193,7 +196,7 @@ public void shouldInheritSerdes() { .toStream() .to(outputRaw); - final Properties streamsConfig = getStreamsConfig(appId); + final Properties streamsConfig = getStreamsConfig(appId, withHeaders); streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); @@ -246,8 +249,9 @@ private static boolean waitForAnyRecord(final String topic) { } } - @Test - public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedException { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldShutdownWhenRecordConstraintIsViolated(final boolean withHeaders) throws InterruptedException { final String testId = "-shouldShutdownWhenRecordConstraintIsViolated"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; @@ -268,7 +272,8 @@ public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedExc .toStream() .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - final Properties streamsConfig = getStreamsConfig(appId); + final Properties streamsConfig = getStreamsConfig(appId, withHeaders); + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); try { produceSynchronously( @@ -287,8 +292,9 @@ public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedExc } } - @Test - public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedException { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldShutdownWhenBytesConstraintIsViolated(final boolean withHeaders) throws InterruptedException { final String testId = "-shouldShutdownWhenBytesConstraintIsViolated"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; @@ -310,7 +316,8 @@ public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedExce .toStream() .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - final Properties streamsConfig = getStreamsConfig(appId); + final Properties streamsConfig = getStreamsConfig(appId, withHeaders); + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); try { produceSynchronously( @@ -329,8 +336,9 @@ public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedExce } } - @Test - public void shouldAllowOverridingChangelogConfig() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldAllowOverridingChangelogConfig(final boolean withHeaders) { final String testId = "-shouldAllowOverridingChangelogConfig"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; @@ -360,7 +368,7 @@ public void shouldAllowOverridingChangelogConfig() { .toStream() .to(outputRaw); - final Properties streamsConfig = getStreamsConfig(appId); + final Properties streamsConfig = getStreamsConfig(appId, withHeaders); streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); @@ -389,8 +397,9 @@ public void shouldAllowOverridingChangelogConfig() { } } - @Test - public void shouldCreateChangelogByDefault() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldCreateChangelogByDefault(final boolean withHeaders) { final String testId = "-shouldCreateChangelogByDefault"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; @@ -418,7 +427,7 @@ public void shouldCreateChangelogByDefault() { .toStream() .to(outputRaw); - final Properties streamsConfig = getStreamsConfig(appId); + final Properties streamsConfig = getStreamsConfig(appId, withHeaders); streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); @@ -445,8 +454,9 @@ public void shouldCreateChangelogByDefault() { } } - @Test - public void shouldAllowDisablingChangelog() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldAllowDisablingChangelog(final boolean withHeaders) { final String testId = "-shouldAllowDisablingChangelog"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; @@ -474,7 +484,7 @@ public void shouldAllowDisablingChangelog() { .toStream() .to(outputRaw); - final Properties streamsConfig = getStreamsConfig(appId); + final Properties streamsConfig = getStreamsConfig(appId, withHeaders); streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); @@ -506,8 +516,8 @@ public void shouldAllowDisablingChangelog() { } } - private static Properties getStreamsConfig(final String appId) { - return mkProperties(mkMap( + private static Properties getStreamsConfig(final String appId, final boolean withHeaders) { + final Properties props = mkProperties(mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), @@ -515,6 +525,8 @@ private static Properties getStreamsConfig(final String appId) { mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE), mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) )); + IntegrationTestUtils.maybeSetDslStoreFormatHeaders(props, withHeaders); + return props; } /** diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 832e8eb1a0691..079565ea97dca 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -280,6 +280,20 @@ public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster cluster } } + /** + * Configures the DSL store format to use headers if enabled. + * This is a helper method to reduce boilerplate in parameterized tests that test both + * with and without headers mode. + * + * @param streamsConfig The streams configuration properties to modify + * @param withHeaders Whether to enable headers mode + */ + public static void maybeSetDslStoreFormatHeaders(final Properties streamsConfig, final boolean withHeaders) { + if (withHeaders) { + streamsConfig.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS); + } + } + /** * @param topic Kafka topic to write the data records to * @param records Data records to write to Kafka