Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -136,12 +137,13 @@ public void after() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}

@Test
public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldKStreamGlobalKTableLeftJoin(final boolean withHeaders) throws Exception {
final KStream<String, String> streamTableJoin = stream.leftJoin(globalTable, keyMapper, joiner);
streamTableJoin.foreach(foreachAction);
produceInitialGlobalTableValues();
startStreams();
startStreams(withHeaders);
produceTopicValues(streamTopic);

final Map<String, String> expected = new HashMap<>();
Expand Down Expand Up @@ -207,12 +209,13 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
);
}

@Test
public void shouldKStreamGlobalKTableJoin() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldKStreamGlobalKTableJoin(final boolean withHeaders) throws Exception {
final KStream<String, String> streamTableJoin = stream.join(globalTable, keyMapper, joiner);
streamTableJoin.foreach(foreachAction);
produceInitialGlobalTableValues();
startStreams();
startStreams(withHeaders);
produceTopicValues(streamTopic);

final Map<String, String> expected = new HashMap<>();
Expand Down Expand Up @@ -277,11 +280,12 @@ public void shouldKStreamGlobalKTableJoin() throws Exception {
);
}

@Test
public void shouldRestoreTransactionalMessages() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldRestoreTransactionalMessages(final boolean withHeaders) throws Exception {
produceInitialGlobalTableValues();

startStreams();
startStreams(withHeaders);

final Map<Long, String> expected = new HashMap<>();
expected.put(1L, "A");
Expand Down Expand Up @@ -309,13 +313,14 @@ public void shouldRestoreTransactionalMessages() throws Exception {
);
}

@Test
public void shouldNotRestoreAbortedMessages() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldNotRestoreAbortedMessages(final boolean withHeaders) throws Exception {
produceAbortedMessages();
produceInitialGlobalTableValues();
produceAbortedMessages();

startStreams();
startStreams(withHeaders);

final Map<Long, String> expected = new HashMap<>();
expected.put(1L, "A");
Expand Down Expand Up @@ -350,7 +355,10 @@ private void createTopics(final String safeTestName) throws Exception {
CLUSTER.createTopic(globalTableTopic, 2, 1);
}

private void startStreams() {
private void startStreams(final boolean withHeaders) {
if (withHeaders) {
streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS);
}
startStreams(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -139,8 +140,10 @@ public void whenShuttingDown() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}

@Test
public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldKStreamGlobalKTableLeftJoin(final boolean withHeaders) throws Exception {
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders);
final KStream<String, String> streamTableJoin = stream.leftJoin(globalTable, keyMapper, joiner);
streamTableJoin.process(supplier);
produceInitialGlobalTableValues();
Expand Down Expand Up @@ -224,8 +227,10 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
"waiting for final values");
}

@Test
public void shouldKStreamGlobalKTableJoin() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldKStreamGlobalKTableJoin(final boolean withHeaders) throws Exception {
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders);
final KStream<String, String> streamTableJoin = stream.join(globalTable, keyMapper, joiner);
streamTableJoin.process(supplier);
produceInitialGlobalTableValues();
Expand Down Expand Up @@ -309,8 +314,10 @@ public void shouldKStreamGlobalKTableJoin() throws Exception {
"waiting for final values");
}

@Test
public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldRestoreGlobalInMemoryKTableOnRestart(final boolean withHeaders) throws Exception {
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders);
builder = new StreamsBuilder();
globalTable = builder.globalTable(
globalTableTopic,
Expand Down Expand Up @@ -340,8 +347,10 @@ public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception {
assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
}

@Test
public void shouldGetToRunningWithOnlyGlobalTopology() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldGetToRunningWithOnlyGlobalTopology(final boolean withHeaders) throws Exception {
IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders);
builder = new StreamsBuilder();
globalTable = builder.globalTable(
globalTableTopic,
Expand Down Expand Up @@ -375,31 +384,37 @@ public void process(final Record<Long, String> record) {
);
}

@Test
public void testProcessingExceptionHandlerContinueEnabledRestorationPhase() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testProcessingExceptionHandlerContinueEnabledRestorationPhase(final boolean withHeaders) throws Exception {
createBuilderWithFailedProcessor();
// enable processing exception handler invoked config
TestGlobalProcessingExceptionHandler.shouldResume = true;
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG, true);
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);

IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders);

produceInitialGlobalTableValues();
startStreams();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, Duration.ofSeconds(30));

assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
}

@Test
public void testProcessingExceptionHandlerFailEnabledRestorationPhase() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testProcessingExceptionHandlerFailEnabledRestorationPhase(final boolean withHeaders) throws Exception {
createBuilderWithFailedProcessor();
// enable processing exception handler invoked config
TestGlobalProcessingExceptionHandler.shouldResume = false;
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG, true);
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);

IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders);

