diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index defa1e51e55a7..58e7723551575 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -202,7 +202,10 @@ files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest|KafkaStreamsTest|EosIntegrationTest|RestoreIntegrationTest).java"/> + files="(EosIntegrationTest|KStreamKStreamJoinTest|KStreamKStreamLeftJoinTest|KStreamKStreamOuterJoinTest|KStreamSlidingWindowAggregateTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBWindowStoreTest|TopologyTestDriverTest).java"/> + + @@ -211,7 +214,7 @@ files="(KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|IQv2StoreIntegrationTest|StreamsConfigTest).java"/> + files="(KStreamKStreamJoinTest|StreamTaskTest|StreamThreadTest|TaskManagerTest|TopologyTestDriverTest).java"/> @@ -219,9 +222,6 @@ - - diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index d567d525dc39a..dfa2a2e2bd0bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -712,12 +712,11 @@ public static StoreBuilder> sessionStoreBuilder( * @return an instance of {@link StoreBuilder} than can build a {@link SessionStoreWithHeaders} */ public static StoreBuilder> sessionStoreWithHeadersBuilder( - final SessionBytesStoreSupplier supplier, - final Serde keySerde, - final Serde valueSerde + final SessionBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde ) { Objects.requireNonNull(supplier, "supplier cannot be null"); return new SessionStoreWithHeadersBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } - } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java index 5285b97475918..b4ce0a01cd301 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java @@ -49,29 +49,24 @@ public List stores(final String storeName, final QueryableStoreType qu } if (store instanceof TimestampedKeyValueStoreWithHeaders) { if (queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) { - return (List) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders) store, ValueConverters.extractValueFromHeaders())); + return (List) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders) store, ValueConverters.extractValueFromHeaders())); } else if (queryableStoreType instanceof QueryableStoreTypes.TimestampedKeyValueStoreType) { - return (List) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders) store, ValueConverters.extractValueAndTimestampFromHeaders())); - } else { - // For custom query types, return the raw store so they can access headers directly - return (List) Collections.singletonList(store); + return (List) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders) store, ValueConverters.extractValueAndTimestampFromHeaders())); } } else if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) { - return (List) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore) store, ValueConverters.extractValue())); + return (List) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore) store, ValueConverters.extractValue())); } else if (store instanceof TimestampedWindowStoreWithHeaders) { if (queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) { - return (List) Collections.singletonList(new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders) store, ValueConverters.extractValueFromHeaders())); + return (List) Collections.singletonList(new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders) store, ValueConverters.extractValueFromHeaders())); } else if (queryableStoreType instanceof QueryableStoreTypes.TimestampedWindowStoreType) { - return (List) Collections.singletonList(new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders) store, ValueConverters.extractValueAndTimestampFromHeaders())); - } else { - // For custom query types, return the raw store so they can access headers directly - return (List) Collections.singletonList(store); + return (List) Collections.singletonList(new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders) store, ValueConverters.extractValueAndTimestampFromHeaders())); } } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) { - return (List) Collections.singletonList(new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore) store, ValueConverters.extractValue())); + return (List) Collections.singletonList(new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore) store, ValueConverters.extractValue())); } else if (store instanceof SessionStoreWithHeaders && queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) { - return (List) Collections.singletonList(new ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders) store)); + return (List) Collections.singletonList(new ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders) store)); } + return (List) Collections.singletonList(store); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index f9dd0250563ab..321b9bf60a2cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -108,22 +108,22 @@ private static T validateAndCastStores(final StateStore store, } if (store instanceof TimestampedKeyValueStoreWithHeaders) { if (queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) { - return (T) new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders) store, ValueConverters.extractValueFromHeaders()); + return (T) new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders) store, ValueConverters.extractValueFromHeaders()); } else if (queryableStoreType instanceof QueryableStoreTypes.TimestampedKeyValueStoreType) { - return (T) new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders) store, ValueConverters.extractValueAndTimestampFromHeaders()); + return (T) new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders) store, ValueConverters.extractValueAndTimestampFromHeaders()); } } else if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) { - return (T) new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore) store, ValueConverters.extractValue()); + return (T) new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore) store, ValueConverters.extractValue()); } else if (store instanceof TimestampedWindowStoreWithHeaders) { if (queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) { - return (T) new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders) store, ValueConverters.extractValueFromHeaders()); + return (T) new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders) store, ValueConverters.extractValueFromHeaders()); } else if (queryableStoreType instanceof QueryableStoreTypes.TimestampedWindowStoreType) { - return (T) new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders) store, ValueConverters.extractValueAndTimestampFromHeaders()); + return (T) new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders) store, ValueConverters.extractValueAndTimestampFromHeaders()); } } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) { - return (T) new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore) store, ValueConverters.extractValue()); + return (T) new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore) store, ValueConverters.extractValue()); } else if (store instanceof SessionStoreWithHeaders && queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) { - return (T) new ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders) store); + return (T) new ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders) store); } return (T) store; diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index ae56c7987765e..b7300f1445477 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -49,13 +49,11 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; -import org.apache.kafka.streams.processor.StandbyUpdateListener.SuspendReason; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.api.ProcessorContext; -import org.apache.kafka.streams.processor.internals.ChangelogRegister; import org.apache.kafka.streams.processor.internals.ClientUtils; import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl; import org.apache.kafka.streams.processor.internals.GlobalStateManager; @@ -111,7 +109,6 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -891,12 +888,13 @@ final boolean isEmpty(final String topic) { * @see #getStateStore(String) * @see #getKeyValueStore(String) * @see #getTimestampedKeyValueStore(String) - * @see #getVersionedKeyValueStore(String) * @see #getTimestampedKeyValueStoreWithHeaders(String) + * @see #getVersionedKeyValueStore(String) * @see #getWindowStore(String) * @see #getTimestampedWindowStore(String) * @see #getTimestampedWindowStoreWithHeaders(String) * @see #getSessionStore(String) + * @see #getSessionStoreWithHeaders(String) */ public Map getAllStateStores() { final Map allStores = new HashMap<>(); @@ -924,12 +922,13 @@ public Map getAllStateStores() { * @see #getAllStateStores() * @see #getKeyValueStore(String) * @see #getTimestampedKeyValueStore(String) - * @see #getVersionedKeyValueStore(String) * @see #getTimestampedKeyValueStoreWithHeaders(String) + * @see #getVersionedKeyValueStore(String) * @see #getWindowStore(String) * @see #getTimestampedWindowStore(String) * @see #getTimestampedWindowStoreWithHeaders(String) * @see #getSessionStore(String) + * @see #getSessionStoreWithHeaders(String) */ public StateStore getStateStore(final String name) throws IllegalArgumentException { return getStateStore(name, true); @@ -1006,12 +1005,13 @@ private void throwIfBuiltInStore(final StateStore stateStore) { * @see #getAllStateStores() * @see #getStateStore(String) * @see #getTimestampedKeyValueStore(String) - * @see #getVersionedKeyValueStore(String) * @see #getTimestampedKeyValueStoreWithHeaders(String) + * @see #getVersionedKeyValueStore(String) * @see #getWindowStore(String) * @see #getTimestampedWindowStore(String) * @see #getTimestampedWindowStoreWithHeaders(String) * @see #getSessionStore(String) + * @see #getSessionStoreWithHeaders(String) */ @SuppressWarnings("unchecked") public KeyValueStore getKeyValueStore(final String name) { @@ -1039,12 +1039,13 @@ public KeyValueStore getKeyValueStore(final String name) { * @see #getAllStateStores() * @see #getStateStore(String) * @see #getKeyValueStore(String) - * @see #getVersionedKeyValueStore(String) * @see #getTimestampedKeyValueStoreWithHeaders(String) + * @see #getVersionedKeyValueStore(String) * @see #getWindowStore(String) * @see #getTimestampedWindowStore(String) * @see #getTimestampedWindowStoreWithHeaders(String) * @see #getSessionStore(String) + * @see #getSessionStoreWithHeaders(String) */ @SuppressWarnings("unchecked") public KeyValueStore> getTimestampedKeyValueStore(final String name) { @@ -1056,53 +1057,55 @@ public KeyValueStore> getTimestampedKeyValueStore } /** - * Get the {@link VersionedKeyValueStore} with the given name. + * Get the {@link TimestampedKeyValueStoreWithHeaders} with the given name. * The store can be a "regular" or global store. *

