diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index 24b60b5711..1d5cc5393c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -58,6 +58,8 @@ public class ClustersProperties { Integer pollTimeoutMs; Integer partitionPollTimeout; Integer noDataEmptyPolls; + Integer maxPageSize; + Integer defaultPageSize; } @Data diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java index 1ba511ab07..aa9d7d5315 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java @@ -43,9 +43,6 @@ import reactor.core.scheduler.Schedulers; @Slf4j public class MessagesController extends AbstractController implements MessagesApi { - private static final int MAX_LOAD_RECORD_LIMIT = 100; - private static final int DEFAULT_LOAD_RECORD_LIMIT = 20; - private final MessagesService messagesService; private final DeserializationService deserializationService; private final AccessControlService accessControlService; @@ -91,8 +88,6 @@ public class MessagesController extends AbstractController implements MessagesAp seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING; seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD; filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS; - int recordsLimit = - Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT); var positions = new ConsumerPosition( seekType, @@ -103,7 +98,7 @@ public class MessagesController extends AbstractController implements MessagesAp ResponseEntity.ok( messagesService.loadMessages( getCluster(clusterName), topicName, positions, q, filterQueryType, - recordsLimit, seekDirection, keySerde, valueSerde) + limit, seekDirection, keySerde, valueSerde) ) ); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBroker.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBroker.java index edab9a8aeb..4a0d1ba0dd 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBroker.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBroker.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.model; import java.math.BigDecimal; +import javax.annotation.Nullable; import lombok.Data; import org.apache.kafka.common.Node; @@ -10,15 +11,27 @@ public class InternalBroker { private final Integer id; private final String host; private final Integer port; - private final BigDecimal bytesInPerSec; - private final BigDecimal bytesOutPerSec; + private final @Nullable BigDecimal bytesInPerSec; + private final @Nullable BigDecimal bytesOutPerSec; + private final @Nullable Integer partitionsLeader; + private final @Nullable Integer partitions; + private final @Nullable Integer inSyncPartitions; + private final @Nullable BigDecimal leadersSkew; + private final @Nullable BigDecimal partitionsSkew; - public InternalBroker(Node node, Statistics statistics) { + public InternalBroker(Node node, + PartitionDistributionStats partitionDistribution, + Statistics statistics) { this.id = node.id(); this.host = node.host(); this.port = node.port(); this.bytesInPerSec = statistics.getMetrics().getBrokerBytesInPerSec().get(node.id()); this.bytesOutPerSec = statistics.getMetrics().getBrokerBytesOutPerSec().get(node.id()); + this.partitionsLeader = partitionDistribution.getPartitionLeaders().get(node); + this.partitions = partitionDistribution.getPartitionsCount().get(node); + this.inSyncPartitions = partitionDistribution.getInSyncPartitions().get(node); + this.leadersSkew = partitionDistribution.leadersSkew(node); + this.partitionsSkew = partitionDistribution.partitionsSkew(node); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionDistributionStats.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionDistributionStats.java new file mode 100644 index 0000000000..b625533d1d --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionDistributionStats.java @@ -0,0 +1,93 @@ +package com.provectus.kafka.ui.model; + +import java.math.BigDecimal; +import java.math.MathContext; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; + +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +@Getter +@Slf4j +public class PartitionDistributionStats { + + // avg skew will show unuseful results on low number of partitions + private static final int MIN_PARTITIONS_FOR_SKEW_CALCULATION = 50; + + private static final MathContext ROUNDING_MATH_CTX = new MathContext(3); + + private final Map partitionLeaders; + private final Map partitionsCount; + private final Map inSyncPartitions; + private final double avgLeadersCntPerBroker; + private final double avgPartitionsPerBroker; + private final boolean skewCanBeCalculated; + + public static PartitionDistributionStats create(Statistics stats) { + return create(stats, MIN_PARTITIONS_FOR_SKEW_CALCULATION); + } + + static PartitionDistributionStats create(Statistics stats, int minPartitionsForSkewCalculation) { + var partitionLeaders = new HashMap(); + var partitionsReplicated = new HashMap(); + var isr = new HashMap(); + int partitionsCnt = 0; + for (TopicDescription td : stats.getTopicDescriptions().values()) { + for (TopicPartitionInfo tp : td.partitions()) { + partitionsCnt++; + tp.replicas().forEach(r -> incr(partitionsReplicated, r)); + tp.isr().forEach(r -> incr(isr, r)); + if (tp.leader() != null) { + incr(partitionLeaders, tp.leader()); + } + } + } + int nodesWithPartitions = partitionsReplicated.size(); + int partitionReplications = partitionsReplicated.values().stream().mapToInt(i -> i).sum(); + var avgPartitionsPerBroker = nodesWithPartitions == 0 ? 0 : ((double) partitionReplications) / nodesWithPartitions; + + int nodesWithLeaders = partitionLeaders.size(); + int leadersCnt = partitionLeaders.values().stream().mapToInt(i -> i).sum(); + var avgLeadersCntPerBroker = nodesWithLeaders == 0 ? 0 : ((double) leadersCnt) / nodesWithLeaders; + + return new PartitionDistributionStats( + partitionLeaders, + partitionsReplicated, + isr, + avgLeadersCntPerBroker, + avgPartitionsPerBroker, + partitionsCnt >= minPartitionsForSkewCalculation + ); + } + + private static void incr(Map map, Node n) { + map.compute(n, (k, c) -> c == null ? 1 : ++c); + } + + @Nullable + public BigDecimal partitionsSkew(Node node) { + return calculateAvgSkew(partitionsCount.get(node), avgPartitionsPerBroker); + } + + @Nullable + public BigDecimal leadersSkew(Node node) { + return calculateAvgSkew(partitionLeaders.get(node), avgLeadersCntPerBroker); + } + + // Returns difference (in percents) from average value, null if it can't be calculated + @Nullable + private BigDecimal calculateAvgSkew(@Nullable Integer value, double avgValue) { + if (avgValue == 0 || !skewCanBeCalculated) { + return null; + } + value = value == null ? 0 : value; + return new BigDecimal((value - avgValue) / avgValue * 100.0).round(ROUNDING_MATH_CTX); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java index 720642157b..8a2ac1a63e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java @@ -10,6 +10,7 @@ import com.provectus.kafka.ui.model.BrokersLogdirsDTO; import com.provectus.kafka.ui.model.InternalBroker; import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.PartitionDistributionStats; import com.provectus.kafka.ui.service.metrics.RawMetric; import java.util.Collections; import java.util.HashMap; @@ -64,11 +65,13 @@ public class BrokerService { } public Flux getBrokers(KafkaCluster cluster) { + var stats = statisticsCache.get(cluster); + var partitionsDistribution = PartitionDistributionStats.create(stats); return adminClientService .get(cluster) .flatMap(ReactiveAdminClient::describeCluster) .map(description -> description.getNodes().stream() - .map(node -> new InternalBroker(node, statisticsCache.get(cluster))) + .map(node -> new InternalBroker(node, partitionsDistribution, stats)) .collect(Collectors.toList())) .flatMapMany(Flux::fromIterable); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index 4f9f0f59f4..f6ad42c110 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.service; import com.google.common.util.concurrent.RateLimiter; +import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.emitter.BackwardRecordEmitter; import com.provectus.kafka.ui.emitter.ForwardRecordEmitter; import com.provectus.kafka.ui.emitter.MessageFilters; @@ -20,13 +21,13 @@ import com.provectus.kafka.ui.serdes.ProducerRecordCreator; import com.provectus.kafka.ui.util.SslPropertiesUtil; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import javax.annotation.Nullable; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.OffsetSpec; @@ -44,16 +45,35 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @Service -@RequiredArgsConstructor @Slf4j public class MessagesService { + private static final int DEFAULT_MAX_PAGE_SIZE = 500; + private static final int DEFAULT_PAGE_SIZE = 100; // limiting UI messages rate to 20/sec in tailing mode - public static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20; + private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20; private final AdminClientService adminClientService; private final DeserializationService deserializationService; private final ConsumerGroupService consumerGroupService; + private final int maxPageSize; + private final int defaultPageSize; + + public MessagesService(AdminClientService adminClientService, + DeserializationService deserializationService, + ConsumerGroupService consumerGroupService, + ClustersProperties properties) { + this.adminClientService = adminClientService; + this.deserializationService = deserializationService; + this.consumerGroupService = consumerGroupService; + + var pollingProps = Optional.ofNullable(properties.getPolling()) + .orElseGet(ClustersProperties.PollingProperties::new); + this.maxPageSize = Optional.ofNullable(pollingProps.getMaxPageSize()) + .orElse(DEFAULT_MAX_PAGE_SIZE); + this.defaultPageSize = Optional.ofNullable(pollingProps.getDefaultPageSize()) + .orElse(DEFAULT_PAGE_SIZE); + } private Mono withExistingTopic(KafkaCluster cluster, String topicName) { return adminClientService.get(cluster) @@ -139,7 +159,7 @@ public class MessagesService { ConsumerPosition consumerPosition, @Nullable String query, MessageFilterTypeDTO filterQueryType, - int limit, + @Nullable Integer pageSize, SeekDirectionDTO seekDirection, @Nullable String keySerde, @Nullable String valueSerde) { @@ -147,7 +167,13 @@ public class MessagesService { .flux() .publishOn(Schedulers.boundedElastic()) .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query, - filterQueryType, limit, seekDirection, keySerde, valueSerde)); + filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde)); + } + + private int fixPageSize(@Nullable Integer pageSize) { + return Optional.ofNullable(pageSize) + .filter(ps -> ps > 0 && ps <= maxPageSize) + .orElse(defaultPageSize); } private Flux loadMessagesImpl(KafkaCluster cluster, diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/model/PartitionDistributionStatsTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/model/PartitionDistributionStatsTest.java new file mode 100644 index 0000000000..c83c4f5cd8 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/model/PartitionDistributionStatsTest.java @@ -0,0 +1,83 @@ +package com.provectus.kafka.ui.model; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.assertj.core.data.Percentage; +import org.junit.jupiter.api.Test; + +class PartitionDistributionStatsTest { + + @Test + void skewCalculatedBasedOnPartitionsCounts() { + Node n1 = new Node(1, "n1", 9092); + Node n2 = new Node(2, "n2", 9092); + Node n3 = new Node(3, "n3", 9092); + Node n4 = new Node(4, "n4", 9092); + + var stats = PartitionDistributionStats.create( + Statistics.builder() + .clusterDescription( + new ReactiveAdminClient.ClusterDescription(null, "test", Set.of(n1, n2, n3), null)) + .topicDescriptions( + Map.of( + "t1", new TopicDescription( + "t1", false, + List.of( + new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)), + new TopicPartitionInfo(1, n2, List.of(n2, n3), List.of(n2, n3)) + ) + ), + "t2", new TopicDescription( + "t2", false, + List.of( + new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)), + new TopicPartitionInfo(1, null, List.of(n2, n1), List.of(n1)) + ) + ) + ) + ) + .build(), 4 + ); + + assertThat(stats.getPartitionLeaders()) + .containsExactlyInAnyOrderEntriesOf(Map.of(n1, 2, n2, 1)); + assertThat(stats.getPartitionsCount()) + .containsExactlyInAnyOrderEntriesOf(Map.of(n1, 3, n2, 4, n3, 1)); + assertThat(stats.getInSyncPartitions()) + .containsExactlyInAnyOrderEntriesOf(Map.of(n1, 3, n2, 3, n3, 1)); + + // Node(partitions): n1(3), n2(4), n3(1), n4(0) + // average partitions cnt = (3+4+1) / 3 = 2.666 (counting only nodes with partitions!) + assertThat(stats.getAvgPartitionsPerBroker()) + .isCloseTo(2.666, Percentage.withPercentage(1)); + + assertThat(stats.partitionsSkew(n1)) + .isCloseTo(BigDecimal.valueOf(12.5), Percentage.withPercentage(1)); + assertThat(stats.partitionsSkew(n2)) + .isCloseTo(BigDecimal.valueOf(50), Percentage.withPercentage(1)); + assertThat(stats.partitionsSkew(n3)) + .isCloseTo(BigDecimal.valueOf(-62.5), Percentage.withPercentage(1)); + assertThat(stats.partitionsSkew(n4)) + .isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1)); + + // Node(leaders): n1(2), n2(1), n3(0), n4(0) + // average leaders cnt = (2+1) / 2 = 1.5 (counting only nodes with leaders!) + assertThat(stats.leadersSkew(n1)) + .isCloseTo(BigDecimal.valueOf(33.33), Percentage.withPercentage(1)); + assertThat(stats.leadersSkew(n2)) + .isCloseTo(BigDecimal.valueOf(-33.33), Percentage.withPercentage(1)); + assertThat(stats.leadersSkew(n3)) + .isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1)); + assertThat(stats.leadersSkew(n4)) + .isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1)); + } + +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 6a3695e694..038d2d710a 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -2496,6 +2496,16 @@ components: type: number bytesOutPerSec: type: number + partitionsLeader: + type: integer + partitions: + type: integer + inSyncPartitions: + type: integer + partitionsSkew: + type: number + leadersSkew: + type: number required: - id @@ -3663,6 +3673,10 @@ components: type: integer noDataEmptyPolls: type: integer + maxPageSize: + type: integer + defaultPageSize: + type: integer adminClientTimeout: type: integer internalTopicPrefix: diff --git a/kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/services/ApiService.java b/kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/services/ApiService.java index a041defc93..b4cc54a38f 100644 --- a/kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/services/ApiService.java +++ b/kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/services/ApiService.java @@ -36,29 +36,31 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti @Slf4j public class ApiService extends BaseSource { + private final ApiClient apiClient = new ApiClient().setBasePath(BASE_API_URL); + @SneakyThrows private TopicsApi topicApi() { - return new TopicsApi(new ApiClient().setBasePath(BASE_API_URL)); + return new TopicsApi(apiClient); } @SneakyThrows private SchemasApi schemaApi() { - return new SchemasApi(new ApiClient().setBasePath(BASE_API_URL)); + return new SchemasApi(apiClient); } @SneakyThrows private KafkaConnectApi connectorApi() { - return new KafkaConnectApi(new ApiClient().setBasePath(BASE_API_URL)); + return new KafkaConnectApi(apiClient); } @SneakyThrows private MessagesApi messageApi() { - return new MessagesApi(new ApiClient().setBasePath(BASE_API_URL)); + return new MessagesApi(apiClient); } @SneakyThrows private KsqlApi ksqlApi() { - return new KsqlApi(new ApiClient().setBasePath(BASE_API_URL)); + return new KsqlApi(apiClient); } @SneakyThrows diff --git a/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/backlog/SmokeBacklog.java b/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/backlog/SmokeBacklog.java index d96bbb7f3a..3ce086ee7b 100644 --- a/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/backlog/SmokeBacklog.java +++ b/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/backlog/SmokeBacklog.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.manualsuite.backlog; import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.BROKERS_SUITE_ID; import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.KSQL_DB_SUITE_ID; +import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.SCHEMAS_SUITE_ID; import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.TOPICS_PROFILE_SUITE_ID; import static com.provectus.kafka.ui.utilities.qase.enums.State.TO_BE_AUTOMATED; @@ -35,37 +36,65 @@ public class SmokeBacklog extends BaseManualTest { } @Automation(state = TO_BE_AUTOMATED) - @Suite(id = KSQL_DB_SUITE_ID) - @QaseId(284) + @Suite(id = BROKERS_SUITE_ID) + @QaseId(331) @Test public void testCaseD() { } - @Automation(state = TO_BE_AUTOMATED) - @Suite(id = BROKERS_SUITE_ID) - @QaseId(331) - @Test - public void testCaseE() { - } - @Automation(state = TO_BE_AUTOMATED) @Suite(id = BROKERS_SUITE_ID) @QaseId(332) @Test - public void testCaseF() { + public void testCaseE() { } @Automation(state = TO_BE_AUTOMATED) @Suite(id = TOPICS_PROFILE_SUITE_ID) @QaseId(335) @Test - public void testCaseG() { + public void testCaseF() { } @Automation(state = TO_BE_AUTOMATED) @Suite(id = TOPICS_PROFILE_SUITE_ID) @QaseId(336) @Test + public void testCaseG() { + } + + @Automation(state = TO_BE_AUTOMATED) + @Suite(id = TOPICS_PROFILE_SUITE_ID) + @QaseId(343) + @Test public void testCaseH() { } + + @Automation(state = TO_BE_AUTOMATED) + @Suite(id = KSQL_DB_SUITE_ID) + @QaseId(344) + @Test + public void testCaseI() { + } + + @Automation(state = TO_BE_AUTOMATED) + @Suite(id = SCHEMAS_SUITE_ID) + @QaseId(345) + @Test + public void testCaseJ() { + } + + @Automation(state = TO_BE_AUTOMATED) + @Suite(id = SCHEMAS_SUITE_ID) + @QaseId(346) + @Test + public void testCaseK() { + } + + @Automation(state = TO_BE_AUTOMATED) + @Suite(id = TOPICS_PROFILE_SUITE_ID) + @QaseId(347) + @Test + public void testCaseL() { + } } diff --git a/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/suite/TopicsTest.java b/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/suite/TopicsTest.java index 76f8506deb..758827e21b 100644 --- a/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/suite/TopicsTest.java +++ b/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/suite/TopicsTest.java @@ -92,4 +92,28 @@ public class TopicsTest extends BaseManualTest { @Test public void testCaseN() { } + + @Automation(state = NOT_AUTOMATED) + @QaseId(337) + @Test + public void testCaseO() { + } + + @Automation(state = NOT_AUTOMATED) + @QaseId(339) + @Test + public void testCaseP() { + } + + @Automation(state = NOT_AUTOMATED) + @QaseId(341) + @Test + public void testCaseQ() { + } + + @Automation(state = NOT_AUTOMATED) + @QaseId(342) + @Test + public void testCaseR() { + } } diff --git a/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/suite/WizardTest.java b/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/suite/WizardTest.java index 9621104b1a..c74c1ba6f0 100644 --- a/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/suite/WizardTest.java +++ b/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/suite/WizardTest.java @@ -14,4 +14,16 @@ public class WizardTest extends BaseManualTest { @Test public void testCaseA() { } + + @Automation(state = NOT_AUTOMATED) + @QaseId(338) + @Test + public void testCaseB() { + } + + @Automation(state = NOT_AUTOMATED) + @QaseId(340) + @Test + public void testCaseC() { + } } diff --git a/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokesuite/ksqldb/KsqlDbTest.java b/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokesuite/ksqldb/KsqlDbTest.java index c4bbe0def4..22ef931bf1 100644 --- a/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokesuite/ksqldb/KsqlDbTest.java +++ b/kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokesuite/ksqldb/KsqlDbTest.java @@ -1,5 +1,6 @@ package com.provectus.kafka.ui.smokesuite.ksqldb; +import static com.provectus.kafka.ui.pages.ksqldb.enums.KsqlMenuTabs.STREAMS; import static com.provectus.kafka.ui.pages.ksqldb.enums.KsqlQueryConfig.SHOW_TABLES; import static com.provectus.kafka.ui.pages.panels.enums.MenuItem.KSQL_DB; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; @@ -39,17 +40,21 @@ public class KsqlDbTest extends BaseTest { FIRST_TABLE.getName(), SECOND_TABLE.getName())); } - @QaseId(86) + @QaseId(284) @Test(priority = 1) - public void clearResultsForExecutedRequest() { - navigateToKsqlDbAndExecuteRequest(SHOW_TABLES.getQuery()); + public void streamsAndTablesVisibilityCheck() { + naviSideBar + .openSideMenu(KSQL_DB); + ksqlDbList + .waitUntilScreenReady(); SoftAssert softly = new SoftAssert(); - softly.assertTrue(ksqlQueryForm.areResultsVisible(), "areResultsVisible()"); - softly.assertAll(); - ksqlQueryForm - .clickClearResultsBtn(); - softly.assertFalse(ksqlQueryForm.areResultsVisible(), "areResultsVisible()"); + softly.assertTrue(ksqlDbList.getTableByName(FIRST_TABLE.getName()).isVisible(), "getTableByName()"); + softly.assertTrue(ksqlDbList.getTableByName(SECOND_TABLE.getName()).isVisible(), "getTableByName()"); softly.assertAll(); + ksqlDbList + .openDetailsTab(STREAMS) + .waitUntilScreenReady(); + Assert.assertTrue(ksqlDbList.getStreamByName(DEFAULT_STREAM.getName()).isVisible(), "getStreamByName()"); } @QaseId(276) @@ -68,11 +73,31 @@ public class KsqlDbTest extends BaseTest { navigateToKsqlDbAndExecuteRequest(SHOW_TABLES.getQuery()); SoftAssert softly = new SoftAssert(); softly.assertTrue(ksqlQueryForm.areResultsVisible(), "areResultsVisible()"); - softly.assertTrue(ksqlQueryForm.getItemByName(FIRST_TABLE.getName()).isVisible(), "getItemByName()"); - softly.assertTrue(ksqlQueryForm.getItemByName(SECOND_TABLE.getName()).isVisible(), "getItemByName()"); + softly.assertTrue(ksqlQueryForm.getItemByName(FIRST_TABLE.getName()).isVisible(), + String.format("getItemByName(%s)", FIRST_TABLE.getName())); + softly.assertTrue(ksqlQueryForm.getItemByName(SECOND_TABLE.getName()).isVisible(), + String.format("getItemByName(%s)", SECOND_TABLE.getName())); softly.assertAll(); } + @QaseId(86) + @Test(priority = 4) + public void clearResultsForExecutedRequest() { + navigateToKsqlDbAndExecuteRequest(SHOW_TABLES.getQuery()); + SoftAssert softly = new SoftAssert(); + softly.assertTrue(ksqlQueryForm.areResultsVisible(), "areResultsVisible()"); + softly.assertAll(); + ksqlQueryForm + .clickClearResultsBtn(); + softly.assertFalse(ksqlQueryForm.areResultsVisible(), "areResultsVisible()"); + softly.assertAll(); + } + + @AfterClass(alwaysRun = true) + public void afterClass() { + TOPIC_NAMES_LIST.forEach(topicName -> apiService.deleteTopic(topicName)); + } + @Step private void navigateToKsqlDbAndExecuteRequest(String query) { naviSideBar @@ -85,9 +110,4 @@ public class KsqlDbTest extends BaseTest { .setQuery(query) .clickExecuteBtn(query); } - - @AfterClass(alwaysRun = true) - public void afterClass() { - TOPIC_NAMES_LIST.forEach(topicName -> apiService.deleteTopic(topicName)); - } } diff --git a/kafka-ui-react-app/src/components/Connect/List/ActionsCell.tsx b/kafka-ui-react-app/src/components/Connect/List/ActionsCell.tsx index 30b3df8a56..5b3a24cdb7 100644 --- a/kafka-ui-react-app/src/components/Connect/List/ActionsCell.tsx +++ b/kafka-ui-react-app/src/components/Connect/List/ActionsCell.tsx @@ -1,26 +1,41 @@ import React from 'react'; -import { FullConnectorInfo } from 'generated-sources'; +import { + Action, + ConnectorAction, + ConnectorState, + FullConnectorInfo, + ResourceType, +} from 'generated-sources'; import { CellContext } from '@tanstack/react-table'; import { ClusterNameRoute } from 'lib/paths'; import useAppParams from 'lib/hooks/useAppParams'; import { Dropdown, DropdownItem } from 'components/common/Dropdown'; -import { useDeleteConnector } from 'lib/hooks/api/kafkaConnect'; +import { + useDeleteConnector, + useUpdateConnectorState, +} from 'lib/hooks/api/kafkaConnect'; import { useConfirm } from 'lib/hooks/useConfirm'; +import { useIsMutating } from '@tanstack/react-query'; +import { ActionDropdownItem } from 'components/common/ActionComponent'; const ActionsCell: React.FC> = ({ row, }) => { - const { connect, name } = row.original; - + const { connect, name, status } = row.original; const { clusterName } = useAppParams(); - + const mutationsNumber = useIsMutating(); + const isMutating = mutationsNumber > 0; const confirm = useConfirm(); const deleteMutation = useDeleteConnector({ clusterName, connectName: connect, connectorName: name, }); - + const stateMutation = useUpdateConnectorState({ + clusterName, + connectName: connect, + connectorName: name, + }); const handleDelete = () => { confirm( <> @@ -31,8 +46,66 @@ const ActionsCell: React.FC> = ({ } ); }; + // const stateMutation = useUpdateConnectorState(routerProps); + const resumeConnectorHandler = () => + stateMutation.mutateAsync(ConnectorAction.RESUME); + const restartConnectorHandler = () => + stateMutation.mutateAsync(ConnectorAction.RESTART); + + const restartAllTasksHandler = () => + stateMutation.mutateAsync(ConnectorAction.RESTART_ALL_TASKS); + + const restartFailedTasksHandler = () => + stateMutation.mutateAsync(ConnectorAction.RESTART_FAILED_TASKS); + return ( + {status.state === ConnectorState.PAUSED && ( + + Resume + + )} + + Restart Connector + + + Restart All Tasks + + + Restart Failed Tasks + Remove Connector diff --git a/kafka-ui-react-app/src/components/Connect/List/__tests__/List.spec.tsx b/kafka-ui-react-app/src/components/Connect/List/__tests__/List.spec.tsx index 9de28f38ff..82b4aab212 100644 --- a/kafka-ui-react-app/src/components/Connect/List/__tests__/List.spec.tsx +++ b/kafka-ui-react-app/src/components/Connect/List/__tests__/List.spec.tsx @@ -9,7 +9,11 @@ import { screen, waitFor } from '@testing-library/react'; import userEvent from '@testing-library/user-event'; import { render, WithRoute } from 'lib/testHelpers'; import { clusterConnectConnectorPath, clusterConnectorsPath } from 'lib/paths'; -import { useConnectors, useDeleteConnector } from 'lib/hooks/api/kafkaConnect'; +import { + useConnectors, + useDeleteConnector, + useUpdateConnectorState, +} from 'lib/hooks/api/kafkaConnect'; const mockedUsedNavigate = jest.fn(); const mockDelete = jest.fn(); @@ -22,6 +26,7 @@ jest.mock('react-router-dom', () => ({ jest.mock('lib/hooks/api/kafkaConnect', () => ({ useConnectors: jest.fn(), useDeleteConnector: jest.fn(), + useUpdateConnectorState: jest.fn(), })); const clusterName = 'local'; @@ -42,6 +47,10 @@ describe('Connectors List', () => { (useConnectors as jest.Mock).mockImplementation(() => ({ data: connectors, })); + const restartConnector = jest.fn(); + (useUpdateConnectorState as jest.Mock).mockImplementation(() => ({ + mutateAsync: restartConnector, + })); }); it('renders', async () => { diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/AddEditFilterContainer.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/AddEditFilterContainer.tsx index 757b6e171d..557db159ba 100644 --- a/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/AddEditFilterContainer.tsx +++ b/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/AddEditFilterContainer.tsx @@ -27,7 +27,7 @@ export interface AddEditFilterContainerProps { inputDisplayNameDefaultValue?: string; inputCodeDefaultValue?: string; isAdd?: boolean; - submitCallback?: (values: AddMessageFilters) => Promise; + submitCallback?: (values: AddMessageFilters) => void; } const AddEditFilterContainer: React.FC = ({ diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Topic.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Topic.tsx index 8945523576..9430e4b749 100644 --- a/kafka-ui-react-app/src/components/Topics/Topic/Topic.tsx +++ b/kafka-ui-react-app/src/components/Topics/Topic/Topic.tsx @@ -59,7 +59,7 @@ const Topic: React.FC = () => { const deleteTopicHandler = async () => { await deleteTopic.mutateAsync(topicName); - navigate('../..'); + navigate(clusterTopicsPath(clusterName)); }; React.useEffect(() => { diff --git a/kafka-ui-react-app/src/components/Topics/Topic/__test__/Topic.spec.tsx b/kafka-ui-react-app/src/components/Topics/Topic/__test__/Topic.spec.tsx index 460e4ad5de..4ec45c3a58 100644 --- a/kafka-ui-react-app/src/components/Topics/Topic/__test__/Topic.spec.tsx +++ b/kafka-ui-react-app/src/components/Topics/Topic/__test__/Topic.spec.tsx @@ -10,6 +10,7 @@ import { clusterTopicMessagesPath, clusterTopicPath, clusterTopicSettingsPath, + clusterTopicsPath, clusterTopicStatisticsPath, getNonExactPath, } from 'lib/paths'; @@ -179,7 +180,9 @@ describe('Details', () => { name: 'Confirm', }); await userEvent.click(submitDeleteButton); - expect(mockNavigate).toHaveBeenCalledWith('../..'); + expect(mockNavigate).toHaveBeenCalledWith( + clusterTopicsPath(mockClusterName) + ); }); it('shows a confirmation popup on deleting topic messages', async () => { diff --git a/kafka-ui-react-app/src/components/common/Dropdown/Dropdown.styled.ts b/kafka-ui-react-app/src/components/common/Dropdown/Dropdown.styled.ts index f63fc5fe2a..d7db888a09 100644 --- a/kafka-ui-react-app/src/components/common/Dropdown/Dropdown.styled.ts +++ b/kafka-ui-react-app/src/components/common/Dropdown/Dropdown.styled.ts @@ -70,7 +70,7 @@ export const DropdownButton = styled.button` `; export const DangerItem = styled.div` - color: ${({ theme: { dropdown } }) => dropdown.item.color.normal}; + color: ${({ theme: { dropdown } }) => dropdown.item.color.danger}; `; export const DropdownItemHint = styled.div` diff --git a/kafka-ui-react-app/src/lib/hooks/api/kafkaConnect.ts b/kafka-ui-react-app/src/lib/hooks/api/kafkaConnect.ts index b8a17c558d..1d01d49195 100644 --- a/kafka-ui-react-app/src/lib/hooks/api/kafkaConnect.ts +++ b/kafka-ui-react-app/src/lib/hooks/api/kafkaConnect.ts @@ -76,7 +76,8 @@ export function useUpdateConnectorState(props: UseConnectorProps) { return useMutation( (action: ConnectorAction) => api.updateConnectorState({ ...props, action }), { - onSuccess: () => client.invalidateQueries(connectorKey(props)), + onSuccess: () => + client.invalidateQueries(['clusters', props.clusterName, 'connectors']), } ); } diff --git a/pom.xml b/pom.xml index 42ebd9addc..beb5744e81 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ 3.19.0 1.11.1 1.12.19 - 7.3.0 + 7.3.3 3.1.0 3.0.13 2.14.0 @@ -43,7 +43,7 @@ 5.9.1 - 5.1.1 + 5.3.0 4.10.0 1.17.5