|
@@ -9,15 +9,13 @@ import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP;
|
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
|
|
|
|
import com.provectus.kafka.ui.AbstractIntegrationTest;
|
|
|
-import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
|
|
|
-import com.provectus.kafka.ui.emitter.Cursor;
|
|
|
+import com.provectus.kafka.ui.emitter.BackwardEmitter;
|
|
|
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
|
|
|
-import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
|
|
|
-import com.provectus.kafka.ui.emitter.MessagesProcessing;
|
|
|
+import com.provectus.kafka.ui.emitter.ForwardEmitter;
|
|
|
import com.provectus.kafka.ui.emitter.PollingSettings;
|
|
|
import com.provectus.kafka.ui.emitter.PollingThrottler;
|
|
|
import com.provectus.kafka.ui.model.ConsumerPosition;
|
|
|
-import com.provectus.kafka.ui.model.ConsumerPosition.Offsets;
|
|
|
+import com.provectus.kafka.ui.model.TopicMessageDTO;
|
|
|
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
|
|
import com.provectus.kafka.ui.producer.KafkaTestProducer;
|
|
|
import com.provectus.kafka.ui.serde.api.Serde;
|
|
@@ -35,6 +33,7 @@ import java.util.UUID;
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.function.Function;
|
|
|
+import java.util.function.Predicate;
|
|
|
import java.util.stream.Collectors;
|
|
|
import lombok.Value;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
@@ -62,6 +61,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|
|
static final List<Record> SENT_RECORDS = new ArrayList<>();
|
|
|
static final ConsumerRecordDeserializer RECORD_DESERIALIZER = createRecordsDeserializer();
|
|
|
static final Cursor.Tracking CURSOR_MOCK = Mockito.mock(Cursor.Tracking.class);
|
|
|
+ static final Predicate<TopicMessageDTO> NOOP_FILTER = m -> true;
|
|
|
|
|
|
@BeforeAll
|
|
|
static void generateMsgs() throws Exception {
|
|
@@ -98,6 +98,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|
|
static void cleanup() {
|
|
|
deleteTopic(TOPIC);
|
|
|
deleteTopic(EMPTY_TOPIC);
|
|
|
+ SENT_RECORDS.clear();
|
|
|
}
|
|
|
|
|
|
private static ConsumerRecordDeserializer createRecordsDeserializer() {
|
|
@@ -110,31 +111,36 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|
|
s.deserializer(null, Serde.Target.VALUE),
|
|
|
StringSerde.name(),
|
|
|
s.deserializer(null, Serde.Target.KEY),
|
|
|
- s.deserializer(null, Serde.Target.VALUE)
|
|
|
+ s.deserializer(null, Serde.Target.VALUE),
|
|
|
+ msg -> msg
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- private MessagesProcessing createMessagesProcessing() {
|
|
|
- return new MessagesProcessing(RECORD_DESERIALIZER, msg -> true, null);
|
|
|
- }
|
|
|
-
|
|
|
@Test
|
|
|
void pollNothingOnEmptyTopic() {
|
|
|
- var forwardEmitter = new ForwardRecordEmitter(
|
|
|
+ var forwardEmitter = new ForwardEmitter(
|
|
|
this::createConsumer,
|
|
|
+ new ConsumerPosition(EARLIEST, EMPTY_TOPIC, null),
|
|
|
+ 100,
|
|
|
+ RECORD_DESERIALIZER,
|
|
|
+ NOOP_FILTER,
|
|
|
+ PollingSettings.createDefault()
|
|
|
new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
|
|
|
createMessagesProcessing(),
|
|
|
PollingSettings.createDefault(),
|
|
|
CURSOR_MOCK
|
|
|
);
|
|
|
|
|
|
- var backwardEmitter = new BackwardRecordEmitter(
|
|
|
+ var backwardEmitter = new BackwardEmitter(
|
|
|
this::createConsumer,
|
|
|
new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
|
|
|
100,
|
|
|
createMessagesProcessing(),
|
|
|
PollingSettings.createDefault(),
|
|
|
- CURSOR_MOCK
|
|
|
+ CURSOR_MOCK,
|
|
|
+ RECORD_DESERIALIZER,
|
|
|
+ NOOP_FILTER,
|
|
|
+ PollingSettings.createDefault()
|
|
|
);
|
|
|
|
|
|
StepVerifier.create(Flux.create(forwardEmitter))
|
|
@@ -152,7 +158,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|
|
|
|
|
@Test
|
|
|
void pollFullTopicFromBeginning() {
|
|
|
- var forwardEmitter = new ForwardRecordEmitter(
|
|
|
+ var forwardEmitter = new ForwardEmitter(
|
|
|
this::createConsumer,
|
|
|
new ConsumerPosition(EARLIEST, TOPIC, List.of(), null, null),
|
|
|
createMessagesProcessing(),
|
|
@@ -160,13 +166,16 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|
|
CURSOR_MOCK
|
|
|
);
|
|
|
|
|
|
- var backwardEmitter = new BackwardRecordEmitter(
|
|
|
+ var backwardEmitter = new BackwardEmitter(
|
|
|
this::createConsumer,
|
|
|
new ConsumerPosition(LATEST, TOPIC, List.of(), null, null),
|
|
|
PARTITIONS * MSGS_PER_PARTITION,
|
|
|
createMessagesProcessing(),
|
|
|
PollingSettings.createDefault(),
|
|
|
CURSOR_MOCK
|
|
|
+ RECORD_DESERIALIZER,
|
|
|
+ NOOP_FILTER,
|
|
|
+ PollingSettings.createDefault()
|
|
|
);
|
|
|
|
|
|
List<String> expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList());
|
|
@@ -183,16 +192,21 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|
|
targetOffsets.put(new TopicPartition(TOPIC, i), offset);
|
|
|
}
|
|
|
|
|
|
- var forwardEmitter = new ForwardRecordEmitter(
|
|
|
+ var forwardEmitter = new ForwardEmitter(
|
|
|
this::createConsumer,
|
|
|
new ConsumerPosition(FROM_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
|
|
|
new Offsets(null, targetOffsets)),
|
|
|
createMessagesProcessing(),
|
|
|
PollingSettings.createDefault(),
|
|
|
CURSOR_MOCK
|
|
|
+ new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
|
|
|
+ PARTITIONS * MSGS_PER_PARTITION,
|
|
|
+ RECORD_DESERIALIZER,
|
|
|
+ NOOP_FILTER,
|
|
|
+ PollingSettings.createDefault()
|
|
|
);
|
|
|
|
|
|
- var backwardEmitter = new BackwardRecordEmitter(
|
|
|
+ var backwardEmitter = new BackwardEmitter(
|
|
|
this::createConsumer,
|
|
|
new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
|
|
|
new Offsets(null, targetOffsets)),
|
|
@@ -200,6 +214,9 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|
|
createMessagesProcessing(),
|
|
|
PollingSettings.createDefault(),
|
|
|
CURSOR_MOCK
|
|
|
+ RECORD_DESERIALIZER,
|
|
|
+ NOOP_FILTER,
|
|
|
+ PollingSettings.createDefault()
|
|
|
);
|
|
|
|
|
|
var expectedValues = SENT_RECORDS.stream()
|
|
@@ -223,7 +240,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|
|
//choosing ts in the middle
|
|
|
long targetTimestamp = tsStats.getMin() + ((tsStats.getMax() - tsStats.getMin()) / 2);
|
|
|
|
|
|
- var forwardEmitter = new ForwardRecordEmitter(
|
|
|
+ var forwardEmitter = new ForwardEmitter(
|
|
|
this::createConsumer,
|
|
|
new ConsumerPosition(FROM_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
|
|
|
createMessagesProcessing(),
|
|
@@ -237,15 +254,23 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|
|
.filter(r -> r.getTimestamp() >= targetTimestamp)
|
|
|
.map(Record::getValue)
|
|
|
.collect(Collectors.toList())
|
|
|
+ new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
|
|
|
+ PARTITIONS * MSGS_PER_PARTITION,
|
|
|
+ RECORD_DESERIALIZER,
|
|
|
+ NOOP_FILTER,
|
|
|
+ PollingSettings.createDefault()
|
|
|
);
|
|
|
|
|
|
- var backwardEmitter = new BackwardRecordEmitter(
|
|
|
+ var backwardEmitter = new BackwardEmitter(
|
|
|
this::createConsumer,
|
|
|
new ConsumerPosition(TO_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
|
|
|
PARTITIONS * MSGS_PER_PARTITION,
|
|
|
createMessagesProcessing(),
|
|
|
PollingSettings.createDefault(),
|
|
|
CURSOR_MOCK
|
|
|
+ RECORD_DESERIALIZER,
|
|
|
+ NOOP_FILTER,
|
|
|
+ PollingSettings.createDefault()
|
|
|
);
|
|
|
|
|
|
expectEmitter(
|
|
@@ -265,7 +290,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|
|
targetOffsets.put(new TopicPartition(TOPIC, i), (long) MSGS_PER_PARTITION);
|
|
|
}
|
|
|
|
|
|
- var backwardEmitter = new BackwardRecordEmitter(
|
|
|
+ var backwardEmitter = new BackwardEmitter(
|
|
|
this::createConsumer,
|
|
|
new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
|
|
|
new Offsets(null, targetOffsets)),
|
|
@@ -273,6 +298,9 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|
|
createMessagesProcessing(),
|
|
|
PollingSettings.createDefault(),
|
|
|
CURSOR_MOCK
|
|
|
+ RECORD_DESERIALIZER,
|
|
|
+ NOOP_FILTER,
|
|
|
+ PollingSettings.createDefault()
|
|
|
);
|
|
|
|
|
|
var expectedValues = SENT_RECORDS.stream()
|
|
@@ -293,10 +321,13 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|
|
offsets.put(new TopicPartition(TOPIC, i), 0L);
|
|
|
}
|
|
|
|
|
|
- var backwardEmitter = new BackwardRecordEmitter(
|
|
|
+ var backwardEmitter = new BackwardEmitter(
|
|
|
this::createConsumer,
|
|
|
new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(offsets.keySet()), null, new Offsets(null, offsets)),
|
|
|
100,
|
|
|
+ RECORD_DESERIALIZER,
|
|
|
+ NOOP_FILTER,
|
|
|
+ PollingSettings.createDefault()
|
|
|
createMessagesProcessing(),
|
|
|
PollingSettings.createDefault(),
|
|
|
CURSOR_MOCK
|