TailingEmitter.java 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. package com.provectus.kafka.ui.emitter;
  2. import com.provectus.kafka.ui.model.ConsumerPosition;
  3. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.kafka.common.errors.InterruptException;
  6. import reactor.core.publisher.FluxSink;
  7. @Slf4j
  8. public class TailingEmitter extends AbstractEmitter
  9. implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
  10. private final Supplier<EnhancedConsumer> consumerSupplier;
  11. private final ConsumerPosition consumerPosition;
  12. public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
  13. ConsumerPosition consumerPosition,
  14. MessagesProcessing messagesProcessing,
  15. PollingSettings pollingSettings) {
  16. super(messagesProcessing, pollingSettings);
  17. this.consumerSupplier = consumerSupplier;
  18. this.consumerPosition = consumerPosition;
  19. }
  20. @Override
  21. public void accept(FluxSink<TopicMessageEventDTO> sink) {
  22. log.debug("Starting tailing polling for {}", consumerPosition);
  23. try (EnhancedConsumer consumer = consumerSupplier.get()) {
  24. SeekOperations.create(consumer, consumerPosition)
  25. .assignAndSeek();
  26. while (!sink.isCancelled()) {
  27. sendPhase(sink, "Polling");
  28. var polled = poll(sink, consumer);
  29. polled.forEach(r -> sendMessage(sink, r));
  30. }
  31. sink.complete();
  32. log.debug("Tailing finished");
  33. } catch (InterruptException kafkaInterruptException) {
  34. log.debug("Tailing finished due to thread interruption");
  35. sink.complete();
  36. } catch (Exception e) {
  37. log.error("Error consuming {}", consumerPosition, e);
  38. sink.error(e);
  39. }
  40. }
  41. }