|
@@ -31,13 +31,13 @@ import org.junit.jupiter.api.Test;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.test.StepVerifier;
|
|
import reactor.test.StepVerifier;
|
|
|
|
|
|
-
|
|
|
|
-public class CursorTest extends AbstractIntegrationTest {
|
|
|
|
|
|
+class CursorTest extends AbstractIntegrationTest {
|
|
|
|
|
|
static final String TOPIC = CursorTest.class.getSimpleName() + "_" + UUID.randomUUID();
|
|
static final String TOPIC = CursorTest.class.getSimpleName() + "_" + UUID.randomUUID();
|
|
-
|
|
|
|
static final int MSGS_IN_PARTITION = 20;
|
|
static final int MSGS_IN_PARTITION = 20;
|
|
- static final int PAGE_SIZE = 11;
|
|
|
|
|
|
+ static final int PAGE_SIZE = (MSGS_IN_PARTITION / 2) + 1; //to poll fill data set in 2 iterations
|
|
|
|
+
|
|
|
|
+ final PollingCursorsStorage cursorsStorage = new PollingCursorsStorage();
|
|
|
|
|
|
@BeforeAll
|
|
@BeforeAll
|
|
static void setup() {
|
|
static void setup() {
|
|
@@ -56,56 +56,45 @@ public class CursorTest extends AbstractIntegrationTest {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
void backwardEmitter() {
|
|
void backwardEmitter() {
|
|
- var cursorsStorage = new PollingCursorsStorage();
|
|
|
|
var consumerPosition = new ConsumerPosition(PollingModeDTO.LATEST, TOPIC, List.of(), null, null);
|
|
var consumerPosition = new ConsumerPosition(PollingModeDTO.LATEST, TOPIC, List.of(), null, null);
|
|
-
|
|
|
|
- var cursor = new Cursor.Tracking(
|
|
|
|
- createRecordsDeserializer(),
|
|
|
|
- consumerPosition,
|
|
|
|
- m -> true,
|
|
|
|
- PAGE_SIZE,
|
|
|
|
- cursorsStorage::register
|
|
|
|
- );
|
|
|
|
-
|
|
|
|
- var emitter = createBackwardEmitter(consumerPosition, cursor);
|
|
|
|
- verifyMessagesEmitted(emitter);
|
|
|
|
- assertCursor(
|
|
|
|
- cursorsStorage,
|
|
|
|
|
|
+ var emitter = createBackwardEmitter(consumerPosition);
|
|
|
|
+ emitMessages(emitter, PAGE_SIZE);
|
|
|
|
+ var cursor = assertCursor(
|
|
PollingModeDTO.TO_OFFSET,
|
|
PollingModeDTO.TO_OFFSET,
|
|
offsets -> assertThat(offsets)
|
|
offsets -> assertThat(offsets)
|
|
.hasSize(1)
|
|
.hasSize(1)
|
|
.containsEntry(new TopicPartition(TOPIC, 0), 9L)
|
|
.containsEntry(new TopicPartition(TOPIC, 0), 9L)
|
|
);
|
|
);
|
|
|
|
+
|
|
|
|
+ // polling remaining records using registered cursor
|
|
|
|
+ emitter = createBackwardEmitterWithCursor(cursor);
|
|
|
|
+ emitMessages(emitter, MSGS_IN_PARTITION - PAGE_SIZE);
|
|
|
|
+ //checking no new cursors registered
|
|
|
|
+ assertThat(cursorsStorage.asMap()).hasSize(1).containsValue(cursor);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
void forwardEmitter() {
|
|
void forwardEmitter() {
|
|
- var cursorsStorage = new PollingCursorsStorage();
|
|
|
|
var consumerPosition = new ConsumerPosition(PollingModeDTO.EARLIEST, TOPIC, List.of(), null, null);
|
|
var consumerPosition = new ConsumerPosition(PollingModeDTO.EARLIEST, TOPIC, List.of(), null, null);
|
|
-
|
|
|
|
- var cursor = new Cursor.Tracking(
|
|
|
|
- createRecordsDeserializer(),
|
|
|
|
- consumerPosition,
|
|
|
|
- m -> true,
|
|
|
|
- PAGE_SIZE,
|
|
|
|
- cursorsStorage::register
|
|
|
|
- );
|
|
|
|
-
|
|
|
|
- var emitter = createForwardEmitter(consumerPosition, cursor);
|
|
|
|
- verifyMessagesEmitted(emitter);
|
|
|
|
- assertCursor(
|
|
|
|
- cursorsStorage,
|
|
|
|
|
|
+ var emitter = createForwardEmitter(consumerPosition);
|
|
|
|
+ emitMessages(emitter, PAGE_SIZE);
|
|
|
|
+ var cursor = assertCursor(
|
|
PollingModeDTO.FROM_OFFSET,
|
|
PollingModeDTO.FROM_OFFSET,
|
|
offsets -> assertThat(offsets)
|
|
offsets -> assertThat(offsets)
|
|
.hasSize(1)
|
|
.hasSize(1)
|
|
.containsEntry(new TopicPartition(TOPIC, 0), 11L)
|
|
.containsEntry(new TopicPartition(TOPIC, 0), 11L)
|
|
);
|
|
);
|
|
|
|
+
|
|
|
|
+ //polling remaining records using registered cursor
|
|
|
|
+ emitter = createForwardEmitterWithCursor(cursor);
|
|
|
|
+ emitMessages(emitter, MSGS_IN_PARTITION - PAGE_SIZE);
|
|
|
|
+ //checking no new cursors registered
|
|
|
|
+ assertThat(cursorsStorage.asMap()).hasSize(1).containsValue(cursor);
|
|
}
|
|
}
|
|
|
|
|
|
- private void assertCursor(PollingCursorsStorage storage,
|
|
|
|
- PollingModeDTO expectedMode,
|
|
|
|
- Consumer<Map<TopicPartition, Long>> offsetsAssert) {
|
|
|
|
- Cursor registeredCursor = storage.asMap().values().stream().findFirst().orElse(null);
|
|
|
|
|
|
+ private Cursor assertCursor(PollingModeDTO expectedMode,
|
|
|
|
+ Consumer<Map<TopicPartition, Long>> offsetsAssert) {
|
|
|
|
+ Cursor registeredCursor = cursorsStorage.asMap().values().stream().findFirst().orElse(null);
|
|
assertThat(registeredCursor).isNotNull();
|
|
assertThat(registeredCursor).isNotNull();
|
|
assertThat(registeredCursor.limit()).isEqualTo(PAGE_SIZE);
|
|
assertThat(registeredCursor.limit()).isEqualTo(PAGE_SIZE);
|
|
assertThat(registeredCursor.deserializer()).isNotNull();
|
|
assertThat(registeredCursor.deserializer()).isNotNull();
|
|
@@ -118,36 +107,68 @@ public class CursorTest extends AbstractIntegrationTest {
|
|
assertThat(cursorPosition.pollingMode()).isEqualTo(expectedMode);
|
|
assertThat(cursorPosition.pollingMode()).isEqualTo(expectedMode);
|
|
|
|
|
|
offsetsAssert.accept(cursorPosition.offsets().tpOffsets());
|
|
offsetsAssert.accept(cursorPosition.offsets().tpOffsets());
|
|
|
|
+ return registeredCursor;
|
|
}
|
|
}
|
|
|
|
|
|
- private void verifyMessagesEmitted(AbstractEmitter emitter) {
|
|
|
|
|
|
+ private void emitMessages(AbstractEmitter emitter, int expectedCnt) {
|
|
StepVerifier.create(
|
|
StepVerifier.create(
|
|
Flux.create(emitter)
|
|
Flux.create(emitter)
|
|
.filter(e -> e.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
|
|
.filter(e -> e.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
|
|
.map(e -> e.getMessage().getContent())
|
|
.map(e -> e.getMessage().getContent())
|
|
)
|
|
)
|
|
- .expectNextCount(PAGE_SIZE)
|
|
|
|
|
|
+ .expectNextCount(expectedCnt)
|
|
.verifyComplete();
|
|
.verifyComplete();
|
|
}
|
|
}
|
|
|
|
|
|
- private BackwardRecordEmitter createBackwardEmitter(ConsumerPosition position, Cursor.Tracking cursor) {
|
|
|
|
|
|
+ private BackwardRecordEmitter createBackwardEmitter(ConsumerPosition position) {
|
|
return new BackwardRecordEmitter(
|
|
return new BackwardRecordEmitter(
|
|
this::createConsumer,
|
|
this::createConsumer,
|
|
position,
|
|
position,
|
|
PAGE_SIZE,
|
|
PAGE_SIZE,
|
|
new MessagesProcessing(createRecordsDeserializer(), m -> true, PAGE_SIZE),
|
|
new MessagesProcessing(createRecordsDeserializer(), m -> true, PAGE_SIZE),
|
|
PollingSettings.createDefault(),
|
|
PollingSettings.createDefault(),
|
|
- cursor
|
|
|
|
|
|
+ createCursor(position)
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
- private ForwardRecordEmitter createForwardEmitter(ConsumerPosition position, Cursor.Tracking cursor) {
|
|
|
|
|
|
+ private BackwardRecordEmitter createBackwardEmitterWithCursor(Cursor cursor) {
|
|
|
|
+ return new BackwardRecordEmitter(
|
|
|
|
+ this::createConsumer,
|
|
|
|
+ cursor.consumerPosition(),
|
|
|
|
+ cursor.limit(),
|
|
|
|
+ new MessagesProcessing(cursor.deserializer(), cursor.filter(), PAGE_SIZE),
|
|
|
|
+ PollingSettings.createDefault(),
|
|
|
|
+ createCursor(cursor.consumerPosition())
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private ForwardRecordEmitter createForwardEmitterWithCursor(Cursor cursor) {
|
|
|
|
+ return new ForwardRecordEmitter(
|
|
|
|
+ this::createConsumer,
|
|
|
|
+ cursor.consumerPosition(),
|
|
|
|
+ new MessagesProcessing(cursor.deserializer(), cursor.filter(), PAGE_SIZE),
|
|
|
|
+ PollingSettings.createDefault(),
|
|
|
|
+ createCursor(cursor.consumerPosition())
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private ForwardRecordEmitter createForwardEmitter(ConsumerPosition position) {
|
|
return new ForwardRecordEmitter(
|
|
return new ForwardRecordEmitter(
|
|
this::createConsumer,
|
|
this::createConsumer,
|
|
position,
|
|
position,
|
|
new MessagesProcessing(createRecordsDeserializer(), m -> true, PAGE_SIZE),
|
|
new MessagesProcessing(createRecordsDeserializer(), m -> true, PAGE_SIZE),
|
|
PollingSettings.createDefault(),
|
|
PollingSettings.createDefault(),
|
|
- cursor
|
|
|
|
|
|
+ createCursor(position)
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private Cursor.Tracking createCursor(ConsumerPosition position) {
|
|
|
|
+ return new Cursor.Tracking(
|
|
|
|
+ createRecordsDeserializer(),
|
|
|
|
+ position,
|
|
|
|
+ m -> true,
|
|
|
|
+ PAGE_SIZE,
|
|
|
|
+ cursorsStorage::register
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|