ConsumingStats.java 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. package com.provectus.kafka.ui.emitter;
  2. import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
  3. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  4. import reactor.core.publisher.FluxSink;
  5. class ConsumingStats {
  6. private long bytes = 0;
  7. private int records = 0;
  8. private long elapsed = 0;
  9. private int filterApplyErrors = 0;
  10. void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
  11. bytes += polledRecords.bytes();
  12. records += polledRecords.count();
  13. elapsed += polledRecords.elapsed().toMillis();
  14. sink.next(
  15. new TopicMessageEventDTO()
  16. .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
  17. .consuming(createConsumingStats())
  18. );
  19. }
  20. void incFilterApplyError() {
  21. filterApplyErrors++;
  22. }
  23. void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
  24. sink.next(
  25. new TopicMessageEventDTO()
  26. .type(TopicMessageEventDTO.TypeEnum.DONE)
  27. .consuming(createConsumingStats())
  28. );
  29. }
  30. private TopicMessageConsumingDTO createConsumingStats() {
  31. return new TopicMessageConsumingDTO()
  32. .bytesConsumed(bytes)
  33. .elapsedMs(elapsed)
  34. .isCancelled(false)
  35. .filterApplyErrors(filterApplyErrors)
  36. .messagesConsumed(records);
  37. }
  38. }