This commit is contained in:
iliax 2023-08-07 19:25:04 +04:00
parent 02967a990b
commit 28f62f9a9a
8 changed files with 102 additions and 137 deletions

View file

@ -27,26 +27,14 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
return poll(sink, consumer, pollingSettings.getPollTimeout());
}
protected PolledRecords pollSinglePartition(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
return poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
}
protected void buffer(List<ConsumerRecord<Bytes, Bytes>> recs) {
messagesProcessing.buffer(recs);
}
protected void flushBuffer(FluxSink<TopicMessageEventDTO> sink) {
messagesProcessing.flush(sink);
}
protected void sendWithoutBuffer(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
messagesProcessing.sendWithoutBuffer(sink, rec);
}
protected boolean sendLimitReached() {
return messagesProcessing.limitReached();
}
protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
messagesProcessing.send(sink, records);
}
protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
messagesProcessing.sendPhase(sink, name);
}
@ -55,10 +43,6 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
messagesProcessing.sentConsumingInfo(sink, records);
}
protected boolean descendingSendSorting() {
return !messagesProcessing.isAscendingSortBeforeSend();
}
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
messagesProcessing.sendFinishEvent(sink);
sink.complete();

View file

@ -11,7 +11,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
public class BackwardEmitter extends PerPartitionEmitter {
public class BackwardEmitter extends RangePollingEmitter {
public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,

View file

@ -11,7 +11,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
public class ForwardEmitter extends PerPartitionEmitter {
public class ForwardEmitter extends RangePollingEmitter {
public ForwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,

View file

@ -1,5 +1,9 @@
package com.provectus.kafka.ui.emitter;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
@ -7,15 +11,11 @@ import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -28,7 +28,6 @@ import reactor.core.publisher.FluxSink;
class MessagesProcessing {
private final ConsumingStats consumingStats = new ConsumingStats();
private final List<TopicMessageDTO> buffer = new ArrayList<>();
private long sentMessages = 0;
private int filterApplyErrors = 0;
@ -41,71 +40,48 @@ class MessagesProcessing {
return limit != null && sentMessages >= limit;
}
boolean isAscendingSortBeforeSend() {
return ascendingSortBeforeSend;
}
void buffer(List<ConsumerRecord<Bytes, Bytes>> polled) {
polled.forEach(rec -> {
if (!limitReached()) {
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
try {
if (filter.test(topicMessage)) {
buffer.add(topicMessage);
sentMessages++;
}
} catch (Exception e) {
filterApplyErrors++;
log.trace("Error applying filter for message {}", topicMessage);
}
}
});
}
@VisibleForTesting
static Stream<TopicMessageDTO> sorted(List<TopicMessageDTO> buffer, boolean asc) {
Comparator<TopicMessageDTO> offsetComparator = asc
? Comparator.comparingLong(TopicMessageDTO::getOffset)
: Comparator.comparingLong(TopicMessageDTO::getOffset).reversed();
static Iterable<ConsumerRecord<Bytes, Bytes>> sorted(Iterable<ConsumerRecord<Bytes, Bytes>> records, boolean asc) {
Comparator<ConsumerRecord> offsetComparator = asc
? Comparator.comparingLong(ConsumerRecord::offset)
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed();
Comparator<TopicMessageDTO> tsComparator = asc
? Comparator.comparing(TopicMessageDTO::getTimestamp)
: Comparator.comparing(TopicMessageDTO::getTimestamp).reversed();
Comparator<ConsumerRecord> tsComparator = asc
? Comparator.comparing(ConsumerRecord::timestamp)
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
TreeMap<Integer, List<TopicMessageDTO>> perPartition = buffer.stream()
TreeMap<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
.collect(
Collectors.groupingBy(
TopicMessageDTO::getPartition,
groupingBy(
ConsumerRecord::partition,
TreeMap::new,
Collectors.collectingAndThen(
Collectors.toList(),
collectingAndThen(
toList(),
lst -> lst.stream().sorted(offsetComparator).toList())));
return Streams.stream(
Iterables.mergeSorted(
perPartition.values(),
tsComparator
)
);
return Iterables.mergeSorted(perPartition.values(), tsComparator);
}
void flush(FluxSink<TopicMessageEventDTO> sink) {
sorted(buffer, ascendingSortBeforeSend)
.forEach(topicMessage -> {
if (!sink.isCancelled()) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.MESSAGE)
.message(topicMessage)
);
void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> polled) {
sorted(polled, ascendingSortBeforeSend)
.forEach(rec -> {
if (!limitReached() && !sink.isCancelled()) {
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
try {
if (filter.test(topicMessage)) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.MESSAGE)
.message(topicMessage)
);
sentMessages++;
}
} catch (Exception e) {
filterApplyErrors++;
log.trace("Error applying filter for message {}", topicMessage);
}
}
});
buffer.clear();
}
void sendWithoutBuffer(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
buffer(List.of(rec));
flush(sink);
}
void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {

View file

@ -3,7 +3,6 @@ package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.TreeMap;
import java.util.function.Supplier;
@ -15,13 +14,13 @@ import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
public abstract class PerPartitionEmitter extends AbstractEmitter {
public abstract class RangePollingEmitter extends AbstractEmitter {
private final Supplier<EnhancedConsumer> consumerSupplier;
protected final ConsumerPosition consumerPosition;
protected final int messagesPerPage;
protected PerPartitionEmitter(Supplier<EnhancedConsumer> consumerSupplier,
protected RangePollingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
MessagesProcessing messagesProcessing,
@ -48,19 +47,13 @@ public abstract class PerPartitionEmitter extends AbstractEmitter {
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Consumer created");
var seekOperations = SeekOperations.create(consumer, consumerPosition);
TreeMap<TopicPartition, FromToOffset> readRange = nextPollingRange(new TreeMap<>(), seekOperations);
log.debug("Starting from offsets {}", readRange);
TreeMap<TopicPartition, FromToOffset> pollRange = nextPollingRange(new TreeMap<>(), seekOperations);
log.debug("Starting from offsets {}", pollRange);
while (!sink.isCancelled() && !readRange.isEmpty() && !sendLimitReached()) {
readRange.forEach((tp, fromTo) -> {
if (sink.isCancelled()) {
return; //fast return in case of sink cancellation
}
var polled = partitionPollIteration(tp, fromTo.from, fromTo.to, consumer, sink);
buffer(polled);
});
flushBuffer(sink);
readRange = nextPollingRange(readRange, seekOperations);
while (!sink.isCancelled() && !pollRange.isEmpty() && !sendLimitReached()) {
var polled = poll(consumer, sink, pollRange);
send(sink, polled);
pollRange = nextPollingRange(pollRange, seekOperations);
}
if (sink.isCancelled()) {
log.debug("Polling finished due to sink cancellation");
@ -76,30 +69,29 @@ public abstract class PerPartitionEmitter extends AbstractEmitter {
}
}
private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(TopicPartition tp,
long fromOffset, //inclusive
long toOffset, //exclusive
EnhancedConsumer consumer,
FluxSink<TopicMessageEventDTO> sink) {
consumer.assign(List.of(tp));
consumer.seek(tp, fromOffset);
sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset));
private List<ConsumerRecord<Bytes, Bytes>> poll(EnhancedConsumer consumer,
FluxSink<TopicMessageEventDTO> sink,
TreeMap<TopicPartition, FromToOffset> range) {
log.trace("Polling range {}", range);
var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
while (!sink.isCancelled()
&& !sendLimitReached()
&& consumer.position(tp) < toOffset) {
sendPhase(sink, String.format("Polling partitions: %s", range.keySet()));
consumer.assign(range.keySet());
range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));
var polledRecords = pollSinglePartition(sink, consumer);
log.debug("{} records polled from {}", polledRecords.count(), tp);
List<ConsumerRecord<Bytes, Bytes>> result = new ArrayList<>();
while (!sink.isCancelled() && consumer.paused().size() < range.size()) {
var polledRecords = poll(sink, consumer);
range.forEach((tp, fromTo) -> {
polledRecords.records(tp).stream()
.filter(r -> r.offset() < fromTo.to)
.forEach(result::add);
polledRecords.records(tp).stream()
.filter(r -> r.offset() < toOffset)
.forEach(recordsToSend::add);
if (consumer.position(tp) >= fromTo.to) {
consumer.pause(List.of(tp));
}
});
}
if (descendingSendSorting()) {
Collections.reverse(recordsToSend);
}
return recordsToSend;
consumer.resume(consumer.paused());
return result;
}
}

View file

@ -35,7 +35,7 @@ public class TailingEmitter extends AbstractEmitter {
while (!sink.isCancelled()) {
sendPhase(sink, "Polling");
var polled = poll(sink, consumer);
polled.forEach(r -> sendWithoutBuffer(sink, r));
send(sink, polled);
}
sink.complete();
log.debug("Tailing finished");

View file

@ -2,27 +2,32 @@ package com.provectus.kafka.ui.emitter;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.RepeatedTest;
class MessagesProcessingTest {
@RepeatedTest(5)
void testSortingAsc() {
var messagesInOrder = List.of(
new TopicMessageDTO(1, 100L, OffsetDateTime.parse("1999-01-01T00:00:00+00:00")),
new TopicMessageDTO(0, 0L, OffsetDateTime.parse("2000-01-01T00:00:00+00:00")),
new TopicMessageDTO(1, 200L, OffsetDateTime.parse("2000-01-05T00:00:00+00:00")),
new TopicMessageDTO(0, 10L, OffsetDateTime.parse("2000-01-10T00:00:00+00:00")),
new TopicMessageDTO(0, 20L, OffsetDateTime.parse("2000-01-20T00:00:00+00:00")),
new TopicMessageDTO(1, 300L, OffsetDateTime.parse("3000-01-01T00:00:00+00:00")),
new TopicMessageDTO(2, 1000L, OffsetDateTime.parse("4000-01-01T00:00:00+00:00")),
new TopicMessageDTO(2, 1001L, OffsetDateTime.parse("2000-01-01T00:00:00+00:00")),
new TopicMessageDTO(2, 1003L, OffsetDateTime.parse("3000-01-01T00:00:00+00:00"))
consumerRecord(1, 100L, "1999-01-01T00:00:00+00:00"),
consumerRecord(0, 0L, "2000-01-01T00:00:00+00:00"),
consumerRecord(1, 200L, "2000-01-05T00:00:00+00:00"),
consumerRecord(0, 10L, "2000-01-10T00:00:00+00:00"),
consumerRecord(0, 20L, "2000-01-20T00:00:00+00:00"),
consumerRecord(1, 300L, "3000-01-01T00:00:00+00:00"),
consumerRecord(2, 1000L, "4000-01-01T00:00:00+00:00"),
consumerRecord(2, 1001L, "2000-01-01T00:00:00+00:00"),
consumerRecord(2, 1003L, "3000-01-01T00:00:00+00:00")
);
var shuffled = new ArrayList<>(messagesInOrder);
@ -32,19 +37,26 @@ class MessagesProcessingTest {
assertThat(sortedList).containsExactlyElementsOf(messagesInOrder);
}
private ConsumerRecord<Bytes, Bytes> consumerRecord(int partition, long offset, String ts) {
return new ConsumerRecord<>(
"topic", partition, offset, OffsetDateTime.parse(ts).toInstant().toEpochMilli(),
TimestampType.CREATE_TIME,
0, 0, null, null, new RecordHeaders(), Optional.empty()
);
}
@RepeatedTest(5)
void testSortingDesc() {
var messagesInOrder = List.of(
new TopicMessageDTO(1, 300L, OffsetDateTime.parse("3000-01-01T00:00:00+00:00")),
new TopicMessageDTO(2, 1003L, OffsetDateTime.parse("3000-01-01T00:00:00+00:00")),
new TopicMessageDTO(0, 20L, OffsetDateTime.parse("2000-01-20T00:00:00+00:00")),
new TopicMessageDTO(0, 10L, OffsetDateTime.parse("2000-01-10T00:00:00+00:00")),
new TopicMessageDTO(1, 200L, OffsetDateTime.parse("2000-01-05T00:00:00+00:00")),
new TopicMessageDTO(0, 0L, OffsetDateTime.parse("2000-01-01T00:00:00+00:00")),
new TopicMessageDTO(2, 1001L, OffsetDateTime.parse("2000-01-01T00:00:00+00:00")),
new TopicMessageDTO(2, 1000L, OffsetDateTime.parse("4000-01-01T00:00:00+00:00")),
new TopicMessageDTO(1, 100L, OffsetDateTime.parse("1999-01-01T00:00:00+00:00"))
consumerRecord(1, 300L, "3000-01-01T00:00:00+00:00"),
consumerRecord(2, 1003L, "3000-01-01T00:00:00+00:00"),
consumerRecord(0, 20L, "2000-01-20T00:00:00+00:00"),
consumerRecord(0, 10L, "2000-01-10T00:00:00+00:00"),
consumerRecord(1, 200L, "2000-01-05T00:00:00+00:00"),
consumerRecord(0, 0L, "2000-01-01T00:00:00+00:00"),
consumerRecord(2, 1001L, "2000-01-01T00:00:00+00:00"),
consumerRecord(2, 1000L, "4000-01-01T00:00:00+00:00"),
consumerRecord(1, 100L, "1999-01-01T00:00:00+00:00")
);
var shuffled = new ArrayList<>(messagesInOrder);

View file

@ -93,6 +93,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
static void cleanup() {
deleteTopic(TOPIC);
deleteTopic(EMPTY_TOPIC);
SENT_RECORDS.clear();
}
private static ConsumerRecordDeserializer createRecordsDeserializer() {