KAFKA-20194: Add sesssion-header-store support to TDD#21956
KAFKA-20194: Add sesssion-header-store support to TDD#21956mjsax merged 6 commits intoapache:trunkfrom
Conversation
| if (store instanceof TimestampedKeyValueStoreWithHeaders) { | ||
| if (queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) { | ||
| return (List<T>) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object, Object>) store, ValueConverters.extractValueFromHeaders())); | ||
| return (List<T>) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<?, ?>) store, ValueConverters.extractValueFromHeaders())); |
There was a problem hiding this comment.
Unrelated side cleanup -- same for StreamThreadStateStoreProvider below.
| * @see #getTimestampedKeyValueStore(String) | ||
| * @see #getVersionedKeyValueStore(String) | ||
| * @see #getTimestampedKeyValueStoreWithHeaders(String) | ||
| * @see #getVersionedKeyValueStore(String) |
There was a problem hiding this comment.
Just fixing the grouping of methods... Similar below.
|
|
||
| /** | ||
| * Get the {@link VersionedKeyValueStore} with the given name. | ||
| * Get the {@link TimestampedKeyValueStoreWithHeaders} with the given name. |
There was a problem hiding this comment.
This diff is weird -- I moved method getTimestampedKeyValueStoreWithHeaders before getVersionedKeyValueStore to fix the grouping in the code, too.
| * @see #getTimestampedKeyValueStoreWithHeaders(String) | ||
| * @see #getVersionedKeyValueStore(String) | ||
| * @see #getWindowStore(String) | ||
| * @see #getTimestampedWindowStoreWithHeaders(String) |
There was a problem hiding this comment.
additional fix -- this line was missing
| topology.addSource(sourceName1, Serdes.Long().deserializer(), Serdes.String().deserializer(), SOURCE_TOPIC_1); | ||
| topology.addSource(sourceName2, Serdes.Integer().deserializer(), Serdes.Double().deserializer(), SOURCE_TOPIC_2); | ||
| topology.addSource(sourceName1, new LongDeserializer(), new StringDeserializer(), SOURCE_TOPIC_1); | ||
| topology.addSource(sourceName2, new IntegerDeserializer(), new DoubleDeserializer(), SOURCE_TOPIC_2); |
There was a problem hiding this comment.
Side cleanup (similar elsewhere)
| } | ||
| } | ||
|
|
||
| // CAUTION: Do not replace with Lambda; Needs to return a new Processor instance each eimte |
There was a problem hiding this comment.
IntelliJ told me to refactor, and I accepted it, but it broke the test... a lambda is a singleton, which is not allowed for a Supplier. -- So Adding this comment to prevent other to make the same mistake.
There was a problem hiding this comment.
Cool...
I think it might be optimized by JVM if we use lambda.(use the same instance)
| Stores.inMemoryKeyValueStore(keyValueStoreName), | ||
| persistent | ||
| ? Stores.persistentKeyValueStore(keyValueStoreName) | ||
| : Stores.inMemoryKeyValueStore(keyValueStoreName), |
| ), | ||
| "processor"); | ||
| topology.addStateStore( | ||
| persistent ? |
There was a problem hiding this comment.
Side cleanup / simplification. We can move the persistent check one level down, and re-use the same code for the outer "builder" call. (Similar below)
| stateDirectory.clean(); | ||
| } | ||
|
|
||
| static class MockChangelogRegister implements ChangelogRegister { |
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| // CAUTION: Do not replace with Lambda; Needs to return a new Processor instance each eimte |
There was a problem hiding this comment.
| // CAUTION: Do not replace with Lambda; Needs to return a new Processor instance each eimte | |
| // CAUTION: Do not replace with Lambda; Needs to return a new Processor instance each time |
| * @see #getSessionStore(String) | ||
| */ | ||
| @SuppressWarnings("unchecked") | ||
| public <K, V> SessionStoreWithHeaders<K, V> getSessionStoreWithHeader(final String name) { |
There was a problem hiding this comment.
| public <K, V> SessionStoreWithHeaders<K, V> getSessionStoreWithHeader(final String name) { | |
| public <K, V> SessionStoreWithHeaders<K, V> getSessionStoreWithHeaders(final String name) { |
There was a problem hiding this comment.
If yes, we should change all the java docs as well
There was a problem hiding this comment.
Yes, good catch. IntelliJ "rename" feature takes care of JavaDocs automatically.
aliehsaeedii
left a comment
There was a problem hiding this comment.
Thanks. LGTM, just some nits. Approved anyway.
| * @see #getSessionStore(String) | ||
| */ | ||
| @SuppressWarnings("unchecked") | ||
| public <K, V> SessionStoreWithHeaders<K, V> getSessionStoreWithHeader(final String name) { |
There was a problem hiding this comment.
If yes, we should change all the java docs as well
frankvicky
left a comment
There was a problem hiding this comment.
LGTM after addressing nits
aeb3e4f to
335ecfe
Compare
335ecfe to
98f234f
Compare
Adds missing method TopologyTestDriver.getSessionStoreWithHeader(), and updates TopologyTestDriverTest for all newly added header store types. Reviewers: TengYao Chi <frankvicky@apache.org>, Alieh Saeedi <asaeedi@confluent.io>
|
Merged to |
Adds missing method TopologyTestDriver.getSessionStoreWithHeader(), and
updates TopologyTestDriverTest for all newly added header store types.
Reviewers: TengYao Chi frankvicky@apache.org, Alieh Saeedi
asaeedi@confluent.io