|
@@ -8,7 +8,9 @@ import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
|
|
|
import com.provectus.kafka.ui.model.ConsumerPosition;
|
|
|
import com.provectus.kafka.ui.model.SeekDirection;
|
|
|
import com.provectus.kafka.ui.model.SeekType;
|
|
|
+import com.provectus.kafka.ui.model.TopicMessageEvent;
|
|
|
import com.provectus.kafka.ui.producer.KafkaTestProducer;
|
|
|
+import com.provectus.kafka.ui.serde.SimpleRecordSerDe;
|
|
|
import com.provectus.kafka.ui.util.OffsetsSeekBackward;
|
|
|
import com.provectus.kafka.ui.util.OffsetsSeekForward;
|
|
|
import java.io.Serializable;
|
|
@@ -24,19 +26,19 @@ import lombok.Value;
|
|
|
import lombok.extern.log4j.Log4j2;
|
|
|
import org.apache.kafka.clients.admin.NewTopic;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
|
-import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
|
import org.apache.kafka.common.TopicPartition;
|
|
|
import org.apache.kafka.common.serialization.BytesDeserializer;
|
|
|
-import org.apache.kafka.common.serialization.StringDeserializer;
|
|
|
import org.apache.kafka.common.utils.Bytes;
|
|
|
import org.junit.jupiter.api.AfterAll;
|
|
|
import org.junit.jupiter.api.BeforeAll;
|
|
|
import org.junit.jupiter.api.Test;
|
|
|
+import org.springframework.test.context.ContextConfiguration;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
|
|
@Log4j2
|
|
|
+@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
|
|
|
class RecordEmitterTest extends AbstractBaseTest {
|
|
|
|
|
|
static final int PARTITIONS = 5;
|
|
@@ -80,7 +82,7 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
this::createConsumer,
|
|
|
new OffsetsSeekForward(EMPTY_TOPIC,
|
|
|
new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD)
|
|
|
- )
|
|
|
+ ), new SimpleRecordSerDe()
|
|
|
);
|
|
|
|
|
|
var backwardEmitter = new BackwardRecordEmitter(
|
|
@@ -89,10 +91,11 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
EMPTY_TOPIC,
|
|
|
new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.BACKWARD),
|
|
|
100
|
|
|
- )
|
|
|
+ ), new SimpleRecordSerDe()
|
|
|
);
|
|
|
|
|
|
Long polledValues = Flux.create(forwardEmitter)
|
|
|
+ .filter(m -> m.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
.limitRequest(100)
|
|
|
.count()
|
|
|
.block();
|
|
@@ -100,6 +103,7 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
assertThat(polledValues).isZero();
|
|
|
|
|
|
polledValues = Flux.create(backwardEmitter)
|
|
|
+ .filter(m -> m.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
.limitRequest(100)
|
|
|
.count()
|
|
|
.block();
|
|
@@ -114,7 +118,7 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
this::createConsumer,
|
|
|
new OffsetsSeekForward(TOPIC,
|
|
|
new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD)
|
|
|
- )
|
|
|
+ ), new SimpleRecordSerDe()
|
|
|
);
|
|
|
|
|
|
var backwardEmitter = new BackwardRecordEmitter(
|
|
@@ -122,12 +126,15 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
new OffsetsSeekBackward(TOPIC,
|
|
|
new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD),
|
|
|
PARTITIONS * MSGS_PER_PARTITION
|
|
|
- )
|
|
|
+ ), new SimpleRecordSerDe()
|
|
|
);
|
|
|
|
|
|
var polledValues = Flux.create(forwardEmitter)
|
|
|
- .map(this::deserialize)
|
|
|
+ .filter(m -> m.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
.limitRequest(Long.MAX_VALUE)
|
|
|
+ .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
+ .map(TopicMessageEvent::getMessage)
|
|
|
+ .map(m -> m.getContent().toString())
|
|
|
.collect(Collectors.toList())
|
|
|
.block();
|
|
|
|
|
@@ -135,8 +142,11 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList()));
|
|
|
|
|
|
polledValues = Flux.create(backwardEmitter)
|
|
|
- .map(this::deserialize)
|
|
|
+ .filter(m -> m.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
.limitRequest(Long.MAX_VALUE)
|
|
|
+ .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
+ .map(TopicMessageEvent::getMessage)
|
|
|
+ .map(m -> m.getContent().toString())
|
|
|
.collect(Collectors.toList())
|
|
|
.block();
|
|
|
|
|
@@ -157,7 +167,7 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
this::createConsumer,
|
|
|
new OffsetsSeekForward(TOPIC,
|
|
|
new ConsumerPosition(SeekType.OFFSET, targetOffsets, SeekDirection.FORWARD)
|
|
|
- )
|
|
|
+ ), new SimpleRecordSerDe()
|
|
|
);
|
|
|
|
|
|
var backwardEmitter = new BackwardRecordEmitter(
|
|
@@ -165,12 +175,15 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
new OffsetsSeekBackward(TOPIC,
|
|
|
new ConsumerPosition(SeekType.OFFSET, targetOffsets, SeekDirection.BACKWARD),
|
|
|
PARTITIONS * MSGS_PER_PARTITION
|
|
|
- )
|
|
|
+ ), new SimpleRecordSerDe()
|
|
|
);
|
|
|
|
|
|
var polledValues = Flux.create(forwardEmitter)
|
|
|
- .map(this::deserialize)
|
|
|
+ .filter(m -> m.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
.limitRequest(Long.MAX_VALUE)
|
|
|
+ .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
+ .map(TopicMessageEvent::getMessage)
|
|
|
+ .map(m -> m.getContent().toString())
|
|
|
.collect(Collectors.toList())
|
|
|
.block();
|
|
|
|
|
@@ -187,8 +200,11 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
|
polledValues = Flux.create(backwardEmitter)
|
|
|
- .map(this::deserialize)
|
|
|
+ .filter(m -> m.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
.limitRequest(Long.MAX_VALUE)
|
|
|
+ .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
+ .map(TopicMessageEvent::getMessage)
|
|
|
+ .map(m -> m.getContent().toString())
|
|
|
.collect(Collectors.toList())
|
|
|
.block();
|
|
|
|
|
@@ -214,7 +230,7 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
this::createConsumer,
|
|
|
new OffsetsSeekForward(TOPIC,
|
|
|
new ConsumerPosition(SeekType.TIMESTAMP, targetTimestamps, SeekDirection.FORWARD)
|
|
|
- )
|
|
|
+ ), new SimpleRecordSerDe()
|
|
|
);
|
|
|
|
|
|
var backwardEmitter = new BackwardRecordEmitter(
|
|
@@ -222,11 +238,13 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
new OffsetsSeekBackward(TOPIC,
|
|
|
new ConsumerPosition(SeekType.TIMESTAMP, targetTimestamps, SeekDirection.BACKWARD),
|
|
|
PARTITIONS * MSGS_PER_PARTITION
|
|
|
- )
|
|
|
+ ), new SimpleRecordSerDe()
|
|
|
);
|
|
|
|
|
|
var polledValues = Flux.create(forwardEmitter)
|
|
|
- .map(this::deserialize)
|
|
|
+ .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
+ .map(TopicMessageEvent::getMessage)
|
|
|
+ .map(m -> m.getContent().toString())
|
|
|
.limitRequest(Long.MAX_VALUE)
|
|
|
.collect(Collectors.toList())
|
|
|
.block();
|
|
@@ -239,7 +257,9 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues);
|
|
|
|
|
|
polledValues = Flux.create(backwardEmitter)
|
|
|
- .map(this::deserialize)
|
|
|
+ .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
+ .map(TopicMessageEvent::getMessage)
|
|
|
+ .map(m -> m.getContent().toString())
|
|
|
.limitRequest(Long.MAX_VALUE)
|
|
|
.collect(Collectors.toList())
|
|
|
.block();
|
|
@@ -266,11 +286,13 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
new OffsetsSeekBackward(TOPIC,
|
|
|
new ConsumerPosition(SeekType.OFFSET, targetOffsets, SeekDirection.BACKWARD),
|
|
|
numMessages
|
|
|
- )
|
|
|
+ ), new SimpleRecordSerDe()
|
|
|
);
|
|
|
|
|
|
var polledValues = Flux.create(backwardEmitter)
|
|
|
- .map(this::deserialize)
|
|
|
+ .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
+ .map(TopicMessageEvent::getMessage)
|
|
|
+ .map(m -> m.getContent().toString())
|
|
|
.limitRequest(numMessages)
|
|
|
.collect(Collectors.toList())
|
|
|
.block();
|
|
@@ -297,11 +319,13 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
new OffsetsSeekBackward(TOPIC,
|
|
|
new ConsumerPosition(SeekType.OFFSET, offsets, SeekDirection.BACKWARD),
|
|
|
100
|
|
|
- )
|
|
|
+ ), new SimpleRecordSerDe()
|
|
|
);
|
|
|
|
|
|
var polledValues = Flux.create(backwardEmitter)
|
|
|
- .map(this::deserialize)
|
|
|
+ .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
|
|
|
+ .map(TopicMessageEvent::getMessage)
|
|
|
+ .map(m -> m.getContent().toString())
|
|
|
.limitRequest(Long.MAX_VALUE)
|
|
|
.collect(Collectors.toList())
|
|
|
.block();
|
|
@@ -327,10 +351,6 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
|
return new KafkaConsumer<>(props);
|
|
|
}
|
|
|
|
|
|
- private String deserialize(ConsumerRecord<Bytes, Bytes> rec) {
|
|
|
- return new StringDeserializer().deserialize(TOPIC, rec.value().get());
|
|
|
- }
|
|
|
-
|
|
|
@Value
|
|
|
static class Record {
|
|
|
String value;
|