Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
Expand All @@ -35,7 +36,8 @@
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.time.Duration;
import java.time.Instant;
Expand All @@ -56,8 +58,10 @@ public class KStreamKStreamLeftJoinTest {
private static final Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

@SuppressWarnings("deprecation") // old join semantics; can be removed when `JoinWindows.of()` is removed
@Test
public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testLeftJoinWithSpuriousResultFixDisabledOldApi(final boolean withHeaders) {
setDslStoreFormat(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();

final int[] expectedKeys = new int[] {0, 1, 2, 3};
Expand Down Expand Up @@ -116,8 +120,10 @@ public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {
}

@SuppressWarnings("deprecation") // old join semantics; can be removed when `JoinWindows.of()` is removed
@Test
public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledOldApi() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledOldApi(final boolean withHeaders) {
setDslStoreFormat(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();

final KStream<Integer, String> stream1;
Expand Down Expand Up @@ -158,8 +164,10 @@ public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledOldApi() {
}
}

@Test
public void testLeftJoinDuplicates() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testLeftJoinDuplicates(final boolean withHeaders) {
setDslStoreFormat(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();

final KStream<Integer, String> stream1;
Expand Down Expand Up @@ -214,8 +222,10 @@ public void testLeftJoinDuplicates() {
}
}

@Test
public void testLeftExpiredNonJoinedRecordsAreEmittedByTheLeftProcessor() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testLeftExpiredNonJoinedRecordsAreEmittedByTheLeftProcessor(final boolean withHeaders) {
setDslStoreFormat(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();

final KStream<Integer, String> stream1;
Expand Down Expand Up @@ -269,8 +279,10 @@ public void testLeftExpiredNonJoinedRecordsAreEmittedByTheLeftProcessor() {
}
}

@Test
public void testLeftExpiredNonJoinedRecordsAreEmittedByTheRightProcessor() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testLeftExpiredNonJoinedRecordsAreEmittedByTheRightProcessor(final boolean withHeaders) {
setDslStoreFormat(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();

final KStream<Integer, String> stream1;
Expand Down Expand Up @@ -324,8 +336,10 @@ public void testLeftExpiredNonJoinedRecordsAreEmittedByTheRightProcessor() {
}
}

@Test
public void testRightNonJoinedRecordsAreNeverEmittedByTheLeftProcessor() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testRightNonJoinedRecordsAreNeverEmittedByTheLeftProcessor(final boolean withHeaders) {
setDslStoreFormat(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();

final KStream<Integer, String> stream1;
Expand Down Expand Up @@ -376,8 +390,10 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheLeftProcessor() {
}
}

@Test
public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor(final boolean withHeaders) {
setDslStoreFormat(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();

final KStream<Integer, String> stream1;
Expand Down Expand Up @@ -428,8 +444,10 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
}
}

@Test
public void testLeftJoinedRecordsWithZeroAfterAreEmitted() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testLeftJoinedRecordsWithZeroAfterAreEmitted(final boolean withHeaders) {
setDslStoreFormat(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();

final int[] expectedKeys = new int[] {0, 1, 2, 3};
Expand Down Expand Up @@ -606,8 +624,9 @@ public void testLeftJoinedRecordsWithZeroAfterAreEmitted() {
}
}

@Test
public void testLeftJoinWithInMemoryCustomSuppliers() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testLeftJoinWithInMemoryCustomSuppliers(final boolean withHeaders) {
final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L));

final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
Expand All @@ -620,19 +639,22 @@ public void testLeftJoinWithInMemoryCustomSuppliers() {

final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());

runLeftJoin(streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows);
runLeftJoin(streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows, withHeaders);
}

@Test
public void testLeftJoinWithDefaultSuppliers() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testLeftJoinWithDefaultSuppliers(final boolean withHeaders) {
final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L));
final StreamJoined<Integer, String, String> streamJoined = StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String());

runLeftJoin(streamJoined, joinWindows);
runLeftJoin(streamJoined, joinWindows, withHeaders);
}

public void runLeftJoin(final StreamJoined<Integer, String, String> streamJoined,
final JoinWindows joinWindows) {
final JoinWindows joinWindows,
final boolean withHeaders) {
setDslStoreFormat(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();

final int[] expectedKeys = new int[] {0, 1, 2, 3};
Expand Down Expand Up @@ -744,8 +766,10 @@ public void runLeftJoin(final StreamJoined<Integer, String, String> streamJoined
}
}

@Test
public void testOrdering() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testOrdering(final boolean withHeaders) {
setDslStoreFormat(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();

final KStream<Integer, String> stream1;
Expand Down Expand Up @@ -795,8 +819,10 @@ public void testOrdering() {
}
}

@Test
public void testGracePeriod() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testGracePeriod(final boolean withHeaders) {
setDslStoreFormat(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[] {0, 1, 2, 3};

Expand Down Expand Up @@ -867,8 +893,10 @@ public void testGracePeriod() {
}
}

@Test
public void testWindowing() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testWindowing(final boolean withHeaders) {
setDslStoreFormat(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[] {0, 1, 2, 3};

Expand Down Expand Up @@ -928,8 +956,10 @@ public void testWindowing() {
}
}

@Test
public void shouldNotEmitLeftJoinResultForAsymmetricWindow() {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void shouldNotEmitLeftJoinResultForAsymmetricWindow(final boolean withHeaders) {
setDslStoreFormat(withHeaders);
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[] {0, 1, 2, 3};

Expand Down Expand Up @@ -1311,4 +1341,19 @@ private void testLowerWindowBound(final int[] expectedKeys,
inputTopic1.pipeInput(0, "dummy", time + 300L);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "dummy+null", 1203L));
}

/**
* Configures the DSL store format to use headers if enabled.
* This is a helper method to reduce boilerplate in parameterized tests that test both
* with and without headers mode.
*
* @param withHeaders Whether to enable headers mode
*/
private void setDslStoreFormat(final boolean withHeaders) {
if (withHeaders) {
PROPS.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS);
} else {
PROPS.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_DEFAULT);
}
}
}
Loading
Loading