Browse Source

"Done" event sending fix (#3631)

* Done message sending fix: Making filtering and limiting a part of emitter to make it possible to gracefully finish emitting
Ilya Kuramshin 2 years ago
parent
commit
e31580a16a

+ 14 - 30
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -1,9 +1,6 @@
 package com.provectus.kafka.ui.emitter;
 
-import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
-import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.time.Duration;
 import java.time.Instant;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -14,13 +11,12 @@ import reactor.core.publisher.FluxSink;
 
 public abstract class AbstractEmitter {
 
-  private final ConsumerRecordDeserializer recordDeserializer;
-  private final ConsumingStats consumingStats = new ConsumingStats();
+  private final MessagesProcessing messagesProcessing;
   private final PollingThrottler throttler;
   protected final PollingSettings pollingSettings;
 
-  protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingSettings pollingSettings) {
-    this.recordDeserializer = recordDeserializer;
+  protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
+    this.messagesProcessing = messagesProcessing;
     this.pollingSettings = pollingSettings;
     this.throttler = pollingSettings.getPollingThrottler();
   }
@@ -40,39 +36,27 @@ public abstract class AbstractEmitter {
     return records;
   }
 
+  protected boolean sendLimitReached() {
+    return messagesProcessing.limitReached();
+  }
+
   protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
-                                                       ConsumerRecord<Bytes, Bytes> msg) {
-    final TopicMessageDTO topicMessage = recordDeserializer.deserialize(msg);
-    sink.next(
-        new TopicMessageEventDTO()
-            .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
-            .message(topicMessage)
-    );
+                             ConsumerRecord<Bytes, Bytes> msg) {
+    messagesProcessing.sendMsg(sink, msg);
   }
 
   protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
-    sink.next(
-        new TopicMessageEventDTO()
-            .type(TopicMessageEventDTO.TypeEnum.PHASE)
-            .phase(new TopicMessagePhaseDTO().name(name))
-    );
+    messagesProcessing.sendPhase(sink, name);
   }
 
   protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
-                               ConsumerRecords<Bytes, Bytes> records,
-                               long elapsed) {
-    return consumingStats.sendConsumingEvt(sink, records, elapsed, getFilterApplyErrors(sink));
+                              ConsumerRecords<Bytes, Bytes> records,
+                              long elapsed) {
+    return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
   }
 
   protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
-    consumingStats.sendFinishEvent(sink, getFilterApplyErrors(sink));
+    messagesProcessing.sendFinishEvent(sink);
     sink.complete();
   }
-
-  protected Number getFilterApplyErrors(FluxSink<?> sink) {
-    return sink.contextView()
-        .<MessageFilterStats>getOrEmpty(MessageFilterStats.class)
-        .<Number>map(MessageFilterStats::getFilterApplyErrors)
-        .orElse(0);
-  }
 }

+ 4 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -2,7 +2,6 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -31,9 +30,9 @@ public class BackwardRecordEmitter
       Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
       ConsumerPosition consumerPosition,
       int messagesPerPage,
-      ConsumerRecordDeserializer recordDeserializer,
+      MessagesProcessing messagesProcessing,
       PollingSettings pollingSettings) {
-    super(recordDeserializer, pollingSettings);
+    super(messagesProcessing, pollingSettings);
     this.consumerPosition = consumerPosition;
     this.messagesPerPage = messagesPerPage;
     this.consumerSupplier = consumerSupplier;
@@ -52,7 +51,7 @@ public class BackwardRecordEmitter
       int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size());
       log.debug("'Until' offsets for polling: {}", readUntilOffsets);
 
