|
@@ -24,6 +24,8 @@ import java.util.Map;
|
|
import java.util.Properties;
|
|
import java.util.Properties;
|
|
import java.util.UUID;
|
|
import java.util.UUID;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
|
+import java.util.function.Consumer;
|
|
|
|
+import java.util.function.Function;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
import lombok.Value;
|
|
import lombok.Value;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
@@ -40,6 +42,8 @@ import org.junit.jupiter.api.BeforeAll;
|
|
import org.junit.jupiter.api.Test;
|
|
import org.junit.jupiter.api.Test;
|
|
import org.springframework.test.context.ContextConfiguration;
|
|
import org.springframework.test.context.ContextConfiguration;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
|
|
+import reactor.core.publisher.FluxSink;
|
|
|
|
+import reactor.test.StepVerifier;
|
|
|
|
|
|
@Slf4j
|
|
@Slf4j
|
|
@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
|
|
@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
|
|
@@ -106,22 +110,17 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
), new SimpleRecordSerDe()
|
|
), new SimpleRecordSerDe()
|
|
);
|
|
);
|
|
|
|
|
|
- Long polledValues = Flux.create(forwardEmitter)
|
|
|
|
- .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .limitRequest(100)
|
|
|
|
- .count()
|
|
|
|
- .block();
|
|
|
|
-
|
|
|
|
- assertThat(polledValues).isZero();
|
|
|
|
-
|
|
|
|
- polledValues = Flux.create(backwardEmitter)
|
|
|
|
- .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .limitRequest(100)
|
|
|
|
- .count()
|
|
|
|
- .block();
|
|
|
|
-
|
|
|
|
- assertThat(polledValues).isZero();
|
|
|
|
-
|
|
|
|
|
|
+ StepVerifier.create(
|
|
|
|
+ Flux.create(forwardEmitter)
|
|
|
|
+ .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
+ .take(100)
|
|
|
|
+ ).expectNextCount(0).expectComplete().verify();
|
|
|
|
+
|
|
|
|
+ StepVerifier.create(
|
|
|
|
+ Flux.create(backwardEmitter)
|
|
|
|
+ .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
+ .take(100)
|
|
|
|
+ ).expectNextCount(0).expectComplete().verify();
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
@@ -136,35 +135,15 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
var backwardEmitter = new BackwardRecordEmitter(
|
|
var backwardEmitter = new BackwardRecordEmitter(
|
|
this::createConsumer,
|
|
this::createConsumer,
|
|
new OffsetsSeekBackward(TOPIC,
|
|
new OffsetsSeekBackward(TOPIC,
|
|
- new ConsumerPosition(BEGINNING, Map.of(), FORWARD),
|
|
|
|
|
|
+ new ConsumerPosition(BEGINNING, Map.of(), BACKWARD),
|
|
PARTITIONS * MSGS_PER_PARTITION
|
|
PARTITIONS * MSGS_PER_PARTITION
|
|
), new SimpleRecordSerDe()
|
|
), new SimpleRecordSerDe()
|
|
);
|
|
);
|
|
|
|
|
|
- var polledValues = Flux.create(forwardEmitter)
|
|
|
|
- .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .limitRequest(Long.MAX_VALUE)
|
|
|
|
- .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .map(TopicMessageEventDTO::getMessage)
|
|
|
|
- .map(m -> m.getContent().toString())
|
|
|
|
- .collect(Collectors.toList())
|
|
|
|
- .block();
|
|
|
|
-
|
|
|
|
- assertThat(polledValues).containsExactlyInAnyOrderElementsOf(
|
|
|
|
- SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList()));
|
|
|
|
-
|
|
|
|
- polledValues = Flux.create(backwardEmitter)
|
|
|
|
- .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .limitRequest(Long.MAX_VALUE)
|
|
|
|
- .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .map(TopicMessageEventDTO::getMessage)
|
|
|
|
- .map(m -> m.getContent().toString())
|
|
|
|
- .collect(Collectors.toList())
|
|
|
|
- .block();
|
|
|
|
-
|
|
|
|
- assertThat(polledValues).containsExactlyInAnyOrderElementsOf(
|
|
|
|
- SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList()));
|
|
|
|
|
|
+ List<String> expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList());
|
|
|
|
|
|
|
|
+ expectEmitter(forwardEmitter, expectedValues);
|
|
|
|
+ expectEmitter(backwardEmitter, expectedValues);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
@@ -190,37 +169,19 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
), new SimpleRecordSerDe()
|
|
), new SimpleRecordSerDe()
|
|
);
|
|
);
|
|
|
|
|
|
- var polledValues = Flux.create(forwardEmitter)
|
|
|
|
- .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .limitRequest(Long.MAX_VALUE)
|
|
|
|
- .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .map(TopicMessageEventDTO::getMessage)
|
|
|
|
- .map(m -> m.getContent().toString())
|
|
|
|
- .collect(Collectors.toList())
|
|
|
|
- .block();
|
|
|
|
-
|
|
|
|
var expectedValues = SENT_RECORDS.stream()
|
|
var expectedValues = SENT_RECORDS.stream()
|
|
.filter(r -> r.getOffset() >= targetOffsets.get(r.getTp()))
|
|
.filter(r -> r.getOffset() >= targetOffsets.get(r.getTp()))
|
|
.map(Record::getValue)
|
|
.map(Record::getValue)
|
|
.collect(Collectors.toList());
|
|
.collect(Collectors.toList());
|
|
|
|
|
|
- assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues);
|
|
|
|
|
|
+ expectEmitter(forwardEmitter, expectedValues);
|
|
|
|
|
|
expectedValues = SENT_RECORDS.stream()
|
|
expectedValues = SENT_RECORDS.stream()
|
|
.filter(r -> r.getOffset() < targetOffsets.get(r.getTp()))
|
|
.filter(r -> r.getOffset() < targetOffsets.get(r.getTp()))
|
|
.map(Record::getValue)
|
|
.map(Record::getValue)
|
|
.collect(Collectors.toList());
|
|
.collect(Collectors.toList());
|
|
|
|
|
|
- polledValues = Flux.create(backwardEmitter)
|
|
|
|
- .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .limitRequest(Long.MAX_VALUE)
|
|
|
|
- .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .map(TopicMessageEventDTO::getMessage)
|
|
|
|
- .map(m -> m.getContent().toString())
|
|
|
|
- .collect(Collectors.toList())
|
|
|
|
- .block();
|
|
|
|
-
|
|
|
|
- assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues);
|
|
|
|
|
|
+ expectEmitter(backwardEmitter, expectedValues);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
@@ -253,36 +214,19 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
), new SimpleRecordSerDe()
|
|
), new SimpleRecordSerDe()
|
|
);
|
|
);
|
|
|
|
|
|
- var polledValues = Flux.create(forwardEmitter)
|
|
|
|
- .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .map(TopicMessageEventDTO::getMessage)
|
|
|
|
- .map(m -> m.getContent().toString())
|
|
|
|
- .limitRequest(Long.MAX_VALUE)
|
|
|
|
- .collect(Collectors.toList())
|
|
|
|
- .block();
|
|
|
|
-
|
|
|
|
var expectedValues = SENT_RECORDS.stream()
|
|
var expectedValues = SENT_RECORDS.stream()
|
|
.filter(r -> r.getTimestamp() >= targetTimestamps.get(r.getTp()))
|
|
.filter(r -> r.getTimestamp() >= targetTimestamps.get(r.getTp()))
|
|
.map(Record::getValue)
|
|
.map(Record::getValue)
|
|
.collect(Collectors.toList());
|
|
.collect(Collectors.toList());
|
|
|
|
|
|
- assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues);
|
|
|
|
-
|
|
|
|
- polledValues = Flux.create(backwardEmitter)
|
|
|
|
- .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .map(TopicMessageEventDTO::getMessage)
|
|
|
|
- .map(m -> m.getContent().toString())
|
|
|
|
- .limitRequest(Long.MAX_VALUE)
|
|
|
|
- .collect(Collectors.toList())
|
|
|
|
- .block();
|
|
|
|
|
|
+ expectEmitter(forwardEmitter, expectedValues);
|
|
|
|
|
|
expectedValues = SENT_RECORDS.stream()
|
|
expectedValues = SENT_RECORDS.stream()
|
|
.filter(r -> r.getTimestamp() < targetTimestamps.get(r.getTp()))
|
|
.filter(r -> r.getTimestamp() < targetTimestamps.get(r.getTp()))
|
|
.map(Record::getValue)
|
|
.map(Record::getValue)
|
|
.collect(Collectors.toList());
|
|
.collect(Collectors.toList());
|
|
|
|
|
|
- assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues);
|
|
|
|
-
|
|
|
|
|
|
+ expectEmitter(backwardEmitter, expectedValues);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
@@ -301,22 +245,15 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
), new SimpleRecordSerDe()
|
|
), new SimpleRecordSerDe()
|
|
);
|
|
);
|
|
|
|
|
|
- var polledValues = Flux.create(backwardEmitter)
|
|
|
|
- .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .map(TopicMessageEventDTO::getMessage)
|
|
|
|
- .map(m -> m.getContent().toString())
|
|
|
|
- .limitRequest(numMessages)
|
|
|
|
- .collect(Collectors.toList())
|
|
|
|
- .block();
|
|
|
|
-
|
|
|
|
var expectedValues = SENT_RECORDS.stream()
|
|
var expectedValues = SENT_RECORDS.stream()
|
|
.filter(r -> r.getOffset() < targetOffsets.get(r.getTp()))
|
|
.filter(r -> r.getOffset() < targetOffsets.get(r.getTp()))
|
|
- .filter(r -> r.getOffset() >= (targetOffsets.get(r.getTp()) - (100 / PARTITIONS)))
|
|
|
|
|
|
+ .filter(r -> r.getOffset() >= (targetOffsets.get(r.getTp()) - (numMessages / PARTITIONS)))
|
|
.map(Record::getValue)
|
|
.map(Record::getValue)
|
|
.collect(Collectors.toList());
|
|
.collect(Collectors.toList());
|
|
|
|
|
|
|
|
+ assertThat(expectedValues).size().isEqualTo(numMessages);
|
|
|
|
|
|
- assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues);
|
|
|
|
|
|
+ expectEmitter(backwardEmitter, expectedValues);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
@@ -334,15 +271,39 @@ class RecordEmitterTest extends AbstractBaseTest {
|
|
), new SimpleRecordSerDe()
|
|
), new SimpleRecordSerDe()
|
|
);
|
|
);
|
|
|
|
|
|
- var polledValues = Flux.create(backwardEmitter)
|
|
|
|
- .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
- .map(TopicMessageEventDTO::getMessage)
|
|
|
|
- .map(m -> m.getContent().toString())
|
|
|
|
- .limitRequest(Long.MAX_VALUE)
|
|
|
|
- .collect(Collectors.toList())
|
|
|
|
- .block();
|
|
|
|
|
|
+ expectEmitter(backwardEmitter,
|
|
|
|
+ 100,
|
|
|
|
+ e -> e.expectNextCount(0),
|
|
|
|
+ StepVerifier.Assertions::hasNotDroppedElements
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void expectEmitter(Consumer<FluxSink<TopicMessageEventDTO>> emitter, List<String> expectedValues) {
|
|
|
|
+ expectEmitter(emitter,
|
|
|
|
+ expectedValues.size(),
|
|
|
|
+ e -> e.recordWith(ArrayList::new)
|
|
|
|
+ .expectNextCount(expectedValues.size())
|
|
|
|
+ .expectRecordedMatches(r -> r.containsAll(expectedValues))
|
|
|
|
+ .consumeRecordedWith(r -> log.info("Collected collection: {}", r)),
|
|
|
|
+ v -> {}
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void expectEmitter(
|
|
|
|
+ Consumer<FluxSink<TopicMessageEventDTO>> emitter,
|
|
|
|
+ int take,
|
|
|
|
+ Function<StepVerifier.Step<String>, StepVerifier.Step<String>> stepConsumer,
|
|
|
|
+ Consumer<StepVerifier.Assertions> assertionsConsumer) {
|
|
|
|
+
|
|
|
|
+ StepVerifier.FirstStep<String> firstStep = StepVerifier.create(
|
|
|
|
+ Flux.create(emitter)
|
|
|
|
+ .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
|
|
+ .take(take)
|
|
|
|
+ .map(m -> m.getMessage().getContent())
|
|
|
|
+ );
|
|
|
|
|
|
- assertThat(polledValues).isEmpty();
|
|
|
|
|
|
+ StepVerifier.Step<String> step = stepConsumer.apply(firstStep);
|
|
|
|
+ assertionsConsumer.accept(step.expectComplete().verifyThenAssertThat());
|
|
}
|
|
}
|
|
|
|
|
|
private KafkaConsumer<Bytes, Bytes> createConsumer() {
|
|
private KafkaConsumer<Bytes, Bytes> createConsumer() {
|