|
@@ -37,24 +37,16 @@ public class OffsetsResetServiceTest extends AbstractIntegrationTest {
|
|
|
|
|
|
private static final int PARTITIONS = 5;
|
|
|
|
|
|
- private static final KafkaCluster CLUSTER =
|
|
|
- KafkaCluster.builder()
|
|
|
- .name(LOCAL)
|
|
|
- .bootstrapServers(kafka.getBootstrapServers())
|
|
|
- .properties(new Properties())
|
|
|
- .build();
|
|
|
-
|
|
|
private final String groupId = "OffsetsResetServiceTestGroup-" + UUID.randomUUID();
|
|
|
private final String topic = "OffsetsResetServiceTestTopic-" + UUID.randomUUID();
|
|
|
|
|
|
+ private KafkaCluster cluster;
|
|
|
private OffsetsResetService offsetsResetService;
|
|
|
|
|
|
@BeforeEach
|
|
|
void init() {
|
|
|
- AdminClientServiceImpl adminClientService = new AdminClientServiceImpl();
|
|
|
- adminClientService.setClientTimeout(5_000);
|
|
|
- offsetsResetService = new OffsetsResetService(adminClientService);
|
|
|
-
|
|
|
+ cluster = applicationContext.getBean(ClustersStorage.class).getClusterByName(LOCAL).get();
|
|
|
+ offsetsResetService = new OffsetsResetService(applicationContext.getBean(AdminClientService.class));
|
|
|
createTopic(new NewTopic(topic, PARTITIONS, (short) 1));
|
|
|
createConsumerGroup();
|
|
|
}
|
|
@@ -76,13 +68,13 @@ public class OffsetsResetServiceTest extends AbstractIntegrationTest {
|
|
|
void failsIfGroupDoesNotExists() {
|
|
|
List<Mono<?>> expectedNotFound = List.of(
|
|
|
offsetsResetService
|
|
|
- .resetToEarliest(CLUSTER, "non-existing-group", topic, null),
|
|
|
+ .resetToEarliest(cluster, "non-existing-group", topic, null),
|
|
|
offsetsResetService
|
|
|
- .resetToLatest(CLUSTER, "non-existing-group", topic, null),
|
|
|
+ .resetToLatest(cluster, "non-existing-group", topic, null),
|
|
|
offsetsResetService
|
|
|
- .resetToTimestamp(CLUSTER, "non-existing-group", topic, null, System.currentTimeMillis()),
|
|
|
+ .resetToTimestamp(cluster, "non-existing-group", topic, null, System.currentTimeMillis()),
|
|
|
offsetsResetService
|
|
|
- .resetToOffsets(CLUSTER, "non-existing-group", topic, Map.of())
|
|
|
+ .resetToOffsets(cluster, "non-existing-group", topic, Map.of())
|
|
|
);
|
|
|
|
|
|
for (Mono<?> mono : expectedNotFound) {
|
|
@@ -101,11 +93,11 @@ public class OffsetsResetServiceTest extends AbstractIntegrationTest {
|
|
|
consumer.poll(Duration.ofMillis(100));
|
|
|
|
|
|
List<Mono<?>> expectedValidationError = List.of(
|
|
|
- offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null),
|
|
|
- offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null),
|
|
|
+ offsetsResetService.resetToEarliest(cluster, groupId, topic, null),
|
|
|
+ offsetsResetService.resetToLatest(cluster, groupId, topic, null),
|
|
|
offsetsResetService
|
|
|
- .resetToTimestamp(CLUSTER, groupId, topic, null, System.currentTimeMillis()),
|
|
|
- offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, Map.of())
|
|
|
+ .resetToTimestamp(cluster, groupId, topic, null, System.currentTimeMillis()),
|
|
|
+ offsetsResetService.resetToOffsets(cluster, groupId, topic, Map.of())
|
|
|
);
|
|
|
|
|
|
for (Mono<?> mono : expectedValidationError) {
|
|
@@ -121,7 +113,7 @@ public class OffsetsResetServiceTest extends AbstractIntegrationTest {
|
|
|
sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10));
|
|
|
|
|
|
var expectedOffsets = Map.of(0, 5L, 1, 5L, 2, 5L);
|
|
|
- offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, expectedOffsets).block();
|
|
|
+ offsetsResetService.resetToOffsets(cluster, groupId, topic, expectedOffsets).block();
|
|
|
assertOffsets(expectedOffsets);
|
|
|
}
|
|
|
|
|
@@ -131,7 +123,7 @@ public class OffsetsResetServiceTest extends AbstractIntegrationTest {
|
|
|
|
|
|
var offsetsWithInValidBounds = Map.of(0, -2L, 1, 5L, 2, 500L);
|
|
|
var expectedOffsets = Map.of(0, 0L, 1, 5L, 2, 10L);
|
|
|
- offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, offsetsWithInValidBounds).block();
|
|
|
+ offsetsResetService.resetToOffsets(cluster, groupId, topic, offsetsWithInValidBounds).block();
|
|
|
assertOffsets(expectedOffsets);
|
|
|
}
|
|
|
|
|
@@ -140,11 +132,11 @@ public class OffsetsResetServiceTest extends AbstractIntegrationTest {
|
|
|
sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10));
|
|
|
|
|
|
commit(Map.of(0, 5L, 1, 5L, 2, 5L));
|
|
|
- offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, List.of(0, 1)).block();
|
|
|
+ offsetsResetService.resetToEarliest(cluster, groupId, topic, List.of(0, 1)).block();
|
|
|
assertOffsets(Map.of(0, 0L, 1, 0L, 2, 5L));
|
|
|
|
|
|
commit(Map.of(0, 5L, 1, 5L, 2, 5L));
|
|
|
- offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null).block();
|
|
|
+ offsetsResetService.resetToEarliest(cluster, groupId, topic, null).block();
|
|
|
assertOffsets(Map.of(0, 0L, 1, 0L, 2, 0L, 3, 0L, 4, 0L));
|
|
|
}
|
|
|
|
|
@@ -153,11 +145,11 @@ public class OffsetsResetServiceTest extends AbstractIntegrationTest {
|
|
|
sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10, 3, 10, 4, 10));
|
|
|
|
|
|
commit(Map.of(0, 5L, 1, 5L, 2, 5L));
|
|
|
- offsetsResetService.resetToLatest(CLUSTER, groupId, topic, List.of(0, 1)).block();
|
|
|
+ offsetsResetService.resetToLatest(cluster, groupId, topic, List.of(0, 1)).block();
|
|
|
assertOffsets(Map.of(0, 10L, 1, 10L, 2, 5L));
|
|
|
|
|
|
commit(Map.of(0, 5L, 1, 5L, 2, 5L));
|
|
|
- offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null).block();
|
|
|
+ offsetsResetService.resetToLatest(cluster, groupId, topic, null).block();
|
|
|
assertOffsets(Map.of(0, 10L, 1, 10L, 2, 10L, 3, 10L, 4, 10L));
|
|
|
}
|
|
|
|
|
@@ -175,7 +167,7 @@ public class OffsetsResetServiceTest extends AbstractIntegrationTest {
|
|
|
new ProducerRecord<Bytes, Bytes>(topic, 2, 1200L, null, null)));
|
|
|
|
|
|
offsetsResetService.resetToTimestamp(
|
|
|
- CLUSTER, groupId, topic, List.of(0, 1, 2, 3), 1600L
|
|
|
+ cluster, groupId, topic, List.of(0, 1, 2, 3), 1600L
|
|
|
).block();
|
|
|
assertOffsets(Map.of(0, 2L, 1, 1L, 2, 3L, 3, 0L));
|
|
|
}
|
|
@@ -227,7 +219,7 @@ public class OffsetsResetServiceTest extends AbstractIntegrationTest {
|
|
|
private Consumer<?, ?> groupConsumer() {
|
|
|
Properties props = new Properties();
|
|
|
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID());
|
|
|
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.getBootstrapServers());
|
|
|
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
|
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
|
|
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
|
|
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|