produceInitialGlobalTableValues();
assertThrows(StreamsException.class, () -> {
startStreams();
Expand All @@ -408,15 +423,18 @@ public void testProcessingExceptionHandlerFailEnabledRestorationPhase() throws E

}

@Test
public void testProcessingExceptionHandlerDisabledRestorationPhase() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testProcessingExceptionHandlerDisabledRestorationPhase(final boolean withHeaders) throws Exception {
createBuilderWithFailedProcessor();
// disable processing exception handler invoked config
TestGlobalProcessingExceptionHandler.shouldResume = false;
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG, false);
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);

IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders);

produceInitialGlobalTableValues();
assertThrows(StreamsException.class, () -> {
startStreams();
Expand All @@ -425,15 +443,18 @@ public void testProcessingExceptionHandlerDisabledRestorationPhase() throws Exce

}

@Test
public void testProcessingExceptionHandlerContinueEnabledRunTimePhase() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testProcessingExceptionHandlerContinueEnabledRunTimePhase(final boolean withHeaders) throws Exception {
createBuilderWithFailedProcessor();
// enable processing exception handler invoked config
TestGlobalProcessingExceptionHandler.shouldResume = true;
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG, true);
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);

IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders);

startStreams();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, Duration.ofSeconds(30));
produceInitialGlobalTableValues();
Expand All @@ -445,29 +466,35 @@ public void testProcessingExceptionHandlerContinueEnabledRunTimePhase() throws E
);
}

@Test
public void testProcessingExceptionHandlerFailEnabledRunTimePhase() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testProcessingExceptionHandlerFailEnabledRunTimePhase(final boolean withHeaders) throws Exception {
createBuilderWithFailedProcessor();
// enable processing exception handler invoked config
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG, true);
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);

IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders);

startStreams();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, Duration.ofSeconds(30));
produceInitialGlobalTableValues();
waitForApplicationState(singletonList(kafkaStreams), State.ERROR, Duration.ofSeconds(30));
assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
}

@Test
public void testProcessingExceptionHandlerDisabledRunTimePhase() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testProcessingExceptionHandlerDisabledRunTimePhase(final boolean withHeaders) throws Exception {
createBuilderWithFailedProcessor();
// disable processing exception handler invoked config
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG, false);
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
TestGlobalProcessingExceptionHandler.class);

IntegrationTestUtils.maybeSetDslStoreFormatHeaders(streamsConfiguration, withHeaders);

startStreams();
waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, Duration.ofSeconds(30));
produceInitialGlobalTableValues();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.params.provider.CsvSource;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -149,22 +149,25 @@ private Admin createAdminClient() {
return Admin.create(adminClientConfig);
}

private void configureStreams(final boolean streamsProtocolEnabled, final String appID) {
private void configureStreams(final boolean streamsProtocolEnabled, final boolean withHeaders, final String appID) {
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
if (streamsProtocolEnabled) {
streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
}
if (withHeaders) {
streamsProp.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS);
}
}

/*
* This test just ensures that the assignor does not get stuck during partition number resolution
* for internal repartition topics. See KAFKA-10689
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldGetToRunningWithWindowedTableInFKJ(final boolean streamsProtocolEnabled) throws Exception {
final String appID = APP_ID + "-windowed-FKJ-" + streamsProtocolEnabled;
configureStreams(streamsProtocolEnabled, appID);
@CsvSource({"false, false", "false, true", "true, false", "true, true"})
public void shouldGetToRunningWithWindowedTableInFKJ(final boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception {
final String appID = APP_ID + "-windowed-FKJ-" + streamsProtocolEnabled + "-" + withHeaders;
configureStreams(streamsProtocolEnabled, withHeaders, appID);

final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, String> inputTopic = streamsBuilder.stream(DEFAULT_INPUT_TOPIC);
Expand All @@ -191,10 +194,10 @@ public void shouldGetToRunningWithWindowedTableInFKJ(final boolean streamsProtoc


@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldCompactTopicsForKeyValueStoreChangelogs(final boolean streamsProtocolEnabled) throws Exception {
final String appID = APP_ID + "-compact-" + streamsProtocolEnabled;
configureStreams(streamsProtocolEnabled, appID);
@CsvSource({"false, false", "false, true", "true, false", "true, true"})
public void shouldCompactTopicsForKeyValueStoreChangelogs(final boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception {
final String appID = APP_ID + "-compact-" + streamsProtocolEnabled + "-" + withHeaders;
configureStreams(streamsProtocolEnabled, withHeaders, appID);

//
// Step 1: Configure and start a simple word count topology
Expand Down Expand Up @@ -229,10 +232,10 @@ public void shouldCompactTopicsForKeyValueStoreChangelogs(final boolean streamsP
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs(final boolean streamsProtocolEnabled) throws Exception {
final String appID = APP_ID + "-compact-delete-" + streamsProtocolEnabled;
configureStreams(streamsProtocolEnabled, appID);
@CsvSource({"false, false", "false, true", "true, false", "true, true"})
public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs(final boolean streamsProtocolEnabled, final boolean withHeaders) throws Exception {
final String appID = APP_ID + "-compact-delete-" + streamsProtocolEnabled + "-" + withHeaders;
configureStreams(streamsProtocolEnabled, withHeaders, appID);

//
// Step 1: Configure and start a simple word count topology
Expand Down
Loading
Loading