Merge branch 'master' into issues/4125

This commit is contained in:
Roman Zabaluev 2023-08-17 16:06:12 +07:00 committed by GitHub
commit 361b03eb15
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 583 additions and 495 deletions

View file

@ -23,7 +23,7 @@ jobs:
# Disabling shallow clone is recommended for improving relevancy of reporting
fetch-depth: 0
ref: ${{ github.event.pull_request.head.sha }}
- uses: pnpm/action-setup@v2.2.4
- uses: pnpm/action-setup@v2.4.0
with:
version: 7.4.0
- name: Install node

View file

@ -34,7 +34,7 @@ jobs:
echo "version=${VERSION}" >> $GITHUB_OUTPUT
- name: Upload files to a GitHub release
uses: svenstaro/upload-release-action@2.6.1
uses: svenstaro/upload-release-action@2.7.0
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
file: kafka-ui-api/target/kafka-ui-api-${{ steps.build.outputs.version }}.jar

View file

@ -57,8 +57,6 @@ public class ClustersProperties {
@Data
public static class PollingProperties {
Integer pollTimeoutMs;
Integer partitionPollTimeout;
Integer noDataEmptyPolls;
Integer maxPageSize;
Integer defaultPageSize;
}

View file

@ -1,28 +1,22 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
public abstract class AbstractEmitter {
abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final MessagesProcessing messagesProcessing;
protected final PollingSettings pollingSettings;
private final PollingSettings pollingSettings;
protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
this.messagesProcessing = messagesProcessing;
this.pollingSettings = pollingSettings;
}
protected PolledRecords poll(
FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
return poll(sink, consumer, pollingSettings.getPollTimeout());
}
protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
var records = consumer.pollEnhanced(timeout);
protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
var records = consumer.pollEnhanced(pollingSettings.getPollTimeout());
sendConsuming(sink, records);
return records;
}
@ -31,9 +25,8 @@ public abstract class AbstractEmitter {
return messagesProcessing.limitReached();
}
protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecord<Bytes, Bytes> msg) {
messagesProcessing.sendMsg(sink, msg);
protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
messagesProcessing.send(sink, records);
}
protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {

View file

@ -0,0 +1,60 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
public class BackwardEmitter extends RangePollingEmitter {
public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) {
super(
consumerSupplier,
consumerPosition,
messagesPerPage,
new MessagesProcessing(
deserializer,
filter,
false,
messagesPerPage
),
pollingSettings
);
}
@Override
protected TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> prevRange,
SeekOperations seekOperations) {
TreeMap<TopicPartition, Long> readToOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
if (prevRange.isEmpty()) {
readToOffsets.putAll(seekOperations.getOffsetsForSeek());
} else {
readToOffsets.putAll(
prevRange.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().from()))
);
}
int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readToOffsets.size());
TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
readToOffsets.forEach((tp, toOffset) -> {
long tpStartOffset = seekOperations.getBeginOffsets().get(tp);
if (toOffset > tpStartOffset) {
result.put(tp, new FromToOffset(Math.max(tpStartOffset, toOffset - msgsToPollPerPartition), toOffset));
}
});
return result;
}
}

View file

@ -1,126 +0,0 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.TreeMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
public class BackwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition consumerPosition;
private final int messagesPerPage;
public BackwardRecordEmitter(
Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
MessagesProcessing messagesProcessing,
PollingSettings pollingSettings) {
super(messagesProcessing, pollingSettings);
this.consumerPosition = consumerPosition;
this.messagesPerPage = messagesPerPage;
this.consumerSupplier = consumerSupplier;
}
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting backward polling for {}", consumerPosition);
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Created consumer");
var seekOperations = SeekOperations.create(consumer, consumerPosition);
var readUntilOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
readUntilOffsets.putAll(seekOperations.getOffsetsForSeek());
int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size());
log.debug("'Until' offsets for polling: {}", readUntilOffsets);
while (!sink.isCancelled() && !readUntilOffsets.isEmpty() && !sendLimitReached()) {
new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
if (sink.isCancelled()) {
return; //fast return in case of sink cancellation
}
long beginOffset = seekOperations.getBeginOffsets().get(tp);
long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
.forEach(r -> sendMessage(sink, r));
if (beginOffset == readFromOffset) {
// we fully read this partition -> removing it from polling iterations
readUntilOffsets.remove(tp);
} else {
// updating 'to' offset for next polling iteration
readUntilOffsets.put(tp, readFromOffset);
}
});
if (readUntilOffsets.isEmpty()) {
log.debug("begin reached after partitions poll iteration");
} else if (sink.isCancelled()) {
log.debug("sink is cancelled after partitions poll iteration");
}
}
sendFinishStatsAndCompleteSink(sink);
log.debug("Polling finished");
} catch (InterruptException kafkaInterruptException) {
log.debug("Polling finished due to thread interruption");
sink.complete();
} catch (Exception e) {
log.error("Error occurred while consuming records", e);
sink.error(e);
}
}
private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
TopicPartition tp,
long fromOffset,
long toOffset,
EnhancedConsumer consumer,
FluxSink<TopicMessageEventDTO> sink
) {
consumer.assign(Collections.singleton(tp));
consumer.seek(tp, fromOffset);
sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset));
int desiredMsgsToPoll = (int) (toOffset - fromOffset);
var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !sendLimitReached()
&& recordsToSend.size() < desiredMsgsToPoll
&& !emptyPolls.noDataEmptyPollsReached()) {
var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
emptyPolls.count(polledRecords.count());
log.debug("{} records polled from {}", polledRecords.count(), tp);
var filteredRecords = polledRecords.records(tp).stream()
.filter(r -> r.offset() < toOffset)
.toList();
if (polledRecords.count() > 0 && filteredRecords.isEmpty()) {
// we already read all messages in target offsets interval
break;
}
recordsToSend.addAll(filteredRecords);
}
log.debug("{} records to send", recordsToSend.size());
Collections.reverse(recordsToSend);
return recordsToSend;
}
}

