浏览代码

ISSUE-943: Topic messages tailing implementation (#1515)

* Topic messages tailing implementation
Ilya Kuramshin 3 年之前
父节点
当前提交
772b878d90

+ 32 - 20
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -1,5 +1,7 @@
 package com.provectus.kafka.ui.controller;
 
+import static java.util.stream.Collectors.toMap;
+
 import com.provectus.kafka.ui.api.MessagesApi;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
@@ -10,10 +12,9 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
 import com.provectus.kafka.ui.service.MessagesService;
 import com.provectus.kafka.ui.service.TopicsService;
-import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
-import java.util.function.Function;
 import javax.validation.Valid;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -29,6 +30,10 @@ import reactor.core.publisher.Mono;
 @RequiredArgsConstructor
 @Slf4j
 public class MessagesController extends AbstractController implements MessagesApi {
+
+  private static final int MAX_LOAD_RECORD_LIMIT = 100;
+  private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
+
   private final MessagesService messagesService;
   private final TopicsService topicsService;
 
@@ -48,13 +53,20 @@ public class MessagesController extends AbstractController implements MessagesAp
       String clusterName, String topicName, SeekTypeDTO seekType, List<String> seekTo,
       Integer limit, String q, MessageFilterTypeDTO filterQueryType,
       SeekDirectionDTO seekDirection, ServerWebExchange exchange) {
-    return parseConsumerPosition(topicName, seekType, seekTo, seekDirection)
-        .map(position ->
-            ResponseEntity.ok(
-                messagesService.loadMessages(
-                    getCluster(clusterName), topicName, position, q, filterQueryType, limit)
-            )
-        );
+    var positions = new ConsumerPosition(
+        seekType != null ? seekType : SeekTypeDTO.BEGINNING,
+        parseSeekTo(topicName, seekTo),
+        seekDirection
+    );
+    int recordsLimit = Optional.ofNullable(limit)
+        .map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT))
+        .orElse(DEFAULT_LOAD_RECORD_LIMIT);
+    return Mono.just(
+        ResponseEntity.ok(
+            messagesService.loadMessages(
+                getCluster(clusterName), topicName, positions, q, filterQueryType, recordsLimit)
+        )
+    );
   }
 
   @Override
@@ -73,13 +85,15 @@ public class MessagesController extends AbstractController implements MessagesAp
     ).map(ResponseEntity::ok);
   }
 
-
-  private Mono<ConsumerPosition> parseConsumerPosition(
-      String topicName, SeekTypeDTO seekType, List<String> seekTo,
-      SeekDirectionDTO seekDirection) {
-    return Mono.justOrEmpty(seekTo)
-        .defaultIfEmpty(Collections.emptyList())
-        .flatMapIterable(Function.identity())
+  /**
+   * The format is [partition]::[offset] for specifying offsets
+   * or [partition]::[timestamp in millis] for specifying timestamps.
+   */
+  private Map<TopicPartition, Long> parseSeekTo(String topic, List<String> seekTo) {
+    if (seekTo == null || seekTo.isEmpty()) {
+      return Map.of();
+    }
+    return seekTo.stream()
         .map(p -> {
           String[] split = p.split("::");
           if (split.length != 2) {
@@ -88,13 +102,11 @@ public class MessagesController extends AbstractController implements MessagesAp
           }
 
           return Pair.of(
-              new TopicPartition(topicName, Integer.parseInt(split[0])),
+              new TopicPartition(topic, Integer.parseInt(split[0])),
               Long.parseLong(split[1])
           );
         })
-        .collectMap(Pair::getKey, Pair::getValue)
-        .map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekTypeDTO.BEGINNING,
-            positions, seekDirection));
+        .collect(toMap(Pair::getKey, Pair::getValue));
   }
 
 }

+ 2 - 25
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.emitter;
 
-import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
@@ -11,7 +10,6 @@ import java.time.Instant;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
@@ -19,9 +17,7 @@ public abstract class AbstractEmitter {
   private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
 
   private final RecordSerDe recordDeserializer;
-  private long bytes = 0;
-  private int records = 0;
-  private long elapsed = 0;
+  private final ConsumingStats consumingStats = new ConsumingStats();
 
   public AbstractEmitter(RecordSerDe recordDeserializer) {
     this.recordDeserializer = recordDeserializer;
@@ -57,25 +53,6 @@ public abstract class AbstractEmitter {
   protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink,
                                ConsumerRecords<Bytes, Bytes> records,
                                long elapsed) {
-    for (ConsumerRecord<Bytes, Bytes> record : records) {
-      for (Header header : record.headers()) {
-        bytes +=
-            (header.key() != null ? header.key().getBytes().length : 0L)
-            + (header.value() != null ? header.value().length : 0L);
-      }
-      bytes += record.serializedKeySize() + record.serializedValueSize();
-    }
-    this.records += records.count();
-    this.elapsed += elapsed;
-    final TopicMessageConsumingDTO consuming = new TopicMessageConsumingDTO()
-        .bytesConsumed(this.bytes)
-        .elapsedMs(this.elapsed)
-        .isCancelled(sink.isCancelled())
-        .messagesConsumed(this.records);
-    sink.next(
-        new TopicMessageEventDTO()
-            .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
-            .consuming(consuming)
-    );
+    consumingStats.sendConsumingEvt(sink, records, elapsed);
   }
 }

+ 41 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -0,0 +1,41 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.utils.Bytes;
+import reactor.core.publisher.FluxSink;
+
+class ConsumingStats {
+
+  private long bytes = 0;
+  private int records = 0;
+  private long elapsed = 0;
+
+  void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
+                               ConsumerRecords<Bytes, Bytes> polledRecords,
+                               long elapsed) {
+    for (ConsumerRecord<Bytes, Bytes> record : polledRecords) {
+      for (Header header : record.headers()) {
+        bytes +=
+            (header.key() != null ? header.key().getBytes().length : 0L)
+                + (header.value() != null ? header.value().length : 0L);
+      }
+      bytes += record.serializedKeySize() + record.serializedValueSize();
+    }
+    this.records += polledRecords.count();
+    this.elapsed += elapsed;
+    final TopicMessageConsumingDTO consuming = new TopicMessageConsumingDTO()
+        .bytesConsumed(this.bytes)
+        .elapsedMs(this.elapsed)
+        .isCancelled(sink.isCancelled())
+        .messagesConsumed(this.records);
+    sink.next(
+        new TopicMessageEventDTO()
+            .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
+            .consuming(consuming)
+    );
+  }
+}

+ 0 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -3,8 +3,6 @@ package com.provectus.kafka.ui.emitter;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serde.RecordSerDe;
 import com.provectus.kafka.ui.util.OffsetsSeek;
-import java.time.Duration;
-import java.time.Instant;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -18,8 +16,6 @@ public class ForwardRecordEmitter
     extends AbstractEmitter
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
-
   private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
   private final OffsetsSeek offsetsSeek;
 

+ 48 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -0,0 +1,48 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serde.RecordSerDe;
+import com.provectus.kafka.ui.util.OffsetsSeek;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.utils.Bytes;
+import reactor.core.publisher.FluxSink;
+
+@Slf4j
+public class TailingEmitter extends AbstractEmitter
+    implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
+
+  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final OffsetsSeek offsetsSeek;
+
+  public TailingEmitter(RecordSerDe recordDeserializer,
+                        Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+                        OffsetsSeek offsetsSeek) {
+    super(recordDeserializer);
+    this.consumerSupplier = consumerSupplier;
+    this.offsetsSeek = offsetsSeek;
+  }
+
+  @Override
+  public void accept(FluxSink<TopicMessageEventDTO> sink) {
+    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+      log.debug("Starting topic tailing");
+      offsetsSeek.assignAndSeek(consumer);
+      while (!sink.isCancelled()) {
+        sendPhase(sink, "Polling");
+        var polled = poll(sink, consumer);
+        polled.forEach(r -> sendMessage(sink, r));
+      }
+      sink.complete();
+      log.debug("Tailing finished");
+    } catch (InterruptException kafkaInterruptException) {
+      sink.complete();
+    } catch (Exception e) {
+      log.error("Error consuming {}", offsetsSeek.getConsumerPosition(), e);
+      sink.error(e);
+    }
+  }
+
+}

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java

@@ -202,6 +202,7 @@ public class ConsumerGroupService {
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
     props.putAll(properties);
 
     return new KafkaConsumer<>(props);

+ 20 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.MessageFilters;
+import com.provectus.kafka.ui.emitter.TailingEmitter;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
@@ -13,9 +14,9 @@ import com.provectus.kafka.ui.model.SeekDirectionDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serde.DeserializationService;
 import com.provectus.kafka.ui.serde.RecordSerDe;
-import com.provectus.kafka.ui.util.FilterTopicMessageEvents;
 import com.provectus.kafka.ui.util.OffsetsSeekBackward;
 import com.provectus.kafka.ui.util.OffsetsSeekForward;
+import com.provectus.kafka.ui.util.ResultSizeLimiter;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -48,8 +49,6 @@ import reactor.core.scheduler.Schedulers;
 @Slf4j
 public class MessagesService {
 
-  private static final int MAX_LOAD_RECORD_LIMIT = 100;
-  private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
 
   private final AdminClientService adminClientService;
   private final DeserializationService deserializationService;
@@ -134,10 +133,7 @@ public class MessagesService {
   public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
                                                  ConsumerPosition consumerPosition, String query,
                                                  MessageFilterTypeDTO filterQueryType,
-                                                 Integer limit) {
-    int recordsLimit = Optional.ofNullable(limit)
-        .map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT))
-        .orElse(DEFAULT_LOAD_RECORD_LIMIT);
+                                                 int limit) {
 
     java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
     RecordSerDe recordDeserializer =
@@ -148,20 +144,33 @@ public class MessagesService {
           new OffsetsSeekForward(topic, consumerPosition),
           recordDeserializer
       );
-    } else {
+    } else if (consumerPosition.getSeekDirection().equals(SeekDirectionDTO.BACKWARD)) {
       emitter = new BackwardRecordEmitter(
           (Map<String, Object> props) -> consumerGroupService.createConsumer(cluster, props),
-          new OffsetsSeekBackward(topic, consumerPosition, recordsLimit),
+          new OffsetsSeekBackward(topic, consumerPosition, limit),
           recordDeserializer
       );
+    } else {
+      emitter = new TailingEmitter(
+          recordDeserializer,
+          () -> consumerGroupService.createConsumer(cluster),
+          new OffsetsSeekForward(topic, consumerPosition)
+      );
     }
     return Flux.create(emitter)
         .filter(getMsgFilter(query, filterQueryType))
-        .takeWhile(new FilterTopicMessageEvents(recordsLimit))
-        .subscribeOn(Schedulers.elastic())
+        .takeWhile(createTakeWhilePredicate(consumerPosition, limit))
+        .subscribeOn(Schedulers.boundedElastic())
         .share();
   }
 
