Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.slf4j.Logger;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1113,17 +1114,17 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
ByteBuffer serializedKey;
try {
serializedKey = keySerializerPlugin.get().serialize(record.topic(), record.headers(), record.key());
serializedKey = keySerializerPlugin.get().serializeToByteBuffer(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
ByteBuffer serializedValue;
try {
serializedValue = valueSerializerPlugin.get().serialize(record.topic(), record.headers(), record.value());
serializedValue = valueSerializerPlugin.get().serializeToByteBuffer(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
Expand Down Expand Up @@ -1585,7 +1586,7 @@ private void close(Duration timeout, boolean swallowException) {
* can be used (the partition is then calculated by built-in
* partitioning logic).
*/
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
private int partition(ProducerRecord<K, V> record, ByteBuffer serializedKey, ByteBuffer serializedValue, Cluster cluster) {
if (record.partition() != null)
return record.partition();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -642,8 +643,8 @@ private int partition(ProducerRecord<K, V> record, Cluster cluster) {
+ "].");
return partition;
}
byte[] keyBytes = keySerializer.serialize(topic, record.headers(), record.key());
byte[] valueBytes = valueSerializer.serialize(topic, record.headers(), record.value());
ByteBuffer keyBytes = keySerializer.serializeToByteBuffer(topic, record.headers(), record.key());
ByteBuffer valueBytes = valueSerializer.serializeToByteBuffer(topic, record.headers(), record.value());
if (partitioner == null) {
return this.cluster.partitionsForTopic(record.topic()).get(0).partition();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.utils.Utils;

import java.io.Closeable;
import java.nio.ByteBuffer;

/**
* Partitioner Interface
Expand All @@ -41,6 +43,23 @@ public interface Partitioner extends Configurable, Closeable {
*/
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

/**
* Compute the partition for the given record.
*
* <p>The default implementation converts the {@link ByteBuffer} parameters to byte arrays and delegates to
* {@link #partition(String, Object, byte[], Object, byte[], Cluster)}.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key as a ByteBuffer to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value as a ByteBuffer to partition on or null
* @param cluster The current cluster metadata
*/
default int partition(String topic, Object key, ByteBuffer keyBytes, Object value, ByteBuffer valueBytes, Cluster cluster) {
return partition(topic, key, Utils.toNullableArrayZeroCopy(keyBytes), value, Utils.toNullableArrayZeroCopy(valueBytes), cluster);
}

/**
* This is called when partitioner is closed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.slf4j.Logger;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -327,6 +328,13 @@ public int partition() {
* Default hashing function to choose a partition from the serialized key bytes
*/
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
return partitionForKey(ByteBuffer.wrap(serializedKey), numPartitions);
}

/*
* Default hashing function to choose a partition from the serialized key ByteBuffer
*/
public static int partitionForKey(final ByteBuffer serializedKey, final int numPartitions) {
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -145,6 +146,15 @@ boolean hasLeaderChangedForTheOngoingRetry() {
* @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
return tryAppend(timestamp, Utils.wrapNullable(key), Utils.wrapNullable(value), headers, callback, now);
}

/**
* Append the record to the current record set and return the relative offset within that record set
*
* @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/
public FutureRecordMetadata tryAppend(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Callback callback, long now) {
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
Expand All @@ -154,8 +164,8 @@ public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value,
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
key == null ? -1 : key.remaining(),
value == null ? -1 : value.remaining(),
Time.SYSTEM);
// we have to keep every future returned to the users in case the batch needs to be
// split to several new batches and resent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;

Expand Down Expand Up @@ -258,7 +259,6 @@ private boolean partitionChanged(String topic,
* Add a record to the accumulator, return the append result
* <p>
* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
* <p>
*
* @param topic The topic to which this record is being sent
* @param partition The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION
Expand All @@ -282,6 +282,37 @@ public RecordAppendResult append(String topic,
long maxTimeToBlock,
long nowMs,
Cluster cluster) throws InterruptedException {
return append(topic, partition, timestamp, Utils.wrapNullable(key), Utils.wrapNullable(value),
headers, callbacks, maxTimeToBlock, nowMs, cluster);
}

/**
* Add a record to the accumulator, return the append result
* <p>
* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
*
* @param topic The topic to which this record is being sent
* @param partition The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION
* if any partition could be used
* @param timestamp The timestamp of the record
* @param key The key for the record as a ByteBuffer
* @param value The value for the record as a ByteBuffer
* @param headers the Headers for the record
* @param callbacks The callbacks to execute
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
* @param nowMs The current time, in milliseconds
* @param cluster The cluster metadata
*/
public RecordAppendResult append(String topic,
int partition,
long timestamp,
ByteBuffer key,
ByteBuffer value,
Header[] headers,
AppendCallbacks callbacks,
long maxTimeToBlock,
long nowMs,
Cluster cluster) throws InterruptedException {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));

// We keep track of the number of appending thread to make sure we do not miss batches in
Expand Down Expand Up @@ -376,8 +407,8 @@ private RecordAppendResult appendNewBatch(String topic,
int partition,
Deque<ProducerBatch> dq,
long timestamp,
byte[] key,
byte[] value,
ByteBuffer key,
ByteBuffer value,
Header[] headers,
AppendCallbacks callbacks,
ByteBuffer buffer,
Expand Down Expand Up @@ -422,7 +453,7 @@ private boolean allBatchesFull(Deque<ProducerBatch> deque) {
* and memory records built) in one of the following cases (whichever comes first): right before send,
* if it is expired, or when the producer is closed.
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
private RecordAppendResult tryAppend(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque, long nowMs) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.header.Headers;

import java.nio.ByteBuffer;

public class BooleanSerializer implements Serializer<Boolean> {

private static final byte TRUE = 0x01;
Expand All @@ -30,4 +34,15 @@ public byte[] serialize(final String topic, final Boolean data) {
data ? TRUE : FALSE
};
}

@Override
public ByteBuffer serializeToByteBuffer(String topic, Headers headers, Boolean data) {
if (data == null)
return null;

ByteBuffer buffer = ByteBuffer.allocate(1);
buffer.put(data ? TRUE : FALSE);
buffer.flip();
return buffer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,19 @@
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Utils;

import java.nio.ByteBuffer;

public class ByteArraySerializer implements Serializer<byte[]> {
@Override
public byte[] serialize(String topic, byte[] data) {
return data;
}

@Override
public ByteBuffer serializeToByteBuffer(String topic, Headers headers, byte[] data) {
return Utils.wrapNullable(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Utils;

import java.nio.ByteBuffer;

/**
Expand All @@ -33,17 +36,15 @@ public byte[] serialize(String topic, ByteBuffer data) {
return null;

data.rewind();
return Utils.toNullableArrayZeroCopy(data);
}

if (data.hasArray()) {
byte[] arr = data.array();
if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
return arr;
}
}
@Override
public ByteBuffer serializeToByteBuffer(String topic, Headers headers, ByteBuffer data) {
if (data == null)
return null;

byte[] ret = new byte[data.remaining()];
data.get(ret, 0, ret.length);
data.rewind();
return ret;
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,24 @@
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Bytes;

import java.nio.ByteBuffer;

public class BytesSerializer implements Serializer<Bytes> {
public byte[] serialize(String topic, Bytes data) {
if (data == null)
return null;

return data.get();
}

@Override
public ByteBuffer serializeToByteBuffer(String topic, Headers headers, Bytes data) {
if (data == null)
return null;

return ByteBuffer.wrap(data.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.header.Headers;

import java.nio.ByteBuffer;

public class DoubleSerializer implements Serializer<Double> {
@Override
public byte[] serialize(String topic, Double data) {
Expand All @@ -34,4 +38,15 @@ public byte[] serialize(String topic, Double data) {
(byte) bits
};
}
}

@Override
public ByteBuffer serializeToByteBuffer(String topic, Headers headers, Double data) {
if (data == null)
return null;

ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putDouble(data);
buffer.flip();
return buffer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.header.Headers;

import java.nio.ByteBuffer;

public class FloatSerializer implements Serializer<Float> {
@Override
public byte[] serialize(final String topic, final Float data) {
Expand All @@ -30,4 +34,15 @@ public byte[] serialize(final String topic, final Float data) {
(byte) bits
};
}
}

@Override
public ByteBuffer serializeToByteBuffer(String topic, Headers headers, Float data) {
if (data == null)
return null;

ByteBuffer buffer = ByteBuffer.allocate(4);
buffer.putFloat(data);
buffer.flip();
return buffer;
}
}
Loading