View file

@ -9,35 +9,37 @@ class ConsumingStats {
private long bytes = 0;
private int records = 0;
private long elapsed = 0;
private int filterApplyErrors = 0;
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
PolledRecords polledRecords,
int filterApplyErrors) {
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
bytes += polledRecords.bytes();
this.records += polledRecords.count();
this.elapsed += polledRecords.elapsed().toMillis();
records += polledRecords.count();
elapsed += polledRecords.elapsed().toMillis();
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.CONSUMING)
.consuming(createConsumingStats(sink, filterApplyErrors))
.consuming(createConsumingStats())
);
}
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {
void incFilterApplyError() {
filterApplyErrors++;
}
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.DONE)
.consuming(createConsumingStats(sink, filterApplyErrors))
.consuming(createConsumingStats())
);
}
private TopicMessageConsumingDTO createConsumingStats(FluxSink<TopicMessageEventDTO> sink,
int filterApplyErrors) {
private TopicMessageConsumingDTO createConsumingStats() {
return new TopicMessageConsumingDTO()
.bytesConsumed(this.bytes)
.elapsedMs(this.elapsed)
.isCancelled(sink.isCancelled())
.bytesConsumed(bytes)
.elapsedMs(elapsed)
.isCancelled(false)
.filterApplyErrors(filterApplyErrors)
.messagesConsumed(this.records);
.messagesConsumed(records);
}
}

View file

@ -1,28 +0,0 @@
package com.provectus.kafka.ui.emitter;
import org.apache.kafka.clients.consumer.ConsumerRecords;
// In some situations it is hard to say whether records range (between two offsets) was fully polled.
// This happens when we have holes in records sequences that is usual case for compact topics or
// topics with transactional writes. In such cases if you want to poll all records between offsets X and Y
// there is no guarantee that you will ever see record with offset Y.
// To workaround this we can assume that after N consecutive empty polls all target messages were read.
public class EmptyPollsCounter {
private final int maxEmptyPolls;
private int emptyPolls = 0;
EmptyPollsCounter(int maxEmptyPolls) {
this.maxEmptyPolls = maxEmptyPolls;
}
public void count(int polledCount) {
emptyPolls = polledCount == 0 ? emptyPolls + 1 : 0;
}
public boolean noDataEmptyPollsReached() {
return emptyPolls >= maxEmptyPolls;
}
}

View file

@ -0,0 +1,61 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
public class ForwardEmitter extends RangePollingEmitter {
public ForwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) {
super(
consumerSupplier,
consumerPosition,
messagesPerPage,
new MessagesProcessing(
deserializer,
filter,
true,
messagesPerPage
),
pollingSettings
);
}
@Override
protected TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> prevRange,
SeekOperations seekOperations) {
TreeMap<TopicPartition, Long> readFromOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
if (prevRange.isEmpty()) {
readFromOffsets.putAll(seekOperations.getOffsetsForSeek());
} else {
readFromOffsets.putAll(
prevRange.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().to()))
);
}
int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readFromOffsets.size());
TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
readFromOffsets.forEach((tp, fromOffset) -> {
long tpEndOffset = seekOperations.getEndOffsets().get(tp);
if (fromOffset < tpEndOffset) {
result.put(tp, new FromToOffset(fromOffset, Math.min(tpEndOffset, fromOffset + msgsToPollPerPartition)));
}
});
return result;
}
}

View file

@ -1,64 +0,0 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
public class ForwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition position;
public ForwardRecordEmitter(
Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition position,
MessagesProcessing messagesProcessing,
PollingSettings pollingSettings) {
super(messagesProcessing, pollingSettings);
this.position = position;
this.consumerSupplier = consumerSupplier;
}
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting forward polling for {}", position);
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Assigning partitions");
var seekOperations = SeekOperations.create(consumer, position);
seekOperations.assignAndSeekNonEmptyPartitions();
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !sendLimitReached()
&& !seekOperations.assignedPartitionsFullyPolled()
&& !emptyPolls.noDataEmptyPollsReached()) {
sendPhase(sink, "Polling");
var records = poll(sink, consumer);
emptyPolls.count(records.count());
log.debug("{} records polled", records.count());
for (ConsumerRecord<Bytes, Bytes> msg : records) {
sendMessage(sink, msg);
}
}
sendFinishStatsAndCompleteSink(sink);
log.debug("Polling finished");
} catch (InterruptException kafkaInterruptException) {
log.debug("Polling finished due to thread interruption");
sink.complete();
} catch (Exception e) {
log.error("Error occurred while consuming records", e);
sink.error(e);
}
}
}

View file

