TailingEmitter.java 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. package com.provectus.kafka.ui.emitter;
  2. import com.provectus.kafka.ui.model.ConsumerPosition;
  3. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  4. import java.util.function.Supplier;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.kafka.common.errors.InterruptException;
  7. import reactor.core.publisher.FluxSink;
  8. @Slf4j
  9. public class TailingEmitter extends AbstractEmitter
  10. implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
  11. private final Supplier<EnhancedConsumer> consumerSupplier;
  12. private final ConsumerPosition consumerPosition;
  13. public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
  14. ConsumerPosition consumerPosition,
  15. MessagesProcessing messagesProcessing,
  16. PollingSettings pollingSettings) {
  17. super(messagesProcessing, pollingSettings);
  18. this.consumerSupplier = consumerSupplier;
  19. this.consumerPosition = consumerPosition;
  20. }
  21. @Override
  22. public void accept(FluxSink<TopicMessageEventDTO> sink) {
  23. log.debug("Starting tailing polling for {}", consumerPosition);
  24. try (EnhancedConsumer consumer = consumerSupplier.get()) {
  25. SeekOperations.create(consumer, consumerPosition)
  26. .assignAndSeek();
  27. while (!sink.isCancelled()) {
  28. sendPhase(sink, "Polling");
  29. var polled = poll(sink, consumer);
  30. polled.forEach(r -> sendMessage(sink, r));
  31. }
  32. sink.complete();
  33. log.debug("Tailing finished");
  34. } catch (InterruptException kafkaInterruptException) {
  35. log.debug("Tailing finished due to thread interruption");
  36. sink.complete();
  37. } catch (Exception e) {
  38. log.error("Error consuming {}", consumerPosition, e);
  39. sink.error(e);
  40. }
  41. }
  42. }