Merge branch 'master' into tarun_samanta/#3944
This commit is contained in:
commit
caf1a5c8af
27 changed files with 583 additions and 495 deletions
2
.github/workflows/frontend.yaml
vendored
2
.github/workflows/frontend.yaml
vendored
|
@ -23,7 +23,7 @@ jobs:
|
|||
# Disabling shallow clone is recommended for improving relevancy of reporting
|
||||
fetch-depth: 0
|
||||
ref: ${{ github.event.pull_request.head.sha }}
|
||||
- uses: pnpm/action-setup@v2.2.4
|
||||
- uses: pnpm/action-setup@v2.4.0
|
||||
with:
|
||||
version: 7.4.0
|
||||
- name: Install node
|
||||
|
|
2
.github/workflows/release.yaml
vendored
2
.github/workflows/release.yaml
vendored
|
@ -34,7 +34,7 @@ jobs:
|
|||
echo "version=${VERSION}" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Upload files to a GitHub release
|
||||
uses: svenstaro/upload-release-action@2.6.1
|
||||
uses: svenstaro/upload-release-action@2.7.0
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
file: kafka-ui-api/target/kafka-ui-api-${{ steps.build.outputs.version }}.jar
|
||||
|
|
|
@ -57,8 +57,6 @@ public class ClustersProperties {
|
|||
@Data
|
||||
public static class PollingProperties {
|
||||
Integer pollTimeoutMs;
|
||||
Integer partitionPollTimeout;
|
||||
Integer noDataEmptyPolls;
|
||||
Integer maxPageSize;
|
||||
Integer defaultPageSize;
|
||||
}
|
||||
|
|
|
@ -1,28 +1,22 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import java.time.Duration;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
public abstract class AbstractEmitter {
|
||||
abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
|
||||
|
||||
private final MessagesProcessing messagesProcessing;
|
||||
protected final PollingSettings pollingSettings;
|
||||
private final PollingSettings pollingSettings;
|
||||
|
||||
protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
|
||||
this.messagesProcessing = messagesProcessing;
|
||||
this.pollingSettings = pollingSettings;
|
||||
}
|
||||
|
||||
protected PolledRecords poll(
|
||||
FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
|
||||
return poll(sink, consumer, pollingSettings.getPollTimeout());
|
||||
}
|
||||
|
||||
protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
|
||||
var records = consumer.pollEnhanced(timeout);
|
||||
protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
|
||||
var records = consumer.pollEnhanced(pollingSettings.getPollTimeout());
|
||||
sendConsuming(sink, records);
|
||||
return records;
|
||||
}
|
||||
|
@ -31,9 +25,8 @@ public abstract class AbstractEmitter {
|
|||
return messagesProcessing.limitReached();
|
||||
}
|
||||
|
||||
protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
|
||||
ConsumerRecord<Bytes, Bytes> msg) {
|
||||
messagesProcessing.sendMsg(sink, msg);
|
||||
protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
|
||||
messagesProcessing.send(sink, records);
|
||||
}
|
||||
|
||||
protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
public class BackwardEmitter extends RangePollingEmitter {
|
||||
|
||||
public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
|
||||
ConsumerPosition consumerPosition,
|
||||
int messagesPerPage,
|
||||
ConsumerRecordDeserializer deserializer,
|
||||
Predicate<TopicMessageDTO> filter,
|
||||
PollingSettings pollingSettings) {
|
||||
super(
|
||||
consumerSupplier,
|
||||
consumerPosition,
|
||||
messagesPerPage,
|
||||
new MessagesProcessing(
|
||||
deserializer,
|
||||
filter,
|
||||
false,
|
||||
messagesPerPage
|
||||
),
|
||||
pollingSettings
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> prevRange,
|
||||
SeekOperations seekOperations) {
|
||||
TreeMap<TopicPartition, Long> readToOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
|
||||
if (prevRange.isEmpty()) {
|
||||
readToOffsets.putAll(seekOperations.getOffsetsForSeek());
|
||||
} else {
|
||||
readToOffsets.putAll(
|
||||
prevRange.entrySet()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().from()))
|
||||
);
|
||||
}
|
||||
|
||||
int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readToOffsets.size());
|
||||
TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
|
||||
readToOffsets.forEach((tp, toOffset) -> {
|
||||
long tpStartOffset = seekOperations.getBeginOffsets().get(tp);
|
||||
if (toOffset > tpStartOffset) {
|
||||
result.put(tp, new FromToOffset(Math.max(tpStartOffset, toOffset - msgsToPollPerPartition), toOffset));
|
||||
}
|
||||
});
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -1,126 +0,0 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Supplier;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
@Slf4j
|
||||
public class BackwardRecordEmitter
|
||||
extends AbstractEmitter
|
||||
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
|
||||
|
||||
private final Supplier<EnhancedConsumer> consumerSupplier;
|
||||
private final ConsumerPosition consumerPosition;
|
||||
private final int messagesPerPage;
|
||||
|
||||
public BackwardRecordEmitter(
|
||||
Supplier<EnhancedConsumer> consumerSupplier,
|
||||
ConsumerPosition consumerPosition,
|
||||
int messagesPerPage,
|
||||
MessagesProcessing messagesProcessing,
|
||||
PollingSettings pollingSettings) {
|
||||
super(messagesProcessing, pollingSettings);
|
||||
this.consumerPosition = consumerPosition;
|
||||
this.messagesPerPage = messagesPerPage;
|
||||
this.consumerSupplier = consumerSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(FluxSink<TopicMessageEventDTO> sink) {
|
||||
log.debug("Starting backward polling for {}", consumerPosition);
|
||||
try (EnhancedConsumer consumer = consumerSupplier.get()) {
|
||||
sendPhase(sink, "Created consumer");
|
||||
|
||||
var seekOperations = SeekOperations.create(consumer, consumerPosition);
|
||||
var readUntilOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
|
||||
readUntilOffsets.putAll(seekOperations.getOffsetsForSeek());
|
||||
|
||||
int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size());
|
||||
log.debug("'Until' offsets for polling: {}", readUntilOffsets);
|
||||
|
||||
while (!sink.isCancelled() && !readUntilOffsets.isEmpty() && !sendLimitReached()) {
|
||||
new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
|
||||
if (sink.isCancelled()) {
|
||||
return; //fast return in case of sink cancellation
|
||||
}
|
||||
long beginOffset = seekOperations.getBeginOffsets().get(tp);
|
||||
long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
|
||||
|
||||
partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
|
||||
.forEach(r -> sendMessage(sink, r));
|
||||
|
||||
if (beginOffset == readFromOffset) {
|
||||
// we fully read this partition -> removing it from polling iterations
|
||||
readUntilOffsets.remove(tp);
|
||||
} else {
|
||||
// updating 'to' offset for next polling iteration
|
||||
readUntilOffsets.put(tp, readFromOffset);
|
||||
}
|
||||
});
|
||||
if (readUntilOffsets.isEmpty()) {
|
||||
log.debug("begin reached after partitions poll iteration");
|
||||
} else if (sink.isCancelled()) {
|
||||
log.debug("sink is cancelled after partitions poll iteration");
|
||||
}
|
||||
}
|
||||
sendFinishStatsAndCompleteSink(sink);
|
||||
log.debug("Polling finished");
|
||||
} catch (InterruptException kafkaInterruptException) {
|
||||
log.debug("Polling finished due to thread interruption");
|
||||
sink.complete();
|
||||
} catch (Exception e) {
|
||||
log.error("Error occurred while consuming records", e);
|
||||
sink.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
|
||||
TopicPartition tp,
|
||||
long fromOffset,
|
||||
long toOffset,
|
||||
EnhancedConsumer consumer,
|
||||
FluxSink<TopicMessageEventDTO> sink
|
||||
) {
|
||||
consumer.assign(Collections.singleton(tp));
|
||||
consumer.seek(tp, fromOffset);
|
||||
sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset));
|
||||
int desiredMsgsToPoll = (int) (toOffset - fromOffset);
|
||||
|
||||
var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
|
||||
|
||||
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
|
||||
while (!sink.isCancelled()
|
||||
&& !sendLimitReached()
|
||||
&& recordsToSend.size() < desiredMsgsToPoll
|
||||
&& !emptyPolls.noDataEmptyPollsReached()) {
|
||||
var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
|
||||
emptyPolls.count(polledRecords.count());
|
||||
|
||||
log.debug("{} records polled from {}", polledRecords.count(), tp);
|
||||
|
||||
var filteredRecords = polledRecords.records(tp).stream()
|
||||
.filter(r -> r.offset() < toOffset)
|
||||
.toList();
|
||||
|
||||
if (polledRecords.count() > 0 && filteredRecords.isEmpty()) {
|
||||
// we already read all messages in target offsets interval
|
||||
break;
|
||||
}
|
||||
recordsToSend.addAll(filteredRecords);
|
||||
}
|
||||
log.debug("{} records to send", recordsToSend.size());
|
||||
Collections.reverse(recordsToSend);
|
||||
return recordsToSend;
|
||||
}
|
||||
}
|
|
@ -9,35 +9,37 @@ class ConsumingStats {
|
|||
private long bytes = 0;
|
||||
private int records = 0;
|
||||
private long elapsed = 0;
|
||||
private int filterApplyErrors = 0;
|
||||
|
||||
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
|
||||
PolledRecords polledRecords,
|
||||
int filterApplyErrors) {
|
||||
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
|
||||
bytes += polledRecords.bytes();
|
||||
this.records += polledRecords.count();
|
||||
this.elapsed += polledRecords.elapsed().toMillis();
|
||||
records += polledRecords.count();
|
||||
elapsed += polledRecords.elapsed().toMillis();
|
||||
sink.next(
|
||||
new TopicMessageEventDTO()
|
||||
.type(TopicMessageEventDTO.TypeEnum.CONSUMING)
|
||||
.consuming(createConsumingStats(sink, filterApplyErrors))
|
||||
.consuming(createConsumingStats())
|
||||
);
|
||||
}
|
||||
|
||||
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {
|
||||
void incFilterApplyError() {
|
||||
filterApplyErrors++;
|
||||
}
|
||||
|
||||
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
|
||||
sink.next(
|
||||
new TopicMessageEventDTO()
|
||||
.type(TopicMessageEventDTO.TypeEnum.DONE)
|
||||
.consuming(createConsumingStats(sink, filterApplyErrors))
|
||||
.consuming(createConsumingStats())
|
||||
);
|
||||
}
|
||||
|
||||
private TopicMessageConsumingDTO createConsumingStats(FluxSink<TopicMessageEventDTO> sink,
|
||||
int filterApplyErrors) {
|
||||
private TopicMessageConsumingDTO createConsumingStats() {
|
||||
return new TopicMessageConsumingDTO()
|
||||
.bytesConsumed(this.bytes)
|
||||
.elapsedMs(this.elapsed)
|
||||
.isCancelled(sink.isCancelled())
|
||||
.bytesConsumed(bytes)
|
||||
.elapsedMs(elapsed)
|
||||
.isCancelled(false)
|
||||
.filterApplyErrors(filterApplyErrors)
|
||||
.messagesConsumed(this.records);
|
||||
.messagesConsumed(records);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
|
||||
// In some situations it is hard to say whether records range (between two offsets) was fully polled.
|
||||
// This happens when we have holes in records sequences that is usual case for compact topics or
|
||||
// topics with transactional writes. In such cases if you want to poll all records between offsets X and Y
|
||||
// there is no guarantee that you will ever see record with offset Y.
|
||||
// To workaround this we can assume that after N consecutive empty polls all target messages were read.
|
||||
public class EmptyPollsCounter {
|
||||
|
||||
private final int maxEmptyPolls;
|
||||
|
||||
private int emptyPolls = 0;
|
||||
|
||||
EmptyPollsCounter(int maxEmptyPolls) {
|
||||
this.maxEmptyPolls = maxEmptyPolls;
|
||||
}
|
||||
|
||||
public void count(int polledCount) {
|
||||
emptyPolls = polledCount == 0 ? emptyPolls + 1 : 0;
|
||||
}
|
||||
|
||||
public boolean noDataEmptyPollsReached() {
|
||||
return emptyPolls >= maxEmptyPolls;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
public class ForwardEmitter extends RangePollingEmitter {
|
||||
|
||||
public ForwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
|
||||
ConsumerPosition consumerPosition,
|
||||
int messagesPerPage,
|
||||
ConsumerRecordDeserializer deserializer,
|
||||
Predicate<TopicMessageDTO> filter,
|
||||
PollingSettings pollingSettings) {
|
||||
super(
|
||||
consumerSupplier,
|
||||
consumerPosition,
|
||||
messagesPerPage,
|
||||
new MessagesProcessing(
|
||||
deserializer,
|
||||
filter,
|
||||
true,
|
||||
messagesPerPage
|
||||
),
|
||||
pollingSettings
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> prevRange,
|
||||
SeekOperations seekOperations) {
|
||||
TreeMap<TopicPartition, Long> readFromOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
|
||||
if (prevRange.isEmpty()) {
|
||||
readFromOffsets.putAll(seekOperations.getOffsetsForSeek());
|
||||
} else {
|
||||
readFromOffsets.putAll(
|
||||
prevRange.entrySet()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().to()))
|
||||
);
|
||||
}
|
||||
|
||||
int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readFromOffsets.size());
|
||||
TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
|
||||
readFromOffsets.forEach((tp, fromOffset) -> {
|
||||
long tpEndOffset = seekOperations.getEndOffsets().get(tp);
|
||||
if (fromOffset < tpEndOffset) {
|
||||
result.put(tp, new FromToOffset(fromOffset, Math.min(tpEndOffset, fromOffset + msgsToPollPerPartition)));
|
||||
}
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,64 +0,0 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import java.util.function.Supplier;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
@Slf4j
|
||||
public class ForwardRecordEmitter
|
||||
extends AbstractEmitter
|
||||
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
|
||||
|
||||
private final Supplier<EnhancedConsumer> consumerSupplier;
|
||||
private final ConsumerPosition position;
|
||||
|
||||
public ForwardRecordEmitter(
|
||||
Supplier<EnhancedConsumer> consumerSupplier,
|
||||
ConsumerPosition position,
|
||||
MessagesProcessing messagesProcessing,
|
||||
PollingSettings pollingSettings) {
|
||||
super(messagesProcessing, pollingSettings);
|
||||
this.position = position;
|
||||
this.consumerSupplier = consumerSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(FluxSink<TopicMessageEventDTO> sink) {
|
||||
log.debug("Starting forward polling for {}", position);
|
||||
try (EnhancedConsumer consumer = consumerSupplier.get()) {
|
||||
sendPhase(sink, "Assigning partitions");
|
||||
var seekOperations = SeekOperations.create(consumer, position);
|
||||
seekOperations.assignAndSeekNonEmptyPartitions();
|
||||
|
||||
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
|
||||
while (!sink.isCancelled()
|
||||
&& !sendLimitReached()
|
||||
&& !seekOperations.assignedPartitionsFullyPolled()
|
||||
&& !emptyPolls.noDataEmptyPollsReached()) {
|
||||
|
||||
sendPhase(sink, "Polling");
|
||||
var records = poll(sink, consumer);
|
||||
emptyPolls.count(records.count());
|
||||
|
||||
log.debug("{} records polled", records.count());
|
||||
|
||||
for (ConsumerRecord<Bytes, Bytes> msg : records) {
|
||||
sendMessage(sink, msg);
|
||||
}
|
||||
}
|
||||
sendFinishStatsAndCompleteSink(sink);
|
||||
log.debug("Polling finished");
|
||||
} catch (InterruptException kafkaInterruptException) {
|
||||
log.debug("Polling finished due to thread interruption");
|
||||
sink.complete();
|
||||
} catch (Exception e) {
|
||||
log.error("Error occurred while consuming records", e);
|
||||
sink.error(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,67 +1,75 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import static java.util.stream.Collectors.collectingAndThen;
|
||||
import static java.util.stream.Collectors.groupingBy;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Streams;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
|
||||
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Predicate;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
@Slf4j
|
||||
public class MessagesProcessing {
|
||||
@RequiredArgsConstructor
|
||||
class MessagesProcessing {
|
||||
|
||||
private final ConsumingStats consumingStats = new ConsumingStats();
|
||||
private long sentMessages = 0;
|
||||
private int filterApplyErrors = 0;
|
||||
|
||||
private final ConsumerRecordDeserializer deserializer;
|
||||
private final Predicate<TopicMessageDTO> filter;
|
||||
private final boolean ascendingSortBeforeSend;
|
||||
private final @Nullable Integer limit;
|
||||
|
||||
public MessagesProcessing(ConsumerRecordDeserializer deserializer,
|
||||
Predicate<TopicMessageDTO> filter,
|
||||
@Nullable Integer limit) {
|
||||
this.deserializer = deserializer;
|
||||
this.filter = filter;
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
boolean limitReached() {
|
||||
return limit != null && sentMessages >= limit;
|
||||
}
|
||||
|
||||
void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
|
||||
if (!sink.isCancelled() && !limitReached()) {
|
||||
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
|
||||
try {
|
||||
if (filter.test(topicMessage)) {
|
||||
sink.next(
|
||||
new TopicMessageEventDTO()
|
||||
.type(TopicMessageEventDTO.TypeEnum.MESSAGE)
|
||||
.message(topicMessage)
|
||||
);
|
||||
sentMessages++;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
filterApplyErrors++;
|
||||
log.trace("Error applying filter for message {}", topicMessage);
|
||||
}
|
||||
}
|
||||
void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> polled) {
|
||||
sortForSending(polled, ascendingSortBeforeSend)
|
||||
.forEach(rec -> {
|
||||
if (!limitReached() && !sink.isCancelled()) {
|
||||
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
|
||||
try {
|
||||
if (filter.test(topicMessage)) {
|
||||
sink.next(
|
||||
new TopicMessageEventDTO()
|
||||
.type(TopicMessageEventDTO.TypeEnum.MESSAGE)
|
||||
.message(topicMessage)
|
||||
);
|
||||
sentMessages++;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
consumingStats.incFilterApplyError();
|
||||
log.trace("Error applying filter for message {}", topicMessage);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
|
||||
if (!sink.isCancelled()) {
|
||||
consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);
|
||||
consumingStats.sendConsumingEvt(sink, polledRecords);
|
||||
}
|
||||
}
|
||||
|
||||
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
|
||||
if (!sink.isCancelled()) {
|
||||
consumingStats.sendFinishEvent(sink, filterApplyErrors);
|
||||
consumingStats.sendFinishEvent(sink);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -75,4 +83,30 @@ public class MessagesProcessing {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Sorting by timestamps, BUT requesting that records within same partitions should be ordered by offsets.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRecord<Bytes, Bytes>> records,
|
||||
boolean asc) {
|
||||
Comparator<ConsumerRecord> offsetComparator = asc
|
||||
? Comparator.comparingLong(ConsumerRecord::offset)
|
||||
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed();
|
||||
|
||||
// partition -> sorted by offsets records
|
||||
Map<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
|
||||
.collect(
|
||||
groupingBy(
|
||||
ConsumerRecord::partition,
|
||||
TreeMap::new,
|
||||
collectingAndThen(toList(), lst -> lst.stream().sorted(offsetComparator).toList())));
|
||||
|
||||
Comparator<ConsumerRecord> tsComparator = asc
|
||||
? Comparator.comparing(ConsumerRecord::timestamp)
|
||||
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
|
||||
|
||||
// merge-sorting records from partitions one by one using timestamp comparator
|
||||
return Iterables.mergeSorted(perPartition.values(), tsComparator);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -8,12 +8,13 @@ import java.util.Set;
|
|||
import java.util.stream.Collectors;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.mutable.MutableLong;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
@Slf4j
|
||||
@Getter
|
||||
public class OffsetsInfo {
|
||||
class OffsetsInfo {
|
||||
|
||||
private final Consumer<?, ?> consumer;
|
||||
|
||||
|
@ -23,7 +24,7 @@ public class OffsetsInfo {
|
|||
private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
|
||||
private final Set<TopicPartition> emptyPartitions = new HashSet<>();
|
||||
|
||||
public OffsetsInfo(Consumer<?, ?> consumer, String topic) {
|
||||
OffsetsInfo(Consumer<?, ?> consumer, String topic) {
|
||||
this(consumer,
|
||||
consumer.partitionsFor(topic).stream()
|
||||
.map(pi -> new TopicPartition(topic, pi.partition()))
|
||||
|
@ -31,8 +32,7 @@ public class OffsetsInfo {
|
|||
);
|
||||
}
|
||||
|
||||
public OffsetsInfo(Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> targetPartitions) {
|
||||
OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
|
||||
this.consumer = consumer;
|
||||
this.beginOffsets = consumer.beginningOffsets(targetPartitions);
|
||||
this.endOffsets = consumer.endOffsets(targetPartitions);
|
||||
|
@ -46,8 +46,8 @@ public class OffsetsInfo {
|
|||
});
|
||||
}
|
||||
|
||||
public boolean assignedPartitionsFullyPolled() {
|
||||
for (var tp: consumer.assignment()) {
|
||||
boolean assignedPartitionsFullyPolled() {
|
||||
for (var tp : consumer.assignment()) {
|
||||
Preconditions.checkArgument(endOffsets.containsKey(tp));
|
||||
if (endOffsets.get(tp) > consumer.position(tp)) {
|
||||
return false;
|
||||
|
@ -56,4 +56,10 @@ public class OffsetsInfo {
|
|||
return true;
|
||||
}
|
||||
|
||||
long summaryOffsetsRange() {
|
||||
MutableLong cnt = new MutableLong();
|
||||
nonEmptyPartitions.forEach(tp -> cnt.add(endOffsets.get(tp) - beginOffsets.get(tp)));
|
||||
return cnt.getValue();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -8,13 +8,8 @@ import java.util.function.Supplier;
|
|||
public class PollingSettings {
|
||||
|
||||
private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(1_000);
|
||||
private static final Duration DEFAULT_PARTITION_POLL_TIMEOUT = Duration.ofMillis(200);
|
||||
private static final int DEFAULT_NO_DATA_EMPTY_POLLS = 3;
|
||||
|
||||
private final Duration pollTimeout;
|
||||
private final Duration partitionPollTimeout;
|
||||
private final int notDataEmptyPolls; //see EmptyPollsCounter docs
|
||||
|
||||
private final Supplier<PollingThrottler> throttlerSupplier;
|
||||
|
||||
public static PollingSettings create(ClustersProperties.Cluster cluster,
|
||||
|
@ -26,18 +21,8 @@ public class PollingSettings {
|
|||
? Duration.ofMillis(pollingProps.getPollTimeoutMs())
|
||||
: DEFAULT_POLL_TIMEOUT;
|
||||
|
||||
var partitionPollTimeout = pollingProps.getPartitionPollTimeout() != null
|
||||
? Duration.ofMillis(pollingProps.getPartitionPollTimeout())
|
||||
: Duration.ofMillis(pollTimeout.toMillis() / 5);
|
||||
|
||||
int noDataEmptyPolls = pollingProps.getNoDataEmptyPolls() != null
|
||||
? pollingProps.getNoDataEmptyPolls()
|
||||
: DEFAULT_NO_DATA_EMPTY_POLLS;
|
||||
|
||||
return new PollingSettings(
|
||||
pollTimeout,
|
||||
partitionPollTimeout,
|
||||
noDataEmptyPolls,
|
||||
PollingThrottler.throttlerSupplier(cluster)
|
||||
);
|
||||
}
|
||||
|
@ -45,34 +30,20 @@ public class PollingSettings {
|
|||
public static PollingSettings createDefault() {
|
||||
return new PollingSettings(
|
||||
DEFAULT_POLL_TIMEOUT,
|
||||
DEFAULT_PARTITION_POLL_TIMEOUT,
|
||||
DEFAULT_NO_DATA_EMPTY_POLLS,
|
||||
PollingThrottler::noop
|
||||
);
|
||||
}
|
||||
|
||||
private PollingSettings(Duration pollTimeout,
|
||||
Duration partitionPollTimeout,
|
||||
int notDataEmptyPolls,
|
||||
Supplier<PollingThrottler> throttlerSupplier) {
|
||||
this.pollTimeout = pollTimeout;
|
||||
this.partitionPollTimeout = partitionPollTimeout;
|
||||
this.notDataEmptyPolls = notDataEmptyPolls;
|
||||
this.throttlerSupplier = throttlerSupplier;
|
||||
}
|
||||
|
||||
public EmptyPollsCounter createEmptyPollsCounter() {
|
||||
return new EmptyPollsCounter(notDataEmptyPolls);
|
||||
}
|
||||
|
||||
public Duration getPollTimeout() {
|
||||
return pollTimeout;
|
||||
}
|
||||
|
||||
public Duration getPartitionPollTimeout() {
|
||||
return partitionPollTimeout;
|
||||
}
|
||||
|
||||
public PollingThrottler getPollingThrottler() {
|
||||
return throttlerSupplier.get();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Supplier;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
@Slf4j
|
||||
abstract class RangePollingEmitter extends AbstractEmitter {
|
||||
|
||||
private final Supplier<EnhancedConsumer> consumerSupplier;
|
||||
protected final ConsumerPosition consumerPosition;
|
||||
protected final int messagesPerPage;
|
||||
|
||||
protected RangePollingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
|
||||
ConsumerPosition consumerPosition,
|
||||
int messagesPerPage,
|
||||
MessagesProcessing messagesProcessing,
|
||||
PollingSettings pollingSettings) {
|
||||
super(messagesProcessing, pollingSettings);
|
||||
this.consumerPosition = consumerPosition;
|
||||
this.messagesPerPage = messagesPerPage;
|
||||
this.consumerSupplier = consumerSupplier;
|
||||
}
|
||||
|
||||
protected record FromToOffset(/*inclusive*/ long from, /*exclusive*/ long to) {
|
||||
}
|
||||
|
||||
//should return empty map if polling should be stopped
|
||||
protected abstract TreeMap<TopicPartition, FromToOffset> nextPollingRange(
|
||||
TreeMap<TopicPartition, FromToOffset> prevRange, //empty on start
|
||||
SeekOperations seekOperations
|
||||
);
|
||||
|
||||
@Override
|
||||
public void accept(FluxSink<TopicMessageEventDTO> sink) {
|
||||
log.debug("Starting polling for {}", consumerPosition);
|
||||
try (EnhancedConsumer consumer = consumerSupplier.get()) {
|
||||
sendPhase(sink, "Consumer created");
|
||||
var seekOperations = SeekOperations.create(consumer, consumerPosition);
|
||||
TreeMap<TopicPartition, FromToOffset> pollRange = nextPollingRange(new TreeMap<>(), seekOperations);
|
||||
log.debug("Starting from offsets {}", pollRange);
|
||||
|
||||
while (!sink.isCancelled() && !pollRange.isEmpty() && !sendLimitReached()) {
|
||||
var polled = poll(consumer, sink, pollRange);
|
||||
send(sink, polled);
|
||||
pollRange = nextPollingRange(pollRange, seekOperations);
|
||||
}
|
||||
if (sink.isCancelled()) {
|
||||
log.debug("Polling finished due to sink cancellation");
|
||||
}
|
||||
sendFinishStatsAndCompleteSink(sink);
|
||||
log.debug("Polling finished");
|
||||
} catch (InterruptException kafkaInterruptException) {
|
||||
log.debug("Polling finished due to thread interruption");
|
||||
sink.complete();
|
||||
} catch (Exception e) {
|
||||
log.error("Error occurred while consuming records", e);
|
||||
sink.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<ConsumerRecord<Bytes, Bytes>> poll(EnhancedConsumer consumer,
|
||||
FluxSink<TopicMessageEventDTO> sink,
|
||||
TreeMap<TopicPartition, FromToOffset> range) {
|
||||
log.trace("Polling range {}", range);
|
||||
sendPhase(sink,
|
||||
"Polling partitions: %s".formatted(range.keySet().stream().map(TopicPartition::partition).sorted().toList()));
|
||||
|
||||
consumer.assign(range.keySet());
|
||||
range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));
|
||||
|
||||
List<ConsumerRecord<Bytes, Bytes>> result = new ArrayList<>();
|
||||
while (!sink.isCancelled() && consumer.paused().size() < range.size()) {
|
||||
var polledRecords = poll(sink, consumer);
|
||||
range.forEach((tp, fromTo) -> {
|
||||
polledRecords.records(tp).stream()
|
||||
.filter(r -> r.offset() < fromTo.to)
|
||||
.forEach(result::add);
|
||||
|
||||
//next position is out of target range -> pausing partition
|
||||
if (consumer.position(tp) >= fromTo.to) {
|
||||
consumer.pause(List.of(tp));
|
||||
}
|
||||
});
|
||||
}
|
||||
consumer.resume(consumer.paused());
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -10,17 +10,18 @@ import java.util.stream.Collectors;
|
|||
import javax.annotation.Nullable;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.commons.lang3.mutable.MutableLong;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
|
||||
class SeekOperations {
|
||||
public class SeekOperations {
|
||||
|
||||
private final Consumer<?, ?> consumer;
|
||||
private final OffsetsInfo offsetsInfo;
|
||||
private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
|
||||
|
||||
static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
|
||||
public static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
|
||||
OffsetsInfo offsetsInfo;
|
||||
if (consumerPosition.getSeekTo() == null) {
|
||||
offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
|
||||
|
@ -34,25 +35,37 @@ class SeekOperations {
|
|||
);
|
||||
}
|
||||
|
||||
void assignAndSeekNonEmptyPartitions() {
|
||||
public void assignAndSeekNonEmptyPartitions() {
|
||||
consumer.assign(offsetsForSeek.keySet());
|
||||
offsetsForSeek.forEach(consumer::seek);
|
||||
}
|
||||
|
||||
Map<TopicPartition, Long> getBeginOffsets() {
|
||||
public Map<TopicPartition, Long> getBeginOffsets() {
|
||||
return offsetsInfo.getBeginOffsets();
|
||||
}
|
||||
|
||||
Map<TopicPartition, Long> getEndOffsets() {
|
||||
public Map<TopicPartition, Long> getEndOffsets() {
|
||||
return offsetsInfo.getEndOffsets();
|
||||
}
|
||||
|
||||
boolean assignedPartitionsFullyPolled() {
|
||||
public boolean assignedPartitionsFullyPolled() {
|
||||
return offsetsInfo.assignedPartitionsFullyPolled();
|
||||
}
|
||||
|
||||
// sum of (end - start) offsets for all partitions
|
||||
public long summaryOffsetsRange() {
|
||||
return offsetsInfo.summaryOffsetsRange();
|
||||
}
|
||||
|
||||
// sum of differences between initial consumer seek and current consumer position (across all partitions)
|
||||
public long offsetsProcessedFromSeek() {
|
||||
MutableLong count = new MutableLong();
|
||||
offsetsForSeek.forEach((tp, initialOffset) -> count.add(consumer.position(tp) - initialOffset));
|
||||
return count.getValue();
|
||||
}
|
||||
|
||||
// Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets
|
||||
Map<TopicPartition, Long> getOffsetsForSeek() {
|
||||
public Map<TopicPartition, Long> getOffsetsForSeek() {
|
||||
return offsetsForSeek;
|
||||
}
|
||||
|
||||
|
@ -61,19 +74,19 @@ class SeekOperations {
|
|||
*/
|
||||
@VisibleForTesting
|
||||
static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
|
||||
OffsetsInfo offsetsInfo,
|
||||
SeekTypeDTO seekType,
|
||||
@Nullable Map<TopicPartition, Long> seekTo) {
|
||||
OffsetsInfo offsetsInfo,
|
||||
SeekTypeDTO seekType,
|
||||
@Nullable Map<TopicPartition, Long> seekTo) {
|
||||
switch (seekType) {
|
||||
case LATEST:
|
||||
return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
|
||||
case BEGINNING:
|
||||
return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
|
||||
case OFFSET:
|
||||
Preconditions.checkNotNull(offsetsInfo);
|
||||
Preconditions.checkNotNull(seekTo);
|
||||
return fixOffsets(offsetsInfo, seekTo);
|
||||
case TIMESTAMP:
|
||||
Preconditions.checkNotNull(offsetsInfo);
|
||||
Preconditions.checkNotNull(seekTo);
|
||||
return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
|
@ -100,7 +113,7 @@ class SeekOperations {
|
|||
}
|
||||
|
||||
private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer, OffsetsInfo offsetsInfo,
|
||||
Map<TopicPartition, Long> timestamps) {
|
||||
Map<TopicPartition, Long> timestamps) {
|
||||
timestamps = new HashMap<>(timestamps);
|
||||
timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
|
||||
|
||||
|
|
|
@ -1,25 +1,28 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
|
||||
import java.util.HashMap;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
@Slf4j
|
||||
public class TailingEmitter extends AbstractEmitter
|
||||
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
|
||||
public class TailingEmitter extends AbstractEmitter {
|
||||
|
||||
private final Supplier<EnhancedConsumer> consumerSupplier;
|
||||
private final ConsumerPosition consumerPosition;
|
||||
|
||||
public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
|
||||
ConsumerPosition consumerPosition,
|
||||
MessagesProcessing messagesProcessing,
|
||||
ConsumerRecordDeserializer deserializer,
|
||||
Predicate<TopicMessageDTO> filter,
|
||||
PollingSettings pollingSettings) {
|
||||
super(messagesProcessing, pollingSettings);
|
||||
super(new MessagesProcessing(deserializer, filter, false, null), pollingSettings);
|
||||
this.consumerSupplier = consumerSupplier;
|
||||
this.consumerPosition = consumerPosition;
|
||||
}
|
||||
|
@ -32,7 +35,7 @@ public class TailingEmitter extends AbstractEmitter
|
|||
while (!sink.isCancelled()) {
|
||||
sendPhase(sink, "Polling");
|
||||
var polled = poll(sink, consumer);
|
||||
polled.forEach(r -> sendMessage(sink, r));
|
||||
send(sink, polled);
|
||||
}
|
||||
sink.complete();
|
||||
log.debug("Tailing finished");
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package com.provectus.kafka.ui.serdes;
|
||||
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO.TimestampTypeEnum;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import java.time.Instant;
|
||||
import java.time.OffsetDateTime;
|
||||
|
@ -8,6 +9,7 @@ import java.time.ZoneId;
|
|||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.UnaryOperator;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
|
@ -32,6 +34,8 @@ public class ConsumerRecordDeserializer {
|
|||
private final Serde.Deserializer fallbackKeyDeserializer;
|
||||
private final Serde.Deserializer fallbackValueDeserializer;
|
||||
|
||||
private final UnaryOperator<TopicMessageDTO> masker;
|
||||
|
||||
public TopicMessageDTO deserialize(ConsumerRecord<Bytes, Bytes> rec) {
|
||||
var message = new TopicMessageDTO();
|
||||
fillKey(message, rec);
|
||||
|
@ -47,20 +51,15 @@ public class ConsumerRecordDeserializer {
|
|||
message.setValueSize(getValueSize(rec));
|
||||
message.setHeadersSize(getHeadersSize(rec));
|
||||
|
||||
return message;
|
||||
return masker.apply(message);
|
||||
}
|
||||
|
||||
private static TopicMessageDTO.TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
|
||||
switch (timestampType) {
|
||||
case CREATE_TIME:
|
||||
return TopicMessageDTO.TimestampTypeEnum.CREATE_TIME;
|
||||
case LOG_APPEND_TIME:
|
||||
return TopicMessageDTO.TimestampTypeEnum.LOG_APPEND_TIME;
|
||||
case NO_TIMESTAMP_TYPE:
|
||||
return TopicMessageDTO.TimestampTypeEnum.NO_TIMESTAMP_TYPE;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
|
||||
}
|
||||
private static TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
|
||||
return switch (timestampType) {
|
||||
case CREATE_TIME -> TimestampTypeEnum.CREATE_TIME;
|
||||
case LOG_APPEND_TIME -> TimestampTypeEnum.LOG_APPEND_TIME;
|
||||
case NO_TIMESTAMP_TYPE -> TimestampTypeEnum.NO_TIMESTAMP_TYPE;
|
||||
};
|
||||
}
|
||||
|
||||
private void fillHeaders(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec) {
|
||||
|
|
|
@ -2,6 +2,7 @@ package com.provectus.kafka.ui.serdes.builtin;
|
|||
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.JsonSerializer;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
|
|
|
@ -102,7 +102,8 @@ public class DeserializationService implements Closeable {
|
|||
valueSerde.deserializer(topic, Serde.Target.VALUE),
|
||||
fallbackSerde.getName(),
|
||||
fallbackSerde.deserializer(topic, Serde.Target.KEY),
|
||||
fallbackSerde.deserializer(topic, Serde.Target.VALUE)
|
||||
fallbackSerde.deserializer(topic, Serde.Target.VALUE),
|
||||
cluster.getMasking().getMaskerForTopic(topic)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -2,10 +2,9 @@ package com.provectus.kafka.ui.service;
|
|||
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
|
||||
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
|
||||
import com.provectus.kafka.ui.emitter.BackwardEmitter;
|
||||
import com.provectus.kafka.ui.emitter.ForwardEmitter;
|
||||
import com.provectus.kafka.ui.emitter.MessageFilters;
|
||||
import com.provectus.kafka.ui.emitter.MessagesProcessing;
|
||||
import com.provectus.kafka.ui.emitter.TailingEmitter;
|
||||
import com.provectus.kafka.ui.exception.TopicNotFoundException;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
|
@ -18,7 +17,6 @@ import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
|
|||
import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
|
||||
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
||||
import java.time.Instant;
|
||||
|
@ -45,7 +43,6 @@ import org.apache.kafka.common.TopicPartition;
|
|||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
|
@ -231,54 +228,24 @@ public class MessagesService {
|
|||
@Nullable String keySerde,
|
||||
@Nullable String valueSerde) {
|
||||
|
||||
java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
|
||||
|
||||
var processing = new MessagesProcessing(
|
||||
deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
|
||||
getMsgFilter(query, filterQueryType),
|
||||
seekDirection == SeekDirectionDTO.TAILING ? null : limit
|
||||
);
|
||||
|
||||
if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
|
||||
emitter = new ForwardRecordEmitter(
|
||||
var deserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
|
||||
var filter = getMsgFilter(query, filterQueryType);
|
||||
var emitter = switch (seekDirection) {
|
||||
case FORWARD -> new ForwardEmitter(
|
||||
() -> consumerGroupService.createConsumer(cluster),
|
||||
consumerPosition,
|
||||
processing,
|
||||
cluster.getPollingSettings()
|
||||
consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
|
||||
);
|
||||
} else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
|
||||
emitter = new BackwardRecordEmitter(
|
||||
case BACKWARD -> new BackwardEmitter(
|
||||
() -> consumerGroupService.createConsumer(cluster),
|
||||
consumerPosition,
|
||||
limit,
|
||||
processing,
|
||||
cluster.getPollingSettings()
|
||||
consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
|
||||
);
|
||||
} else {
|
||||
emitter = new TailingEmitter(
|
||||
case TAILING -> new TailingEmitter(
|
||||
() -> consumerGroupService.createConsumer(cluster),
|
||||
consumerPosition,
|
||||
processing,
|
||||
cluster.getPollingSettings()
|
||||
consumerPosition, deserializer, filter, cluster.getPollingSettings()
|
||||
);
|
||||
}
|
||||
return Flux.create(emitter)
|
||||
.map(getDataMasker(cluster, topic))
|
||||
.map(throttleUiPublish(seekDirection));
|
||||
}
|
||||
|
||||
private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
|
||||
var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
|
||||
var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
|
||||
return evt -> {
|
||||
if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
|
||||
return evt;
|
||||
}
|
||||
return evt.message(
|
||||
evt.getMessage()
|
||||
.key(keyMasker.apply(evt.getMessage().getKey()))
|
||||
.content(valMasker.apply(evt.getMessage().getContent())));
|
||||
};
|
||||
return Flux.create(emitter)
|
||||
.map(throttleUiPublish(seekDirection));
|
||||
}
|
||||
|
||||
private Predicate<TopicMessageDTO> getMsgFilter(String query,
|
||||
|
|
|
@ -92,14 +92,12 @@ class AnalysisTasksStore {
|
|||
.result(completedState);
|
||||
}
|
||||
|
||||
@Value
|
||||
@Builder(toBuilder = true)
|
||||
private static class RunningAnalysis {
|
||||
Instant startedAt;
|
||||
double completenessPercent;
|
||||
long msgsScanned;
|
||||
long bytesScanned;
|
||||
Closeable task;
|
||||
private record RunningAnalysis(Instant startedAt,
|
||||
double completenessPercent,
|
||||
long msgsScanned,
|
||||
long bytesScanned,
|
||||
Closeable task) {
|
||||
|
||||
TopicAnalysisProgressDTO toDto() {
|
||||
return new TopicAnalysisProgressDTO()
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
package com.provectus.kafka.ui.service.analyze;
|
||||
|
||||
import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
|
||||
import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING;
|
||||
|
||||
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
|
||||
import com.provectus.kafka.ui.emitter.OffsetsInfo;
|
||||
import com.provectus.kafka.ui.emitter.PollingSettings;
|
||||
import com.provectus.kafka.ui.emitter.SeekOperations;
|
||||
import com.provectus.kafka.ui.exception.TopicAnalysisException;
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
|
||||
import com.provectus.kafka.ui.service.ConsumerGroupService;
|
||||
|
@ -15,16 +16,14 @@ import java.time.Instant;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import org.apache.kafka.common.errors.WakeupException;
|
||||
import org.springframework.stereotype.Component;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
|
||||
|
@ -33,6 +32,14 @@ import reactor.core.scheduler.Schedulers;
|
|||
@RequiredArgsConstructor
|
||||
public class TopicAnalysisService {
|
||||
|
||||
private static final Scheduler SCHEDULER = Schedulers.newBoundedElastic(
|
||||
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
|
||||
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
|
||||
"topic-analysis-tasks",
|
||||
10, //ttl for idle threads (in sec)
|
||||
true //daemon
|
||||
);
|
||||
|
||||
private final AnalysisTasksStore analysisTasksStore = new AnalysisTasksStore();
|
||||
|
||||
private final TopicsService topicsService;
|
||||
|
@ -40,30 +47,18 @@ public class TopicAnalysisService {
|
|||
|
||||
public Mono<Void> analyze(KafkaCluster cluster, String topicName) {
|
||||
return topicsService.getTopicDetails(cluster, topicName)
|
||||
.doOnNext(topic ->
|
||||
startAnalysis(
|
||||
cluster,
|
||||
topicName,
|
||||
topic.getPartitionCount(),
|
||||
topic.getPartitions().values()
|
||||
.stream()
|
||||
.mapToLong(p -> p.getOffsetMax() - p.getOffsetMin())
|
||||
.sum()
|
||||
)
|
||||
).then();
|
||||
.doOnNext(topic -> startAnalysis(cluster, topicName))
|
||||
.then();
|
||||
}
|
||||
|
||||
private synchronized void startAnalysis(KafkaCluster cluster,
|
||||
String topic,
|
||||
int partitionsCnt,
|
||||
long approxNumberOfMsgs) {
|
||||
private synchronized void startAnalysis(KafkaCluster cluster, String topic) {
|
||||
var topicId = new TopicIdentity(cluster, topic);
|
||||
if (analysisTasksStore.isAnalysisInProgress(topicId)) {
|
||||
throw new TopicAnalysisException("Topic is already analyzing");
|
||||
}
|
||||
var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs, cluster.getPollingSettings());
|
||||
var task = new AnalysisTask(cluster, topicId);
|
||||
analysisTasksStore.registerNewTask(topicId, task);
|
||||
Schedulers.boundedElastic().schedule(task);
|
||||
SCHEDULER.schedule(task);
|
||||
}
|
||||
|
||||
public void cancelAnalysis(KafkaCluster cluster, String topicName) {
|
||||
|
@ -79,20 +74,14 @@ public class TopicAnalysisService {
|
|||
private final Instant startedAt = Instant.now();
|
||||
|
||||
private final TopicIdentity topicId;
|
||||
private final int partitionsCnt;
|
||||
private final long approxNumberOfMsgs;
|
||||
private final EmptyPollsCounter emptyPollsCounter;
|
||||
|
||||
private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
|
||||
private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
|
||||
|
||||
private final EnhancedConsumer consumer;
|
||||
|
||||
AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
|
||||
long approxNumberOfMsgs, PollingSettings pollingSettings) {
|
||||
AnalysisTask(KafkaCluster cluster, TopicIdentity topicId) {
|
||||
this.topicId = topicId;
|
||||
this.approxNumberOfMsgs = approxNumberOfMsgs;
|
||||
this.partitionsCnt = partitionsCnt;
|
||||
this.consumer = consumerGroupService.createConsumer(
|
||||
cluster,
|
||||
// to improve polling throughput
|
||||
|
@ -101,7 +90,6 @@ public class TopicAnalysisService {
|
|||
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
|
||||
)
|
||||
);
|
||||
this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -113,23 +101,20 @@ public class TopicAnalysisService {
|
|||
public void run() {
|
||||
try {
|
||||
log.info("Starting {} topic analysis", topicId);
|
||||
var topicPartitions = IntStream.range(0, partitionsCnt)
|
||||
.peek(i -> partitionStats.put(i, new TopicAnalysisStats()))
|
||||
.mapToObj(i -> new TopicPartition(topicId.topicName, i))
|
||||
.collect(Collectors.toList());
|
||||
consumer.partitionsFor(topicId.topicName)
|
||||
.forEach(tp -> partitionStats.put(tp.partition(), new TopicAnalysisStats()));
|
||||
|
||||
consumer.assign(topicPartitions);
|
||||
consumer.seekToBeginning(topicPartitions);
|
||||
var seekOperations = SeekOperations.create(consumer, new ConsumerPosition(BEGINNING, topicId.topicName, null));
|
||||
long summaryOffsetsRange = seekOperations.summaryOffsetsRange();
|
||||
seekOperations.assignAndSeekNonEmptyPartitions();
|
||||
|
||||
var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
|
||||
while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) {
|
||||
while (!seekOperations.assignedPartitionsFullyPolled()) {
|
||||
var polled = consumer.pollEnhanced(Duration.ofSeconds(3));
|
||||
emptyPollsCounter.count(polled.count());
|
||||
polled.forEach(r -> {
|
||||
totalStats.apply(r);
|
||||
partitionStats.get(r.partition()).apply(r);
|
||||
});
|
||||
updateProgress();
|
||||
updateProgress(seekOperations.offsetsProcessedFromSeek(), summaryOffsetsRange);
|
||||
}
|
||||
analysisTasksStore.setAnalysisResult(topicId, startedAt, totalStats, partitionStats);
|
||||
log.info("{} topic analysis finished", topicId);
|
||||
|
@ -145,13 +130,13 @@ public class TopicAnalysisService {
|
|||
}
|
||||
}
|
||||
|
||||
private void updateProgress() {
|
||||
if (totalStats.totalMsgs > 0 && approxNumberOfMsgs != 0) {
|
||||
private void updateProgress(long processedOffsets, long summaryOffsetsRange) {
|
||||
if (processedOffsets > 0 && summaryOffsetsRange != 0) {
|
||||
analysisTasksStore.updateProgress(
|
||||
topicId,
|
||||
totalStats.totalMsgs,
|
||||
totalStats.keysSize.sum + totalStats.valuesSize.sum,
|
||||
Math.min(100.0, (((double) totalStats.totalMsgs) / approxNumberOfMsgs) * 100)
|
||||
Math.min(100.0, (((double) processedOffsets) / summaryOffsetsRange) * 100)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
package com.provectus.kafka.ui.service.masking;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
|
@ -9,6 +7,7 @@ import com.fasterxml.jackson.databind.node.ContainerNode;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import com.provectus.kafka.ui.service.masking.policies.MaskingPolicy;
|
||||
import java.util.List;
|
||||
|
@ -54,7 +53,8 @@ public class DataMasking {
|
|||
Optional.ofNullable(property.getTopicValuesPattern()).map(Pattern::compile).orElse(null),
|
||||
MaskingPolicy.create(property)
|
||||
);
|
||||
}).collect(toList()));
|
||||
}).toList()
|
||||
);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -62,8 +62,17 @@ public class DataMasking {
|
|||
this.masks = masks;
|
||||
}
|
||||
|
||||
public UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
|
||||
var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).collect(toList());
|
||||
public UnaryOperator<TopicMessageDTO> getMaskerForTopic(String topic) {
|
||||
var keyMasker = getMaskingFunction(topic, Serde.Target.KEY);
|
||||
var valMasker = getMaskingFunction(topic, Serde.Target.VALUE);
|
||||
return msg -> msg
|
||||
.key(keyMasker.apply(msg.getKey()))
|
||||
.content(valMasker.apply(msg.getContent()));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
|
||||
var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).toList();
|
||||
if (targetMasks.isEmpty()) {
|
||||
return UnaryOperator.identity();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.header.internals.RecordHeaders;
|
||||
import org.apache.kafka.common.record.TimestampType;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.junit.jupiter.api.RepeatedTest;
|
||||
|
||||
class MessagesProcessingTest {
|
||||
|
||||
|
||||
@RepeatedTest(5)
|
||||
void testSortingAsc() {
|
||||
var messagesInOrder = List.of(
|
||||
consumerRecord(1, 100L, "1999-01-01T00:00:00+00:00"),
|
||||
consumerRecord(0, 0L, "2000-01-01T00:00:00+00:00"),
|
||||
consumerRecord(1, 200L, "2000-01-05T00:00:00+00:00"),
|
||||
consumerRecord(0, 10L, "2000-01-10T00:00:00+00:00"),
|
||||
consumerRecord(0, 20L, "2000-01-20T00:00:00+00:00"),
|
||||
consumerRecord(1, 300L, "3000-01-01T00:00:00+00:00"),
|
||||
consumerRecord(2, 1000L, "4000-01-01T00:00:00+00:00"),
|
||||
consumerRecord(2, 1001L, "2000-01-01T00:00:00+00:00"),
|
||||
consumerRecord(2, 1003L, "3000-01-01T00:00:00+00:00")
|
||||
);
|
||||
|
||||
var shuffled = new ArrayList<>(messagesInOrder);
|
||||
Collections.shuffle(shuffled);
|
||||
|
||||
var sortedList = MessagesProcessing.sortForSending(shuffled, true);
|
||||
assertThat(sortedList).containsExactlyElementsOf(messagesInOrder);
|
||||
}
|
||||
|
||||
@RepeatedTest(5)
|
||||
void testSortingDesc() {
|
||||
var messagesInOrder = List.of(
|
||||
consumerRecord(1, 300L, "3000-01-01T00:00:00+00:00"),
|
||||
consumerRecord(2, 1003L, "3000-01-01T00:00:00+00:00"),
|
||||
consumerRecord(0, 20L, "2000-01-20T00:00:00+00:00"),
|
||||
consumerRecord(0, 10L, "2000-01-10T00:00:00+00:00"),
|
||||
consumerRecord(1, 200L, "2000-01-05T00:00:00+00:00"),
|
||||
consumerRecord(0, 0L, "2000-01-01T00:00:00+00:00"),
|
||||
consumerRecord(2, 1001L, "2000-01-01T00:00:00+00:00"),
|
||||
consumerRecord(2, 1000L, "4000-01-01T00:00:00+00:00"),
|
||||
consumerRecord(1, 100L, "1999-01-01T00:00:00+00:00")
|
||||
);
|
||||
|
||||
var shuffled = new ArrayList<>(messagesInOrder);
|
||||
Collections.shuffle(shuffled);
|
||||
|
||||
var sortedList = MessagesProcessing.sortForSending(shuffled, false);
|
||||
assertThat(sortedList).containsExactlyElementsOf(messagesInOrder);
|
||||
}
|
||||
|
||||
private ConsumerRecord<Bytes, Bytes> consumerRecord(int partition, long offset, String ts) {
|
||||
return new ConsumerRecord<>(
|
||||
"topic", partition, offset, OffsetDateTime.parse(ts).toInstant().toEpochMilli(),
|
||||
TimestampType.CREATE_TIME,
|
||||
0, 0, null, null, new RecordHeaders(), Optional.empty()
|
||||
);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
package com.provectus.kafka.ui.serdes;
|
||||
|
||||
import static com.provectus.kafka.ui.serde.api.DeserializeResult.Type.STRING;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.serde.api.DeserializeResult;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import java.util.Map;
|
||||
import java.util.function.UnaryOperator;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class ConsumerRecordDeserializerTest {
|
||||
|
||||
@Test
|
||||
void dataMaskingAppliedOnDeserializedMessage() {
|
||||
UnaryOperator<TopicMessageDTO> maskerMock = mock();
|
||||
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());
|
||||
|
||||
var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
|
||||
recordDeser.deserialize(new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes())));
|
||||
|
||||
verify(maskerMock).apply(any(TopicMessageDTO.class));
|
||||
}
|
||||
|
||||
}
|
|
@ -7,13 +7,13 @@ import static com.provectus.kafka.ui.model.SeekTypeDTO.TIMESTAMP;
|
|||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import com.provectus.kafka.ui.AbstractIntegrationTest;
|
||||
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
|
||||
import com.provectus.kafka.ui.emitter.BackwardEmitter;
|
||||
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
|
||||
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
|
||||
import com.provectus.kafka.ui.emitter.MessagesProcessing;
|
||||
import com.provectus.kafka.ui.emitter.ForwardEmitter;
|
||||
import com.provectus.kafka.ui.emitter.PollingSettings;
|
||||
import com.provectus.kafka.ui.emitter.PollingThrottler;
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import com.provectus.kafka.ui.producer.KafkaTestProducer;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
|
@ -31,16 +31,15 @@ import java.util.UUID;
|
|||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.Value;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
import org.apache.kafka.common.serialization.BytesDeserializer;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -58,6 +57,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|||
static final String EMPTY_TOPIC = TOPIC + "_empty";
|
||||
static final List<Record> SENT_RECORDS = new ArrayList<>();
|
||||
static final ConsumerRecordDeserializer RECORD_DESERIALIZER = createRecordsDeserializer();
|
||||
static final Predicate<TopicMessageDTO> NOOP_FILTER = m -> true;
|
||||
|
||||
@BeforeAll
|
||||
static void generateMsgs() throws Exception {
|
||||
|
@ -93,6 +93,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|||
static void cleanup() {
|
||||
deleteTopic(TOPIC);
|
||||
deleteTopic(EMPTY_TOPIC);
|
||||
SENT_RECORDS.clear();
|
||||
}
|
||||
|
||||
private static ConsumerRecordDeserializer createRecordsDeserializer() {
|
||||
|
@ -105,28 +106,28 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|||
s.deserializer(null, Serde.Target.VALUE),
|
||||
StringSerde.name(),
|
||||
s.deserializer(null, Serde.Target.KEY),
|
||||
s.deserializer(null, Serde.Target.VALUE)
|
||||
s.deserializer(null, Serde.Target.VALUE),
|
||||
msg -> msg
|
||||
);
|
||||
}
|
||||
|
||||
private MessagesProcessing createMessagesProcessing() {
|
||||
return new MessagesProcessing(RECORD_DESERIALIZER, msg -> true, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
void pollNothingOnEmptyTopic() {
|
||||
var forwardEmitter = new ForwardRecordEmitter(
|
||||
this::createConsumer,
|
||||
new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
|
||||
createMessagesProcessing(),
|
||||
PollingSettings.createDefault()
|
||||
);
|
||||
|
||||
var backwardEmitter = new BackwardRecordEmitter(
|
||||
var forwardEmitter = new ForwardEmitter(
|
||||
this::createConsumer,
|
||||
new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
|
||||
100,
|
||||
createMessagesProcessing(),
|
||||
RECORD_DESERIALIZER,
|
||||
NOOP_FILTER,
|
||||
PollingSettings.createDefault()
|
||||
);
|
||||
|
||||
var backwardEmitter = new BackwardEmitter(
|
||||
this::createConsumer,
|
||||
new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
|
||||
100,
|
||||
RECORD_DESERIALIZER,
|
||||
NOOP_FILTER,
|
||||
PollingSettings.createDefault()
|
||||
);
|
||||
|
||||
|
@ -145,18 +146,21 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|||
|
||||
@Test
|
||||
void pollFullTopicFromBeginning() {
|
||||
var forwardEmitter = new ForwardRecordEmitter(
|
||||
var forwardEmitter = new ForwardEmitter(
|
||||
this::createConsumer,
|
||||
new ConsumerPosition(BEGINNING, TOPIC, null),
|
||||
createMessagesProcessing(),
|
||||
PARTITIONS * MSGS_PER_PARTITION,
|
||||
RECORD_DESERIALIZER,
|
||||
NOOP_FILTER,
|
||||
PollingSettings.createDefault()
|
||||
);
|
||||
|
||||
var backwardEmitter = new BackwardRecordEmitter(
|
||||
var backwardEmitter = new BackwardEmitter(
|
||||
this::createConsumer,
|
||||
new ConsumerPosition(LATEST, TOPIC, null),
|
||||
PARTITIONS * MSGS_PER_PARTITION,
|
||||
createMessagesProcessing(),
|
||||
RECORD_DESERIALIZER,
|
||||
NOOP_FILTER,
|
||||
PollingSettings.createDefault()
|
||||
);
|
||||
|
||||
|
@ -174,18 +178,21 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|||
targetOffsets.put(new TopicPartition(TOPIC, i), offset);
|
||||
}
|
||||
|
||||
var forwardEmitter = new ForwardRecordEmitter(
|
||||
this::createConsumer,
|
||||
new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
|
||||
createMessagesProcessing(),
|
||||
PollingSettings.createDefault()
|
||||
);
|
||||
|
||||
var backwardEmitter = new BackwardRecordEmitter(
|
||||
var forwardEmitter = new ForwardEmitter(
|
||||
this::createConsumer,
|
||||
new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
|
||||
PARTITIONS * MSGS_PER_PARTITION,
|
||||
createMessagesProcessing(),
|
||||
RECORD_DESERIALIZER,
|
||||
NOOP_FILTER,
|
||||
PollingSettings.createDefault()
|
||||
);
|
||||
|
||||
var backwardEmitter = new BackwardEmitter(
|
||||
this::createConsumer,
|
||||
new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
|
||||
PARTITIONS * MSGS_PER_PARTITION,
|
||||
RECORD_DESERIALIZER,
|
||||
NOOP_FILTER,
|
||||
PollingSettings.createDefault()
|
||||
);
|
||||
|
||||
|
@ -219,18 +226,21 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|||
);
|
||||
}
|
||||
|
||||
var forwardEmitter = new ForwardRecordEmitter(
|
||||
this::createConsumer,
|
||||
new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
|
||||
createMessagesProcessing(),
|
||||
PollingSettings.createDefault()
|
||||
);
|
||||
|
||||
var backwardEmitter = new BackwardRecordEmitter(
|
||||
var forwardEmitter = new ForwardEmitter(
|
||||
this::createConsumer,
|
||||
new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
|
||||
PARTITIONS * MSGS_PER_PARTITION,
|
||||
createMessagesProcessing(),
|
||||
RECORD_DESERIALIZER,
|
||||
NOOP_FILTER,
|
||||
PollingSettings.createDefault()
|
||||
);
|
||||
|
||||
var backwardEmitter = new BackwardEmitter(
|
||||
this::createConsumer,
|
||||
new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
|
||||
PARTITIONS * MSGS_PER_PARTITION,
|
||||
RECORD_DESERIALIZER,
|
||||
NOOP_FILTER,
|
||||
PollingSettings.createDefault()
|
||||
);
|
||||
|
||||
|
@ -257,11 +267,12 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|||
targetOffsets.put(new TopicPartition(TOPIC, i), (long) MSGS_PER_PARTITION);
|
||||
}
|
||||
|
||||
var backwardEmitter = new BackwardRecordEmitter(
|
||||
var backwardEmitter = new BackwardEmitter(
|
||||
this::createConsumer,
|
||||
new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
|
||||
numMessages,
|
||||
createMessagesProcessing(),
|
||||
RECORD_DESERIALIZER,
|
||||
NOOP_FILTER,
|
||||
PollingSettings.createDefault()
|
||||
);
|
||||
|
||||
|
@ -283,11 +294,12 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|||
offsets.put(new TopicPartition(TOPIC, i), 0L);
|
||||
}
|
||||
|
||||
var backwardEmitter = new BackwardRecordEmitter(
|
||||
var backwardEmitter = new BackwardEmitter(
|
||||
this::createConsumer,
|
||||
new ConsumerPosition(OFFSET, TOPIC, offsets),
|
||||
100,
|
||||
createMessagesProcessing(),
|
||||
RECORD_DESERIALIZER,
|
||||
NOOP_FILTER,
|
||||
PollingSettings.createDefault()
|
||||
);
|
||||
|
||||
|
|
|
@ -3868,10 +3868,6 @@ components:
|
|||
properties:
|
||||
pollTimeoutMs:
|
||||
type: integer
|
||||
partitionPollTimeout:
|
||||
type: integer
|
||||
noDataEmptyPolls:
|
||||
type: integer
|
||||
maxPageSize:
|
||||
type: integer
|
||||
defaultPageSize:
|
||||
|
|
Loading…
Add table
Reference in a new issue