@ -1,67 +1,75 @@
package com.provectus.kafka.ui.emitter;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
public class MessagesProcessing {
@RequiredArgsConstructor
class MessagesProcessing {
private final ConsumingStats consumingStats = new ConsumingStats();
private long sentMessages = 0;
private int filterApplyErrors = 0;
private final ConsumerRecordDeserializer deserializer;
private final Predicate<TopicMessageDTO> filter;
private final boolean ascendingSortBeforeSend;
private final @Nullable Integer limit;
public MessagesProcessing(ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
@Nullable Integer limit) {
this.deserializer = deserializer;
this.filter = filter;
this.limit = limit;
}
boolean limitReached() {
return limit != null && sentMessages >= limit;
}
void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
if (!sink.isCancelled() && !limitReached()) {
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
try {
if (filter.test(topicMessage)) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.MESSAGE)
.message(topicMessage)
);
sentMessages++;
}
} catch (Exception e) {
filterApplyErrors++;
log.trace("Error applying filter for message {}", topicMessage);
}
}
void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> polled) {
sortForSending(polled, ascendingSortBeforeSend)
.forEach(rec -> {
if (!limitReached() && !sink.isCancelled()) {
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
try {
if (filter.test(topicMessage)) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.MESSAGE)
.message(topicMessage)
);
sentMessages++;
}
} catch (Exception e) {
consumingStats.incFilterApplyError();
log.trace("Error applying filter for message {}", topicMessage);
}
}
});
}
void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
if (!sink.isCancelled()) {
consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);
consumingStats.sendConsumingEvt(sink, polledRecords);
}
}
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
if (!sink.isCancelled()) {
consumingStats.sendFinishEvent(sink, filterApplyErrors);
consumingStats.sendFinishEvent(sink);
}
}
@ -75,4 +83,30 @@ public class MessagesProcessing {
}
}
/*
* Sorting by timestamps, BUT requesting that records within same partitions should be ordered by offsets.
*/
@VisibleForTesting
static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRecord<Bytes, Bytes>> records,
boolean asc) {
Comparator<ConsumerRecord> offsetComparator = asc
? Comparator.comparingLong(ConsumerRecord::offset)
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed();
// partition -> sorted by offsets records
Map<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
.collect(
groupingBy(
ConsumerRecord::partition,
TreeMap::new,
collectingAndThen(toList(), lst -> lst.stream().sorted(offsetComparator).toList())));
Comparator<ConsumerRecord> tsComparator = asc
? Comparator.comparing(ConsumerRecord::timestamp)
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
// merge-sorting records from partitions one by one using timestamp comparator
return Iterables.mergeSorted(perPartition.values(), tsComparator);
}
}

View file