+  private Predicate<TopicMessageEventDTO> createTakeWhilePredicate(
+      ConsumerPosition consumerPosition, int limit) {
+    return consumerPosition.getSeekDirection() == SeekDirectionDTO.TAILING
+        ? evt -> true // no limit for tailing
+        : new ResultSizeLimiter(limit);
+  }
+
   private Predicate<TopicMessageEventDTO> getMsgFilter(String query, MessageFilterTypeDTO filterQueryType) {
     if (StringUtils.isEmpty(query)) {
       return evt -> true;

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java

@@ -43,6 +43,9 @@ public abstract class OffsetsSeek {
       case BEGINNING:
         offsets = offsetsFromBeginning(consumer, partitions);
         break;
+      case LATEST:
+        offsets = endOffsets(consumer, partitions);
+        break;
       default:
         throw new IllegalArgumentException("Unknown seekType: " + seekType);
     }
@@ -73,6 +76,10 @@ public abstract class OffsetsSeek {
         .collect(Collectors.toList());
   }
 
+  protected Map<TopicPartition, Long> endOffsets(
+      Consumer<Bytes, Bytes> consumer, List<TopicPartition> partitions) {
+    return consumer.endOffsets(partitions);
+  }
 
   protected abstract Map<TopicPartition, Long> offsetsFromBeginning(
       Consumer<Bytes, Bytes> consumer, List<TopicPartition> partitions);

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/FilterTopicMessageEvents.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ResultSizeLimiter.java

@@ -4,11 +4,11 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 
-public class FilterTopicMessageEvents implements Predicate<TopicMessageEventDTO> {
+public class ResultSizeLimiter implements Predicate<TopicMessageEventDTO> {
   private final AtomicInteger processed = new AtomicInteger();
   private final int limit;
 
-  public FilterTopicMessageEvents(int limit) {
+  public ResultSizeLimiter(int limit) {
     this.limit = limit;
   }
 

+ 150 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java

@@ -0,0 +1,150 @@
+package com.provectus.kafka.ui.emitter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.AbstractBaseTest;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
+import com.provectus.kafka.ui.model.SeekDirectionDTO;
+import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.service.ClustersStorage;
+import com.provectus.kafka.ui.service.MessagesService;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+
+@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
+class TailingEmitterTest extends AbstractBaseTest {
+
+  @Autowired
+  private ApplicationContext ctx;
+
+  private String topic;
+
+  private KafkaProducer<String, String> producer;
+
+  private Disposable tailingFluxDispose;
+
+  @BeforeEach
+  void init() {
+    topic = "TopicTailingTest_" + UUID.randomUUID();
+    createTopic(new NewTopic(topic, 2, (short) 1));
+    producer = new KafkaProducer<>(
+        Map.of(
+            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
+            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
+            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
+        ));
+  }
+
+  @AfterEach
+  void tearDown() {
+    deleteTopic(topic);
+    if (tailingFluxDispose != null) {
+      tailingFluxDispose.dispose();
+    }
+  }
+
+  @Test
+  void allNewMessagesShouldBeEmitted() throws Exception {
+    var fluxOutput = startTailing(null);
+
+    List<String> expectedValues = new ArrayList<>();
+    for (int i = 0; i < 50; i++) {
+      producer.send(new ProducerRecord<>(topic, i + "", i + "")).get();
+      expectedValues.add(i + "");
+    }
+
+    Awaitility.await()
+        .atMost(Duration.ofSeconds(60))
+        .pollInSameThread()
+        .untilAsserted(() -> {
+          assertThat(fluxOutput)
+              .filteredOn(msg -> msg.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
+              .extracting(msg -> msg.getMessage().getContent())
+              .hasSameElementsAs(expectedValues);
+        });
+  }
+
+  @Test
+  void allNewMessageThatFitFilterConditionShouldBeEmitted() throws Exception {
+    var fluxOutput = startTailing("good");
+
+    List<String> expectedValues = new ArrayList<>();
+    for (int i = 0; i < 50; i++) {
+      if (i % 2 == 0) {
+        producer.send(new ProducerRecord<>(topic, i + "", i + "_good")).get();
+        expectedValues.add(i + "_good");
+      } else {
+        producer.send(new ProducerRecord<>(topic, i + "", i + "_bad")).get();
+      }
+    }
+
+    Awaitility.await()
+        .atMost(Duration.ofSeconds(60))
+        .pollInSameThread()
+        .untilAsserted(() -> {
+          assertThat(fluxOutput)
+              .filteredOn(msg -> msg.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
+              .extracting(msg -> msg.getMessage().getContent())
+              .hasSameElementsAs(expectedValues);
+        });
+  }
+
+  private Flux<TopicMessageEventDTO> createTailingFlux(
+      String topicName,
+      String query) {
+    var cluster = ctx.getBean(ClustersStorage.class)
+        .getClusterByName(LOCAL)
+        .get();
+
+    return ctx.getBean(MessagesService.class)
+        .loadMessages(cluster, topicName,
+            new ConsumerPosition(SeekTypeDTO.LATEST, Map.of(), SeekDirectionDTO.TAILING),
+            query,
+            MessageFilterTypeDTO.STRING_CONTAINS,
+            0);
+  }
+
+  private List<TopicMessageEventDTO> startTailing(String filterQuery) {
+    List<TopicMessageEventDTO> fluxOutput = new CopyOnWriteArrayList<>();
+    tailingFluxDispose = createTailingFlux(topic, filterQuery)
+        .doOnNext(fluxOutput::add)
+        .subscribe();
+
+    // this is needed to be sure that tailing is initialized
+    // and we can start to produce test messages
+    waitUntilTailingInitialized(fluxOutput);
+
+    return fluxOutput;
+  }
+
+
+  private void waitUntilTailingInitialized(List<TopicMessageEventDTO> fluxOutput) {
+    Awaitility.await()
+        .pollInSameThread()
+        .pollDelay(Duration.ofMillis(100))
+        .atMost(Duration.ofSeconds(10))
+        .until(() -> fluxOutput.stream()
+            .anyMatch(msg -> msg.getType() == TopicMessageEventDTO.TypeEnum.CONSUMING));
+  }
+
+}

+ 3 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -2010,6 +2010,7 @@ components:
             - MESSAGE
             - CONSUMING
             - DONE
+            - EMIT_THROTTLING
         message:
           $ref: "#/components/schemas/TopicMessage"
         phase:
@@ -2091,6 +2092,7 @@ components:
         - BEGINNING
         - OFFSET
         - TIMESTAMP
+        - LATEST
 
     MessageFilterType:
       type: string
@@ -2103,6 +2105,7 @@ components:
       enum:
         - FORWARD
         - BACKWARD
+        - TAILING
       default: FORWARD
 
     Partition: