TailingEmitter.java 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package com.provectus.kafka.ui.emitter;
  2. import com.provectus.kafka.ui.model.ConsumerPosition;
  3. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  4. import java.util.HashMap;
  5. import java.util.function.Supplier;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.kafka.clients.consumer.KafkaConsumer;
  8. import org.apache.kafka.common.errors.InterruptException;
  9. import org.apache.kafka.common.utils.Bytes;
  10. import reactor.core.publisher.FluxSink;
  11. @Slf4j
  12. public class TailingEmitter extends AbstractEmitter
  13. implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
  14. private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
  15. private final ConsumerPosition consumerPosition;
  16. public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
  17. ConsumerPosition consumerPosition,
  18. MessagesProcessing messagesProcessing,
  19. PollingSettings pollingSettings) {
  20. super(messagesProcessing, pollingSettings);
  21. this.consumerSupplier = consumerSupplier;
  22. this.consumerPosition = consumerPosition;
  23. }
  24. @Override
  25. public void accept(FluxSink<TopicMessageEventDTO> sink) {
  26. log.debug("Starting tailing polling for {}", consumerPosition);
  27. try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
  28. assignAndSeek(consumer);
  29. while (!sink.isCancelled()) {
  30. sendPhase(sink, "Polling");
  31. var polled = poll(sink, consumer);
  32. polled.forEach(r -> sendMessage(sink, r));
  33. }
  34. sink.complete();
  35. log.debug("Tailing finished");
  36. } catch (InterruptException kafkaInterruptException) {
  37. log.debug("Tailing finished due to thread interruption");
  38. sink.complete();
  39. } catch (Exception e) {
  40. log.error("Error consuming {}", consumerPosition, e);
  41. sink.error(e);
  42. }
  43. }
  44. private void assignAndSeek(KafkaConsumer<Bytes, Bytes> consumer) {
  45. var seekOperations = SeekOperations.create(consumer, consumerPosition);
  46. var seekOffsets = new HashMap<>(seekOperations.getEndOffsets()); // defaulting offsets to topic end
  47. seekOffsets.putAll(seekOperations.getOffsetsForSeek()); // this will only set non-empty partitions
  48. consumer.assign(seekOffsets.keySet());
  49. seekOffsets.forEach(consumer::seek);
  50. }
  51. }