|
@@ -29,7 +29,6 @@ class MessagesProcessing {
|
|
|
|
|
|
private final ConsumingStats consumingStats = new ConsumingStats();
|
|
|
private long sentMessages = 0;
|
|
|
- private int filterApplyErrors = 0;
|
|
|
|
|
|
private final ConsumerRecordDeserializer deserializer;
|
|
|
private final Predicate<TopicMessageDTO> filter;
|
|
@@ -40,32 +39,6 @@ class MessagesProcessing {
|
|
|
return limit != null && sentMessages >= limit;
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * Sorting by timestamps, BUT requesting that records within same partitions should be ordered by offsets.
|
|
|
- */
|
|
|
- @VisibleForTesting
|
|
|
- static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRecord<Bytes, Bytes>> records,
|
|
|
- boolean asc) {
|
|
|
- Comparator<ConsumerRecord> offsetComparator = asc
|
|
|
- ? Comparator.comparingLong(ConsumerRecord::offset)
|
|
|
- : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed();
|
|
|
-
|
|
|
- // partition -> sorted by offsets records
|
|
|
- Map<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
|
|
|
- .collect(
|
|
|
- groupingBy(
|
|
|
- ConsumerRecord::partition,
|
|
|
- TreeMap::new,
|
|
|
- collectingAndThen(toList(), lst -> lst.stream().sorted(offsetComparator).toList())));
|
|
|
-
|
|
|
- Comparator<ConsumerRecord> tsComparator = asc
|
|
|
- ? Comparator.comparing(ConsumerRecord::timestamp)
|
|
|
- : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
|
|
|
-
|
|
|
- // merge-sorting records from partitions one by one using timestamp comparator
|
|
|
- return Iterables.mergeSorted(perPartition.values(), tsComparator);
|
|
|
- }
|
|
|
-
|
|
|
void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> polled) {
|
|
|
sortForSending(polled, ascendingSortBeforeSend)
|
|
|
.forEach(rec -> {
|
|
@@ -81,7 +54,7 @@ class MessagesProcessing {
|
|
|
sentMessages++;
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- filterApplyErrors++;
|
|
|
+ consumingStats.incFilterApplyError();
|
|
|
log.trace("Error applying filter for message {}", topicMessage);
|
|
|
}
|
|
|
}
|
|
@@ -90,13 +63,13 @@ class MessagesProcessing {
|
|
|
|
|
|
void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
|
|
|
if (!sink.isCancelled()) {
|
|
|
- consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);
|
|
|
+ consumingStats.sendConsumingEvt(sink, polledRecords);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
|
|
|
if (!sink.isCancelled()) {
|
|
|
- consumingStats.sendFinishEvent(sink, filterApplyErrors);
|
|
|
+ consumingStats.sendFinishEvent(sink);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -110,4 +83,30 @@ class MessagesProcessing {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * Sorting by timestamps, BUT requesting that records within same partitions should be ordered by offsets.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRecord<Bytes, Bytes>> records,
|
|
|
+ boolean asc) {
|
|
|
+ Comparator<ConsumerRecord> offsetComparator = asc
|
|
|
+ ? Comparator.comparingLong(ConsumerRecord::offset)
|
|
|
+ : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed();
|
|
|
+
|
|
|
+ // partition -> sorted by offsets records
|
|
|
+ Map<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
|
|
|
+ .collect(
|
|
|
+ groupingBy(
|
|
|
+ ConsumerRecord::partition,
|
|
|
+ TreeMap::new,
|
|
|
+ collectingAndThen(toList(), lst -> lst.stream().sorted(offsetComparator).toList())));
|
|
|
+
|
|
|
+ Comparator<ConsumerRecord> tsComparator = asc
|
|
|
+ ? Comparator.comparing(ConsumerRecord::timestamp)
|
|
|
+ : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
|
|
|
+
|
|
|
+ // merge-sorting records from partitions one by one using timestamp comparator
|
|
|
+ return Iterables.mergeSorted(perPartition.values(), tsComparator);
|
|
|
+ }
|
|
|
+
|
|
|
}
|