@ -8,12 +8,13 @@ import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
@Slf4j
@Getter
public class OffsetsInfo {
class OffsetsInfo {
private final Consumer<?, ?> consumer;
@ -23,7 +24,7 @@ public class OffsetsInfo {
private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
private final Set<TopicPartition> emptyPartitions = new HashSet<>();
public OffsetsInfo(Consumer<?, ?> consumer, String topic) {
OffsetsInfo(Consumer<?, ?> consumer, String topic) {
this(consumer,
consumer.partitionsFor(topic).stream()
.map(pi -> new TopicPartition(topic, pi.partition()))
@ -31,8 +32,7 @@ public class OffsetsInfo {
);
}
public OffsetsInfo(Consumer<?, ?> consumer,
Collection<TopicPartition> targetPartitions) {
OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
this.consumer = consumer;
this.beginOffsets = consumer.beginningOffsets(targetPartitions);
this.endOffsets = consumer.endOffsets(targetPartitions);
@ -46,8 +46,8 @@ public class OffsetsInfo {
});
}
public boolean assignedPartitionsFullyPolled() {
for (var tp: consumer.assignment()) {
boolean assignedPartitionsFullyPolled() {
for (var tp : consumer.assignment()) {
Preconditions.checkArgument(endOffsets.containsKey(tp));
if (endOffsets.get(tp) > consumer.position(tp)) {
return false;
@ -56,4 +56,10 @@ public class OffsetsInfo {
return true;
}
long summaryOffsetsRange() {
MutableLong cnt = new MutableLong();
nonEmptyPartitions.forEach(tp -> cnt.add(endOffsets.get(tp) - beginOffsets.get(tp)));
return cnt.getValue();
}
}

View file

@ -8,13 +8,8 @@ import java.util.function.Supplier;
public class PollingSettings {
private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(1_000);
private static final Duration DEFAULT_PARTITION_POLL_TIMEOUT = Duration.ofMillis(200);
private static final int DEFAULT_NO_DATA_EMPTY_POLLS = 3;
private final Duration pollTimeout;
private final Duration partitionPollTimeout;
private final int notDataEmptyPolls; //see EmptyPollsCounter docs
private final Supplier<PollingThrottler> throttlerSupplier;
public static PollingSettings create(ClustersProperties.Cluster cluster,
@ -26,18 +21,8 @@ public class PollingSettings {
? Duration.ofMillis(pollingProps.getPollTimeoutMs())
: DEFAULT_POLL_TIMEOUT;
var partitionPollTimeout = pollingProps.getPartitionPollTimeout() != null
? Duration.ofMillis(pollingProps.getPartitionPollTimeout())
: Duration.ofMillis(pollTimeout.toMillis() / 5);
int noDataEmptyPolls = pollingProps.getNoDataEmptyPolls() != null
? pollingProps.getNoDataEmptyPolls()
: DEFAULT_NO_DATA_EMPTY_POLLS;
return new PollingSettings(
pollTimeout,
partitionPollTimeout,
noDataEmptyPolls,
PollingThrottler.throttlerSupplier(cluster)
);
}
@ -45,34 +30,20 @@ public class PollingSettings {
public static PollingSettings createDefault() {
return new PollingSettings(
DEFAULT_POLL_TIMEOUT,
DEFAULT_PARTITION_POLL_TIMEOUT,
DEFAULT_NO_DATA_EMPTY_POLLS,
PollingThrottler::noop
);
}
private PollingSettings(Duration pollTimeout,
Duration partitionPollTimeout,
int notDataEmptyPolls,
Supplier<PollingThrottler> throttlerSupplier) {
this.pollTimeout = pollTimeout;
this.partitionPollTimeout = partitionPollTimeout;
this.notDataEmptyPolls = notDataEmptyPolls;
this.throttlerSupplier = throttlerSupplier;
}
public EmptyPollsCounter createEmptyPollsCounter() {
return new EmptyPollsCounter(notDataEmptyPolls);
}
public Duration getPollTimeout() {
return pollTimeout;
}
public Duration getPartitionPollTimeout() {
return partitionPollTimeout;
}
public PollingThrottler getPollingThrottler() {
return throttlerSupplier.get();
}

View file

@ -0,0 +1,98 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
abstract class RangePollingEmitter extends AbstractEmitter {
private final Supplier<EnhancedConsumer> consumerSupplier;
protected final ConsumerPosition consumerPosition;
protected final int messagesPerPage;
protected RangePollingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
MessagesProcessing messagesProcessing,
PollingSettings pollingSettings) {
super(messagesProcessing, pollingSettings);
this.consumerPosition = consumerPosition;
this.messagesPerPage = messagesPerPage;
this.consumerSupplier = consumerSupplier;
}
protected record FromToOffset(/*inclusive*/ long from, /*exclusive*/ long to) {
}
//should return empty map if polling should be stopped
protected abstract TreeMap<TopicPartition, FromToOffset> nextPollingRange(
TreeMap<TopicPartition, FromToOffset> prevRange, //empty on start
SeekOperations seekOperations
);
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting polling for {}", consumerPosition);
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Consumer created");
var seekOperations = SeekOperations.create(consumer, consumerPosition);
TreeMap<TopicPartition, FromToOffset> pollRange = nextPollingRange(new TreeMap<>(), seekOperations);
log.debug("Starting from offsets {}", pollRange);
while (!sink.isCancelled() && !pollRange.isEmpty() && !sendLimitReached()) {
var polled = poll(consumer, sink, pollRange);
send(sink, polled);
pollRange = nextPollingRange(pollRange, seekOperations);
}
if (sink.isCancelled()) {
log.debug("Polling finished due to sink cancellation");
}
sendFinishStatsAndCompleteSink(sink);
log.debug("Polling finished");
} catch (InterruptException kafkaInterruptException) {
log.debug("Polling finished due to thread interruption");
sink.complete();
} catch (Exception e) {
log.error("Error occurred while consuming records", e);
sink.error(e);
}
}
private List<ConsumerRecord<Bytes, Bytes>> poll(EnhancedConsumer consumer,
FluxSink<TopicMessageEventDTO> sink,
TreeMap<TopicPartition, FromToOffset> range) {
log.trace("Polling range {}", range);
sendPhase(sink,
"Polling partitions: %s".formatted(range.keySet().stream().map(TopicPartition::partition).sorted().toList()));
consumer.assign(range.keySet());
range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));
List<ConsumerRecord<Bytes, Bytes>> result = new ArrayList<>();
while (!sink.isCancelled() && consumer.paused().size() < range.size()) {
var polledRecords = poll(sink, consumer);
range.forEach((tp, fromTo) -> {
polledRecords.records(tp).stream()
.filter(r -> r.offset() < fromTo.to)
.forEach(result::add);
//next position is out of target range -> pausing partition
if (consumer.position(tp) >= fromTo.to) {
consumer.pause(List.of(tp));
}
});
}
consumer.resume(consumer.paused());
return result;
}
}

View file

@ -10,17 +10,18 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
class SeekOperations {
public class SeekOperations {
private final Consumer<?, ?> consumer;
private final OffsetsInfo offsetsInfo;
private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
public static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
OffsetsInfo offsetsInfo;
if (consumerPosition.getSeekTo() == null) {
offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
@ -34,25 +35,37 @@ class SeekOperations {
);
}
void assignAndSeekNonEmptyPartitions() {
public void assignAndSeekNonEmptyPartitions() {
consumer.assign(offsetsForSeek.keySet());
offsetsForSeek.forEach(consumer::seek);
}
Map<TopicPartition, Long> getBeginOffsets() {
public Map<TopicPartition, Long> getBeginOffsets() {
return offsetsInfo.getBeginOffsets();
}
Map<TopicPartition, Long> getEndOffsets() {
public Map<TopicPartition, Long> getEndOffsets() {
return offsetsInfo.getEndOffsets();
}
boolean assignedPartitionsFullyPolled() {
public boolean assignedPartitionsFullyPolled() {
return offsetsInfo.assignedPartitionsFullyPolled();
}
// sum of (end - start) offsets for all partitions
public long summaryOffsetsRange() {
return offsetsInfo.summaryOffsetsRange();
}
// sum of differences between initial consumer seek and current consumer position (across all partitions)
public long offsetsProcessedFromSeek() {
MutableLong count = new MutableLong();
offsetsForSeek.forEach((tp, initialOffset) -> count.add(consumer.position(tp) - initialOffset));
return count.getValue();
}
// Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets
Map<TopicPartition, Long> getOffsetsForSeek() {
public Map<TopicPartition, Long> getOffsetsForSeek() {
return offsetsForSeek;
}
@ -61,19 +74,19 @@ class SeekOperations {
*/
@VisibleForTesting
static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
OffsetsInfo offsetsInfo,
SeekTypeDTO seekType,
@Nullable Map<TopicPartition, Long> seekTo) {
OffsetsInfo offsetsInfo,
SeekTypeDTO seekType,
@Nullable Map<TopicPartition, Long> seekTo) {
switch (seekType) {
case LATEST:
return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
case BEGINNING:
return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
case OFFSET:
Preconditions.checkNotNull(offsetsInfo);
Preconditions.checkNotNull(seekTo);
return fixOffsets(offsetsInfo, seekTo);
case TIMESTAMP:
Preconditions.checkNotNull(offsetsInfo);
Preconditions.checkNotNull(seekTo);
return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
default:
throw new IllegalStateException();
@ -100,7 +113,7 @@ class SeekOperations {
}
private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer, OffsetsInfo offsetsInfo,
Map<TopicPartition, Long> timestamps) {
Map<TopicPartition, Long> timestamps) {
timestamps = new HashMap<>(timestamps);
timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());

View file

@ -1,25 +1,28 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.HashMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.errors.InterruptException;
import reactor.core.publisher.FluxSink;
@Slf4j
public class TailingEmitter extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
public class TailingEmitter extends AbstractEmitter {
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition consumerPosition;
public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
MessagesProcessing messagesProcessing,
ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) {
super(messagesProcessing, pollingSettings);
super(new MessagesProcessing(deserializer, filter, false, null), pollingSettings);
this.consumerSupplier = consumerSupplier;
this.consumerPosition = consumerPosition;
}
@ -32,7 +35,7 @@ public class TailingEmitter extends AbstractEmitter
while (!sink.isCancelled()) {
sendPhase(sink, "Polling");
var polled = poll(sink, consumer);
polled.forEach(r -> sendMessage(sink, r));
send(sink, polled);
}
sink.complete();
log.debug("Tailing finished");

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.serdes;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageDTO.TimestampTypeEnum;
import com.provectus.kafka.ui.serde.api.Serde;
import java.time.Instant;
import java.time.OffsetDateTime;
@ -8,6 +9,7 @@ import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.UnaryOperator;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -32,6 +34,8 @@ public class ConsumerRecordDeserializer {
private final Serde.Deserializer fallbackKeyDeserializer;
private final Serde.Deserializer fallbackValueDeserializer;
private final UnaryOperator<TopicMessageDTO> masker;
public TopicMessageDTO deserialize(ConsumerRecord<Bytes, Bytes> rec) {
var message = new TopicMessageDTO();
fillKey(message, rec);
@ -47,20 +51,15 @@ public class ConsumerRecordDeserializer {
message.setValueSize(getValueSize(rec));
message.setHeadersSize(getHeadersSize(rec));
return message;
return masker.apply(message);
}
private static TopicMessageDTO.TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
switch (timestampType) {
case CREATE_TIME:
return TopicMessageDTO.TimestampTypeEnum.CREATE_TIME;
case LOG_APPEND_TIME:
return TopicMessageDTO.TimestampTypeEnum.LOG_APPEND_TIME;
case NO_TIMESTAMP_TYPE:
return TopicMessageDTO.TimestampTypeEnum.NO_TIMESTAMP_TYPE;
default:
throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
}
private static TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
return switch (timestampType) {
case CREATE_TIME -> TimestampTypeEnum.CREATE_TIME;
case LOG_APPEND_TIME -> TimestampTypeEnum.LOG_APPEND_TIME;
case NO_TIMESTAMP_TYPE -> TimestampTypeEnum.NO_TIMESTAMP_TYPE;
};
}
private void fillHeaders(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec) {

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.serdes.builtin;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;

View file

@ -102,7 +102,8 @@ public class DeserializationService implements Closeable {
valueSerde.deserializer(topic, Serde.Target.VALUE),
fallbackSerde.getName(),
fallbackSerde.deserializer(topic, Serde.Target.KEY),
fallbackSerde.deserializer(topic, Serde.Target.VALUE)
fallbackSerde.deserializer(topic, Serde.Target.VALUE),
cluster.getMasking().getMaskerForTopic(topic)
);
}

View file

@ -2,10 +2,9 @@ package com.provectus.kafka.ui.service;
import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
import com.provectus.kafka.ui.emitter.BackwardEmitter;
import com.provectus.kafka.ui.emitter.ForwardEmitter;
import com.provectus.kafka.ui.emitter.MessageFilters;
import com.provectus.kafka.ui.emitter.MessagesProcessing;
import com.provectus.kafka.ui.emitter.TailingEmitter;
import com.provectus.kafka.ui.exception.TopicNotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
@ -18,7 +17,6 @@ import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.time.Instant;
@ -45,7 +43,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -231,54 +228,24 @@ public class MessagesService {
@Nullable String keySerde,
@Nullable String valueSerde) {
java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
var processing = new MessagesProcessing(
deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
getMsgFilter(query, filterQueryType),
seekDirection == SeekDirectionDTO.TAILING ? null : limit
);
if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
emitter = new ForwardRecordEmitter(
var deserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
var filter = getMsgFilter(query, filterQueryType);
var emitter = switch (seekDirection) {
case FORWARD -> new ForwardEmitter(
() -> consumerGroupService.createConsumer(cluster),
consumerPosition,
processing,
cluster.getPollingSettings()
consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
);
} else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
emitter = new BackwardRecordEmitter(
case BACKWARD -> new BackwardEmitter(
() -> consumerGroupService.createConsumer(cluster),
consumerPosition,
limit,
processing,
cluster.getPollingSettings()
consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
);
} else {
emitter = new TailingEmitter(
case TAILING -> new TailingEmitter(
() -> consumerGroupService.createConsumer(cluster),
consumerPosition,
processing,
cluster.getPollingSettings()
consumerPosition, deserializer, filter, cluster.getPollingSettings()
);
}
return Flux.create(emitter)
.map(getDataMasker(cluster, topic))
.map(throttleUiPublish(seekDirection));
}
private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
return evt -> {
if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
return evt;
}
return evt.message(
evt.getMessage()
.key(keyMasker.apply(evt.getMessage().getKey()))
.content(valMasker.apply(evt.getMessage().getContent())));
};
return Flux.create(emitter)
.map(throttleUiPublish(seekDirection));
}
private Predicate<TopicMessageDTO> getMsgFilter(String query,

View file

@ -92,14 +92,12 @@ class AnalysisTasksStore {
.result(completedState);
}
@Value
@Builder(toBuilder = true)
private static class RunningAnalysis {
Instant startedAt;
double completenessPercent;
long msgsScanned;
long bytesScanned;
Closeable task;
private record RunningAnalysis(Instant startedAt,
double completenessPercent,
long msgsScanned,
long bytesScanned,
Closeable task) {
TopicAnalysisProgressDTO toDto() {
return new TopicAnalysisProgressDTO()

View file

@ -1,10 +1,11 @@
package com.provectus.kafka.ui.service.analyze;
import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING;
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.emitter.OffsetsInfo;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.emitter.SeekOperations;
import com.provectus.kafka.ui.exception.TopicAnalysisException;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
import com.provectus.kafka.ui.service.ConsumerGroupService;
@ -15,16 +16,14 @@ import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@ -33,6 +32,14 @@ import reactor.core.scheduler.Schedulers;
@RequiredArgsConstructor
public class TopicAnalysisService {
private static final Scheduler SCHEDULER = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"topic-analysis-tasks",
10, //ttl for idle threads (in sec)
true //daemon
);
private final AnalysisTasksStore analysisTasksStore = new AnalysisTasksStore();
private final TopicsService topicsService;
@ -40,30 +47,18 @@ public class TopicAnalysisService {
public Mono<Void> analyze(KafkaCluster cluster, String topicName) {
return topicsService.getTopicDetails(cluster, topicName)
.doOnNext(topic ->
startAnalysis(
cluster,
topicName,
topic.getPartitionCount(),
topic.getPartitions().values()
.stream()
.mapToLong(p -> p.getOffsetMax() - p.getOffsetMin())
.sum()
)
).then();
.doOnNext(topic -> startAnalysis(cluster, topicName))
.then();
}
private synchronized void startAnalysis(KafkaCluster cluster,
String topic,
int partitionsCnt,
long approxNumberOfMsgs) {
private synchronized void startAnalysis(KafkaCluster cluster, String topic) {
var topicId = new TopicIdentity(cluster, topic);
if (analysisTasksStore.isAnalysisInProgress(topicId)) {
throw new TopicAnalysisException("Topic is already analyzing");
}
var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs, cluster.getPollingSettings());
var task = new AnalysisTask(cluster, topicId);
analysisTasksStore.registerNewTask(topicId, task);
Schedulers.boundedElastic().schedule(task);
SCHEDULER.schedule(task);
}
public void cancelAnalysis(KafkaCluster cluster, String topicName) {
@ -79,20 +74,14 @@ public class TopicAnalysisService {
private final Instant startedAt = Instant.now();
private final TopicIdentity topicId;
private final int partitionsCnt;
private final long approxNumberOfMsgs;
private final EmptyPollsCounter emptyPollsCounter;
private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
private final EnhancedConsumer consumer;
AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
long approxNumberOfMsgs, PollingSettings pollingSettings) {
AnalysisTask(KafkaCluster cluster, TopicIdentity topicId) {
this.topicId = topicId;
this.approxNumberOfMsgs = approxNumberOfMsgs;
this.partitionsCnt = partitionsCnt;
this.consumer = consumerGroupService.createConsumer(
cluster,
// to improve polling throughput
@ -101,7 +90,6 @@ public class TopicAnalysisService {
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
)
);
this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter();
}
@Override
@ -113,23 +101,20 @@ public class TopicAnalysisService {
public void run() {
try {
log.info("Starting {} topic analysis", topicId);
var topicPartitions = IntStream.range(0, partitionsCnt)
.peek(i -> partitionStats.put(i, new TopicAnalysisStats()))
.mapToObj(i -> new TopicPartition(topicId.topicName, i))
.collect(Collectors.toList());
consumer.partitionsFor(topicId.topicName)
.forEach(tp -> partitionStats.put(tp.partition(), new TopicAnalysisStats()));
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);
var seekOperations = SeekOperations.create(consumer, new ConsumerPosition(BEGINNING, topicId.topicName, null));
long summaryOffsetsRange = seekOperations.summaryOffsetsRange();
seekOperations.assignAndSeekNonEmptyPartitions();
var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) {
while (!seekOperations.assignedPartitionsFullyPolled()) {
var polled = consumer.pollEnhanced(Duration.ofSeconds(3));
emptyPollsCounter.count(polled.count());
polled.forEach(r -> {
totalStats.apply(r);
partitionStats.get(r.partition()).apply(r);
});
updateProgress();
updateProgress(seekOperations.offsetsProcessedFromSeek(), summaryOffsetsRange);
}
analysisTasksStore.setAnalysisResult(topicId, startedAt, totalStats, partitionStats);
log.info("{} topic analysis finished", topicId);
@ -145,13 +130,13 @@ public class TopicAnalysisService {
}
}
private void updateProgress() {
if (totalStats.totalMsgs > 0 && approxNumberOfMsgs != 0) {
private void updateProgress(long processedOffsets, long summaryOffsetsRange) {
if (processedOffsets > 0 && summaryOffsetsRange != 0) {
analysisTasksStore.updateProgress(
topicId,
totalStats.totalMsgs,
totalStats.keysSize.sum + totalStats.valuesSize.sum,
Math.min(100.0, (((double) totalStats.totalMsgs) / approxNumberOfMsgs) * 100)
Math.min(100.0, (((double) processedOffsets) / summaryOffsetsRange) * 100)
);
}
}

View file

@ -1,7 +1,5 @@
package com.provectus.kafka.ui.service.masking;
import static java.util.stream.Collectors.toList;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.json.JsonMapper;
@ -9,6 +7,7 @@ import com.fasterxml.jackson.databind.node.ContainerNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.service.masking.policies.MaskingPolicy;
import java.util.List;
@ -54,7 +53,8 @@ public class DataMasking {
Optional.ofNullable(property.getTopicValuesPattern()).map(Pattern::compile).orElse(null),
MaskingPolicy.create(property)
);
}).collect(toList()));
}).toList()
);
}
@VisibleForTesting
@ -62,8 +62,17 @@ public class DataMasking {
this.masks = masks;
}
public UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).collect(toList());
public UnaryOperator<TopicMessageDTO> getMaskerForTopic(String topic) {
var keyMasker = getMaskingFunction(topic, Serde.Target.KEY);
var valMasker = getMaskingFunction(topic, Serde.Target.VALUE);
return msg -> msg
.key(keyMasker.apply(msg.getKey()))
.content(valMasker.apply(msg.getContent()));
}
@VisibleForTesting
UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).toList();
if (targetMasks.isEmpty()) {
return UnaryOperator.identity();
}

View file

@ -0,0 +1,69 @@
package com.provectus.kafka.ui.emitter;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.RepeatedTest;
class MessagesProcessingTest {
@RepeatedTest(5)
void testSortingAsc() {
var messagesInOrder = List.of(
consumerRecord(1, 100L, "1999-01-01T00:00:00+00:00"),
consumerRecord(0, 0L, "2000-01-01T00:00:00+00:00"),
consumerRecord(1, 200L, "2000-01-05T00:00:00+00:00"),
consumerRecord(0, 10L, "2000-01-10T00:00:00+00:00"),
consumerRecord(0, 20L, "2000-01-20T00:00:00+00:00"),
consumerRecord(1, 300L, "3000-01-01T00:00:00+00:00"),
consumerRecord(2, 1000L, "4000-01-01T00:00:00+00:00"),
consumerRecord(2, 1001L, "2000-01-01T00:00:00+00:00"),
consumerRecord(2, 1003L, "3000-01-01T00:00:00+00:00")
);
var shuffled = new ArrayList<>(messagesInOrder);
Collections.shuffle(shuffled);
var sortedList = MessagesProcessing.sortForSending(shuffled, true);
assertThat(sortedList).containsExactlyElementsOf(messagesInOrder);
}
@RepeatedTest(5)
void testSortingDesc() {
var messagesInOrder = List.of(
consumerRecord(1, 300L, "3000-01-01T00:00:00+00:00"),
consumerRecord(2, 1003L, "3000-01-01T00:00:00+00:00"),
consumerRecord(0, 20L, "2000-01-20T00:00:00+00:00"),
consumerRecord(0, 10L, "2000-01-10T00:00:00+00:00"),
consumerRecord(1, 200L, "2000-01-05T00:00:00+00:00"),
consumerRecord(0, 0L, "2000-01-01T00:00:00+00:00"),
consumerRecord(2, 1001L, "2000-01-01T00:00:00+00:00"),
consumerRecord(2, 1000L, "4000-01-01T00:00:00+00:00"),
consumerRecord(1, 100L, "1999-01-01T00:00:00+00:00")
);
var shuffled = new ArrayList<>(messagesInOrder);
Collections.shuffle(shuffled);
var sortedList = MessagesProcessing.sortForSending(shuffled, false);
assertThat(sortedList).containsExactlyElementsOf(messagesInOrder);
}
private ConsumerRecord<Bytes, Bytes> consumerRecord(int partition, long offset, String ts) {
return new ConsumerRecord<>(
"topic", partition, offset, OffsetDateTime.parse(ts).toInstant().toEpochMilli(),
TimestampType.CREATE_TIME,
0, 0, null, null, new RecordHeaders(), Optional.empty()
);
}
}

View file

@ -0,0 +1,30 @@
package com.provectus.kafka.ui.serdes;
import static com.provectus.kafka.ui.serde.api.DeserializeResult.Type.STRING;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.Serde;
import java.util.Map;
import java.util.function.UnaryOperator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.Test;
class ConsumerRecordDeserializerTest {
@Test
void dataMaskingAppliedOnDeserializedMessage() {
UnaryOperator<TopicMessageDTO> maskerMock = mock();
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());
var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
recordDeser.deserialize(new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes())));
verify(maskerMock).apply(any(TopicMessageDTO.class));
}
}

View file

@ -7,13 +7,13 @@ import static com.provectus.kafka.ui.model.SeekTypeDTO.TIMESTAMP;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.AbstractIntegrationTest;
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
import com.provectus.kafka.ui.emitter.BackwardEmitter;
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
import com.provectus.kafka.ui.emitter.MessagesProcessing;
import com.provectus.kafka.ui.emitter.ForwardEmitter;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.emitter.PollingThrottler;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.producer.KafkaTestProducer;
import com.provectus.kafka.ui.serde.api.Serde;
@ -31,16 +31,15 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@ -58,6 +57,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
static final String EMPTY_TOPIC = TOPIC + "_empty";
static final List<Record> SENT_RECORDS = new ArrayList<>();
static final ConsumerRecordDeserializer RECORD_DESERIALIZER = createRecordsDeserializer();
static final Predicate<TopicMessageDTO> NOOP_FILTER = m -> true;
@BeforeAll
static void generateMsgs() throws Exception {
@ -93,6 +93,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
static void cleanup() {
deleteTopic(TOPIC);
deleteTopic(EMPTY_TOPIC);
SENT_RECORDS.clear();
}
private static ConsumerRecordDeserializer createRecordsDeserializer() {
@ -105,28 +106,28 @@ class RecordEmitterTest extends AbstractIntegrationTest {
s.deserializer(null, Serde.Target.VALUE),
StringSerde.name(),
s.deserializer(null, Serde.Target.KEY),
s.deserializer(null, Serde.Target.VALUE)
s.deserializer(null, Serde.Target.VALUE),
msg -> msg
);
}
private MessagesProcessing createMessagesProcessing() {
return new MessagesProcessing(RECORD_DESERIALIZER, msg -> true, null);
}
@Test
void pollNothingOnEmptyTopic() {
var forwardEmitter = new ForwardRecordEmitter(
this::createConsumer,
new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
createMessagesProcessing(),
PollingSettings.createDefault()
);
var backwardEmitter = new BackwardRecordEmitter(
var forwardEmitter = new ForwardEmitter(
this::createConsumer,
new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
100,
createMessagesProcessing(),
RECORD_DESERIALIZER,
NOOP_FILTER,
PollingSettings.createDefault()
);
var backwardEmitter = new BackwardEmitter(
this::createConsumer,
new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
100,
RECORD_DESERIALIZER,
NOOP_FILTER,
PollingSettings.createDefault()
);
@ -145,18 +146,21 @@ class RecordEmitterTest extends AbstractIntegrationTest {
@Test
void pollFullTopicFromBeginning() {
var forwardEmitter = new ForwardRecordEmitter(
var forwardEmitter = new ForwardEmitter(
this::createConsumer,
new ConsumerPosition(BEGINNING, TOPIC, null),
createMessagesProcessing(),
PARTITIONS * MSGS_PER_PARTITION,
RECORD_DESERIALIZER,
NOOP_FILTER,
PollingSettings.createDefault()
);
var backwardEmitter = new BackwardRecordEmitter(
var backwardEmitter = new BackwardEmitter(
this::createConsumer,
new ConsumerPosition(LATEST, TOPIC, null),
PARTITIONS * MSGS_PER_PARTITION,
createMessagesProcessing(),
RECORD_DESERIALIZER,
NOOP_FILTER,
PollingSettings.createDefault()
);
@ -174,18 +178,21 @@ class RecordEmitterTest extends AbstractIntegrationTest {
targetOffsets.put(new TopicPartition(TOPIC, i), offset);
}
var forwardEmitter = new ForwardRecordEmitter(
this::createConsumer,
new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
createMessagesProcessing(),
PollingSettings.createDefault()
);
var backwardEmitter = new BackwardRecordEmitter(
var forwardEmitter = new ForwardEmitter(
this::createConsumer,
new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
PARTITIONS * MSGS_PER_PARTITION,
createMessagesProcessing(),
RECORD_DESERIALIZER,
NOOP_FILTER,
PollingSettings.createDefault()
);
var backwardEmitter = new BackwardEmitter(
this::createConsumer,
new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
PARTITIONS * MSGS_PER_PARTITION,
RECORD_DESERIALIZER,
NOOP_FILTER,
PollingSettings.createDefault()
);
@ -219,18 +226,21 @@ class RecordEmitterTest extends AbstractIntegrationTest {
);
}
var forwardEmitter = new ForwardRecordEmitter(
this::createConsumer,
new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
createMessagesProcessing(),
PollingSettings.createDefault()
);
var backwardEmitter = new BackwardRecordEmitter(
var forwardEmitter = new ForwardEmitter(
this::createConsumer,
new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
PARTITIONS * MSGS_PER_PARTITION,
createMessagesProcessing(),
RECORD_DESERIALIZER,
NOOP_FILTER,
PollingSettings.createDefault()
);
var backwardEmitter = new BackwardEmitter(
this::createConsumer,
new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
PARTITIONS * MSGS_PER_PARTITION,
RECORD_DESERIALIZER,
NOOP_FILTER,
PollingSettings.createDefault()
);
@ -257,11 +267,12 @@ class RecordEmitterTest extends AbstractIntegrationTest {
targetOffsets.put(new TopicPartition(TOPIC, i), (long) MSGS_PER_PARTITION);
}
var backwardEmitter = new BackwardRecordEmitter(
var backwardEmitter = new BackwardEmitter(
this::createConsumer,
new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
numMessages,
createMessagesProcessing(),
RECORD_DESERIALIZER,
NOOP_FILTER,
PollingSettings.createDefault()
);
@ -283,11 +294,12 @@ class RecordEmitterTest extends AbstractIntegrationTest {
offsets.put(new TopicPartition(TOPIC, i), 0L);
}
var backwardEmitter = new BackwardRecordEmitter(
var backwardEmitter = new BackwardEmitter(
this::createConsumer,
new ConsumerPosition(OFFSET, TOPIC, offsets),
100,
createMessagesProcessing(),
RECORD_DESERIALIZER,
NOOP_FILTER,
PollingSettings.createDefault()
);

View file

@ -3868,10 +3868,6 @@ components:
properties:
pollTimeoutMs:
type: integer
partitionPollTimeout:
type: integer
noDataEmptyPolls:
type: integer
maxPageSize:
type: integer
defaultPageSize: