Merge branch 'master' into ISSUE_754_acl

This commit is contained in:
Ilya Kuramshin 2023-04-24 15:47:52 +04:00 committed by GitHub
commit 2d90897aeb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 462 additions and 60 deletions

View file

@ -58,6 +58,8 @@ public class ClustersProperties {
Integer pollTimeoutMs;
Integer partitionPollTimeout;
Integer noDataEmptyPolls;
Integer maxPageSize;
Integer defaultPageSize;
}
@Data

View file

@ -43,9 +43,6 @@ import reactor.core.scheduler.Schedulers;
@Slf4j
public class MessagesController extends AbstractController implements MessagesApi {
private static final int MAX_LOAD_RECORD_LIMIT = 100;
private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
private final MessagesService messagesService;
private final DeserializationService deserializationService;
private final AccessControlService accessControlService;
@ -91,8 +88,6 @@ public class MessagesController extends AbstractController implements MessagesAp
seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
int recordsLimit =
Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
var positions = new ConsumerPosition(
seekType,
@ -103,7 +98,7 @@ public class MessagesController extends AbstractController implements MessagesAp
ResponseEntity.ok(
messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType,
recordsLimit, seekDirection, keySerde, valueSerde)
limit, seekDirection, keySerde, valueSerde)
)
);

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.model;
import java.math.BigDecimal;
import javax.annotation.Nullable;
import lombok.Data;
import org.apache.kafka.common.Node;
@ -10,15 +11,27 @@ public class InternalBroker {
private final Integer id;
private final String host;
private final Integer port;
private final BigDecimal bytesInPerSec;
private final BigDecimal bytesOutPerSec;
private final @Nullable BigDecimal bytesInPerSec;
private final @Nullable BigDecimal bytesOutPerSec;
private final @Nullable Integer partitionsLeader;
private final @Nullable Integer partitions;
private final @Nullable Integer inSyncPartitions;
private final @Nullable BigDecimal leadersSkew;
private final @Nullable BigDecimal partitionsSkew;
public InternalBroker(Node node, Statistics statistics) {
public InternalBroker(Node node,
PartitionDistributionStats partitionDistribution,
Statistics statistics) {
this.id = node.id();
this.host = node.host();
this.port = node.port();
this.bytesInPerSec = statistics.getMetrics().getBrokerBytesInPerSec().get(node.id());
this.bytesOutPerSec = statistics.getMetrics().getBrokerBytesOutPerSec().get(node.id());
this.partitionsLeader = partitionDistribution.getPartitionLeaders().get(node);
this.partitions = partitionDistribution.getPartitionsCount().get(node);
this.inSyncPartitions = partitionDistribution.getInSyncPartitions().get(node);
this.leadersSkew = partitionDistribution.leadersSkew(node);
this.partitionsSkew = partitionDistribution.partitionsSkew(node);
}
}

View file

@ -0,0 +1,93 @@
package com.provectus.kafka.ui.model;
import java.math.BigDecimal;
import java.math.MathContext;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
@Slf4j
public class PartitionDistributionStats {
// avg skew will show unuseful results on low number of partitions
private static final int MIN_PARTITIONS_FOR_SKEW_CALCULATION = 50;
private static final MathContext ROUNDING_MATH_CTX = new MathContext(3);
private final Map<Node, Integer> partitionLeaders;
private final Map<Node, Integer> partitionsCount;
private final Map<Node, Integer> inSyncPartitions;
private final double avgLeadersCntPerBroker;
private final double avgPartitionsPerBroker;
private final boolean skewCanBeCalculated;
public static PartitionDistributionStats create(Statistics stats) {
return create(stats, MIN_PARTITIONS_FOR_SKEW_CALCULATION);
}
static PartitionDistributionStats create(Statistics stats, int minPartitionsForSkewCalculation) {
var partitionLeaders = new HashMap<Node, Integer>();
var partitionsReplicated = new HashMap<Node, Integer>();
var isr = new HashMap<Node, Integer>();
int partitionsCnt = 0;
for (TopicDescription td : stats.getTopicDescriptions().values()) {
for (TopicPartitionInfo tp : td.partitions()) {
partitionsCnt++;
tp.replicas().forEach(r -> incr(partitionsReplicated, r));
tp.isr().forEach(r -> incr(isr, r));
if (tp.leader() != null) {
incr(partitionLeaders, tp.leader());
}
}
}
int nodesWithPartitions = partitionsReplicated.size();
int partitionReplications = partitionsReplicated.values().stream().mapToInt(i -> i).sum();
var avgPartitionsPerBroker = nodesWithPartitions == 0 ? 0 : ((double) partitionReplications) / nodesWithPartitions;
int nodesWithLeaders = partitionLeaders.size();
int leadersCnt = partitionLeaders.values().stream().mapToInt(i -> i).sum();
var avgLeadersCntPerBroker = nodesWithLeaders == 0 ? 0 : ((double) leadersCnt) / nodesWithLeaders;
return new PartitionDistributionStats(
partitionLeaders,
partitionsReplicated,
isr,
avgLeadersCntPerBroker,
avgPartitionsPerBroker,
partitionsCnt >= minPartitionsForSkewCalculation
);
}
private static void incr(Map<Node, Integer> map, Node n) {
map.compute(n, (k, c) -> c == null ? 1 : ++c);
}
@Nullable
public BigDecimal partitionsSkew(Node node) {
return calculateAvgSkew(partitionsCount.get(node), avgPartitionsPerBroker);
}
@Nullable
public BigDecimal leadersSkew(Node node) {
return calculateAvgSkew(partitionLeaders.get(node), avgLeadersCntPerBroker);
}
// Returns difference (in percents) from average value, null if it can't be calculated
@Nullable
private BigDecimal calculateAvgSkew(@Nullable Integer value, double avgValue) {
if (avgValue == 0 || !skewCanBeCalculated) {
return null;
}
value = value == null ? 0 : value;
return new BigDecimal((value - avgValue) / avgValue * 100.0).round(ROUNDING_MATH_CTX);
}
}

View file

@ -10,6 +10,7 @@ import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
import com.provectus.kafka.ui.model.InternalBroker;
import com.provectus.kafka.ui.model.InternalBrokerConfig;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.PartitionDistributionStats;
import com.provectus.kafka.ui.service.metrics.RawMetric;
import java.util.Collections;
import java.util.HashMap;
@ -64,11 +65,13 @@ public class BrokerService {
}
public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
var stats = statisticsCache.get(cluster);
var partitionsDistribution = PartitionDistributionStats.create(stats);
return adminClientService
.get(cluster)
.flatMap(ReactiveAdminClient::describeCluster)
.map(description -> description.getNodes().stream()
.map(node -> new InternalBroker(node, statisticsCache.get(cluster)))
.map(node -> new InternalBroker(node, partitionsDistribution, stats))
.collect(Collectors.toList()))
.flatMapMany(Flux::fromIterable);
}

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.service;
import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
import com.provectus.kafka.ui.emitter.MessageFilters;
@ -20,13 +21,13 @@ import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.OffsetSpec;
@ -44,16 +45,35 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@Service
@RequiredArgsConstructor
@Slf4j
public class MessagesService {
private static final int DEFAULT_MAX_PAGE_SIZE = 500;
private static final int DEFAULT_PAGE_SIZE = 100;
// limiting UI messages rate to 20/sec in tailing mode
public static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
private final AdminClientService adminClientService;
private final DeserializationService deserializationService;
private final ConsumerGroupService consumerGroupService;
private final int maxPageSize;
private final int defaultPageSize;
public MessagesService(AdminClientService adminClientService,
DeserializationService deserializationService,
ConsumerGroupService consumerGroupService,
ClustersProperties properties) {
this.adminClientService = adminClientService;
this.deserializationService = deserializationService;
this.consumerGroupService = consumerGroupService;
var pollingProps = Optional.ofNullable(properties.getPolling())
.orElseGet(ClustersProperties.PollingProperties::new);
this.maxPageSize = Optional.ofNullable(pollingProps.getMaxPageSize())
.orElse(DEFAULT_MAX_PAGE_SIZE);
this.defaultPageSize = Optional.ofNullable(pollingProps.getDefaultPageSize())
.orElse(DEFAULT_PAGE_SIZE);
}
private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
return adminClientService.get(cluster)
@ -139,7 +159,7 @@ public class MessagesService {
ConsumerPosition consumerPosition,
@Nullable String query,
MessageFilterTypeDTO filterQueryType,
int limit,
@Nullable Integer pageSize,
SeekDirectionDTO seekDirection,
@Nullable String keySerde,
@Nullable String valueSerde) {
@ -147,7 +167,13 @@ public class MessagesService {
.flux()
.publishOn(Schedulers.boundedElastic())
.flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
filterQueryType, limit, seekDirection, keySerde, valueSerde));
filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde));
}
private int fixPageSize(@Nullable Integer pageSize) {
return Optional.ofNullable(pageSize)
.filter(ps -> ps > 0 && ps <= maxPageSize)
.orElse(defaultPageSize);
}
private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,

