Forráskód Böngészése

Merge branch 'master' into vlad/develop

VladSenyuta 2 éve
szülő
commit
42077ce2e0

+ 29 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -4,6 +4,7 @@ import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableTable;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Table;
@@ -498,6 +499,14 @@ public class ReactiveAdminClient implements Closeable {
         .flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
   }
 
+  /**
+   * List offset for the specified topics, skipping no-leader partitions.
+   */
+  public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicDescription> topicDescriptions,
+                                                     OffsetSpec offsetSpec) {
+    return listOffsetsUnsafe(filterPartitionsWithLeaderCheck(topicDescriptions, p -> true, false), offsetSpec);
+  }
+
   private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
                                                                            boolean failOnUnknownLeader) {
     var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
@@ -507,34 +516,44 @@ public class ReactiveAdminClient implements Closeable {
                 descriptions.values(), partitions::contains, failOnUnknownLeader));
   }
 
-  private Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
+  @VisibleForTesting
+  static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
                                                               Predicate<TopicPartition> partitionPredicate,
                                                               boolean failOnUnknownLeader) {
     var goodPartitions = new HashSet<TopicPartition>();
     for (TopicDescription description : topicDescriptions) {
+      var goodTopicPartitions = new ArrayList<TopicPartition>();
       for (TopicPartitionInfo partitionInfo : description.partitions()) {
         TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
-        if (!partitionPredicate.test(topicPartition)) {
-          continue;
+        if (partitionInfo.leader() == null) {
+          if (failOnUnknownLeader) {
+            throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
+          } else {
+            // if ANY of topic partitions has no leader - we have to skip all topic partitions
+            goodTopicPartitions.clear();
+            break;
+          }
         }
-        if (partitionInfo.leader() != null) {
-          goodPartitions.add(topicPartition);
-        } else if (failOnUnknownLeader) {
-          throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
+        if (partitionPredicate.test(topicPartition)) {
+          goodTopicPartitions.add(topicPartition);
         }
       }
+      goodPartitions.addAll(goodTopicPartitions);
     }
     return goodPartitions;
   }
 
-  // 1. NOTE(!): should only apply for partitions with existing leader,
+  // 1. NOTE(!): should only apply for partitions from topics where all partitions have leaders,
   //    otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
   // 2. NOTE(!): Skips partitions that were not initialized yet
   //    (UnknownTopicOrPartitionException thrown, ex. after topic creation)
   // 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
   @KafkaClientInternalsDependant
-  public Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions,
-                                                           OffsetSpec offsetSpec) {
+  @VisibleForTesting
+  Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
+    if (partitions.isEmpty()) {
+      return Mono.just(Map.of());
+    }
 
     Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
         parts -> {

+ 7 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 
+import com.google.common.collect.Sets;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.exception.TopicMetadataException;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
@@ -136,22 +137,14 @@ public class TopicsService {
   }
 
   private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription>
-                                                                  descriptions,
+                                                                  descriptionsMap,
                                                               ReactiveAdminClient ac) {
-    var topicPartitions = descriptions.values().stream()
-        .flatMap(desc ->
-            desc.partitions().stream()
-                // list offsets should only be applied to partitions with existing leader
-                // (see ReactiveAdminClient.listOffsetsUnsafe(..) docs)
-                .filter(tp -> tp.leader() != null)
-                .map(p -> new TopicPartition(desc.name(), p.partition())))
-        .collect(toList());
-
-    return ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.earliest())
-        .zipWith(ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.latest()),
+    var descriptions = descriptionsMap.values();
+    return ac.listOffsets(descriptions, OffsetSpec.earliest())
+        .zipWith(ac.listOffsets(descriptions, OffsetSpec.latest()),
             (earliest, latest) ->
-                topicPartitions.stream()
-                    .filter(tp -> earliest.containsKey(tp) && latest.containsKey(tp))
+                Sets.intersection(earliest.keySet(), latest.keySet())
+                    .stream()
                     .map(tp ->
                         Map.entry(tp,
                             new InternalPartitionsOffsets.Offsets(

+ 57 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ReactiveAdminClientTest.java

@@ -4,8 +4,11 @@ import static com.provectus.kafka.ui.service.ReactiveAdminClient.toMonoWithExcep
 import static java.util.Objects.requireNonNull;
 import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.ThrowableAssert.ThrowingCallable;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -22,16 +25,20 @@ import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.assertj.core.api.ThrowableAssert;
 import org.junit.function.ThrowingRunnable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -133,6 +140,56 @@ class ReactiveAdminClientTest extends AbstractIntegrationTest {
         .verifyComplete();
   }
 
+  @Test
+  void filterPartitionsWithLeaderCheckSkipsPartitionsFromTopicWhereSomePartitionsHaveNoLeader() {
+    var filteredPartitions = ReactiveAdminClient.filterPartitionsWithLeaderCheck(
+        List.of(
+            // contains partitions with no leader
+            new TopicDescription("noLeaderTopic", false,
+                List.of(
+                    new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
+                    new TopicPartitionInfo(1, null, List.of(), List.of()))),
+            // should be skipped by predicate
+            new TopicDescription("skippingByPredicate", false,
+                List.of(
+                    new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()))),
+            // good topic
+            new TopicDescription("good", false,
+                List.of(
+                    new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
+                    new TopicPartitionInfo(1, new Node(2, "n2", 9092), List.of(), List.of()))
+            )),
+        p -> !p.topic().equals("skippingByPredicate"),
+        false
+    );
+
+    assertThat(filteredPartitions)
+        .containsExactlyInAnyOrder(
+            new TopicPartition("good", 0),
+            new TopicPartition("good", 1)
+        );
+  }
+
+  @Test
+  void filterPartitionsWithLeaderCheckThrowExceptionIfThereIsSomePartitionsWithoutLeaderAndFlagSet() {
+    ThrowingCallable call = () -> ReactiveAdminClient.filterPartitionsWithLeaderCheck(
+        List.of(
+            // contains partitions with no leader
+            new TopicDescription("t1", false,
+                List.of(
+                    new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
+                    new TopicPartitionInfo(1, null, List.of(), List.of()))),
+            new TopicDescription("t2", false,
+                List.of(
+                    new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()))
+            )),
+        p -> true,
+        // setting failOnNoLeader flag
+        true
+    );
+    assertThatThrownBy(call).isInstanceOf(ValidationException.class);
+  }
+
   @Test
   void testListOffsetsUnsafe() {
     String topic = UUID.randomUUID().toString();

+ 1 - 1
kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/models/Topic.java

@@ -11,7 +11,7 @@ import lombok.experimental.Accessors;
 @Accessors(chain = true)
 public class Topic {
 
-  private String name, timeToRetainData, maxMessageBytes, messageKey, messageContent, customParameterValue;
+  private String name, timeToRetainData, maxMessageBytes, messageKey, messageValue, customParameterValue;
   private int numberOfPartitions;
   private CustomParameterType customParameterType;
   private CleanupPolicyValue cleanupPolicyValue;

+ 5 - 5
kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/pages/topics/ProduceMessagePanel.java

@@ -12,7 +12,7 @@ import java.util.Arrays;
 public class ProduceMessagePanel extends BasePage {
 
   protected SelenideElement keyTextArea = $x("//div[@id='key']/textarea");
-  protected SelenideElement contentTextArea = $x("//div[@id='content']/textarea");
+  protected SelenideElement valueTextArea = $x("//div[@id='content']/textarea");
   protected SelenideElement headersTextArea = $x("//div[@id='headers']/textarea");
   protected SelenideElement submitBtn = headersTextArea.$x("../../../..//button[@type='submit']");
   protected SelenideElement partitionDdl = $x("//ul[@name='partition']");
@@ -34,14 +34,14 @@ public class ProduceMessagePanel extends BasePage {
   }
 
   @Step
-  public ProduceMessagePanel setContentFiled(String value) {
-    clearByKeyboard(contentTextArea);
-    contentTextArea.setValue(value);
+  public ProduceMessagePanel setValueFiled(String value) {
+    clearByKeyboard(valueTextArea);
+    valueTextArea.setValue(value);
     return this;
   }
 
   @Step
-  public ProduceMessagePanel setHeaderFiled(String value) {
+  public ProduceMessagePanel setHeadersFld(String value) {
     headersTextArea.setValue(value);
     return this;
   }

+ 8 - 12
kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/pages/topics/TopicDetails.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.pages.topics;
 
-import static com.codeborne.selenide.Selenide.$;
 import static com.codeborne.selenide.Selenide.$$x;
 import static com.codeborne.selenide.Selenide.$x;
 import static com.codeborne.selenide.Selenide.sleep;
@@ -296,16 +295,6 @@ public class TopicDetails extends BasePage {
     return this;
   }
 
-  @Step
-  public boolean isKeyMessageVisible(String keyMessage) {
-    return keyMessage.equals($("td[title]").getText());
-  }
-
-  @Step
-  public boolean isContentMessageVisible(String contentMessage) {
-    return contentMessage.matches(contentMessageTab.getText().trim());
-  }
-
   private void selectYear(int expectedYear) {
     while (getActualCalendarDate().getYear() > expectedYear) {
       clickByJavaScript(previousMonthButton);
@@ -382,6 +371,13 @@ public class TopicDetails extends BasePage {
         .findFirst().orElseThrow();
   }
 
+  @Step
+  public TopicDetails.MessageGridItem getMessageByKey(String key) {
+    return initItems().stream()
+        .filter(e -> e.getKey().equals(key))
+        .findFirst().orElseThrow();
+  }
+
   @Step
   public List<MessageGridItem> getAllMessages() {
     return initItems();
@@ -451,7 +447,7 @@ public class TopicDetails extends BasePage {
 
     @Step
     public String getValue() {
-      return element.$x("./td[6]/span/p").getText().trim();
+      return element.$x("./td[6]").getAttribute("title");
     }
 
     @Step

+ 1 - 1
kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/services/ApiService.java

@@ -185,7 +185,7 @@ public class ApiService extends BaseSource {
     createMessage.setKeySerde("String");
     createMessage.setValueSerde("String");
     createMessage.setKey(topic.getMessageKey());
-    createMessage.setContent(topic.getMessageContent());
+    createMessage.setContent(topic.getMessageValue());
     try {
       messageApi().sendTopicMessages(clusterName, topic.getName(), createMessage).block();
     } catch (WebClientResponseException ex) {

+ 3 - 3
kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokesuite/connectors/ConnectorsTest.java

@@ -23,13 +23,13 @@ public class ConnectorsTest extends BaseTest {
   private static final String MESSAGE_KEY = " ";
   private static final Topic TOPIC_FOR_CREATE = new Topic()
       .setName("topic-for-create-connector-" + randomAlphabetic(5))
-      .setMessageContent(MESSAGE_CONTENT).setMessageKey(MESSAGE_KEY);
+      .setMessageValue(MESSAGE_CONTENT).setMessageKey(MESSAGE_KEY);
   private static final Topic TOPIC_FOR_DELETE = new Topic()
       .setName("topic-for-delete-connector-" + randomAlphabetic(5))
-      .setMessageContent(MESSAGE_CONTENT).setMessageKey(MESSAGE_KEY);
+      .setMessageValue(MESSAGE_CONTENT).setMessageKey(MESSAGE_KEY);
   private static final Topic TOPIC_FOR_UPDATE = new Topic()
       .setName("topic-for-update-connector-" + randomAlphabetic(5))
-      .setMessageContent(MESSAGE_CONTENT).setMessageKey(MESSAGE_KEY);
+      .setMessageValue(MESSAGE_CONTENT).setMessageKey(MESSAGE_KEY);
   private static final Connector CONNECTOR_FOR_DELETE = new Connector()
       .setName("connector-for-delete-" + randomAlphabetic(5))
       .setConfig(getResourceAsString("testData/connectors/delete_connector_config.json"));

+ 8 - 12
kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokesuite/topics/MessagesTest.java

@@ -28,23 +28,23 @@ public class MessagesTest extends BaseTest {
   private static final Topic TOPIC_FOR_MESSAGES = new Topic()
       .setName("topic-with-clean-message-attribute-" + randomAlphabetic(5))
       .setMessageKey(randomAlphabetic(5))
-      .setMessageContent(randomAlphabetic(10));
+      .setMessageValue(randomAlphabetic(10));
   private static final Topic TOPIC_TO_CLEAR_AND_PURGE_MESSAGES = new Topic()
       .setName("topic-to-clear-and-purge-messages-" + randomAlphabetic(5))
       .setMessageKey(randomAlphabetic(5))
-      .setMessageContent(randomAlphabetic(10));
+      .setMessageValue(randomAlphabetic(10));
   private static final Topic TOPIC_FOR_CHECK_FILTERS = new Topic()
       .setName("topic-for-check-filters-" + randomAlphabetic(5))
       .setMessageKey(randomAlphabetic(5))
-      .setMessageContent(randomAlphabetic(10));
+      .setMessageValue(randomAlphabetic(10));
   private static final Topic TOPIC_TO_RECREATE = new Topic()
       .setName("topic-to-recreate-attribute-" + randomAlphabetic(5))
       .setMessageKey(randomAlphabetic(5))
-      .setMessageContent(randomAlphabetic(10));
+      .setMessageValue(randomAlphabetic(10));
   private static final Topic TOPIC_FOR_CHECK_MESSAGES_COUNT = new Topic()
       .setName("topic-for-check-messages-count" + randomAlphabetic(5))
       .setMessageKey(randomAlphabetic(5))
-      .setMessageContent(randomAlphabetic(10));
+      .setMessageValue(randomAlphabetic(10));
   private static final List<Topic> TOPIC_LIST = new ArrayList<>();
 
   @BeforeClass(alwaysRun = true)
@@ -65,12 +65,8 @@ public class MessagesTest extends BaseTest {
     topicDetails
         .openDetailsTab(MESSAGES);
     produceMessage(TOPIC_FOR_MESSAGES);
-    SoftAssert softly = new SoftAssert();
-    softly.assertTrue(topicDetails.isKeyMessageVisible((TOPIC_FOR_MESSAGES.getMessageKey())),
-        "isKeyMessageVisible()");
-    softly.assertTrue(topicDetails.isContentMessageVisible((TOPIC_FOR_MESSAGES.getMessageContent()).trim()),
-        "isContentMessageVisible()");
-    softly.assertAll();
+    Assert.assertEquals(topicDetails.getMessageByKey(TOPIC_FOR_MESSAGES.getMessageKey()).getValue(),
+        TOPIC_FOR_MESSAGES.getMessageValue(), "message.getValue()");
   }
 
   @QaseId(19)
@@ -266,7 +262,7 @@ public class MessagesTest extends BaseTest {
     produceMessagePanel
         .waitUntilScreenReady()
         .setKeyField(topic.getMessageKey())
-        .setContentFiled(topic.getMessageContent())
+        .setValueFiled(topic.getMessageValue())
         .submitProduceMessage();
     topicDetails
         .waitUntilScreenReady();

+ 1 - 1
kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokesuite/topics/TopicsTest.java

@@ -45,7 +45,7 @@ public class TopicsTest extends BaseTest {
       .setMaxSizeOnDisk(NOT_SET)
       .setMaxMessageBytes("1048588")
       .setMessageKey(randomAlphabetic(5))
-      .setMessageContent(randomAlphabetic(10));
+      .setMessageValue(randomAlphabetic(10));
   private static final Topic TOPIC_TO_CHECK_SETTINGS = new Topic()
       .setName("new-topic-" + randomAlphabetic(5))
       .setNumberOfPartitions(1)

+ 10 - 9
kafka-ui-react-app/src/components/Topics/Topic/Messages/Message.tsx

@@ -80,19 +80,20 @@ const Message: React.FC<Props> = ({
     filters?: PreviewFilter[]
   ) => {
     if (!filters?.length || !jsonValue) return jsonValue;
-
     const parsedJson = getParsedJson(jsonValue);
 
     return (
       <>
-        {filters.map((item) => (
-          <span key={`${item.path}--${item.field}`}>
-            {item.field}:{' '}
-            {JSON.stringify(
-              JSONPath({ path: item.path, json: parsedJson, wrap: false })
-            )}
-          </span>
-        ))}
+        {filters.map((item) => {
+          return (
+            <div key={`${item.path}--${item.field}`}>
+              {item.field}:{' '}
+              {JSON.stringify(
+                JSONPath({ path: item.path, json: parsedJson, wrap: false })
+              )}
+            </div>
+          );
+        })}
       </>
     );
   };

+ 1 - 1
kafka-ui-react-app/src/components/Topics/Topic/Messages/MessageContent/MessageContent.styled.ts

@@ -58,7 +58,7 @@ export const MetadataLabel = styled.p`
   width: 80px;
 `;
 
-export const MetadataValue = styled.p`
+export const MetadataValue = styled.div`
   color: ${({ theme }) => theme.topicMetaData.color.value};
   font-size: 14px;
 `;

+ 39 - 4
kafka-ui-react-app/src/components/Topics/Topic/Messages/__test__/Message.spec.tsx

@@ -1,6 +1,9 @@
 import React from 'react';
 import { TopicMessage, TopicMessageTimestampTypeEnum } from 'generated-sources';
-import Message, { Props } from 'components/Topics/Topic/Messages/Message';
+import Message, {
+  PreviewFilter,
+  Props,
+} from 'components/Topics/Topic/Messages/Message';
 import { screen } from '@testing-library/react';
 import { render } from 'lib/testHelpers';
 import userEvent from '@testing-library/user-event';
@@ -8,6 +11,9 @@ import { formatTimestamp } from 'lib/dateTimeHelpers';
 
 const messageContentText = 'messageContentText';
 
+const keyTest = '{"payload":{"subreddit":"learnprogramming"}}';
+const contentTest =
+  '{"payload":{"author":"DwaywelayTOP","archived":false,"name":"t3_11jshwd","id":"11jshwd"}}';
 jest.mock(
   'components/Topics/Topic/Messages/MessageContent/MessageContent',
   () => () =>
@@ -28,10 +34,19 @@ describe('Message component', () => {
     content: '{"data": "test"}',
     headers: { header: 'test' },
   };
-
+  const mockKeyFilters: PreviewFilter = {
+    field: 'sub',
+    path: '$.payload.subreddit',
+  };
+  const mockContentFilters: PreviewFilter = {
+    field: 'author',
+    path: '$.payload.author',
+  };
   const renderComponent = (
     props: Partial<Props> = {
       message: mockMessage,
+      keyFilters: [],
+      contentFilters: [],
     }
   ) =>
     render(
@@ -39,8 +54,8 @@ describe('Message component', () => {
         <tbody>
           <Message
             message={props.message || mockMessage}
-            keyFilters={[]}
-            contentFilters={[]}
+            keyFilters={props.keyFilters || []}
+            contentFilters={props.contentFilters || []}
           />
         </tbody>
       </table>
@@ -88,4 +103,24 @@ describe('Message component', () => {
     await userEvent.click(messageToggleIcon);
     expect(screen.getByText(messageContentText)).toBeInTheDocument();
   });
+
+  it('should check if Preview filter showing for key', () => {
+    const props = {
+      message: { ...mockMessage, key: keyTest as string },
+      keyFilters: [mockKeyFilters],
+    };
+    renderComponent(props);
+    const keyFiltered = screen.getByText('sub: "learnprogramming"');
+    expect(keyFiltered).toBeInTheDocument();
+  });
+
+  it('should check if Preview filter showing for Value', () => {
+    const props = {
+      message: { ...mockMessage, content: contentTest as string },
+      contentFilters: [mockContentFilters],
+    };
+    renderComponent(props);
+    const keyFiltered = screen.getByText('author: "DwaywelayTOP"');
+    expect(keyFiltered).toBeInTheDocument();
+  });
 });