diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java index 9b7905fe3e..30a5f566d3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java @@ -32,10 +32,10 @@ public abstract class AbstractEmitter { return records; } - protected FluxSink sendMessage(FluxSink sink, + protected void sendMessage(FluxSink sink, ConsumerRecord msg) { final TopicMessageDTO topicMessage = ClusterUtil.mapToTopicMessage(msg, recordDeserializer); - return sink.next( + sink.next( new TopicMessageEventDTO() .type(TopicMessageEventDTO.TypeEnum.MESSAGE) .message(topicMessage) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java index cb3d4ae89b..ff0e2fca4b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java @@ -4,5 +4,5 @@ public enum Feature { KAFKA_CONNECT, KSQL_DB, SCHEMA_REGISTRY, - TOPIC_DELETION; + TOPIC_DELETION } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java index 7b93b1420e..d8c96f8cbc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java @@ -12,6 +12,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; @Service diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index 34eca2ac3f..b991884548 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -48,8 +48,6 @@ import reactor.core.scheduler.Schedulers; @RequiredArgsConstructor @Slf4j public class MessagesService { - - private final AdminClientService adminClientService; private final DeserializationService deserializationService; private final ConsumerGroupService consumerGroupService; @@ -85,7 +83,7 @@ public class MessagesService { if (msg.getPartition() != null && msg.getPartition() > metricsCache.get(cluster).getTopicDescriptions() .get(topic).partitions().size() - 1) { - throw new ValidationException("Invalid partition"); + return Mono.error(new ValidationException("Invalid partition")); } RecordSerDe serde = deserializationService.getRecordDeserializerForCluster(cluster); @@ -118,6 +116,8 @@ public class MessagesService { } }); return Mono.fromFuture(cf); + } catch (Throwable e) { + return Mono.error(e); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java index ae57a668a9..184a10e482 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java @@ -296,7 +296,7 @@ public class TopicsService { .collect(toList()); // Iterate brokers and try to add them in assignment - // while (partition replicas count != requested replication factor) + // while partition replicas count != requested replication factor for (Integer broker : brokers) { if (!assignmentList.contains(broker)) { assignmentList.add(broker); @@ -324,7 +324,7 @@ public class TopicsService { .collect(toList()); // Iterate brokers and try to remove them from assignment - // while (partition replicas count != requested replication factor) + // while partition replicas count != requested replication factor for (Integer broker : brokersUsageList) { // Check is the broker the leader of partition if (!topic.getPartitions().get(partition).getLeader() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java index eccc19f0f6..137e9aa3a8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java @@ -18,7 +18,7 @@ public class ResponseParser { } public static Optional parseSelectResponse(JsonNode jsonNode) { - // in response we getting either header record or row data + // in response, we're getting either header record or row data if (arrayFieldNonEmpty(jsonNode, "header")) { return Optional.of( KsqlApiClient.KsqlResponseTable.builder() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java index b0b2b36d81..1edd85c245 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java @@ -156,21 +156,21 @@ public class JmxClusterUtil { .metrics(metrics) .internalBrokerMetrics(perBrokerJmxMetrics) .bytesInPerSec(findTopicMetrics( - metrics, JmxMetricsName.BytesInPerSec, JmxMetricsValueName.FiveMinuteRate)) + metrics, JmxMetricsName.BYTES_IN_PER_SEC, JmxMetricsValueName.FIFTEEN_MINUTE_RATE)) .bytesOutPerSec(findTopicMetrics( - metrics, JmxMetricsName.BytesOutPerSec, JmxMetricsValueName.FiveMinuteRate)) + metrics, JmxMetricsName.BYTES_OUT_PER_SEC, JmxMetricsValueName.FIFTEEN_MINUTE_RATE)) .build(); } private Map findTopicMetrics(List metrics, JmxMetricsName metricsName, JmxMetricsValueName valueName) { - return metrics.stream().filter(m -> metricsName.name().equals(m.getName())) + return metrics.stream().filter(m -> metricsName.getValue().equals(m.getName())) .filter(m -> m.getParams().containsKey("topic")) - .filter(m -> m.getValue().containsKey(valueName.name())) + .filter(m -> m.getValue().containsKey(valueName.getValue())) .map(m -> Tuples.of( m.getParams().get("topic"), - m.getValue().get(valueName.name()) + m.getValue().get(valueName.getValue()) )).collect(groupingBy( Tuple2::getT1, reducing(BigDecimal.ZERO, Tuple2::getT2, BigDecimal::add) @@ -204,7 +204,7 @@ public class JmxClusterUtil { private boolean isWellKnownMetric(MetricDTO metric) { final Optional param = Optional.ofNullable(metric.getParams().get(NAME_METRIC_FIELD)).filter(p -> - Arrays.stream(JmxMetricsName.values()).map(Enum::name) + Arrays.stream(JmxMetricsName.values()).map(JmxMetricsName::getValue) .anyMatch(n -> n.equals(p)) ); return metric.getCanonicalName().contains(KAFKA_SERVER_PARAM) && param.isPresent(); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsName.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsName.java index 6dccf6f9b3..2384fc8536 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsName.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsName.java @@ -1,31 +1,41 @@ package com.provectus.kafka.ui.util; public enum JmxMetricsName { - MessagesInPerSec, - BytesInPerSec, - ReplicationBytesInPerSec, - RequestsPerSec, - ErrorsPerSec, - MessageConversionsPerSec, - BytesOutPerSec, - ReplicationBytesOutPerSec, - NoKeyCompactedTopicRecordsPerSec, - InvalidMagicNumberRecordsPerSec, - InvalidMessageCrcRecordsPerSec, - InvalidOffsetOrSequenceRecordsPerSec, - UncleanLeaderElectionsPerSec, - IsrShrinksPerSec, - IsrExpandsPerSec, - ReassignmentBytesOutPerSec, - ReassignmentBytesInPerSec, - ProduceMessageConversionsPerSec, - FailedFetchRequestsPerSec, - ZooKeeperSyncConnectsPerSec, - BytesRejectedPerSec, - ZooKeeperAuthFailuresPerSec, - TotalFetchRequestsPerSec, - FailedIsrUpdatesPerSec, - IncrementalFetchSessionEvictionsPerSec, - FetchMessageConversionsPerSec, - FailedProduceRequestsPerSec + MESSAGES_IN_PER_SEC("MessagesInPerSec"), + BYTES_IN_PER_SEC("BytesInPerSec"), + REPLICATION_BYTES_IN_PER_SEC("ReplicationBytesInPerSec"), + REQUESTS_PER_SEC("RequestsPerSec"), + ERRORS_PER_SEC("ErrorsPerSec"), + MESSAGE_CONVERSIONS_PER_SEC("MessageConversionsPerSec"), + BYTES_OUT_PER_SEC("BytesOutPerSec"), + REPLICATION_BYTES_OUT_PER_SEC("ReplicationBytesOutPerSec"), + NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC("NoKeyCompactedTopicRecordsPerSec"), + INVALID_MAGIC_NUMBER_RECORDS_PER_SEC("InvalidMagicNumberRecordsPerSec"), + INVALID_MESSAGE_CRC_RECORDS_PER_SEC("InvalidMessageCrcRecordsPerSec"), + INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC("InvalidOffsetOrSequenceRecordsPerSec"), + UNCLEAN_LEADER_ELECTIONS_PER_SEC("UncleanLeaderElectionsPerSec"), + ISR_SHRINKS_PER_SEC("IsrShrinksPerSec"), + ISR_EXPANDS_PER_SEC("IsrExpandsPerSec"), + REASSIGNMENT_BYTES_OUT_PER_SEC("ReassignmentBytesOutPerSec"), + REASSIGNMENT_BYTES_IN_PER_SEC("ReassignmentBytesInPerSec"), + PRODUCE_MESSAGE_CONVERSIONS_PER_SEC("ProduceMessageConversionsPerSec"), + FAILED_FETCH_REQUESTS_PER_SEC("FailedFetchRequestsPerSec"), + ZOOKEEPER_SYNC_CONNECTS_PER_SEC("ZooKeeperSyncConnectsPerSec"), + BYTES_REJECTED_PER_SEC("BytesRejectedPerSec"), + ZOO_KEEPER_AUTH_FAILURES_PER_SEC("ZooKeeperAuthFailuresPerSec"), + TOTAL_FETCH_REQUESTS_PER_SEC("TotalFetchRequestsPerSec"), + FAILED_ISR_UPDATES_PER_SEC("FailedIsrUpdatesPerSec"), + INCREMENTAL_FETCH_SESSION_EVICTIONS_PER_SEC("IncrementalFetchSessionEvictionsPerSec"), + FETCH_MESSAGE_CONVERSIONS_PER_SEC("FetchMessageConversionsPerSec"), + FAILED_PRODUCE_REQUESTS_PER_SEC("FailedProduceRequestsPerSe"); + + private final String value; + + JmxMetricsName(String value) { + this.value = value; + } + + public String getValue() { + return value; + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsValueName.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsValueName.java index f70ac9d1c6..cbcc6cee07 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsValueName.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxMetricsValueName.java @@ -1,9 +1,19 @@ package com.provectus.kafka.ui.util; public enum JmxMetricsValueName { - Count, - OneMinuteRate, - FifteenMinuteRate, - FiveMinuteRate, - MeanRate + COUNT("Count"), + ONE_MINUTE_RATE("OneMinuteRate"), + FIFTEEN_MINUTE_RATE("FifteenMinuteRate"), + FIVE_MINUTE_RATE("FiveMinuteRate"), + MEAN_RATE("MeanRate"); + + private final String value; + + JmxMetricsValueName(String value) { + this.value = value; + } + + public String getValue() { + return value; + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java index 7b044fe087..aa482c57b5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java @@ -30,7 +30,7 @@ import java.util.Map; public final class KafkaConstants { - private static final String LONG_MAX_STRING = Long.valueOf(Long.MAX_VALUE).toString(); + private static final String LONG_MAX_STRING = Long.toString(Long.MAX_VALUE); public static final Map TOPIC_DEFAULT_CONFIGS = Map.ofEntries( new AbstractMap.SimpleEntry<>(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE), diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ResultSizeLimiter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ResultSizeLimiter.java index ea07105be1..64fcb21509 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ResultSizeLimiter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ResultSizeLimiter.java @@ -16,9 +16,7 @@ public class ResultSizeLimiter implements Predicate { public boolean test(TopicMessageEventDTO event) { if (event.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) { final int i = processed.incrementAndGet(); - if (i > limit) { - return false; - } + return i <= limit; } return true; } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java index 73d9331b41..96e8cf371e 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java @@ -22,6 +22,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.web.reactive.server.WebTestClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) @Slf4j @@ -48,8 +50,13 @@ public class KafkaConsumerTests extends AbstractBaseTest { .isOk(); try (KafkaTestProducer producer = KafkaTestProducer.forKafka(kafka)) { - Stream.of("one", "two", "three", "four") - .forEach(value -> producer.send(topicName, value)); + Flux.fromStream( + Stream.of("one", "two", "three", "four") + .map(value -> Mono.fromFuture(producer.send(topicName, value))) + ).blockLast(); + } catch (Throwable e) { + log.error("Error on sending", e); + throw new RuntimeException(e); } long count = webTestClient.get() diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java index 4a7e82833b..48ce2cac38 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java @@ -76,12 +76,12 @@ class TailingEmitterTest extends AbstractBaseTest { Awaitility.await() .atMost(Duration.ofSeconds(60)) .pollInSameThread() - .untilAsserted(() -> { - assertThat(fluxOutput) + .untilAsserted(() -> + assertThat(fluxOutput) .filteredOn(msg -> msg.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) .extracting(msg -> msg.getMessage().getContent()) - .hasSameElementsAs(expectedValues); - }); + .hasSameElementsAs(expectedValues) + ); } @Test @@ -101,12 +101,12 @@ class TailingEmitterTest extends AbstractBaseTest { Awaitility.await() .atMost(Duration.ofSeconds(60)) .pollInSameThread() - .untilAsserted(() -> { - assertThat(fluxOutput) + .untilAsserted(() -> + assertThat(fluxOutput) .filteredOn(msg -> msg.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) .extracting(msg -> msg.getMessage().getContent()) - .hasSameElementsAs(expectedValues); - }); + .hasSameElementsAs(expectedValues) + ); } private Flux createTailingFlux( diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java index eb59d4977a..c892418843 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.producer; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -25,12 +26,20 @@ public class KafkaTestProducer implements AutoCloseable { ))); } - public Future send(String topic, ValueT value) { - return producer.send(new ProducerRecord<>(topic, value)); + public CompletableFuture send(String topic, ValueT value) { + return send(new ProducerRecord<>(topic, value)); } - public Future send(ProducerRecord record) { - return producer.send(record); + public CompletableFuture send(ProducerRecord record) { + CompletableFuture cf = new CompletableFuture<>(); + producer.send(record, (m, e) -> { + if (e != null) { + cf.completeExceptionally(e); + } else { + cf.complete(m); + } + }); + return cf; } @Override diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/BrokerServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/BrokerServiceTest.java index 816ec61eee..188a2f77b4 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/BrokerServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/BrokerServiceTest.java @@ -1,7 +1,5 @@ package com.provectus.kafka.ui.service; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - import com.provectus.kafka.ui.AbstractBaseTest; import com.provectus.kafka.ui.mapper.ClusterMapperImpl; import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper; @@ -45,15 +43,4 @@ class BrokerServiceTest extends AbstractBaseTest { .verifyComplete(); } - @Test - void getBrokersNull() { - assertThatThrownBy(() -> brokerService.getBrokers(null)).isInstanceOf(NullPointerException.class); - } - - @Test - void getBrokersEmpty() { - assertThatThrownBy(() -> brokerService.getBrokers(KafkaCluster.builder().build())).isInstanceOf( - NullPointerException.class); - } - } \ No newline at end of file diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ConfigTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ConfigTest.java index 03c65fd9f4..f696220bb7 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ConfigTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ConfigTest.java @@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.assertThat; import com.provectus.kafka.ui.AbstractBaseTest; import com.provectus.kafka.ui.model.BrokerConfigDTO; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; @@ -13,6 +14,7 @@ import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWeb import org.springframework.core.ParameterizedTypeReference; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.web.reactive.server.WebTestClient; +import org.testcontainers.shaded.org.awaitility.Awaitility; @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) @AutoConfigureWebTestClient(timeout = "60000") @@ -22,29 +24,33 @@ public class ConfigTest extends AbstractBaseTest { private WebTestClient webTestClient; @Test - public void testAlterConfig() throws Exception { + public void testAlterConfig() { String name = "background.threads"; Optional bc = getConfig(name); assertThat(bc.isPresent()).isTrue(); assertThat(bc.get().getValue()).isEqualTo("10"); + final String newValue = "5"; + webTestClient.put() .uri("/api/clusters/{clusterName}/brokers/{id}/configs/{name}", LOCAL, 1, name) .bodyValue(Map.of( "name", name, - "value", "5" + "value", newValue ) ) .exchange() .expectStatus().isOk(); - // Without sleep it returns old config so we need to wait a little bit - Thread.sleep(1000); - - Optional bcc = getConfig(name); - assertThat(bcc.isPresent()).isTrue(); - assertThat(bcc.get().getValue()).isEqualTo("5"); + Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInSameThread() + .untilAsserted(() -> { + Optional bcc = getConfig(name); + assertThat(bcc.isPresent()).isTrue(); + assertThat(bcc.get().getValue()).isEqualTo(newValue); + }); } @Test diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java index 0efef1d205..4fe49b5e29 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java @@ -1,7 +1,6 @@ package com.provectus.kafka.ui.service; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.provectus.kafka.ui.AbstractBaseTest; import com.provectus.kafka.ui.exception.NotFoundException; @@ -32,6 +31,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.test.context.ContextConfiguration; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) public class OffsetsResetServiceTest extends AbstractBaseTest { @@ -75,45 +76,45 @@ public class OffsetsResetServiceTest extends AbstractBaseTest { @Test void failsIfGroupDoesNotExists() { - assertThatThrownBy( - () -> offsetsResetService - .resetToEarliest(CLUSTER, "non-existing-group", topic, null).block() - ).isInstanceOf(NotFoundException.class); - assertThatThrownBy( - () -> offsetsResetService - .resetToLatest(CLUSTER, "non-existing-group", topic, null).block() - ).isInstanceOf(NotFoundException.class); - assertThatThrownBy(() -> offsetsResetService - .resetToTimestamp(CLUSTER, "non-existing-group", topic, null, System.currentTimeMillis()) - .block() - ).isInstanceOf(NotFoundException.class); - assertThatThrownBy( - () -> offsetsResetService + List> expectedNotFound = List.of( + offsetsResetService + .resetToEarliest(CLUSTER, "non-existing-group", topic, null), + offsetsResetService + .resetToLatest(CLUSTER, "non-existing-group", topic, null), + offsetsResetService + .resetToTimestamp(CLUSTER, "non-existing-group", topic, null, System.currentTimeMillis()), + offsetsResetService .resetToOffsets(CLUSTER, "non-existing-group", topic, Map.of()) - .block() - ).isInstanceOf(NotFoundException.class); + ); + + for (Mono mono : expectedNotFound) { + StepVerifier.create(mono) + .expectErrorMatches(t -> t instanceof NotFoundException) + .verify(); + } } @Test void failsIfGroupIsActive() { // starting consumer to activate group try (var consumer = groupConsumer()) { + consumer.subscribe(Pattern.compile("no-such-topic-pattern")); consumer.poll(Duration.ofMillis(100)); - assertThatThrownBy(() -> - offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null).block() - ).isInstanceOf(ValidationException.class); - assertThatThrownBy( - () -> offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null).block() - ).isInstanceOf(ValidationException.class); - assertThatThrownBy(() -> offsetsResetService - .resetToTimestamp(CLUSTER, groupId, topic, null, System.currentTimeMillis()) - .block() - ).isInstanceOf(ValidationException.class); - assertThatThrownBy( - () -> offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, Map.of()).block() - ).isInstanceOf(ValidationException.class); + List> expectedValidationError = List.of( + offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null), + offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null), + offsetsResetService + .resetToTimestamp(CLUSTER, groupId, topic, null, System.currentTimeMillis()), + offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, Map.of()) + ); + + for (Mono mono : expectedValidationError) { + StepVerifier.create(mono) + .expectErrorMatches(t -> t instanceof ValidationException) + .verify(); + } } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java index 280aceac86..e947486da6 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import lombok.Value; import lombok.extern.slf4j.Slf4j; @@ -40,6 +42,8 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.test.context.ContextConfiguration; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.test.StepVerifier; @Slf4j @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) @@ -106,22 +110,17 @@ class RecordEmitterTest extends AbstractBaseTest { ), new SimpleRecordSerDe() ); - Long polledValues = Flux.create(forwardEmitter) - .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .limitRequest(100) - .count() - .block(); - - assertThat(polledValues).isZero(); - - polledValues = Flux.create(backwardEmitter) - .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .limitRequest(100) - .count() - .block(); - - assertThat(polledValues).isZero(); + StepVerifier.create( + Flux.create(forwardEmitter) + .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) + .take(100) + ).expectNextCount(0).expectComplete().verify(); + StepVerifier.create( + Flux.create(backwardEmitter) + .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) + .take(100) + ).expectNextCount(0).expectComplete().verify(); } @Test @@ -136,35 +135,15 @@ class RecordEmitterTest extends AbstractBaseTest { var backwardEmitter = new BackwardRecordEmitter( this::createConsumer, new OffsetsSeekBackward(TOPIC, - new ConsumerPosition(BEGINNING, Map.of(), FORWARD), + new ConsumerPosition(BEGINNING, Map.of(), BACKWARD), PARTITIONS * MSGS_PER_PARTITION ), new SimpleRecordSerDe() ); - var polledValues = Flux.create(forwardEmitter) - .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .limitRequest(Long.MAX_VALUE) - .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .map(TopicMessageEventDTO::getMessage) - .map(m -> m.getContent().toString()) - .collect(Collectors.toList()) - .block(); - - assertThat(polledValues).containsExactlyInAnyOrderElementsOf( - SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList())); - - polledValues = Flux.create(backwardEmitter) - .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .limitRequest(Long.MAX_VALUE) - .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .map(TopicMessageEventDTO::getMessage) - .map(m -> m.getContent().toString()) - .collect(Collectors.toList()) - .block(); - - assertThat(polledValues).containsExactlyInAnyOrderElementsOf( - SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList())); + List expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList()); + expectEmitter(forwardEmitter, expectedValues); + expectEmitter(backwardEmitter, expectedValues); } @Test @@ -190,37 +169,19 @@ class RecordEmitterTest extends AbstractBaseTest { ), new SimpleRecordSerDe() ); - var polledValues = Flux.create(forwardEmitter) - .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .limitRequest(Long.MAX_VALUE) - .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .map(TopicMessageEventDTO::getMessage) - .map(m -> m.getContent().toString()) - .collect(Collectors.toList()) - .block(); - var expectedValues = SENT_RECORDS.stream() .filter(r -> r.getOffset() >= targetOffsets.get(r.getTp())) .map(Record::getValue) .collect(Collectors.toList()); - assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues); + expectEmitter(forwardEmitter, expectedValues); expectedValues = SENT_RECORDS.stream() .filter(r -> r.getOffset() < targetOffsets.get(r.getTp())) .map(Record::getValue) .collect(Collectors.toList()); - polledValues = Flux.create(backwardEmitter) - .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .limitRequest(Long.MAX_VALUE) - .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .map(TopicMessageEventDTO::getMessage) - .map(m -> m.getContent().toString()) - .collect(Collectors.toList()) - .block(); - - assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues); + expectEmitter(backwardEmitter, expectedValues); } @Test @@ -253,36 +214,19 @@ class RecordEmitterTest extends AbstractBaseTest { ), new SimpleRecordSerDe() ); - var polledValues = Flux.create(forwardEmitter) - .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .map(TopicMessageEventDTO::getMessage) - .map(m -> m.getContent().toString()) - .limitRequest(Long.MAX_VALUE) - .collect(Collectors.toList()) - .block(); - var expectedValues = SENT_RECORDS.stream() .filter(r -> r.getTimestamp() >= targetTimestamps.get(r.getTp())) .map(Record::getValue) .collect(Collectors.toList()); - assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues); - - polledValues = Flux.create(backwardEmitter) - .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .map(TopicMessageEventDTO::getMessage) - .map(m -> m.getContent().toString()) - .limitRequest(Long.MAX_VALUE) - .collect(Collectors.toList()) - .block(); + expectEmitter(forwardEmitter, expectedValues); expectedValues = SENT_RECORDS.stream() .filter(r -> r.getTimestamp() < targetTimestamps.get(r.getTp())) .map(Record::getValue) .collect(Collectors.toList()); - assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues); - + expectEmitter(backwardEmitter, expectedValues); } @Test @@ -301,22 +245,15 @@ class RecordEmitterTest extends AbstractBaseTest { ), new SimpleRecordSerDe() ); - var polledValues = Flux.create(backwardEmitter) - .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .map(TopicMessageEventDTO::getMessage) - .map(m -> m.getContent().toString()) - .limitRequest(numMessages) - .collect(Collectors.toList()) - .block(); - var expectedValues = SENT_RECORDS.stream() .filter(r -> r.getOffset() < targetOffsets.get(r.getTp())) - .filter(r -> r.getOffset() >= (targetOffsets.get(r.getTp()) - (100 / PARTITIONS))) + .filter(r -> r.getOffset() >= (targetOffsets.get(r.getTp()) - (numMessages / PARTITIONS))) .map(Record::getValue) .collect(Collectors.toList()); + assertThat(expectedValues).size().isEqualTo(numMessages); - assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues); + expectEmitter(backwardEmitter, expectedValues); } @Test @@ -334,15 +271,39 @@ class RecordEmitterTest extends AbstractBaseTest { ), new SimpleRecordSerDe() ); - var polledValues = Flux.create(backwardEmitter) - .filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .map(TopicMessageEventDTO::getMessage) - .map(m -> m.getContent().toString()) - .limitRequest(Long.MAX_VALUE) - .collect(Collectors.toList()) - .block(); + expectEmitter(backwardEmitter, + 100, + e -> e.expectNextCount(0), + StepVerifier.Assertions::hasNotDroppedElements + ); + } - assertThat(polledValues).isEmpty(); + private void expectEmitter(Consumer> emitter, List expectedValues) { + expectEmitter(emitter, + expectedValues.size(), + e -> e.recordWith(ArrayList::new) + .expectNextCount(expectedValues.size()) + .expectRecordedMatches(r -> r.containsAll(expectedValues)) + .consumeRecordedWith(r -> log.info("Collected collection: {}", r)), + v -> {} + ); + } + + private void expectEmitter( + Consumer> emitter, + int take, + Function, StepVerifier.Step> stepConsumer, + Consumer assertionsConsumer) { + + StepVerifier.FirstStep firstStep = StepVerifier.create( + Flux.create(emitter) + .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) + .take(take) + .map(m -> m.getMessage().getContent()) + ); + + StepVerifier.Step step = stepConsumer.apply(firstStep); + assertionsConsumer.accept(step.expectComplete().verifyThenAssertThat()); } private KafkaConsumer createConsumer() { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java index 5b4cee3a1f..558a5bafb9 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java @@ -85,7 +85,7 @@ public class SchemaRegistryPaginationTest { .toArray(String[]::new) ); var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME, - 0, -1, null, null).block();; + 0, -1, null, null).block(); assertThat(schemas.getBody().getPageCount()).isEqualTo(4); assertThat(schemas.getBody().getSchemas()).hasSize(25); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java index 84edb4518e..d7e7e8e02c 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java @@ -1,7 +1,6 @@ package com.provectus.kafka.ui.service; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.AbstractBaseTest; @@ -29,6 +28,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; +import reactor.test.StepVerifier; @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) public class SendAndReadTests extends AbstractBaseTest { @@ -526,8 +526,9 @@ public class SendAndReadTests extends AbstractBaseTest { public void assertSendThrowsException() { String topic = createTopicAndCreateSchemas(); try { - assertThatThrownBy(() -> - messagesService.sendMessage(targetCluster, topic, msgToSend).block()); + StepVerifier.create( + messagesService.sendMessage(targetCluster, topic, msgToSend) + ).expectError().verify(); } finally { deleteTopic(topic); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategyTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategyTest.java index 988279bceb..3b12afa71a 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategyTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategyTest.java @@ -13,7 +13,9 @@ import com.provectus.kafka.ui.model.TableDTO; import java.util.List; import lombok.SneakyThrows; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestFactory; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; @@ -53,64 +55,29 @@ class ShowStrategyTest { assertFalse(strategy.test("LIST PROPERTIES;")); } - @Test - void shouldSerializeStreamsResponse() { - JsonNode node = getResponseWithData("streams"); - strategy.test("show streams;"); - KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); - TableDTO table = serializedResponse.getData(); - assertThat(table.getHeaders()).isEqualTo(List.of("header")); - assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + @TestFactory + public Iterable shouldSerialize() { + return List.of( + shouldSerializeGenerate("streams", "show streams;"), + shouldSerializeGenerate("tables", "show tables;"), + shouldSerializeGenerate("topics", "show topics;"), + shouldSerializeGenerate("properties", "show properties;"), + shouldSerializeGenerate("functions", "show functions;"), + shouldSerializeGenerate("queries", "show queries;") + ); } - @Test - void shouldSerializeTablesResponse() { - JsonNode node = getResponseWithData("tables"); - strategy.test("show tables;"); - KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); - TableDTO table = serializedResponse.getData(); - assertThat(table.getHeaders()).isEqualTo(List.of("header")); - assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); - } - - @Test - void shouldSerializeTopicsResponse() { - JsonNode node = getResponseWithData("topics"); - strategy.test("show topics;"); - KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); - TableDTO table = serializedResponse.getData(); - assertThat(table.getHeaders()).isEqualTo(List.of("header")); - assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); - } - - @Test - void shouldSerializePropertiesResponse() { - JsonNode node = getResponseWithData("properties"); - strategy.test("show properties;"); - KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); - TableDTO table = serializedResponse.getData(); - assertThat(table.getHeaders()).isEqualTo(List.of("header")); - assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); - } - - @Test - void shouldSerializeFunctionsResponse() { - JsonNode node = getResponseWithData("functions"); - strategy.test("show functions;"); - KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); - TableDTO table = serializedResponse.getData(); - assertThat(table.getHeaders()).isEqualTo(List.of("header")); - assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); - } - - @Test - void shouldSerializeQueriesResponse() { - JsonNode node = getResponseWithData("queries"); - strategy.test("show queries;"); - KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); - TableDTO table = serializedResponse.getData(); - assertThat(table.getHeaders()).isEqualTo(List.of("header")); - assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + public DynamicTest shouldSerializeGenerate(final String key, final String sql) { + return DynamicTest.dynamicTest("Should serialize " + key, + () -> { + JsonNode node = getResponseWithData(key); + strategy.test(sql); + KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node); + TableDTO table = serializedResponse.getData(); + assertThat(table.getHeaders()).isEqualTo(List.of("header")); + assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); + } + ); } @Test diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java index f1c33836da..d78426d48a 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java @@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.github.fge.jsonschema.core.exceptions.ProcessingException; import com.github.fge.jsonschema.core.report.ProcessingReport; import com.github.fge.jsonschema.main.JsonSchemaFactory; -import com.provectus.kafka.ui.serde.schemaregistry.AvroMessageFormatter; import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import java.io.IOException; import java.net.URI;