-      while (!sink.isCancelled() && !readUntilOffsets.isEmpty()) {
+      while (!sink.isCancelled() && !readUntilOffsets.isEmpty() && !sendLimitReached()) {
         new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
           if (sink.isCancelled()) {
             return; //fast return in case of sink cancellation
@@ -61,8 +60,6 @@ public class BackwardRecordEmitter
           long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
 
           partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
-              .stream()
-              .filter(r -> !sink.isCancelled())
               .forEach(r -> sendMessage(sink, r));
 
           if (beginOffset == readFromOffset) {
@@ -106,6 +103,7 @@ public class BackwardRecordEmitter
 
     EmptyPollsCounter emptyPolls  = pollingSettings.createEmptyPollsCounter();
     while (!sink.isCancelled()
+        && !sendLimitReached()
         && recordsToSend.size() < desiredMsgsToPoll
         && !emptyPolls.noDataEmptyPollsReached()) {
       var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());

+ 4 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -19,7 +19,7 @@ class ConsumingStats {
   int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
                         ConsumerRecords<Bytes, Bytes> polledRecords,
                         long elapsed,
-                        Number filterApplyErrors) {
+                        int filterApplyErrors) {
     int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
     bytes += polledBytes;
     this.records += polledRecords.count();
@@ -32,7 +32,7 @@ class ConsumingStats {
     return polledBytes;
   }
 
-  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, Number filterApplyErrors) {
+  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {
     sink.next(
         new TopicMessageEventDTO()
             .type(TopicMessageEventDTO.TypeEnum.DONE)
@@ -41,12 +41,12 @@ class ConsumingStats {
   }
 
   private TopicMessageConsumingDTO createConsumingStats(FluxSink<TopicMessageEventDTO> sink,
-                                                        Number filterApplyErrors) {
+                                                        int filterApplyErrors) {
     return new TopicMessageConsumingDTO()
         .bytesConsumed(this.bytes)
         .elapsedMs(this.elapsed)
         .isCancelled(sink.isCancelled())
-        .filterApplyErrors(filterApplyErrors.intValue())
+        .filterApplyErrors(filterApplyErrors)
         .messagesConsumed(this.records);
   }
 }

+ 4 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -2,7 +2,6 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -23,9 +22,9 @@ public class ForwardRecordEmitter
   public ForwardRecordEmitter(
       Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
       ConsumerPosition position,
-      ConsumerRecordDeserializer recordDeserializer,
+      MessagesProcessing messagesProcessing,
       PollingSettings pollingSettings) {
-    super(recordDeserializer, pollingSettings);
+    super(messagesProcessing, pollingSettings);
     this.position = position;
     this.consumerSupplier = consumerSupplier;
   }
@@ -40,6 +39,7 @@ public class ForwardRecordEmitter
 
       EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
       while (!sink.isCancelled()
+          && !sendLimitReached()
           && !seekOperations.assignedPartitionsFullyPolled()
           && !emptyPolls.noDataEmptyPollsReached()) {
 
@@ -50,11 +50,7 @@ public class ForwardRecordEmitter
         log.debug("{} records polled", records.count());
 
         for (ConsumerRecord<Bytes, Bytes> msg : records) {
-          if (!sink.isCancelled()) {
-            sendMessage(sink, msg);
-          } else {
-            break;
-          }
+          sendMessage(sink, msg);
         }
       }
       sendFinishStatsAndCompleteSink(sink);

+ 0 - 16
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilterStats.java

@@ -1,16 +0,0 @@
-package com.provectus.kafka.ui.emitter;
-
-import java.util.concurrent.atomic.AtomicLong;
-import lombok.AccessLevel;
-import lombok.Getter;
-
-public class MessageFilterStats {
-
-  @Getter(AccessLevel.PACKAGE)
-  private final AtomicLong filterApplyErrors = new AtomicLong();
-
-  public final void incrementApplyErrors() {
-    filterApplyErrors.incrementAndGet();
-  }
-
-}

+ 82 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -0,0 +1,82 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.utils.Bytes;
+import reactor.core.publisher.FluxSink;
+
+@Slf4j
+public class MessagesProcessing {
+
+  private final ConsumingStats consumingStats = new ConsumingStats();
+  private long sentMessages = 0;
+  private int filterApplyErrors = 0;
+
+  private final ConsumerRecordDeserializer deserializer;
+  private final Predicate<TopicMessageDTO> filter;
+  private final @Nullable Integer limit;
+
+  public MessagesProcessing(ConsumerRecordDeserializer deserializer,
+                            Predicate<TopicMessageDTO> filter,
+                            @Nullable Integer limit) {
+    this.deserializer = deserializer;
+    this.filter = filter;
+    this.limit = limit;
+  }
+
+  boolean limitReached() {
+    return limit != null && sentMessages >= limit;
+  }
+
+  void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
+    if (!sink.isCancelled() && !limitReached()) {
+      TopicMessageDTO topicMessage = deserializer.deserialize(rec);
+      try {
+        if (filter.test(topicMessage)) {
+          sink.next(
+              new TopicMessageEventDTO()
+                  .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
+                  .message(topicMessage)
+          );
+          sentMessages++;
+        }
+      } catch (Exception e) {
+        filterApplyErrors++;
+        log.trace("Error applying filter for message {}", topicMessage);
+      }
+    }
+  }
+
+  int sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink,
+                        ConsumerRecords<Bytes, Bytes> polledRecords,
+                        long elapsed) {
+    if (!sink.isCancelled()) {
+      return consumingStats.sendConsumingEvt(sink, polledRecords, elapsed, filterApplyErrors);
+    }
+    return 0;
+  }
+
+  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
+    if (!sink.isCancelled()) {
+      consumingStats.sendFinishEvent(sink, filterApplyErrors);
+    }
+  }
+
+  void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
+    if (!sink.isCancelled()) {
+      sink.next(
+          new TopicMessageEventDTO()
+              .type(TopicMessageEventDTO.TypeEnum.PHASE)
+              .phase(new TopicMessagePhaseDTO().name(name))
+      );
+    }
+  }
+
+}

+ 2 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -2,7 +2,6 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.HashMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
@@ -20,9 +19,9 @@ public class TailingEmitter extends AbstractEmitter
 
   public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
                         ConsumerPosition consumerPosition,
-                        ConsumerRecordDeserializer recordDeserializer,
+                        MessagesProcessing messagesProcessing,
                         PollingSettings pollingSettings) {
-    super(recordDeserializer, pollingSettings);
+    super(messagesProcessing, pollingSettings);
     this.consumerSupplier = consumerSupplier;
     this.consumerPosition = consumerPosition;
   }

+ 18 - 39
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -3,9 +3,8 @@ package com.provectus.kafka.ui.service;
 import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
-import com.provectus.kafka.ui.emitter.MessageFilterStats;
 import com.provectus.kafka.ui.emitter.MessageFilters;
-import com.provectus.kafka.ui.emitter.ResultSizeLimiter;
+import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.TailingEmitter;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
@@ -14,9 +13,9 @@ import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serde.api.Serde;
-import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import java.util.List;
@@ -162,13 +161,18 @@ public class MessagesService {
                                                       @Nullable String valueSerde) {
 
     java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
-    ConsumerRecordDeserializer recordDeserializer =
-        deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
+
+    var processing = new MessagesProcessing(
+        deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
+        getMsgFilter(query, filterQueryType),
+        seekDirection == SeekDirectionDTO.TAILING ? null : limit
+    );
+
     if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
       emitter = new ForwardRecordEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
-          recordDeserializer,
+          processing,
           cluster.getPollingSettings()
       );
     } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