* This is often useful in test cases to pre-populate the store before the test case instructs the topology to * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward. * * @param name the name of the store - * @return the key value store, or {@code null} if no {@link VersionedKeyValueStore} has been registered with the given name + * @return the key value store, or {@code null} if no {@link TimestampedKeyValueStoreWithHeaders} has been registered with the given name * @see #getAllStateStores() * @see #getStateStore(String) * @see #getKeyValueStore(String) * @see #getTimestampedKeyValueStore(String) - * @see #getTimestampedKeyValueStoreWithHeaders(String) + * @see #getVersionedKeyValueStore(String) * @see #getWindowStore(String) * @see #getTimestampedWindowStore(String) * @see #getTimestampedWindowStoreWithHeaders(String) * @see #getSessionStore(String) + * @see #getSessionStoreWithHeaders(String) */ @SuppressWarnings("unchecked") - public VersionedKeyValueStore getVersionedKeyValueStore(final String name) { + public KeyValueStore> getTimestampedKeyValueStoreWithHeaders(final String name) { final StateStore store = getStateStore(name, false); - return store instanceof VersionedKeyValueStore ? (VersionedKeyValueStore) store : null; + return store instanceof TimestampedKeyValueStoreWithHeaders ? (TimestampedKeyValueStoreWithHeaders) store : null; } /** - * Get the {@link TimestampedKeyValueStoreWithHeaders} with the given name. + * Get the {@link VersionedKeyValueStore} with the given name. * The store can be a "regular" or global store. *