View file

@ -0,0 +1,83 @@
package com.provectus.kafka.ui.model;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.assertj.core.data.Percentage;
import org.junit.jupiter.api.Test;
class PartitionDistributionStatsTest {
@Test
void skewCalculatedBasedOnPartitionsCounts() {
Node n1 = new Node(1, "n1", 9092);
Node n2 = new Node(2, "n2", 9092);
Node n3 = new Node(3, "n3", 9092);
Node n4 = new Node(4, "n4", 9092);
var stats = PartitionDistributionStats.create(
Statistics.builder()
.clusterDescription(
new ReactiveAdminClient.ClusterDescription(null, "test", Set.of(n1, n2, n3), null))
.topicDescriptions(
Map.of(
"t1", new TopicDescription(
"t1", false,
List.of(
new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)),
new TopicPartitionInfo(1, n2, List.of(n2, n3), List.of(n2, n3))
)
),
"t2", new TopicDescription(
"t2", false,
List.of(
new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)),
new TopicPartitionInfo(1, null, List.of(n2, n1), List.of(n1))
)
)
)
)
.build(), 4
);
assertThat(stats.getPartitionLeaders())
.containsExactlyInAnyOrderEntriesOf(Map.of(n1, 2, n2, 1));
assertThat(stats.getPartitionsCount())
.containsExactlyInAnyOrderEntriesOf(Map.of(n1, 3, n2, 4, n3, 1));
assertThat(stats.getInSyncPartitions())
.containsExactlyInAnyOrderEntriesOf(Map.of(n1, 3, n2, 3, n3, 1));
// Node(partitions): n1(3), n2(4), n3(1), n4(0)
// average partitions cnt = (3+4+1) / 3 = 2.666 (counting only nodes with partitions!)
assertThat(stats.getAvgPartitionsPerBroker())
.isCloseTo(2.666, Percentage.withPercentage(1));
assertThat(stats.partitionsSkew(n1))
.isCloseTo(BigDecimal.valueOf(12.5), Percentage.withPercentage(1));
assertThat(stats.partitionsSkew(n2))
.isCloseTo(BigDecimal.valueOf(50), Percentage.withPercentage(1));
assertThat(stats.partitionsSkew(n3))
.isCloseTo(BigDecimal.valueOf(-62.5), Percentage.withPercentage(1));
assertThat(stats.partitionsSkew(n4))
.isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1));
// Node(leaders): n1(2), n2(1), n3(0), n4(0)
// average leaders cnt = (2+1) / 2 = 1.5 (counting only nodes with leaders!)
assertThat(stats.leadersSkew(n1))
.isCloseTo(BigDecimal.valueOf(33.33), Percentage.withPercentage(1));
assertThat(stats.leadersSkew(n2))
.isCloseTo(BigDecimal.valueOf(-33.33), Percentage.withPercentage(1));
assertThat(stats.leadersSkew(n3))
.isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1));
assertThat(stats.leadersSkew(n4))
.isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1));
}
}

View file

@ -2496,6 +2496,16 @@ components:
type: number
bytesOutPerSec:
type: number
partitionsLeader:
type: integer
partitions:
type: integer
inSyncPartitions:
type: integer
partitionsSkew:
type: number
leadersSkew:
type: number
required:
- id
@ -3663,6 +3673,10 @@ components:
type: integer
noDataEmptyPolls:
type: integer
maxPageSize:
type: integer
defaultPageSize:
type: integer
adminClientTimeout:
type: integer
internalTopicPrefix:

View file

@ -36,29 +36,31 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti
@Slf4j
public class ApiService extends BaseSource {
private final ApiClient apiClient = new ApiClient().setBasePath(BASE_API_URL);
@SneakyThrows
private TopicsApi topicApi() {
return new TopicsApi(new ApiClient().setBasePath(BASE_API_URL));
return new TopicsApi(apiClient);
}
@SneakyThrows
private SchemasApi schemaApi() {
return new SchemasApi(new ApiClient().setBasePath(BASE_API_URL));
return new SchemasApi(apiClient);
}
@SneakyThrows
private KafkaConnectApi connectorApi() {
return new KafkaConnectApi(new ApiClient().setBasePath(BASE_API_URL));
return new KafkaConnectApi(apiClient);
}
@SneakyThrows
private MessagesApi messageApi() {
return new MessagesApi(new ApiClient().setBasePath(BASE_API_URL));
return new MessagesApi(apiClient);
}
@SneakyThrows
private KsqlApi ksqlApi() {
return new KsqlApi(new ApiClient().setBasePath(BASE_API_URL));
return new KsqlApi(apiClient);
}
@SneakyThrows

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.manualsuite.backlog;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.BROKERS_SUITE_ID;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.KSQL_DB_SUITE_ID;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.SCHEMAS_SUITE_ID;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.TOPICS_PROFILE_SUITE_ID;
import static com.provectus.kafka.ui.utilities.qase.enums.State.TO_BE_AUTOMATED;
@ -35,37 +36,65 @@ public class SmokeBacklog extends BaseManualTest {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = KSQL_DB_SUITE_ID)
@QaseId(284)
@Suite(id = BROKERS_SUITE_ID)
@QaseId(331)
@Test
public void testCaseD() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = BROKERS_SUITE_ID)
@QaseId(331)
@Test
public void testCaseE() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = BROKERS_SUITE_ID)
@QaseId(332)
@Test
public void testCaseF() {
public void testCaseE() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = TOPICS_PROFILE_SUITE_ID)
@QaseId(335)
@Test
public void testCaseG() {
public void testCaseF() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = TOPICS_PROFILE_SUITE_ID)
@QaseId(336)
@Test
public void testCaseG() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = TOPICS_PROFILE_SUITE_ID)
@QaseId(343)
@Test
public void testCaseH() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = KSQL_DB_SUITE_ID)
@QaseId(344)
@Test
public void testCaseI() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = SCHEMAS_SUITE_ID)
@QaseId(345)
@Test
public void testCaseJ() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = SCHEMAS_SUITE_ID)
@QaseId(346)
@Test
public void testCaseK() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = TOPICS_PROFILE_SUITE_ID)
@QaseId(347)
@Test
public void testCaseL() {
}
}

