فهرست منبع

[BE] Implement an internal topic name prefix (#3505)

* Revert "Fix topic internal indication (#2846)"

This reverts commit 2f2e19d1bce41ec8c0f867b20b4c72b20f764bf0.

* Fix #3135

* Add a nullable annotation for internalTopicPrefix

* Fix e2e tests

* upd isInternal

---------

Co-authored-by: VladSenyuta <vlad.senyuta@gmail.com>
Roman Zabaluev 2 سال پیش
والد
کامیت
d42e911379

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -25,6 +25,8 @@ public class ClustersProperties {
 
   List<Cluster> clusters = new ArrayList<>();
 
+  String internalTopicPrefix;
+
   @Data
   public static class Cluster {
     String name;

+ 19 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java

@@ -1,9 +1,11 @@
 package com.provectus.kafka.ui.model;
 
+import com.provectus.kafka.ui.config.ClustersProperties;
 import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import lombok.Builder;
 import lombok.Data;
 import org.apache.kafka.clients.admin.ConfigEntry;
@@ -14,6 +16,8 @@ import org.apache.kafka.common.TopicPartition;
 @Builder(toBuilder = true)
 public class InternalTopic {
 
+  ClustersProperties clustersProperties;
+
   // from TopicDescription
   private final String name;
   private final boolean internal;
@@ -40,9 +44,17 @@ public class InternalTopic {
                                    List<ConfigEntry> configs,
                                    InternalPartitionsOffsets partitionsOffsets,
                                    Metrics metrics,
-                                   InternalLogDirStats logDirInfo) {
+                                   InternalLogDirStats logDirInfo,
+                                   @Nullable String internalTopicPrefix) {
     var topic = InternalTopic.builder();
-    topic.internal(topicDescription.isInternal());
+
+    internalTopicPrefix = internalTopicPrefix == null || internalTopicPrefix.isEmpty()
+        ? "_"
+        : internalTopicPrefix;
+
+    topic.internal(
+        topicDescription.isInternal() || topicDescription.name().startsWith(internalTopicPrefix)
+    );
     topic.name(topicDescription.name());
 
     List<InternalPartition> partitions = topicDescription.partitions().stream()
@@ -56,10 +68,10 @@ public class InternalTopic {
           List<InternalReplica> replicas = partition.replicas().stream()
               .map(r ->
                   InternalReplica.builder()
-                    .broker(r.id())
-                    .inSync(partition.isr().contains(r))
-                    .leader(partition.leader() != null && partition.leader().id() == r.id())
-                    .build())
+                      .broker(r.id())
+                      .inSync(partition.isr().contains(r))
+                      .leader(partition.leader() != null && partition.leader().id() == r.id())
+                      .build())
               .collect(Collectors.toList());
           partitionDto.replicas(replicas);
 
@@ -79,7 +91,7 @@ public class InternalTopic {
 
           return partitionDto.build();
         })
-        .collect(Collectors.toList());
+        .toList();
 
     topic.partitions(partitions.stream().collect(
         Collectors.toMap(InternalPartition::getPartition, t -> t)));

+ 7 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 
+import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.exception.TopicMetadataException;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.TopicRecreationException;
@@ -52,6 +53,7 @@ public class TopicsService {
 
   private final AdminClientService adminClientService;
   private final StatisticsCache statisticsCache;
+  private final ClustersProperties clustersProperties;
   @Value("${topic.recreate.maxRetries:15}")
   private int recreateMaxRetries;
   @Value("${topic.recreate.delay.seconds:1}")
@@ -127,7 +129,8 @@ public class TopicsService {
             configs.getOrDefault(t, List.of()),
             partitionsOffsets,
             metrics,
-            logDirInfo
+            logDirInfo,
+            clustersProperties.getInternalTopicPrefix()
         ))
         .collect(toList());
   }
@@ -459,7 +462,9 @@ public class TopicsService {
                     stats.getTopicConfigs().getOrDefault(topicName, List.of()),
                     InternalPartitionsOffsets.empty(),
                     stats.getMetrics(),
-                    stats.getLogDirInfo()))
+                    stats.getLogDirInfo(),
+                    clustersProperties.getInternalTopicPrefix()
+                    ))
             .collect(toList())
         );
   }

+ 8 - 8
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java

@@ -69,7 +69,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                Metrics.empty(), InternalLogDirStats.empty()))
+                Metrics.empty(), InternalLogDirStats.empty(), "_"))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
 
@@ -95,7 +95,7 @@ class TopicsServicePaginationTest {
         .map(Objects::toString)
         .map(name -> new TopicDescription(name, false, List.of()))
         .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-            Metrics.empty(), InternalLogDirStats.empty()))
+            Metrics.empty(), InternalLogDirStats.empty(), "_"))
         .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
     init(internalTopics);
 
@@ -122,7 +122,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                Metrics.empty(), InternalLogDirStats.empty()))
+                Metrics.empty(), InternalLogDirStats.empty(), "_"))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
 
@@ -141,7 +141,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                Metrics.empty(), InternalLogDirStats.empty()))
+                Metrics.empty(), InternalLogDirStats.empty(), "_"))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
 
@@ -160,7 +160,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, Integer.parseInt(name) % 10 == 0, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                Metrics.empty(), InternalLogDirStats.empty()))
+                Metrics.empty(), InternalLogDirStats.empty(), "_"))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
 
@@ -181,7 +181,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, Integer.parseInt(name) % 5 == 0, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                Metrics.empty(), InternalLogDirStats.empty()))
+                Metrics.empty(), InternalLogDirStats.empty(), "_"))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
 
@@ -202,7 +202,7 @@ class TopicsServicePaginationTest {
             .map(Objects::toString)
             .map(name -> new TopicDescription(name, false, List.of()))
             .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
-                Metrics.empty(), InternalLogDirStats.empty()))
+                Metrics.empty(), InternalLogDirStats.empty(), "_"))
             .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
     );
 
@@ -224,7 +224,7 @@ class TopicsServicePaginationTest {
                     new TopicPartitionInfo(p, null, List.of(), List.of()))
                 .collect(Collectors.toList())))
         .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), InternalPartitionsOffsets.empty(),
-            Metrics.empty(), InternalLogDirStats.empty()))
+            Metrics.empty(), InternalLogDirStats.empty(), "_"))
         .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
 
     init(internalTopics);

+ 7 - 3
kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/pages/topics/TopicsList.java

@@ -6,7 +6,6 @@ import com.codeborne.selenide.SelenideElement;
 import com.provectus.kafka.ui.pages.BasePage;
 import io.qameta.allure.Step;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -175,6 +174,12 @@ public class TopicsList extends BasePage {
                 .findFirst().orElseThrow();
     }
 
+    @Step
+    public TopicGridItem getAnyNonInternalTopic() {
+        return getNonInternalTopics().stream()
+                .findAny().orElseThrow();
+    }
+
     @Step
     public List<TopicGridItem> getNonInternalTopics() {
         return initGridItems().stream()
@@ -207,8 +212,7 @@ public class TopicsList extends BasePage {
         public boolean isInternal() {
             boolean internal = false;
             try {
-                element.$x("./td[2]/a/span").shouldBe(visible, Duration.ofMillis(500));
-                internal = true;
+                internal = element.$x("./td[2]/a/span").isDisplayed();
             } catch (Throwable ignored) {
             }
             return internal;

+ 5 - 4
kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokeSuite/topics/TopicsTest.java

@@ -52,7 +52,8 @@ public class TopicsTest extends BaseTest {
             .setMaxSizeOnDisk(NOT_SET);
     private static final Topic TOPIC_FOR_CHECK_FILTERS = new Topic()
             .setName("topic-for-check-filters-" + randomAlphabetic(5));
-    private static final Topic TOPIC_FOR_DELETE = new Topic().setName("topic-to-delete-" + randomAlphabetic(5));
+    private static final Topic TOPIC_FOR_DELETE = new Topic()
+            .setName("topic-to-delete-" + randomAlphabetic(5));
     private static final List<Topic> TOPIC_LIST = new ArrayList<>();
 
     @BeforeClass(alwaysRun = true)
@@ -89,11 +90,11 @@ public class TopicsTest extends BaseTest {
     void checkAvailableOperations() {
         navigateToTopics();
         topicsList
-                .getTopicItem("my_ksql_1ksql_processing_log")
+                .getTopicItem(TOPIC_TO_UPDATE_AND_DELETE.getName())
                 .selectItem(true);
         verifyElementsCondition(topicsList.getActionButtons(), Condition.enabled);
         topicsList
-                .getTopicItem("_confluent-ksql-my_ksql_1_command_topic")
+                .getTopicItem(TOPIC_FOR_CHECK_FILTERS.getName())
                 .selectItem(true);
         Assert.assertFalse(topicsList.isCopySelectedTopicBtnEnabled(), "isCopySelectedTopicBtnEnabled()");
     }
@@ -456,7 +457,7 @@ public class TopicsTest extends BaseTest {
                 .setNumberOfPartitions(1);
         navigateToTopics();
         topicsList
-                .getTopicItem("_schemas")
+                .getAnyNonInternalTopic()
                 .selectItem(true)
                 .clickCopySelectedTopicBtn();
         topicCreateEditForm