|
@@ -13,12 +13,12 @@ class ConsumingStats {
|
|
|
|
|
|
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
|
|
|
bytes += polledRecords.bytes();
|
|
|
- this.records += polledRecords.count();
|
|
|
- this.elapsed += polledRecords.elapsed().toMillis();
|
|
|
+ records += polledRecords.count();
|
|
|
+ elapsed += polledRecords.elapsed().toMillis();
|
|
|
sink.next(
|
|
|
new TopicMessageEventDTO()
|
|
|
.type(TopicMessageEventDTO.TypeEnum.CONSUMING)
|
|
|
- .consuming(createConsumingStats(sink))
|
|
|
+ .consuming(createConsumingStats())
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -30,16 +30,16 @@ class ConsumingStats {
|
|
|
sink.next(
|
|
|
new TopicMessageEventDTO()
|
|
|
.type(TopicMessageEventDTO.TypeEnum.DONE)
|
|
|
- .consuming(createConsumingStats(sink))
|
|
|
+ .consuming(createConsumingStats())
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- private TopicMessageConsumingDTO createConsumingStats(FluxSink<TopicMessageEventDTO> sink) {
|
|
|
+ private TopicMessageConsumingDTO createConsumingStats() {
|
|
|
return new TopicMessageConsumingDTO()
|
|
|
- .bytesConsumed(this.bytes)
|
|
|
- .elapsedMs(this.elapsed)
|
|
|
- .isCancelled(sink.isCancelled())
|
|
|
+ .bytesConsumed(bytes)
|
|
|
+ .elapsedMs(elapsed)
|
|
|
+ .isCancelled(false)
|
|
|
.filterApplyErrors(filterApplyErrors)
|
|
|
- .messagesConsumed(this.records);
|
|
|
+ .messagesConsumed(records);
|
|
|
}
|
|
|
}
|