@@ -176,33 +180,22 @@ public class MessagesService {
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
           limit,
-          recordDeserializer,
+          processing,
           cluster.getPollingSettings()
       );
     } else {
       emitter = new TailingEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
-          recordDeserializer,
+          processing,
           cluster.getPollingSettings()
       );
     }
-    MessageFilterStats filterStats = new MessageFilterStats();
     return Flux.create(emitter)
-        .contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats))
-        .filter(getMsgFilter(query, filterQueryType, filterStats))
         .map(getDataMasker(cluster, topic))
-        .takeWhile(createTakeWhilePredicate(seekDirection, limit))
         .map(throttleUiPublish(seekDirection));
   }
 
-  private Predicate<TopicMessageEventDTO> createTakeWhilePredicate(
-      SeekDirectionDTO seekDirection, int limit) {
-    return seekDirection == SeekDirectionDTO.TAILING
-        ? evt -> true // no limit for tailing
-        : new ResultSizeLimiter(limit);
-  }
-
   private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
     var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
     var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
@@ -211,32 +204,18 @@ public class MessagesService {
         return evt;
       }
       return evt.message(
-        evt.getMessage()
-            .key(keyMasker.apply(evt.getMessage().getKey()))
-            .content(valMasker.apply(evt.getMessage().getContent())));
+          evt.getMessage()
+              .key(keyMasker.apply(evt.getMessage().getKey()))
+              .content(valMasker.apply(evt.getMessage().getContent())));
     };
   }
 
