diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 31a9f6b945c61..168b692645066 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -86,6 +86,7 @@ import org.slf4j.Logger; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -1113,17 +1114,17 @@ private Future doSend(ProducerRecord record, Callback call nowMs += clusterAndWaitTime.waitedOnMetadataMs; long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; - byte[] serializedKey; + ByteBuffer serializedKey; try { - serializedKey = keySerializerPlugin.get().serialize(record.topic(), record.headers(), record.key()); + serializedKey = keySerializerPlugin.get().serializeToByteBuffer(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } - byte[] serializedValue; + ByteBuffer serializedValue; try { - serializedValue = valueSerializerPlugin.get().serialize(record.topic(), record.headers(), record.value()); + serializedValue = valueSerializerPlugin.get().serializeToByteBuffer(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + @@ -1585,7 +1586,7 @@ private void close(Duration timeout, boolean swallowException) { * can be used (the partition is then calculated by built-in * partitioning logic). */ - private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { + private int partition(ProducerRecord record, ByteBuffer serializedKey, ByteBuffer serializedValue, Cluster cluster) { if (record.partition() != null) return record.partition(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index eb89f88f7d913..b5d267ef804f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Time; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; @@ -642,8 +643,8 @@ private int partition(ProducerRecord record, Cluster cluster) { + "]."); return partition; } - byte[] keyBytes = keySerializer.serialize(topic, record.headers(), record.key()); - byte[] valueBytes = valueSerializer.serialize(topic, record.headers(), record.value()); + ByteBuffer keyBytes = keySerializer.serializeToByteBuffer(topic, record.headers(), record.key()); + ByteBuffer valueBytes = valueSerializer.serializeToByteBuffer(topic, record.headers(), record.value()); if (partitioner == null) { return this.cluster.partitionsForTopic(record.topic()).get(0).partition(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java index d1d1ad3ac55f1..726a082626c60 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java @@ -18,8 +18,10 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.utils.Utils; import java.io.Closeable; +import java.nio.ByteBuffer; /** * Partitioner Interface @@ -41,6 +43,23 @@ public interface Partitioner extends Configurable, Closeable { */ int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); + /** + * Compute the partition for the given record. + * + *

The default implementation converts the {@link ByteBuffer} parameters to byte arrays and delegates to + * {@link #partition(String, Object, byte[], Object, byte[], Cluster)}. + * + * @param topic The topic name + * @param key The key to partition on (or null if no key) + * @param keyBytes The serialized key as a ByteBuffer to partition on (or null if no key) + * @param value The value to partition on or null + * @param valueBytes The serialized value as a ByteBuffer to partition on or null + * @param cluster The current cluster metadata + */ + default int partition(String topic, Object key, ByteBuffer keyBytes, Object value, ByteBuffer valueBytes, Cluster cluster) { + return partition(topic, key, Utils.toNullableArrayZeroCopy(keyBytes), value, Utils.toNullableArrayZeroCopy(valueBytes), cluster); + } + /** * This is called when partitioner is closed. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java index 98386324f7bda..9219fd650101d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -327,6 +328,13 @@ public int partition() { * Default hashing function to choose a partition from the serialized key bytes */ public static int partitionForKey(final byte[] serializedKey, final int numPartitions) { + return partitionForKey(ByteBuffer.wrap(serializedKey), numPartitions); + } + + /* + * Default hashing function to choose a partition from the serialized key ByteBuffer + */ + public static int partitionForKey(final ByteBuffer serializedKey, final int numPartitions) { return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 14d0d6ef6334d..48948e7cb23e8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.utils.ProducerIdAndEpoch; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,6 +146,15 @@ boolean hasLeaderChangedForTheOngoingRetry() { * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) { + return tryAppend(timestamp, Utils.wrapNullable(key), Utils.wrapNullable(value), headers, callback, now); + } + + /** + * Append the record to the current record set and return the relative offset within that record set + * + * @return The RecordSend corresponding to this record or null if there isn't sufficient room. + */ + public FutureRecordMetadata tryAppend(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Callback callback, long now) { if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) { return null; } else { @@ -154,8 +164,8 @@ public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, - key == null ? -1 : key.length, - value == null ? -1 : value.length, + key == null ? -1 : key.remaining(), + value == null ? -1 : value.remaining(), Time.SYSTEM); // we have to keep every future returned to the users in case the batch needs to be // split to several new batches and resent. diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ca3ab153d98b4..fce01f4d10cc5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -40,6 +40,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.ProducerIdAndEpoch; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -258,7 +259,6 @@ private boolean partitionChanged(String topic, * Add a record to the accumulator, return the append result *

* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created - *

* * @param topic The topic to which this record is being sent * @param partition The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION @@ -282,6 +282,37 @@ public RecordAppendResult append(String topic, long maxTimeToBlock, long nowMs, Cluster cluster) throws InterruptedException { + return append(topic, partition, timestamp, Utils.wrapNullable(key), Utils.wrapNullable(value), + headers, callbacks, maxTimeToBlock, nowMs, cluster); + } + + /** + * Add a record to the accumulator, return the append result + *

+ * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created + * + * @param topic The topic to which this record is being sent + * @param partition The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION + * if any partition could be used + * @param timestamp The timestamp of the record + * @param key The key for the record as a ByteBuffer + * @param value The value for the record as a ByteBuffer + * @param headers the Headers for the record + * @param callbacks The callbacks to execute + * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available + * @param nowMs The current time, in milliseconds + * @param cluster The cluster metadata + */ + public RecordAppendResult append(String topic, + int partition, + long timestamp, + ByteBuffer key, + ByteBuffer value, + Header[] headers, + AppendCallbacks callbacks, + long maxTimeToBlock, + long nowMs, + Cluster cluster) throws InterruptedException { TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize))); // We keep track of the number of appending thread to make sure we do not miss batches in @@ -376,8 +407,8 @@ private RecordAppendResult appendNewBatch(String topic, int partition, Deque dq, long timestamp, - byte[] key, - byte[] value, + ByteBuffer key, + ByteBuffer value, Header[] headers, AppendCallbacks callbacks, ByteBuffer buffer, @@ -422,7 +453,7 @@ private boolean allBatchesFull(Deque deque) { * and memory records built) in one of the following cases (whichever comes first): right before send, * if it is expired, or when the producer is closed. */ - private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, + private RecordAppendResult tryAppend(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Callback callback, Deque deque, long nowMs) { if (closed) throw new KafkaException("Producer closed while send in progress"); diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java index ad4722a8d8074..8a8cfc0079d0d 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; + public class BooleanSerializer implements Serializer { private static final byte TRUE = 0x01; @@ -30,4 +34,15 @@ public byte[] serialize(final String topic, final Boolean data) { data ? TRUE : FALSE }; } + + @Override + public ByteBuffer serializeToByteBuffer(String topic, Headers headers, Boolean data) { + if (data == null) + return null; + + ByteBuffer buffer = ByteBuffer.allocate(1); + buffer.put(data ? TRUE : FALSE); + buffer.flip(); + return buffer; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java index 6bebaa6531fc1..9d14250b359c8 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java @@ -16,9 +16,19 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; + public class ByteArraySerializer implements Serializer { @Override public byte[] serialize(String topic, byte[] data) { return data; } + + @Override + public ByteBuffer serializeToByteBuffer(String topic, Headers headers, byte[] data) { + return Utils.wrapNullable(data); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java index 43b3d6e38a76e..79f0bb6653988 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.utils.Utils; + import java.nio.ByteBuffer; /** @@ -33,17 +36,15 @@ public byte[] serialize(String topic, ByteBuffer data) { return null; data.rewind(); + return Utils.toNullableArrayZeroCopy(data); + } - if (data.hasArray()) { - byte[] arr = data.array(); - if (data.arrayOffset() == 0 && arr.length == data.remaining()) { - return arr; - } - } + @Override + public ByteBuffer serializeToByteBuffer(String topic, Headers headers, ByteBuffer data) { + if (data == null) + return null; - byte[] ret = new byte[data.remaining()]; - data.get(ret, 0, ret.length); data.rewind(); - return ret; + return data; } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java index f19305da63713..ca23e3b883e78 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java @@ -16,8 +16,11 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.utils.Bytes; +import java.nio.ByteBuffer; + public class BytesSerializer implements Serializer { public byte[] serialize(String topic, Bytes data) { if (data == null) @@ -25,4 +28,12 @@ public byte[] serialize(String topic, Bytes data) { return data.get(); } + + @Override + public ByteBuffer serializeToByteBuffer(String topic, Headers headers, Bytes data) { + if (data == null) + return null; + + return ByteBuffer.wrap(data.get()); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java index 99781b53d0e01..7751490184dbf 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; + public class DoubleSerializer implements Serializer { @Override public byte[] serialize(String topic, Double data) { @@ -34,4 +38,15 @@ public byte[] serialize(String topic, Double data) { (byte) bits }; } -} \ No newline at end of file + + @Override + public ByteBuffer serializeToByteBuffer(String topic, Headers headers, Double data) { + if (data == null) + return null; + + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.putDouble(data); + buffer.flip(); + return buffer; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java index 6f74a573a5f10..54f232e87e8aa 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; + public class FloatSerializer implements Serializer { @Override public byte[] serialize(final String topic, final Float data) { @@ -30,4 +34,15 @@ public byte[] serialize(final String topic, final Float data) { (byte) bits }; } -} \ No newline at end of file + + @Override + public ByteBuffer serializeToByteBuffer(String topic, Headers headers, Float data) { + if (data == null) + return null; + + ByteBuffer buffer = ByteBuffer.allocate(4); + buffer.putFloat(data); + buffer.flip(); + return buffer; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java index 8ab531046006a..8f113e79cf091 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; + public class IntegerSerializer implements Serializer { public byte[] serialize(String topic, Integer data) { if (data == null) @@ -28,4 +32,15 @@ public byte[] serialize(String topic, Integer data) { data.byteValue() }; } -} \ No newline at end of file + + @Override + public ByteBuffer serializeToByteBuffer(String topic, Headers headers, Integer data) { + if (data == null) + return null; + + ByteBuffer buffer = ByteBuffer.allocate(4); + buffer.putInt(data); + buffer.flip(); + return buffer; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java index 436f0e01095ca..9a82c48a7e775 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; + public class LongSerializer implements Serializer { public byte[] serialize(String topic, Long data) { if (data == null) @@ -32,4 +36,15 @@ public byte[] serialize(String topic, Long data) { data.byteValue() }; } -} \ No newline at end of file + + @Override + public ByteBuffer serializeToByteBuffer(String topic, Headers headers, Long data) { + if (data == null) + return null; + + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.putLong(data); + buffer.flip(); + return buffer; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java index 0730b71bcade1..4255163e41613 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java @@ -17,8 +17,10 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.utils.Utils; import java.io.Closeable; +import java.nio.ByteBuffer; import java.util.Map; /** @@ -83,6 +85,34 @@ default byte[] serialize(String topic, Headers headers, T data) { return serialize(topic, data); } + /** + * Convert {@code data} into a {@link ByteBuffer}. + * + *

It is recommended to serialize {@code null} data to a {@code null} {@link ByteBuffer}. + * + *

The caller cannot make any assumptions about the returned {@link ByteBuffer} like the + * position, limit, capacity, etc., or if it is backed by {@link ByteBuffer#hasArray() an array or not}. + * + *

Similarly, if this method is overridden, the implementation cannot make any assumptions about + * how the returned {@link ByteBuffer} will be used. + * + *

Note that the passed in {@link Headers} may be empty, but never {@code null}. + * The implementation is allowed to modify the passed in headers, as a side effect of serialization. + * It is considered best practice to not delete or modify existing headers, but rather only add new ones. + * + * @param topic + * topic associated with data + * @param headers + * headers associated with the record + * @param data + * typed data; may be {@code null} + * + * @return serialized {@link ByteBuffer}; may be {@code null} + */ + default ByteBuffer serializeToByteBuffer(String topic, Headers headers, T data) { + return Utils.wrapNullable(serialize(topic, headers, data)); + } + /** * Close this serializer. * diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java index e54354b4dea05..4448445b6c8c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; + public class ShortSerializer implements Serializer { public byte[] serialize(String topic, Short data) { if (data == null) @@ -26,4 +30,15 @@ public byte[] serialize(String topic, Short data) { data.byteValue() }; } -} \ No newline at end of file + + @Override + public ByteBuffer serializeToByteBuffer(String topic, Headers headers, Short data) { + if (data == null) + return null; + + ByteBuffer buffer = ByteBuffer.allocate(2); + buffer.putShort(data); + buffer.flip(); + return buffer; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/VoidSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/VoidSerializer.java index f1f2c6071a832..3db37f7574b7e 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/VoidSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/VoidSerializer.java @@ -16,9 +16,18 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.header.Headers; + +import java.nio.ByteBuffer; + public class VoidSerializer implements Serializer { @Override public byte[] serialize(String topic, Void data) { return null; } + + @Override + public ByteBuffer serializeToByteBuffer(String topic, Headers headers, Void data) { + return null; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index f8b8456d5f8d7..97a768bc65a87 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -33,8 +33,6 @@ import java.io.InputStream; import java.io.PrintWriter; import java.io.StringWriter; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; import java.lang.management.ManagementFactory; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -117,9 +115,6 @@ private Utils() {} private static final Logger log = LoggerFactory.getLogger(Utils.class); - private static final VarHandle INT_HANDLE = - MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.LITTLE_ENDIAN); - /** * Get a sorted list representation of a collection. * @param collection The collection to sort @@ -284,6 +279,28 @@ public static byte[] toNullableArray(ByteBuffer buffer) { return buffer == null ? null : toArray(buffer); } + /** + * Convert a ByteBuffer to a nullable byte array, returning the backing array directly + * when it exactly represents the buffer's contents (zero offset, full length) to avoid + * allocation. Falls back to {@link #toArray(ByteBuffer)} otherwise. + * + * @param buffer The buffer to convert + * @return The resulting array or null if the buffer is null + */ + public static byte[] toNullableArrayZeroCopy(ByteBuffer buffer) { + if (buffer == null) + return null; + + if (buffer.hasArray()) { + byte[] arr = buffer.array(); + if (buffer.arrayOffset() == 0 && arr.length == buffer.remaining()) { + return arr; + } + } + + return toArray(buffer); + } + /** * Wrap an array as a nullable ByteBuffer. * @param array The nullable array to wrap @@ -495,12 +512,23 @@ public static T newParameterizedInstance(String className, Object... params) /** * Generates 32 bit murmur2 hash from byte array + * * @param data byte array to hash * @return 32 bit hash of the given array */ - @SuppressWarnings("fallthrough") public static int murmur2(final byte[] data) { - int length = data.length; + return murmur2(ByteBuffer.wrap(data)); + } + + /** + * Generates 32 bit murmur2 hash from a ByteBuffer. + * + * @param data The input ByteBuffer. Only bytes between position and limit are hashed. + * The buffer's position is not modified. + */ + @SuppressWarnings("fallthrough") + public static int murmur2(final ByteBuffer data) { + int length = data.remaining(); int seed = 0x9747b28c; // 'm' and 'r' are mixing constants generated offline. // They're not really 'magic', they just happen to work well. @@ -510,10 +538,13 @@ public static int murmur2(final byte[] data) { // Initialize the hash to a random value int h = seed ^ length; int length4 = length >> 2; + int pos = data.position(); for (int i = 0; i < length4; i++) { - final int i4 = i << 2; - int k = (int) INT_HANDLE.get(data, i4); + int k = data.getInt(pos + (i << 2)); + if (data.order() != java.nio.ByteOrder.LITTLE_ENDIAN) { + k = Integer.reverseBytes(k); + } k *= m; k ^= k >>> r; k *= m; @@ -525,11 +556,11 @@ public static int murmur2(final byte[] data) { int index = length4 << 2; switch (length - index) { case 3: - h ^= (data[index + 2] & 0xff) << 16; + h ^= (data.get(pos + index + 2) & 0xff) << 16; case 2: - h ^= (data[index + 1] & 0xff) << 8; + h ^= (data.get(pos + index + 1) & 0xff) << 8; case 1: - h ^= data[index] & 0xff; + h ^= data.get(pos + index) & 0xff; h *= m; } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 98ffd66270c5a..df8b504b20fa3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -115,6 +115,7 @@ import org.mockito.internal.stubbing.answers.CallsRealMethods; import java.lang.management.ManagementFactory; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; @@ -1115,10 +1116,10 @@ private > void doTestHeaders(Class serializerCla KafkaProducer producer = kafkaProducer(configs, keySerializer, valueSerializer, metadata, null, null, Time.SYSTEM); - when(keySerializer.serialize(any(), any(), any())).then(invocation -> - invocation.getArgument(2).getBytes()); - when(valueSerializer.serialize(any(), any(), any())).then(invocation -> - invocation.getArgument(2).getBytes()); + when(keySerializer.serializeToByteBuffer(any(), any(), any())).then(invocation -> + ByteBuffer.wrap(invocation.getArgument(2).getBytes())); + when(valueSerializer.serializeToByteBuffer(any(), any(), any())).then(invocation -> + ByteBuffer.wrap(invocation.getArgument(2).getBytes())); String value = "value"; String key = "key"; @@ -1136,8 +1137,8 @@ private > void doTestHeaders(Class serializerCla //ensure existing headers are not changed, and last header for key is still original value assertArrayEquals(record.headers().lastHeader("test").value(), "header2".getBytes()); - verify(valueSerializer).serialize(topic, record.headers(), value); - verify(keySerializer).serialize(topic, record.headers(), key); + verify(valueSerializer).serializeToByteBuffer(topic, record.headers(), value); + verify(keySerializer).serializeToByteBuffer(topic, record.headers(), key); producer.close(Duration.ofMillis(0)); } @@ -1516,8 +1517,8 @@ public void testSendNotAllowedInPreparedTransactionState() throws Exception { eq(topic), anyInt(), anyLong(), - any(), - any(), + any(byte[].class), + any(byte[].class), any(), any(), anyLong(), @@ -2754,8 +2755,8 @@ private FutureRecordMetadata expectAppend( TopicPartition initialSelectedPartition, Cluster cluster ) throws InterruptedException { - byte[] serializedKey = ctx.serializer.serialize(topic, record.key()); - byte[] serializedValue = ctx.serializer.serialize(topic, record.value()); + ByteBuffer serializedKey = ctx.serializer.serializeToByteBuffer(topic, new RecordHeaders(), record.key()); + ByteBuffer serializedValue = ctx.serializer.serializeToByteBuffer(topic, new RecordHeaders(), record.value()); long timestamp = record.timestamp() == null ? ctx.time.milliseconds() : record.timestamp(); ProduceRequestResult requestResult = new ProduceRequestResult(initialSelectedPartition); @@ -2763,26 +2764,26 @@ private FutureRecordMetadata expectAppend( requestResult, 5, timestamp, - serializedKey.length, - serializedValue.length, + serializedKey == null ? -1 : serializedKey.remaining(), + serializedValue == null ? -1 : serializedValue.remaining(), ctx.time ); when(ctx.partitioner.partition( - initialSelectedPartition.topic(), - record.key(), - serializedKey, - record.value(), - serializedValue, - cluster + eq(initialSelectedPartition.topic()), + eq(record.key()), + any(ByteBuffer.class), + eq(record.value()), + any(ByteBuffer.class), + eq(cluster) )).thenReturn(initialSelectedPartition.partition()); when(ctx.accumulator.append( eq(initialSelectedPartition.topic()), // 0 eq(initialSelectedPartition.partition()), // 1 eq(timestamp), // 2 - eq(serializedKey), // 3 - eq(serializedValue), // 4 + any(ByteBuffer.class), // 3 + any(ByteBuffer.class), // 4 eq(Record.EMPTY_HEADERS), // 5 any(RecordAccumulator.AppendCallbacks.class), // 6 <-- anyLong(), diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java new file mode 100644 index 0000000000000..253b3a357ebb6 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import org.apache.kafka.common.Cluster; + +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.Mockito.mock; + +public class PartitionerTest { + + private static final Cluster CLUSTER = mock(Cluster.class); + + private static Partitioner capturingPartitioner(byte[][] capture) { + return new Partitioner() { + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + capture[0] = keyBytes; + capture[1] = valueBytes; + return 0; + } + + @Override + public void close() { + } + + @Override + public void configure(Map configs) { + } + }; + } + + @Test + public void testByteBufferPartitionPassesBackingArrayDirectly() { + final byte[][] captured = new byte[2][]; + + try (Partitioner partitioner = capturingPartitioner(captured)) { + byte[] keyArray = "key".getBytes(); + byte[] valueArray = "value".getBytes(); + + partitioner.partition("test", null, ByteBuffer.wrap(keyArray), null, ByteBuffer.wrap(valueArray), CLUSTER); + assertSame(keyArray, captured[0], + "When key ByteBuffer wraps an exact array, the backing array should be passed directly without copying"); + assertSame(valueArray, captured[1], + "When value ByteBuffer wraps an exact array, the backing array should be passed directly without copying"); + } + } + + @Test + public void testByteBufferPartitionCopiesWhenNotExactArray() { + final byte[][] captured = new byte[2][]; + + try (Partitioner partitioner = capturingPartitioner(captured)) { + // A slice of a larger buffer - hasArray() is true but arrayOffset/length don't match + byte[] keyBackingArray = "prefixkey".getBytes(); + ByteBuffer keySlice = ByteBuffer.wrap(keyBackingArray, 6, 3).slice(); + byte[] valueBackingArray = "prefixvalue".getBytes(); + ByteBuffer valueSlice = ByteBuffer.wrap(valueBackingArray, 6, 5).slice(); + + partitioner.partition("test", null, keySlice, null, valueSlice, CLUSTER); + assertNotSame(keyBackingArray, captured[0], + "When key ByteBuffer is a slice, a new array should be allocated"); + assertArrayEquals("key".getBytes(), captured[0], + "The copied key array should contain the correct bytes"); + assertNotSame(valueBackingArray, captured[1], + "When value ByteBuffer is a slice, a new array should be allocated"); + assertArrayEquals("value".getBytes(), captured[1], + "The copied value array should contain the correct bytes"); + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java index 8c6d0a33d21b3..52eaea02cd6d7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java @@ -55,7 +55,7 @@ public void testRoundRobinWithUnavailablePartitions() { Cluster cluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), partitions, Collections.emptySet(), Collections.emptySet()); for (int i = 1; i <= 100; i++) { - int part = partitioner.partition("test", null, null, null, null, cluster); + int part = partitioner.partition("test", null, null, null, (byte[]) null, cluster); assertTrue(part == 0 || part == 2, "We should never choose a leader-less node in round robin"); if (part == 0) countForPart0++; diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index 521a0f19415ad..177c11670bea2 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -40,6 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; public class SerializationTest { @@ -103,6 +105,42 @@ public void allSerdesShouldSupportNull() { } } + @SuppressWarnings("unchecked") + @Test + public void allSerializersByteBufferShouldMatchByteArray() { + for (Map.Entry, List> test : testData.entrySet()) { + try (Serde serde = Serdes.serdeFrom((Class) test.getKey())) { + for (Object value : test.getValue()) { + byte[] serialized = serde.serializer().serialize(topic, value); + ByteBuffer serializedBuffer = serde.serializer().serializeToByteBuffer(topic, null, value); + + assertArrayEquals(serialized, Utils.toNullableArray(serializedBuffer), + "serializeToByteBuffer should produce same bytes as serialize for " + test.getKey().getSimpleName()); + } + } + } + } + + @Test + public void allSerializersByteBufferShouldSupportNull() { + for (Class cls : testData.keySet()) { + try (Serde serde = Serdes.serdeFrom(cls)) { + assertNull(serde.serializer().serializeToByteBuffer(topic, null, null), + "serializeToByteBuffer should support null in " + cls.getSimpleName()); + } + } + } + + @Test + public void byteBufferSerializerShouldReturnSameInstance() { + try (ByteBufferSerializer serializer = new ByteBufferSerializer()) { + ByteBuffer input = ByteBuffer.allocate(4).putInt(42); + ByteBuffer result = serializer.serializeToByteBuffer(topic, null, input); + assertSame(input, result, "serializeToByteBuffer should return the same ByteBuffer instance"); + assertEquals(0, result.position(), "Buffer should be rewound to position 0"); + } + } + @Test public void testSerdeFromUnknown() { assertThrows(IllegalArgumentException.class, () -> Serdes.serdeFrom(DummyClass.class));