This commit is contained in:
iliax 2023-08-05 16:58:45 +04:00
parent 0a8d3f1ef4
commit 8d30d14458
11 changed files with 151 additions and 76 deletions

View file

@ -6,7 +6,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
public abstract class AbstractEmitter {
public abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final MessagesProcessing messagesProcessing;
protected final PollingSettings pollingSettings;

View file

@ -16,21 +16,22 @@ import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
public class BackwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
public class BackwardRecordEmitter extends AbstractEmitter {
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition consumerPosition;
private final int messagesPerPage;
private final TimestampsSortedMessageProcessing messagesProcessing;
public BackwardRecordEmitter(
Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
MessagesProcessing messagesProcessing,
TimestampsSortedMessageProcessing messagesProcessing,
PollingSettings pollingSettings) {
super(messagesProcessing, pollingSettings);
this.messagesProcessing = messagesProcessing;
this.consumerPosition = consumerPosition;
this.messagesPerPage = messagesPerPage;
this.consumerSupplier = consumerSupplier;
@ -73,6 +74,7 @@ public class BackwardRecordEmitter
} else if (sink.isCancelled()) {
log.debug("sink is cancelled after partitions poll iteration");
}
messagesProcessing.flush(sink);
}
sendFinishStatsAndCompleteSink(sink);
log.debug("Polling finished");
@ -108,14 +110,9 @@ public class BackwardRecordEmitter
.filter(r -> r.offset() < toOffset)
.toList();
if (polledRecords.count() > 0 && filteredRecords.isEmpty()) {
// we already read all messages in target offsets interval
break;
}
recordsToSend.addAll(filteredRecords);
}
log.debug("{} records to send", recordsToSend.size());
Collections.reverse(recordsToSend);
return recordsToSend;
}
}

View file

@ -6,7 +6,9 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.function.Supplier;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -14,15 +16,12 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
public class ForwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
public class ForwardRecordEmitter extends AbstractEmitter {
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition position;
@ -68,18 +67,39 @@ public class ForwardRecordEmitter
}
}
// @SneakyThrows
// public static void main(String[] args) {
// String topic = "test";
// String topic = "test2tx";
//
// Properties properties = new Properties();
// properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());
//
// try (var producer = new KafkaProducer<>(properties)) {
// for (int i = 0; i < 10; i++) {
// for (int j = 0; j < 30; j++) {
// producer.send(new ProducerRecord<>(topic, (i + 1) + "", "j=" + j + "-" + RandomStringUtils.random(5)));
// producer.initTransactions();
//
// for (int i = 0; i < 5; i++) {
// producer.beginTransaction();
// for (int j = 0; j < 300; j++) {
// producer.send(new ProducerRecord<>(topic, (i + 1) + "", "j=" + j + "-" + RandomStringUtils.random(5)))
// .get();
// }
// producer.abortTransaction();
//
// producer.beginTransaction();
// producer.send(new ProducerRecord<>(topic, (i + 1) + "", "VISIBLE" + "-" + RandomStringUtils.random(5)))
// .get();
// producer.commitTransaction();
//
// producer.beginTransaction();
// for (int j = 0; j < 300; j++) {
// producer.send(
// new ProducerRecord<>(topic, ((i * 10) + 1) + "", "j=" + j + "-" + RandomStringUtils.random(5)))
// .get();
// }
// producer.abortTransaction();
// }
// }
// }

View file

