123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- package com.provectus.kafka.ui.emitter;
- import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
- import com.provectus.kafka.ui.model.TopicMessageEventDTO;
- import reactor.core.publisher.FluxSink;
- class ConsumingStats {
- private long bytes = 0;
- private int records = 0;
- private long elapsed = 0;
- private int filterApplyErrors = 0;
- void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
- bytes += polledRecords.bytes();
- records += polledRecords.count();
- elapsed += polledRecords.elapsed().toMillis();
- sink.next(
- new TopicMessageEventDTO()
- .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
- .consuming(createConsumingStats())
- );
- }
- void incFilterApplyError() {
- filterApplyErrors++;
- }
- void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
- sink.next(
- new TopicMessageEventDTO()
- .type(TopicMessageEventDTO.TypeEnum.DONE)
- .consuming(createConsumingStats())
- );
- }
- private TopicMessageConsumingDTO createConsumingStats() {
- return new TopicMessageConsumingDTO()
- .bytesConsumed(bytes)
- .elapsedMs(elapsed)
- .isCancelled(false)
- .filterApplyErrors(filterApplyErrors)
- .messagesConsumed(records);
- }
- }
|