* This is often useful in test cases to pre-populate the store before the test case instructs the topology to * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward. * * @param name the name of the store - * @return the key value store, or {@code null} if no {@link TimestampedKeyValueStoreWithHeaders} has been registered with the given name + * @return the key value store, or {@code null} if no {@link VersionedKeyValueStore} has been registered with the given name * @see #getAllStateStores() * @see #getStateStore(String) * @see #getKeyValueStore(String) * @see #getTimestampedKeyValueStore(String) - * @see #getVersionedKeyValueStore(String) + * @see #getTimestampedKeyValueStoreWithHeaders(String) * @see #getWindowStore(String) * @see #getTimestampedWindowStore(String) * @see #getTimestampedWindowStoreWithHeaders(String) * @see #getSessionStore(String) + * @see #getSessionStoreWithHeaders(String) */ @SuppressWarnings("unchecked") - public KeyValueStore> getTimestampedKeyValueStoreWithHeaders(final String name) { + public VersionedKeyValueStore getVersionedKeyValueStore(final String name) { final StateStore store = getStateStore(name, false); - return store instanceof TimestampedKeyValueStoreWithHeaders ? (TimestampedKeyValueStoreWithHeaders) store : null; + return store instanceof VersionedKeyValueStore ? (VersionedKeyValueStore) store : null; } /** @@ -1123,11 +1126,12 @@ public KeyValueStore> getTimestampedKeyValueS * @see #getStateStore(String) * @see #getKeyValueStore(String) * @see #getTimestampedKeyValueStore(String) - * @see #getVersionedKeyValueStore(String) * @see #getTimestampedKeyValueStoreWithHeaders(String) + * @see #getVersionedKeyValueStore(String) * @see #getTimestampedWindowStore(String) * @see #getTimestampedWindowStoreWithHeaders(String) * @see #getSessionStore(String) + * @see #getSessionStoreWithHeaders(String) */ @SuppressWarnings("unchecked") public WindowStore getWindowStore(final String name) { @@ -1156,10 +1160,12 @@ public WindowStore getWindowStore(final String name) { * @see #getStateStore(String) * @see #getKeyValueStore(String) * @see #getTimestampedKeyValueStore(String) - * @see #getVersionedKeyValueStore(String) * @see #getTimestampedKeyValueStoreWithHeaders(String) + * @see #getVersionedKeyValueStore(String) * @see #getWindowStore(String) + * @see #getTimestampedWindowStoreWithHeaders(String) * @see #getSessionStore(String) + * @see #getSessionStoreWithHeaders(String) */ @SuppressWarnings("unchecked") public WindowStore> getTimestampedWindowStore(final String name) { @@ -1183,11 +1189,12 @@ public WindowStore> getTimestampedWindowStore(fin * @see #getStateStore(String) * @see #getKeyValueStore(String) * @see #getTimestampedKeyValueStore(String) - * @see #getVersionedKeyValueStore(String) * @see #getTimestampedKeyValueStoreWithHeaders(String) + * @see #getVersionedKeyValueStore(String) * @see #getWindowStore(String) * @see #getTimestampedWindowStore(String) * @see #getSessionStore(String) + * @see #getSessionStoreWithHeaders(String) */ @SuppressWarnings("unchecked") public WindowStore> getTimestampedWindowStoreWithHeaders(final String name) { @@ -1208,11 +1215,12 @@ public WindowStore> getTimestampedWindowStore * @see #getStateStore(String) * @see #getKeyValueStore(String) * @see #getTimestampedKeyValueStore(String) + * @see #getTimestampedKeyValueStoreWithHeaders(String) * @see #getVersionedKeyValueStore(String) * @see #getWindowStore(String) * @see #getTimestampedWindowStore(String) - * @see #getTimestampedKeyValueStoreWithHeaders(String) * @see #getTimestampedWindowStoreWithHeaders(String) + * @see #getSessionStoreWithHeaders(String) */ @SuppressWarnings("unchecked") public SessionStore getSessionStore(final String name) { @@ -1223,6 +1231,32 @@ public SessionStore getSessionStore(final String name) { return store instanceof SessionStore ? (SessionStore) store : null; } + /** + * Get the {@link SessionStore} with the given name. + * The store can be a "regular" or global store. + *