@ -14,13 +14,13 @@ import reactor.core.publisher.FluxSink;
@Slf4j
public class MessagesProcessing {
private final ConsumingStats consumingStats = new ConsumingStats();
private long sentMessages = 0;
private int filterApplyErrors = 0;
protected final ConsumingStats consumingStats = new ConsumingStats();
protected long sentMessages = 0;
protected int filterApplyErrors = 0;
private final ConsumerRecordDeserializer deserializer;
private final Predicate<TopicMessageDTO> filter;
private final @Nullable Integer limit;
protected final ConsumerRecordDeserializer deserializer;
protected final Predicate<TopicMessageDTO> filter;
protected final @Nullable Integer limit;
public MessagesProcessing(ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,

View file

@ -14,13 +14,13 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
class SeekOperations {
public class SeekOperations {
private final Consumer<?, ?> consumer;
private final OffsetsInfo offsetsInfo;
private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
public static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
OffsetsInfo offsetsInfo;
if (consumerPosition.getSeekTo() == null) {
offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
@ -34,25 +34,25 @@ class SeekOperations {
);
}
void assignAndSeekNonEmptyPartitions() {
public void assignAndSeekNonEmptyPartitions() {
consumer.assign(offsetsForSeek.keySet());
offsetsForSeek.forEach(consumer::seek);
}
Map<TopicPartition, Long> getBeginOffsets() {
public Map<TopicPartition, Long> getBeginOffsets() {
return offsetsInfo.getBeginOffsets();
}
Map<TopicPartition, Long> getEndOffsets() {
public Map<TopicPartition, Long> getEndOffsets() {
return offsetsInfo.getEndOffsets();
}
boolean assignedPartitionsFullyPolled() {
public boolean assignedPartitionsFullyPolled() {
return offsetsInfo.assignedPartitionsFullyPolled();
}
// Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets
Map<TopicPartition, Long> getOffsetsForSeek() {
public Map<TopicPartition, Long> getOffsetsForSeek() {
return offsetsForSeek;
}

View file

@ -9,8 +9,7 @@ import org.apache.kafka.common.errors.InterruptException;
import reactor.core.publisher.FluxSink;
@Slf4j
public class TailingEmitter extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
public class TailingEmitter extends AbstractEmitter {
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition consumerPosition;

View file

@ -0,0 +1,71 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.FluxSink;
@Slf4j
public class TimestampsSortedMessageProcessing extends MessagesProcessing {
private final List<TopicMessageDTO> buffer = new ArrayList<>();
public TimestampsSortedMessageProcessing(ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
@Nullable Integer limit) {
super(deserializer, filter, limit);
}
@Override
void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
if (!sink.isCancelled() && !limitReached()) {
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
try {
if (filter.test(topicMessage)) {
buffer.add(topicMessage);
sentMessages++;
}
} catch (Exception e) {
filterApplyErrors++;
log.trace("Error applying filter for message {}", topicMessage);
}
}
}
@Override
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
flush(sink);
super.sendFinishEvent(sink);
}
void flush(FluxSink<TopicMessageEventDTO> sink) {
sorted(buffer)
.forEach(topicMessage ->
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.MESSAGE)
.message(topicMessage)));
buffer.clear();
}
static Stream<TopicMessageDTO> sorted(List<TopicMessageDTO> messages) {
return messages.stream()
.sorted(Comparator.comparingLong(TopicMessageDTO::getOffset).reversed())
.sorted(Comparator.comparingInt(TopicMessageDTO::getPartition))
.sorted((m1, m2) -> {
if (m1.getPartition().equals(m2.getPartition())) {
return 0; //sorting is stable, so it will just keep messages in same order
}
return -m1.getTimestamp().compareTo(m2.getTimestamp());
});
}
}

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.serdes.builtin;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;

View file

@ -2,11 +2,13 @@ package com.provectus.kafka.ui.service;
import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.emitter.AbstractEmitter;
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
import com.provectus.kafka.ui.emitter.MessageFilters;
import com.provectus.kafka.ui.emitter.MessagesProcessing;
import com.provectus.kafka.ui.emitter.TailingEmitter;
import com.provectus.kafka.ui.emitter.TimestampsSortedMessageProcessing;
import com.provectus.kafka.ui.exception.TopicNotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerPosition;
@ -231,37 +233,29 @@ public class MessagesService {
@Nullable String keySerde,
@Nullable String valueSerde) {
java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
var processing = new MessagesProcessing(
deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
getMsgFilter(query, filterQueryType),
seekDirection == SeekDirectionDTO.TAILING ? null : limit
);
if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
emitter = new ForwardRecordEmitter(
var deserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
var filter = getMsgFilter(query, filterQueryType);
AbstractEmitter emitter = switch (seekDirection) {
case FORWARD -> new ForwardRecordEmitter(
() -> consumerGroupService.createConsumer(cluster),
consumerPosition,
processing,
new MessagesProcessing(deserializer, filter, limit),
cluster.getPollingSettings()
);
} else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
emitter = new BackwardRecordEmitter(
case BACKWARD -> new BackwardRecordEmitter(
() -> consumerGroupService.createConsumer(cluster),
consumerPosition,
limit,
processing,
new TimestampsSortedMessageProcessing(deserializer, filter, limit),
cluster.getPollingSettings()
);
} else {
emitter = new TailingEmitter(
case TAILING -> new TailingEmitter(
() -> consumerGroupService.createConsumer(cluster),
consumerPosition,
processing,
new MessagesProcessing(deserializer, filter, null),
cluster.getPollingSettings()
);
}
};
return Flux.create(emitter)
.map(getDataMasker(cluster, topic))
.map(throttleUiPublish(seekDirection));

View file

@ -4,8 +4,11 @@ import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.emitter.OffsetsInfo;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.emitter.SeekOperations;
import com.provectus.kafka.ui.exception.TopicAnalysisException;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
import com.provectus.kafka.ui.service.ConsumerGroupService;
import com.provectus.kafka.ui.service.TopicsService;
@ -44,7 +47,6 @@ public class TopicAnalysisService {
startAnalysis(
cluster,
topicName,
topic.getPartitionCount(),
topic.getPartitions().values()
.stream()
.mapToLong(p -> p.getOffsetMax() - p.getOffsetMin())
@ -55,13 +57,12 @@ public class TopicAnalysisService {
private synchronized void startAnalysis(KafkaCluster cluster,
String topic,
int partitionsCnt,
long approxNumberOfMsgs) {
var topicId = new TopicIdentity(cluster, topic);
if (analysisTasksStore.isAnalysisInProgress(topicId)) {
throw new TopicAnalysisException("Topic is already analyzing");
}
var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs, cluster.getPollingSettings());
var task = new AnalysisTask(cluster, topicId, approxNumberOfMsgs);
analysisTasksStore.registerNewTask(topicId, task);
Schedulers.boundedElastic().schedule(task);
}
@ -79,20 +80,16 @@ public class TopicAnalysisService {
private final Instant startedAt = Instant.now();
private final TopicIdentity topicId;
private final int partitionsCnt;
private final long approxNumberOfMsgs;
private final EmptyPollsCounter emptyPollsCounter;
private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
private final EnhancedConsumer consumer;
AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
long approxNumberOfMsgs, PollingSettings pollingSettings) {
AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, long approxNumberOfMsgs) {
this.topicId = topicId;
this.approxNumberOfMsgs = approxNumberOfMsgs;
this.partitionsCnt = partitionsCnt;
this.consumer = consumerGroupService.createConsumer(
cluster,
// to improve polling throughput
@ -101,7 +98,6 @@ public class TopicAnalysisService {
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
)
);
this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter();
}
@Override
@ -113,18 +109,14 @@ public class TopicAnalysisService {
public void run() {
try {
log.info("Starting {} topic analysis", topicId);
var topicPartitions = IntStream.range(0, partitionsCnt)
.peek(i -> partitionStats.put(i, new TopicAnalysisStats()))
.mapToObj(i -> new TopicPartition(topicId.topicName, i))
.collect(Collectors.toList());
var seekOperations = SeekOperations.create(
consumer,
new ConsumerPosition(SeekTypeDTO.BEGINNING, topicId.topicName, null)
);
seekOperations.assignAndSeekNonEmptyPartitions();
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);
var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) {
while (!seekOperations.assignedPartitionsFullyPolled()) {
var polled = consumer.pollEnhanced(Duration.ofSeconds(3));
emptyPollsCounter.count(polled.count());
polled.forEach(r -> {
totalStats.apply(r);
partitionStats.get(r.partition()).apply(r);

View file

@ -13,6 +13,7 @@ import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
import com.provectus.kafka.ui.emitter.MessagesProcessing;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.emitter.PollingThrottler;
import com.provectus.kafka.ui.emitter.TimestampsSortedMessageProcessing;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.producer.KafkaTestProducer;
@ -126,7 +127,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
this::createConsumer,
new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
100,
createMessagesProcessing(),
new TimestampsSortedMessageProcessing(RECORD_DESERIALIZER, msg -> true, null),
PollingSettings.createDefault()
);
@ -156,7 +157,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
this::createConsumer,
new ConsumerPosition(LATEST, TOPIC, null),
PARTITIONS * MSGS_PER_PARTITION,
createMessagesProcessing(),
new TimestampsSortedMessageProcessing(RECORD_DESERIALIZER, msg -> true, null),
PollingSettings.createDefault()
);
@ -185,7 +186,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
this::createConsumer,
new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
PARTITIONS * MSGS_PER_PARTITION,
createMessagesProcessing(),
new TimestampsSortedMessageProcessing(RECORD_DESERIALIZER, msg -> true, null),
PollingSettings.createDefault()
);
@ -230,7 +231,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
this::createConsumer,
new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
PARTITIONS * MSGS_PER_PARTITION,
createMessagesProcessing(),
new TimestampsSortedMessageProcessing(RECORD_DESERIALIZER, msg -> true, null),
PollingSettings.createDefault()
);
@ -261,7 +262,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
this::createConsumer,
new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
numMessages,
createMessagesProcessing(),
new TimestampsSortedMessageProcessing(RECORD_DESERIALIZER, msg -> true, null),
PollingSettings.createDefault()
);
@ -287,7 +288,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
this::createConsumer,
new ConsumerPosition(OFFSET, TOPIC, offsets),
100,
createMessagesProcessing(),
new TimestampsSortedMessageProcessing(RECORD_DESERIALIZER, msg -> true, null),
PollingSettings.createDefault()
);