This commit is contained in:
iliax 2023-08-07 17:47:00 +04:00
parent 053b5a7c04
commit 02967a990b
3 changed files with 28 additions and 26 deletions

View file

@ -1,9 +1,8 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.time.Duration;
import java.util.Comparator;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@ -32,7 +31,7 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
return poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
}
protected void buffer(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
protected void buffer(List<ConsumerRecord<Bytes, Bytes>> recs) {
messagesProcessing.buffer(recs);
}
@ -56,6 +55,10 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
messagesProcessing.sentConsumingInfo(sink, records);
}
protected boolean descendingSendSorting() {
return !messagesProcessing.isAscendingSortBeforeSend();
}
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
messagesProcessing.sendFinishEvent(sink);
sink.complete();

View file

@ -41,30 +41,25 @@ class MessagesProcessing {
return limit != null && sentMessages >= limit;
}
void buffer(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
Streams.stream(recs)
.sorted(
Comparator.<ConsumerRecord>comparingInt(ConsumerRecord::partition)
.thenComparing(
ascendingSortBeforeSend
? Comparator.comparingLong(ConsumerRecord::offset)
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed()
)
)
.forEach(rec -> {
if (!limitReached()) {
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
try {
if (filter.test(topicMessage)) {
buffer.add(topicMessage);
sentMessages++;
}
} catch (Exception e) {
filterApplyErrors++;
log.trace("Error applying filter for message {}", topicMessage);
}
boolean isAscendingSortBeforeSend() {
return ascendingSortBeforeSend;
}
void buffer(List<ConsumerRecord<Bytes, Bytes>> polled) {
polled.forEach(rec -> {
if (!limitReached()) {
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
try {
if (filter.test(topicMessage)) {
buffer.add(topicMessage);
sentMessages++;
}
});
} catch (Exception e) {
filterApplyErrors++;
log.trace("Error applying filter for message {}", topicMessage);
}
}
});
}
@VisibleForTesting

View file

@ -3,6 +3,7 @@ package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.TreeMap;
import java.util.function.Supplier;
@ -96,6 +97,9 @@ public abstract class PerPartitionEmitter extends AbstractEmitter {
.filter(r -> r.offset() < toOffset)
.forEach(recordsToSend::add);
}
if (descendingSendSorting()) {
Collections.reverse(recordsToSend);
}
return recordsToSend;
}
}