+ * This is often useful in test cases to pre-populate the store before the test case instructs the topology to + * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, and/or to check the store afterward. + * + * @param name the name of the store + * @return the key value store, or {@code null} if no {@link SessionStore} has been registered with the given name + * @see #getAllStateStores() + * @see #getStateStore(String) + * @see #getKeyValueStore(String) + * @see #getTimestampedKeyValueStore(String) + * @see #getTimestampedKeyValueStoreWithHeaders(String) + * @see #getVersionedKeyValueStore(String) + * @see #getWindowStore(String) + * @see #getTimestampedWindowStore(String) + * @see #getTimestampedWindowStoreWithHeaders(String) + * @see #getSessionStore(String) + */ + @SuppressWarnings("unchecked") + public SessionStoreWithHeaders getSessionStoreWithHeaders(final String name) { + final StateStore store = getStateStore(name, false); + return store instanceof SessionStoreWithHeaders ? (SessionStoreWithHeaders) store : null; + } + /** * Close the driver, its topology, and all processors. */ @@ -1251,21 +1285,6 @@ public void close() { stateDirectory.clean(); } - static class MockChangelogRegister implements ChangelogRegister { - @Override - public void register(final TopicPartition partition, final ProcessorStateManager stateManager) { } - - @Override - public void register(final Set changelogPartitions, final ProcessorStateManager stateManager) { } - - @Override - public void unregister(final Collection partitions) { } - - @Override - public void unregister(final Collection partitions, - final SuspendReason reason) { } - } - static class MockTime implements Time { private final AtomicLong timeMs; private final AtomicLong highResTimeNs; diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 2dc0089990e69..7555c0783305c 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -24,6 +24,10 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.DoubleSerializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; @@ -386,7 +390,6 @@ private Topology setupGlobalStoreTopology(final String... sourceTopicNames) { () -> new Processor() { KeyValueStore store; - @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { store = context.getStateStore(sourceTopicName + "-globalStore"); @@ -602,6 +605,7 @@ record = processedRecords2.get(0); assertThat(record, equalTo(expectedResult)); } + @SuppressWarnings("resource") @Test public void shouldUseSourceSpecificDeserializers() { final Topology topology = new Topology(); @@ -610,23 +614,23 @@ public void shouldUseSourceSpecificDeserializers() { final String sourceName2 = "source-2"; final String processor = "processor"; - topology.addSource(sourceName1, Serdes.Long().deserializer(), Serdes.String().deserializer(), SOURCE_TOPIC_1); - topology.addSource(sourceName2, Serdes.Integer().deserializer(), Serdes.Double().deserializer(), SOURCE_TOPIC_2); + topology.addSource(sourceName1, new LongDeserializer(), new StringDeserializer(), SOURCE_TOPIC_1); + topology.addSource(sourceName2, new IntegerDeserializer(), new DoubleDeserializer(), SOURCE_TOPIC_2); topology.addProcessor(processor, new MockProcessorSupplier(), sourceName1, sourceName2); topology.addSink( "sink", SINK_TOPIC_1, (topic, data) -> { if (data instanceof Long) { - return Serdes.Long().serializer().serialize(topic, (Long) data); + return new LongSerializer().serialize(topic, (Long) data); } - return Serdes.Integer().serializer().serialize(topic, (Integer) data); + return new IntegerSerializer().serialize(topic, (Integer) data); }, (topic, data) -> { if (data instanceof String) { - return Serdes.String().serializer().serialize(topic, (String) data); + return new StringSerializer().serialize(topic, (String) data); } - return Serdes.Double().serializer().serialize(topic, (Double) data); + return new DoubleSerializer().serialize(topic, (Double) data); }, processor); @@ -642,21 +646,21 @@ public void shouldUseSourceSpecificDeserializers() { testDriver.pipeRecord(SOURCE_TOPIC_1, consumerRecord1, - Serdes.Long().serializer(), - Serdes.String().serializer(), + new LongSerializer(), + new StringSerializer(), Instant.now()); final TestRecord result1 = - testDriver.readRecord(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer()); + testDriver.readRecord(SINK_TOPIC_1, new LongDeserializer(), new StringDeserializer()); assertThat(result1.getKey(), equalTo(source1Key)); assertThat(result1.getValue(), equalTo(source1Value)); testDriver.pipeRecord(SOURCE_TOPIC_2, consumerRecord2, - Serdes.Integer().serializer(), - Serdes.Double().serializer(), + new IntegerSerializer(), + new DoubleSerializer(), Instant.now()); final TestRecord result2 = - testDriver.readRecord(SINK_TOPIC_1, Serdes.Integer().deserializer(), Serdes.Double().deserializer()); + testDriver.readRecord(SINK_TOPIC_1, new IntegerDeserializer(), new DoubleDeserializer()); assertThat(result2.getKey(), equalTo(source2Key)); assertThat(result2.getValue(), equalTo(source2Value)); } @@ -718,10 +722,10 @@ public void shouldUseSinkSpecificSerializers() { final String sourceName1 = "source-1"; final String sourceName2 = "source-2"; - topology.addSource(sourceName1, Serdes.Long().deserializer(), Serdes.String().deserializer(), SOURCE_TOPIC_1); - topology.addSource(sourceName2, Serdes.Integer().deserializer(), Serdes.Double().deserializer(), SOURCE_TOPIC_2); - topology.addSink("sink-1", SINK_TOPIC_1, Serdes.Long().serializer(), Serdes.String().serializer(), sourceName1); - topology.addSink("sink-2", SINK_TOPIC_2, Serdes.Integer().serializer(), Serdes.Double().serializer(), sourceName2); + topology.addSource(sourceName1, new LongDeserializer(), new StringDeserializer(), SOURCE_TOPIC_1); + topology.addSource(sourceName2, new IntegerDeserializer(), new DoubleDeserializer(), SOURCE_TOPIC_2); + topology.addSink("sink-1", SINK_TOPIC_1, new LongSerializer(), new StringSerializer(), sourceName1); + topology.addSink("sink-2", SINK_TOPIC_2, new IntegerSerializer(), new DoubleSerializer(), sourceName2); testDriver = new TopologyTestDriver(topology); @@ -735,21 +739,21 @@ public void shouldUseSinkSpecificSerializers() { testDriver.pipeRecord(SOURCE_TOPIC_1, consumerRecord1, - Serdes.Long().serializer(), - Serdes.String().serializer(), + new LongSerializer(), + new StringSerializer(), Instant.now()); final TestRecord result1 = - testDriver.readRecord(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer()); + testDriver.readRecord(SINK_TOPIC_1, new LongDeserializer(), new StringDeserializer()); assertThat(result1.getKey(), equalTo(source1Key)); assertThat(result1.getValue(), equalTo(source1Value)); testDriver.pipeRecord(SOURCE_TOPIC_2, consumerRecord2, - Serdes.Integer().serializer(), - Serdes.Double().serializer(), + new IntegerSerializer(), + new DoubleSerializer(), Instant.now()); final TestRecord result2 = - testDriver.readRecord(SINK_TOPIC_2, Serdes.Integer().deserializer(), Serdes.Double().deserializer()); + testDriver.readRecord(SINK_TOPIC_2, new IntegerDeserializer(), new DoubleDeserializer()); assertThat(result2.getKey(), equalTo(source2Key)); assertThat(result2.getValue(), equalTo(source2Value)); } @@ -880,8 +884,8 @@ public void shouldReturnAllStores() { Serdes.ByteArray(), Time.SYSTEM).withLoggingDisabled(), "sourceProcessorName", - Serdes.ByteArray().deserializer(), - Serdes.ByteArray().deserializer(), + new ByteArrayDeserializer(), + new ByteArrayDeserializer(), "globalTopicName", "globalProcessorName", voidProcessorSupplier); @@ -911,12 +915,16 @@ public void shouldReturnCorrectInMemoryStoreTypeOnly() { private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) { final String keyValueStoreName = "keyValueStore"; final String timestampedKeyValueStoreName = "keyValueTimestampStore"; + final String timestampedKeyValueStoreWithHeaderName = "keyValueTimestampStoreWithHeaders"; final String versionedKeyValueStoreName = "keyValueVersionedStore"; final String windowStoreName = "windowStore"; final String timestampedWindowStoreName = "windowTimestampStore"; + final String timestampedWindowStoreWithHeadersName = "windowTimestampStoreWithHeaders"; final String sessionStoreName = "sessionStore"; + final String sessionStoreWithHeadersName = "sessionStoreWithHeaders"; final String globalKeyValueStoreName = "globalKeyValueStore"; final String globalTimestampedKeyValueStoreName = "globalKeyValueTimestampStore"; + final String globalTimestampedKeyValueStoreWithHeadersName = "globalKeyValueTimestampStoreWithHeaders"; final String globalVersionedKeyValueStoreName = "globalKeyValueVersionedStore"; final Topology topology = setupSingleProcessorTopology(); @@ -925,12 +933,16 @@ private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) { persistent, keyValueStoreName, timestampedKeyValueStoreName, + timestampedKeyValueStoreWithHeaderName, versionedKeyValueStoreName, windowStoreName, timestampedWindowStoreName, + timestampedWindowStoreWithHeadersName, sessionStoreName, + sessionStoreWithHeadersName, globalKeyValueStoreName, globalTimestampedKeyValueStoreName, + globalTimestampedKeyValueStoreWithHeadersName, globalVersionedKeyValueStoreName); @@ -939,70 +951,137 @@ private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) { // verify state stores assertNotNull(testDriver.getKeyValueStore(keyValueStoreName)); assertNull(testDriver.getTimestampedKeyValueStore(keyValueStoreName)); + assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(keyValueStoreName)); assertNull(testDriver.getVersionedKeyValueStore(keyValueStoreName)); assertNull(testDriver.getWindowStore(keyValueStoreName)); assertNull(testDriver.getTimestampedWindowStore(keyValueStoreName)); + assertNull(testDriver.getTimestampedWindowStoreWithHeaders(keyValueStoreName)); assertNull(testDriver.getSessionStore(keyValueStoreName)); + assertNull(testDriver.getSessionStoreWithHeaders(keyValueStoreName)); assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreName)); assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreName)); + assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedKeyValueStoreName)); assertNull(testDriver.getVersionedKeyValueStore(timestampedKeyValueStoreName)); assertNull(testDriver.getWindowStore(timestampedKeyValueStoreName)); assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreName)); + assertNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedKeyValueStoreName)); assertNull(testDriver.getSessionStore(timestampedKeyValueStoreName)); + assertNull(testDriver.getSessionStoreWithHeaders(timestampedKeyValueStoreName)); if (persistent) { // versioned stores do not offer an in-memory version yet, so nothing to test/verify unless persistent assertNull(testDriver.getKeyValueStore(versionedKeyValueStoreName)); assertNull(testDriver.getTimestampedKeyValueStore(versionedKeyValueStoreName)); + assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(versionedKeyValueStoreName)); assertNotNull(testDriver.getVersionedKeyValueStore(versionedKeyValueStoreName)); assertNull(testDriver.getWindowStore(versionedKeyValueStoreName)); assertNull(testDriver.getTimestampedWindowStore(versionedKeyValueStoreName)); + assertNull(testDriver.getTimestampedWindowStoreWithHeaders(versionedKeyValueStoreName)); assertNull(testDriver.getSessionStore(versionedKeyValueStoreName)); + assertNull(testDriver.getSessionStoreWithHeaders(versionedKeyValueStoreName)); } + assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreWithHeaderName)); + assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreWithHeaderName)); + assertNotNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedKeyValueStoreWithHeaderName)); + assertNull(testDriver.getVersionedKeyValueStore(timestampedKeyValueStoreWithHeaderName)); + assertNull(testDriver.getWindowStore(timestampedKeyValueStoreWithHeaderName)); + assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreWithHeaderName)); + assertNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedKeyValueStoreWithHeaderName)); + assertNull(testDriver.getSessionStore(timestampedKeyValueStoreWithHeaderName)); + assertNull(testDriver.getSessionStoreWithHeaders(timestampedKeyValueStoreWithHeaderName)); + assertNull(testDriver.getKeyValueStore(windowStoreName)); assertNull(testDriver.getTimestampedKeyValueStore(windowStoreName)); + assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(windowStoreName)); assertNull(testDriver.getVersionedKeyValueStore(windowStoreName)); assertNotNull(testDriver.getWindowStore(windowStoreName)); assertNull(testDriver.getTimestampedWindowStore(windowStoreName)); + assertNull(testDriver.getTimestampedWindowStoreWithHeaders(windowStoreName)); assertNull(testDriver.getSessionStore(windowStoreName)); + assertNull(testDriver.getSessionStoreWithHeaders(windowStoreName)); assertNull(testDriver.getKeyValueStore(timestampedWindowStoreName)); assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreName)); + assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedWindowStoreName)); assertNull(testDriver.getVersionedKeyValueStore(timestampedWindowStoreName)); assertNotNull(testDriver.getWindowStore(timestampedWindowStoreName)); assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreName)); + assertNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedWindowStoreName)); assertNull(testDriver.getSessionStore(timestampedWindowStoreName)); + assertNull(testDriver.getSessionStoreWithHeaders(timestampedWindowStoreName)); + + assertNull(testDriver.getKeyValueStore(timestampedWindowStoreWithHeadersName)); + assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreWithHeadersName)); + assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedWindowStoreWithHeadersName)); + assertNull(testDriver.getVersionedKeyValueStore(timestampedWindowStoreWithHeadersName)); + assertNotNull(testDriver.getWindowStore(timestampedWindowStoreWithHeadersName)); + assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreWithHeadersName)); + assertNotNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedWindowStoreWithHeadersName)); + assertNull(testDriver.getSessionStore(timestampedWindowStoreWithHeadersName)); + assertNull(testDriver.getSessionStoreWithHeaders(timestampedWindowStoreWithHeadersName)); assertNull(testDriver.getKeyValueStore(sessionStoreName)); assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreName)); + assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(sessionStoreName)); assertNull(testDriver.getVersionedKeyValueStore(sessionStoreName)); assertNull(testDriver.getWindowStore(sessionStoreName)); assertNull(testDriver.getTimestampedWindowStore(sessionStoreName)); + assertNull(testDriver.getTimestampedWindowStoreWithHeaders(sessionStoreName)); assertNotNull(testDriver.getSessionStore(sessionStoreName)); + assertNull(testDriver.getSessionStoreWithHeaders(sessionStoreName)); + + assertNull(testDriver.getKeyValueStore(sessionStoreWithHeadersName)); + assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreWithHeadersName)); + assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(sessionStoreWithHeadersName)); + assertNull(testDriver.getVersionedKeyValueStore(sessionStoreWithHeadersName)); + assertNull(testDriver.getWindowStore(sessionStoreWithHeadersName)); + assertNull(testDriver.getTimestampedWindowStore(sessionStoreWithHeadersName)); + assertNull(testDriver.getTimestampedWindowStoreWithHeaders(sessionStoreWithHeadersName)); + assertNotNull(testDriver.getSessionStore(sessionStoreWithHeadersName)); + assertNotNull(testDriver.getSessionStoreWithHeaders(sessionStoreWithHeadersName)); // verify global stores assertNotNull(testDriver.getKeyValueStore(globalKeyValueStoreName)); assertNull(testDriver.getTimestampedKeyValueStore(globalKeyValueStoreName)); + assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(globalKeyValueStoreName)); assertNull(testDriver.getVersionedKeyValueStore(globalKeyValueStoreName)); assertNull(testDriver.getWindowStore(globalKeyValueStoreName)); assertNull(testDriver.getTimestampedWindowStore(globalKeyValueStoreName)); + assertNull(testDriver.getTimestampedWindowStoreWithHeaders(globalKeyValueStoreName)); assertNull(testDriver.getSessionStore(globalKeyValueStoreName)); + assertNull(testDriver.getSessionStoreWithHeaders(globalKeyValueStoreName)); assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreName)); assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreName)); + assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(globalTimestampedKeyValueStoreName)); assertNull(testDriver.getVersionedKeyValueStore(globalTimestampedKeyValueStoreName)); assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreName)); assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreName)); + assertNull(testDriver.getTimestampedWindowStoreWithHeaders(globalTimestampedKeyValueStoreName)); assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreName)); + assertNull(testDriver.getSessionStoreWithHeaders(globalTimestampedKeyValueStoreName)); + + assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreWithHeadersName)); + assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreWithHeadersName)); + assertNotNull(testDriver.getTimestampedKeyValueStoreWithHeaders(globalTimestampedKeyValueStoreWithHeadersName)); + assertNull(testDriver.getVersionedKeyValueStore(globalTimestampedKeyValueStoreWithHeadersName)); + assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreWithHeadersName)); + assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreWithHeadersName)); + assertNull(testDriver.getTimestampedWindowStoreWithHeaders(globalTimestampedKeyValueStoreWithHeadersName)); + assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreWithHeadersName)); + assertNull(testDriver.getSessionStoreWithHeaders(globalTimestampedKeyValueStoreWithHeadersName)); if (persistent) { // versioned stores do not offer an in-memory version yet, so nothing to test/verify unless persistent assertNull(testDriver.getKeyValueStore(globalVersionedKeyValueStoreName)); assertNull(testDriver.getTimestampedKeyValueStore(globalVersionedKeyValueStoreName)); + assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(globalVersionedKeyValueStoreName)); assertNotNull(testDriver.getVersionedKeyValueStore(globalVersionedKeyValueStoreName)); assertNull(testDriver.getWindowStore(globalVersionedKeyValueStoreName)); assertNull(testDriver.getTimestampedWindowStore(globalVersionedKeyValueStoreName)); + assertNull(testDriver.getTimestampedWindowStoreWithHeaders(globalVersionedKeyValueStoreName)); assertNull(testDriver.getSessionStore(globalVersionedKeyValueStoreName)); + assertNull(testDriver.getSessionStoreWithHeaders(globalVersionedKeyValueStoreName)); } } @@ -1019,12 +1098,16 @@ public void shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod() { private void shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(final boolean persistent) { final String keyValueStoreName = "keyValueStore"; final String timestampedKeyValueStoreName = "keyValueTimestampStore"; + final String timestampedKeyValueStoreWithHeadersName = "keyValueTimestampStoreWithHeaders"; final String versionedKeyValueStoreName = "keyValueVersionedStore"; final String windowStoreName = "windowStore"; final String timestampedWindowStoreName = "windowTimestampStore"; + final String timestampedWindowStoreWithHeadersName = "windowTimestampStoreWithHeaders"; final String sessionStoreName = "sessionStore"; + final String sessionStoreWithHeadersName = "sessionStoreWithHeaders"; final String globalKeyValueStoreName = "globalKeyValueStore"; final String globalTimestampedKeyValueStoreName = "globalKeyValueTimestampStore"; + final String globalTimestampedKeyValueStoreWithHeadersName = "globalKeyValueTimestampStoreWithHeaders"; final String globalVersionedKeyValueStoreName = "globalKeyValueVersionedStore"; final Topology topology = setupSingleProcessorTopology(); @@ -1033,12 +1116,16 @@ private void shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(final boolean persistent, keyValueStoreName, timestampedKeyValueStoreName, + timestampedKeyValueStoreWithHeadersName, versionedKeyValueStoreName, windowStoreName, timestampedWindowStoreName, + timestampedWindowStoreWithHeadersName, sessionStoreName, + sessionStoreWithHeadersName, globalKeyValueStoreName, globalTimestampedKeyValueStoreName, + globalTimestampedKeyValueStoreWithHeadersName, globalVersionedKeyValueStoreName); @@ -1127,6 +1214,7 @@ private void shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(final boolean } } + // CAUTION: Do not replace with Lambda; Needs to return a new Processor instance each time final ProcessorSupplier voidProcessorSupplier = () -> new Processor() { @Override public void process(final Record record) { @@ -1137,29 +1225,42 @@ private void addStoresToTopology(final Topology topology, final boolean persistent, final String keyValueStoreName, final String timestampedKeyValueStoreName, + final String timestampedKeyValueStoreWithHeadersName, final String versionedKeyValueStoreName, final String windowStoreName, final String timestampedWindowStoreName, + final String timestampedWindowStoreWithHeadersName, final String sessionStoreName, + final String sessionStoreWithHeadersName, final String globalKeyValueStoreName, final String globalTimestampedKeyValueStoreName, + final String globalTimestampedKeyValueStoreWithHeadersName, final String globalVersionedKeyValueStoreName) { // add state stores topology.addStateStore( Stores.keyValueStoreBuilder( - persistent ? - Stores.persistentKeyValueStore(keyValueStoreName) : - Stores.inMemoryKeyValueStore(keyValueStoreName), + persistent + ? Stores.persistentKeyValueStore(keyValueStoreName) + : Stores.inMemoryKeyValueStore(keyValueStoreName), Serdes.ByteArray(), Serdes.ByteArray() ), "processor"); topology.addStateStore( Stores.timestampedKeyValueStoreBuilder( - persistent ? - Stores.persistentTimestampedKeyValueStore(timestampedKeyValueStoreName) : - Stores.inMemoryKeyValueStore(timestampedKeyValueStoreName), + persistent + ? Stores.persistentTimestampedKeyValueStore(timestampedKeyValueStoreName) + : Stores.inMemoryKeyValueStore(timestampedKeyValueStoreName), + Serdes.ByteArray(), + Serdes.ByteArray() + ), + "processor"); + topology.addStateStore( + Stores.timestampedKeyValueStoreWithHeadersBuilder( + persistent + ? Stores.persistentTimestampedKeyValueStoreWithHeaders(timestampedKeyValueStoreWithHeadersName) + : Stores.inMemoryKeyValueStore(timestampedKeyValueStoreWithHeadersName), Serdes.ByteArray(), Serdes.ByteArray() ), @@ -1175,70 +1276,92 @@ private void addStoresToTopology(final Topology topology, } topology.addStateStore( Stores.windowStoreBuilder( - persistent ? - Stores.persistentWindowStore(windowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false) : - Stores.inMemoryWindowStore(windowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false), + persistent + ? Stores.persistentWindowStore(windowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false) + : Stores.inMemoryWindowStore(windowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false), Serdes.ByteArray(), Serdes.ByteArray() ), "processor"); topology.addStateStore( Stores.timestampedWindowStoreBuilder( - persistent ? - Stores.persistentTimestampedWindowStore(timestampedWindowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false) : - Stores.inMemoryWindowStore(timestampedWindowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false), + persistent + ? Stores.persistentTimestampedWindowStore(timestampedWindowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false) + : Stores.inMemoryWindowStore(timestampedWindowStoreName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false), Serdes.ByteArray(), Serdes.ByteArray() ), "processor"); topology.addStateStore( - persistent ? - Stores.sessionStoreBuilder( - Stores.persistentSessionStore(sessionStoreName, Duration.ofMillis(1000L)), - Serdes.ByteArray(), - Serdes.ByteArray()) : - Stores.sessionStoreBuilder( - Stores.inMemorySessionStore(sessionStoreName, Duration.ofMillis(1000L)), - Serdes.ByteArray(), - Serdes.ByteArray()), + Stores.timestampedWindowStoreWithHeadersBuilder( + persistent + ? Stores.persistentTimestampedWindowStoreWithHeaders(timestampedWindowStoreWithHeadersName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false) + : Stores.inMemoryWindowStore(timestampedWindowStoreWithHeadersName, Duration.ofMillis(1000L), Duration.ofMillis(100L), false), + Serdes.ByteArray(), + Serdes.ByteArray() + ), + "processor"); + topology.addStateStore( + Stores.sessionStoreBuilder( + persistent + ? Stores.persistentSessionStore(sessionStoreName, Duration.ofMillis(1000L)) + : Stores.inMemorySessionStore(sessionStoreName, Duration.ofMillis(1000L)), + Serdes.ByteArray(), + Serdes.ByteArray() + ), + "processor"); + topology.addStateStore( + Stores.sessionStoreWithHeadersBuilder( + persistent + ? Stores.persistentSessionStoreWithHeaders(sessionStoreWithHeadersName, Duration.ofMillis(1000L)) + : Stores.inMemorySessionStore(sessionStoreWithHeadersName, Duration.ofMillis(1000L)), + Serdes.ByteArray(), + Serdes.ByteArray() + ), "processor"); // add global stores topology.addGlobalStore( - persistent ? - Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore(globalKeyValueStoreName), - Serdes.ByteArray(), - Serdes.ByteArray() - ).withLoggingDisabled() : - Stores.keyValueStoreBuilder( - Stores.inMemoryKeyValueStore(globalKeyValueStoreName), - Serdes.ByteArray(), - Serdes.ByteArray() - ).withLoggingDisabled(), + Stores.keyValueStoreBuilder( + persistent + ? Stores.persistentKeyValueStore(globalKeyValueStoreName) + : Stores.inMemoryKeyValueStore(globalKeyValueStoreName), + Serdes.ByteArray(), + Serdes.ByteArray() + ).withLoggingDisabled(), "sourceDummy1", - Serdes.ByteArray().deserializer(), - Serdes.ByteArray().deserializer(), + new ByteArrayDeserializer(), + new ByteArrayDeserializer(), "topicDummy1", "processorDummy1", voidProcessorSupplier); topology.addGlobalStore( - persistent ? - Stores.timestampedKeyValueStoreBuilder( - Stores.persistentTimestampedKeyValueStore(globalTimestampedKeyValueStoreName), - Serdes.ByteArray(), - Serdes.ByteArray() - ).withLoggingDisabled() : - Stores.timestampedKeyValueStoreBuilder( - Stores.inMemoryKeyValueStore(globalTimestampedKeyValueStoreName), - Serdes.ByteArray(), - Serdes.ByteArray() - ).withLoggingDisabled(), + Stores.timestampedKeyValueStoreBuilder( + persistent + ? Stores.persistentTimestampedKeyValueStore(globalTimestampedKeyValueStoreName) + : Stores.inMemoryKeyValueStore(globalTimestampedKeyValueStoreName), + Serdes.ByteArray(), + Serdes.ByteArray() + ).withLoggingDisabled(), "sourceDummy2", - Serdes.ByteArray().deserializer(), - Serdes.ByteArray().deserializer(), + new ByteArrayDeserializer(), + new ByteArrayDeserializer(), "topicDummy2", "processorDummy2", voidProcessorSupplier); + topology.addGlobalStore( + Stores.timestampedKeyValueStoreWithHeadersBuilder( + persistent + ? Stores.persistentTimestampedKeyValueStoreWithHeaders(globalTimestampedKeyValueStoreWithHeadersName) + : Stores.inMemoryKeyValueStore(globalTimestampedKeyValueStoreWithHeadersName), + Serdes.ByteArray(), + Serdes.ByteArray() + ).withLoggingDisabled(), + "sourceDummy3", + new ByteArrayDeserializer(), + new ByteArrayDeserializer(), + "topicDummy3", + "processorDummy3", + voidProcessorSupplier); if (persistent) { // versioned stores do not offer an in-memory version yet topology.addGlobalStore( Stores.versionedKeyValueStoreBuilder( @@ -1246,11 +1369,11 @@ private void addStoresToTopology(final Topology topology, Serdes.ByteArray(), Serdes.ByteArray() ).withLoggingDisabled(), - "sourceDummy3", - Serdes.ByteArray().deserializer(), - Serdes.ByteArray().deserializer(), - "topicDummy3", - "processorDummy3", + "sourceDummy4", + new ByteArrayDeserializer(), + new ByteArrayDeserializer(), + "topicDummy4", + "processorDummy4", voidProcessorSupplier); } } @@ -1271,8 +1394,8 @@ public void shouldReturnAllStoresNames() { Serdes.ByteArray(), Time.SYSTEM).withLoggingDisabled(), "sourceProcessorName", - Serdes.ByteArray().deserializer(), - Serdes.ByteArray().deserializer(), + new ByteArrayDeserializer(), + new ByteArrayDeserializer(), "globalTopicName", "globalProcessorName", voidProcessorSupplier); @@ -1300,8 +1423,8 @@ private void setup(final KeyValueBytesStoreSupplier storeSupplier) { "aggregator"); topology.addSink("sinkProcessor", "result-topic", "aggregator"); - config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); + config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName()); testDriver = new TopologyTestDriver(topology, config); store = testDriver.getKeyValueStore("aggStore"); @@ -1439,7 +1562,7 @@ public void shouldCleanUpPersistentStateStoresOnClose() { new ProcessorSupplier() { @Override public Processor get() { - return new Processor() { + return new Processor<>() { private KeyValueStore store; @Override @@ -1463,8 +1586,8 @@ public void process(final Record record) { final Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver-cleanup"); config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); - config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName()); try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) { assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a")); @@ -1674,7 +1797,7 @@ public void shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies() { new StringDeserializer(), "global-topic", "globalProcessor", - () -> new Processor() { + () -> new Processor<>() { private KeyValueStore stateStore; @Override