View file

@ -92,4 +92,28 @@ public class TopicsTest extends BaseManualTest {
@Test
public void testCaseN() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(337)
@Test
public void testCaseO() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(339)
@Test
public void testCaseP() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(341)
@Test
public void testCaseQ() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(342)
@Test
public void testCaseR() {
}
}

View file

@ -14,4 +14,16 @@ public class WizardTest extends BaseManualTest {
@Test
public void testCaseA() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(338)
@Test
public void testCaseB() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(340)
@Test
public void testCaseC() {
}
}

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.smokesuite.ksqldb;
import static com.provectus.kafka.ui.pages.ksqldb.enums.KsqlMenuTabs.STREAMS;
import static com.provectus.kafka.ui.pages.ksqldb.enums.KsqlQueryConfig.SHOW_TABLES;
import static com.provectus.kafka.ui.pages.panels.enums.MenuItem.KSQL_DB;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
@ -39,17 +40,21 @@ public class KsqlDbTest extends BaseTest {
FIRST_TABLE.getName(), SECOND_TABLE.getName()));
}
@QaseId(86)
@QaseId(284)
@Test(priority = 1)
public void clearResultsForExecutedRequest() {
navigateToKsqlDbAndExecuteRequest(SHOW_TABLES.getQuery());
public void streamsAndTablesVisibilityCheck() {
naviSideBar
.openSideMenu(KSQL_DB);
ksqlDbList
.waitUntilScreenReady();
SoftAssert softly = new SoftAssert();
softly.assertTrue(ksqlQueryForm.areResultsVisible(), "areResultsVisible()");
softly.assertAll();
ksqlQueryForm
.clickClearResultsBtn();
softly.assertFalse(ksqlQueryForm.areResultsVisible(), "areResultsVisible()");
softly.assertTrue(ksqlDbList.getTableByName(FIRST_TABLE.getName()).isVisible(), "getTableByName()");
softly.assertTrue(ksqlDbList.getTableByName(SECOND_TABLE.getName()).isVisible(), "getTableByName()");
softly.assertAll();
ksqlDbList
.openDetailsTab(STREAMS)
.waitUntilScreenReady();
Assert.assertTrue(ksqlDbList.getStreamByName(DEFAULT_STREAM.getName()).isVisible(), "getStreamByName()");
}
@QaseId(276)
@ -68,11 +73,31 @@ public class KsqlDbTest extends BaseTest {
navigateToKsqlDbAndExecuteRequest(SHOW_TABLES.getQuery());
SoftAssert softly = new SoftAssert();
softly.assertTrue(ksqlQueryForm.areResultsVisible(), "areResultsVisible()");
softly.assertTrue(ksqlQueryForm.getItemByName(FIRST_TABLE.getName()).isVisible(), "getItemByName()");
softly.assertTrue(ksqlQueryForm.getItemByName(SECOND_TABLE.getName()).isVisible(), "getItemByName()");
softly.assertTrue(ksqlQueryForm.getItemByName(FIRST_TABLE.getName()).isVisible(),
String.format("getItemByName(%s)", FIRST_TABLE.getName()));
softly.assertTrue(ksqlQueryForm.getItemByName(SECOND_TABLE.getName()).isVisible(),
String.format("getItemByName(%s)", SECOND_TABLE.getName()));
softly.assertAll();
}
@QaseId(86)
@Test(priority = 4)
public void clearResultsForExecutedRequest() {
navigateToKsqlDbAndExecuteRequest(SHOW_TABLES.getQuery());
SoftAssert softly = new SoftAssert();
softly.assertTrue(ksqlQueryForm.areResultsVisible(), "areResultsVisible()");
softly.assertAll();
ksqlQueryForm
.clickClearResultsBtn();
softly.assertFalse(ksqlQueryForm.areResultsVisible(), "areResultsVisible()");
softly.assertAll();
}
@AfterClass(alwaysRun = true)
public void afterClass() {
TOPIC_NAMES_LIST.forEach(topicName -> apiService.deleteTopic(topicName));
}
@Step
private void navigateToKsqlDbAndExecuteRequest(String query) {
naviSideBar
@ -85,9 +110,4 @@ public class KsqlDbTest extends BaseTest {
.setQuery(query)
.clickExecuteBtn(query);
}
@AfterClass(alwaysRun = true)
public void afterClass() {
TOPIC_NAMES_LIST.forEach(topicName -> apiService.deleteTopic(topicName));
}
}

View file

@ -1,26 +1,41 @@
import React from 'react';
import { FullConnectorInfo } from 'generated-sources';
import {
Action,
ConnectorAction,
ConnectorState,
FullConnectorInfo,
ResourceType,
} from 'generated-sources';
import { CellContext } from '@tanstack/react-table';
import { ClusterNameRoute } from 'lib/paths';
import useAppParams from 'lib/hooks/useAppParams';
import { Dropdown, DropdownItem } from 'components/common/Dropdown';
import { useDeleteConnector } from 'lib/hooks/api/kafkaConnect';
import {
useDeleteConnector,
useUpdateConnectorState,
} from 'lib/hooks/api/kafkaConnect';
import { useConfirm } from 'lib/hooks/useConfirm';
import { useIsMutating } from '@tanstack/react-query';
import { ActionDropdownItem } from 'components/common/ActionComponent';
const ActionsCell: React.FC<CellContext<FullConnectorInfo, unknown>> = ({
row,
}) => {
const { connect, name } = row.original;
const { connect, name, status } = row.original;
const { clusterName } = useAppParams<ClusterNameRoute>();
const mutationsNumber = useIsMutating();
const isMutating = mutationsNumber > 0;
const confirm = useConfirm();
const deleteMutation = useDeleteConnector({
clusterName,
connectName: connect,
connectorName: name,
});
const stateMutation = useUpdateConnectorState({
clusterName,
connectName: connect,
connectorName: name,
});
const handleDelete = () => {
confirm(
<>
@ -31,8 +46,66 @@ const ActionsCell: React.FC<CellContext<FullConnectorInfo, unknown>> = ({
}
);
};
// const stateMutation = useUpdateConnectorState(routerProps);
const resumeConnectorHandler = () =>
stateMutation.mutateAsync(ConnectorAction.RESUME);
const restartConnectorHandler = () =>
stateMutation.mutateAsync(ConnectorAction.RESTART);
const restartAllTasksHandler = () =>
stateMutation.mutateAsync(ConnectorAction.RESTART_ALL_TASKS);
const restartFailedTasksHandler = () =>
stateMutation.mutateAsync(ConnectorAction.RESTART_FAILED_TASKS);
return (
<Dropdown>
{status.state === ConnectorState.PAUSED && (
<ActionDropdownItem
onClick={resumeConnectorHandler}
disabled={isMutating}
permission={{
resource: ResourceType.CONNECT,
action: Action.EDIT,
value: name,
}}
>
Resume
</ActionDropdownItem>
)}
<ActionDropdownItem
onClick={restartConnectorHandler}
disabled={isMutating}
permission={{
resource: ResourceType.CONNECT,
action: Action.EDIT,
value: name,
}}
>
Restart Connector
</ActionDropdownItem>
<ActionDropdownItem
onClick={restartAllTasksHandler}
disabled={isMutating}
permission={{
resource: ResourceType.CONNECT,
action: Action.EDIT,
value: name,
}}
>
Restart All Tasks
</ActionDropdownItem>
<ActionDropdownItem
onClick={restartFailedTasksHandler}
disabled={isMutating}
permission={{
resource: ResourceType.CONNECT,
action: Action.EDIT,
value: name,
}}
>
Restart Failed Tasks
</ActionDropdownItem>
<DropdownItem onClick={handleDelete} danger>
Remove Connector
</DropdownItem>

View file

@ -9,7 +9,11 @@ import { screen, waitFor } from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import { render, WithRoute } from 'lib/testHelpers';
import { clusterConnectConnectorPath, clusterConnectorsPath } from 'lib/paths';
import { useConnectors, useDeleteConnector } from 'lib/hooks/api/kafkaConnect';
import {
useConnectors,
useDeleteConnector,
useUpdateConnectorState,
} from 'lib/hooks/api/kafkaConnect';
const mockedUsedNavigate = jest.fn();
const mockDelete = jest.fn();
@ -22,6 +26,7 @@ jest.mock('react-router-dom', () => ({
jest.mock('lib/hooks/api/kafkaConnect', () => ({
useConnectors: jest.fn(),
useDeleteConnector: jest.fn(),
useUpdateConnectorState: jest.fn(),
}));
const clusterName = 'local';
@ -42,6 +47,10 @@ describe('Connectors List', () => {
(useConnectors as jest.Mock).mockImplementation(() => ({
data: connectors,
}));
const restartConnector = jest.fn();
(useUpdateConnectorState as jest.Mock).mockImplementation(() => ({
mutateAsync: restartConnector,
}));
});
it('renders', async () => {

View file

@ -27,7 +27,7 @@ export interface AddEditFilterContainerProps {
inputDisplayNameDefaultValue?: string;
inputCodeDefaultValue?: string;
isAdd?: boolean;
submitCallback?: (values: AddMessageFilters) => Promise<void>;
submitCallback?: (values: AddMessageFilters) => void;
}
const AddEditFilterContainer: React.FC<AddEditFilterContainerProps> = ({

View file

@ -59,7 +59,7 @@ const Topic: React.FC = () => {
const deleteTopicHandler = async () => {
await deleteTopic.mutateAsync(topicName);
navigate('../..');
navigate(clusterTopicsPath(clusterName));
};
React.useEffect(() => {

View file

@ -10,6 +10,7 @@ import {
clusterTopicMessagesPath,
clusterTopicPath,
clusterTopicSettingsPath,
clusterTopicsPath,
clusterTopicStatisticsPath,
getNonExactPath,
} from 'lib/paths';
@ -179,7 +180,9 @@ describe('Details', () => {
name: 'Confirm',
});
await userEvent.click(submitDeleteButton);
expect(mockNavigate).toHaveBeenCalledWith('../..');
expect(mockNavigate).toHaveBeenCalledWith(
clusterTopicsPath(mockClusterName)
);
});
it('shows a confirmation popup on deleting topic messages', async () => {

View file

@ -70,7 +70,7 @@ export const DropdownButton = styled.button`
`;
export const DangerItem = styled.div`
color: ${({ theme: { dropdown } }) => dropdown.item.color.normal};
color: ${({ theme: { dropdown } }) => dropdown.item.color.danger};
`;
export const DropdownItemHint = styled.div`

View file

@ -76,7 +76,8 @@ export function useUpdateConnectorState(props: UseConnectorProps) {
return useMutation(
(action: ConnectorAction) => api.updateConnectorState({ ...props, action }),
{
onSuccess: () => client.invalidateQueries(connectorKey(props)),
onSuccess: () =>
client.invalidateQueries(['clusters', props.clusterName, 'connectors']),
}
);
}

View file

@ -26,7 +26,7 @@
<assertj.version>3.19.0</assertj.version>
<avro.version>1.11.1</avro.version>
<byte-buddy.version>1.12.19</byte-buddy.version>
<confluent.version>7.3.0</confluent.version>
<confluent.version>7.3.3</confluent.version>
<datasketches-java.version>3.1.0</datasketches-java.version>
<groovy.version>3.0.13</groovy.version>
<jackson.version>2.14.0</jackson.version>
@ -43,7 +43,7 @@
<!-- Test dependency versions -->
<junit.version>5.9.1</junit.version>
<mockito.version>5.1.1</mockito.version>
<mockito.version>5.3.0</mockito.version>
<okhttp3.mockwebserver.version>4.10.0</okhttp3.mockwebserver.version>
<testcontainers.version>1.17.5</testcontainers.version>