This commit is contained in:
iliax 2023-08-07 17:18:35 +04:00
parent a47c8e474b
commit e22a34cfbc
4 changed files with 28 additions and 27 deletions

View file

@ -32,8 +32,8 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
return poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
}
protected void buffer(ConsumerRecord<Bytes, Bytes> rec) {
messagesProcessing.buffer(rec);
protected void buffer(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
messagesProcessing.buffer(recs);
}
protected void flushBuffer(FluxSink<TopicMessageEventDTO> sink) {

View file

@ -41,19 +41,30 @@ class MessagesProcessing {
return limit != null && sentMessages >= limit;
}
void buffer(ConsumerRecord<Bytes, Bytes> rec) {
if (!limitReached()) {
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
try {
if (filter.test(topicMessage)) {
buffer.add(topicMessage);
sentMessages++;
}
} catch (Exception e) {
filterApplyErrors++;
log.trace("Error applying filter for message {}", topicMessage);
}
}
void buffer(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
Streams.stream(recs)
.sorted(
Comparator.<ConsumerRecord>comparingInt(ConsumerRecord::partition)
.thenComparing(
ascendingSortBeforeSend
? Comparator.comparingLong(ConsumerRecord::offset)
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed()
)
)
.forEach(rec -> {
if (!limitReached()) {
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
try {
if (filter.test(topicMessage)) {
buffer.add(topicMessage);
sentMessages++;
}
} catch (Exception e) {
filterApplyErrors++;
log.trace("Error applying filter for message {}", topicMessage);
}
}
});
}
@VisibleForTesting

View file

@ -1,7 +1,6 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.ArrayList;
import java.util.List;
@ -56,8 +55,8 @@ public abstract class PerPartitionEmitter extends AbstractEmitter {
if (sink.isCancelled()) {
return; //fast return in case of sink cancellation
}
partitionPollIteration(tp, fromTo.from, fromTo.to, consumer, sink)
.forEach(this::buffer);
var polled = partitionPollIteration(tp, fromTo.from, fromTo.to, consumer, sink);
buffer(polled);
});
flushBuffer(sink);
readRange = nextPollingRange(readRange, seekOperations);

View file

@ -13,7 +13,6 @@ public class PollingSettings {
private final Duration pollTimeout;
private final Duration partitionPollTimeout;
private final int notDataEmptyPolls; //see EmptyPollsCounter docs
private final Supplier<PollingThrottler> throttlerSupplier;
@ -37,7 +36,6 @@ public class PollingSettings {
return new PollingSettings(
pollTimeout,
partitionPollTimeout,
noDataEmptyPolls,
PollingThrottler.throttlerSupplier(cluster)
);
}
@ -46,25 +44,18 @@ public class PollingSettings {
return new PollingSettings(
DEFAULT_POLL_TIMEOUT,
DEFAULT_PARTITION_POLL_TIMEOUT,
DEFAULT_NO_DATA_EMPTY_POLLS,
PollingThrottler::noop
);
}
private PollingSettings(Duration pollTimeout,
Duration partitionPollTimeout,
int notDataEmptyPolls,
Supplier<PollingThrottler> throttlerSupplier) {
this.pollTimeout = pollTimeout;
this.partitionPollTimeout = partitionPollTimeout;
this.notDataEmptyPolls = notDataEmptyPolls;
this.throttlerSupplier = throttlerSupplier;
}
public EmptyPollsCounter createEmptyPollsCounter() {
return new EmptyPollsCounter(notDataEmptyPolls);
}
public Duration getPollTimeout() {
return pollTimeout;
}