-  private Predicate<TopicMessageEventDTO> getMsgFilter(String query,
-                                                       MessageFilterTypeDTO filterQueryType,
-                                                       MessageFilterStats filterStats) {
+  private Predicate<TopicMessageDTO> getMsgFilter(String query,
+                                                  MessageFilterTypeDTO filterQueryType) {
     if (StringUtils.isEmpty(query)) {
       return evt -> true;
     }
-    var messageFilter = MessageFilters.createMsgFilter(query, filterQueryType);
-    return evt -> {
-      // we only apply filter for message events
-      if (evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) {
-        try {
-          return messageFilter.test(evt.getMessage());
-        } catch (Exception e) {
-          filterStats.incrementApplyErrors();
-          log.trace("Error applying filter '{}' for message {}", query, evt.getMessage());
-          return false;
-        }
-      }
-      return true;
-    };
+    return MessageFilters.createMsgFilter(query, filterQueryType);
   }
 
   private <T> UnaryOperator<T> throttleUiPublish(SeekDirectionDTO seekDirection) {

+ 15 - 10
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -9,6 +9,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
@@ -106,12 +107,16 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     );
   }
 
+  private MessagesProcessing createMessagesProcessing() {
+    return new MessagesProcessing(RECORD_DESERIALIZER, msg -> true, null);
+  }
+
   @Test
   void pollNothingOnEmptyTopic() {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
-        RECORD_DESERIALIZER,
+        createMessagesProcessing(),
         PollingSettings.createDefault()
     );
 
@@ -119,7 +124,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
         100,
-        RECORD_DESERIALIZER,
+        createMessagesProcessing(),
         PollingSettings.createDefault()
     );
 
@@ -141,7 +146,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, TOPIC, null),
-        RECORD_DESERIALIZER,
+        createMessagesProcessing(),
         PollingSettings.createDefault()
     );
 
@@ -149,7 +154,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(LATEST, TOPIC, null),
         PARTITIONS * MSGS_PER_PARTITION,
-        RECORD_DESERIALIZER,
+        createMessagesProcessing(),
         PollingSettings.createDefault()
     );
 
@@ -170,7 +175,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
-        RECORD_DESERIALIZER,
+        createMessagesProcessing(),
         PollingSettings.createDefault()
     );
 
@@ -178,7 +183,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         PARTITIONS * MSGS_PER_PARTITION,
-        RECORD_DESERIALIZER,
+        createMessagesProcessing(),
         PollingSettings.createDefault()
     );
 
@@ -215,7 +220,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
-        RECORD_DESERIALIZER,
+        createMessagesProcessing(),
         PollingSettings.createDefault()
     );
 
@@ -223,7 +228,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
         PARTITIONS * MSGS_PER_PARTITION,
-        RECORD_DESERIALIZER,
+        createMessagesProcessing(),
         PollingSettings.createDefault()
     );
 
@@ -254,7 +259,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         numMessages,
-        RECORD_DESERIALIZER,
+        createMessagesProcessing(),
         PollingSettings.createDefault()
     );
 
@@ -280,7 +285,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, offsets),
         100,
-        RECORD_DESERIALIZER,
+        createMessagesProcessing(),
         PollingSettings.createDefault()
     );