瀏覽代碼

Merge branch 'master' of github.com:provectus/kafka-ui into ISSUE-3504_messagesApiV2

 Conflicts:
	kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java
	kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java
	kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java
	kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java
	kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java
iliax 1 年之前
父節點
當前提交
df663967a9
共有 40 個文件被更改,包括 1019 次插入309 次删除
  1. 5 0
      kafka-ui-api/pom.xml
  2. 1 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java
  3. 54 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java
  4. 7 19
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java
  5. 7 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java
  6. 5 14
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java
  7. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java
  8. 82 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EnhancedConsumer.java
  9. 9 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java
  10. 2 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java
  11. 48 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java
  12. 4 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java
  13. 5 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java
  14. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java
  15. 5 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java
  16. 0 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/AvroEmbeddedSerde.java
  17. 9 23
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Base64Serde.java
  18. 2 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Int64Serde.java
  19. 3 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UInt64Serde.java
  20. 22 28
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UuidBinarySerde.java
  21. 10 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java
  22. 21 35
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java
  23. 37 30
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java
  24. 181 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java
  25. 4 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java
  26. 36 18
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditService.java
  27. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditWriter.java
  28. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporter.java
  29. 14 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java
  30. 82 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ApplicationMetrics.java
  31. 0 29
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordsUtil.java
  32. 1 1
      kafka-ui-api/src/main/resources/application.yml
  33. 2 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
  34. 7 9
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java
  35. 214 6
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java
  36. 4 4
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditServiceTest.java
  37. 1 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporterTest.java
  38. 127 1
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
  39. 2 2
      kafka-ui-react-app/src/components/NavBar/UserInfo/UserInfo.tsx
  40. 0 2
      kafka-ui-react-app/src/components/NavBar/UserInfo/__tests__/UserInfo.spec.tsx

+ 5 - 0
kafka-ui-api/pom.xml

@@ -114,6 +114,11 @@
             <artifactId>json</artifactId>
             <version>${org.json.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.micrometer</groupId>
+            <artifactId>micrometer-registry-prometheus</artifactId>
+            <scope>runtime</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java

@@ -13,6 +13,7 @@ abstract class AbstractAuthSecurityConfig {
       "/resources/**",
       "/actuator/health/**",
       "/actuator/info",
+      "/actuator/prometheus",
       "/auth",
       "/login",
       "/logout",

+ 54 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java

@@ -2,6 +2,9 @@ package com.provectus.kafka.ui.controller;
 
 import com.provectus.kafka.ui.api.AclsApi;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
+import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
+import com.provectus.kafka.ui.model.CreateProducerAclDTO;
+import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
 import com.provectus.kafka.ui.model.KafkaAclDTO;
 import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
 import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
@@ -123,4 +126,55 @@ public class AclsController extends AbstractController implements AclsApi {
         .doOnEach(sig -> auditService.audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
   }
+
+  @Override
+  public Mono<ResponseEntity<Void>> createConsumerAcl(String clusterName,
+                                                      Mono<CreateConsumerAclDTO> createConsumerAclDto,
+                                                      ServerWebExchange exchange) {
+    AccessContext context = AccessContext.builder()
+        .cluster(clusterName)
+        .aclActions(AclAction.EDIT)
+        .operationName("createConsumerAcl")
+        .build();
+
+    return accessControlService.validateAccess(context)
+        .then(createConsumerAclDto)
+        .flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
+  }
+
+  @Override
+  public Mono<ResponseEntity<Void>> createProducerAcl(String clusterName,
+                                                      Mono<CreateProducerAclDTO> createProducerAclDto,
+                                                      ServerWebExchange exchange) {
+    AccessContext context = AccessContext.builder()
+        .cluster(clusterName)
+        .aclActions(AclAction.EDIT)
+        .operationName("createProducerAcl")
+        .build();
+
+    return accessControlService.validateAccess(context)
+        .then(createProducerAclDto)
+        .flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
+  }
+
+  @Override
+  public Mono<ResponseEntity<Void>> createStreamAppAcl(String clusterName,
+                                                       Mono<CreateStreamAppAclDTO> createStreamAppAclDto,
+                                                       ServerWebExchange exchange) {
+    AccessContext context = AccessContext.builder()
+        .cluster(clusterName)
+        .aclActions(AclAction.EDIT)
+        .operationName("createStreamAppAcl")
+        .build();
+
+    return accessControlService.validateAccess(context)
+        .then(createStreamAppAclDto)
+        .flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
+  }
 }

+ 7 - 19
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -2,38 +2,28 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.time.Duration;
-import java.time.Instant;
-import javax.annotation.Nullable;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 public abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
   private final MessagesProcessing messagesProcessing;
-  private final PollingThrottler throttler;
   protected final PollingSettings pollingSettings;
 
   protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
     this.messagesProcessing = messagesProcessing;
     this.pollingSettings = pollingSettings;
-    this.throttler = pollingSettings.getPollingThrottler();
   }
 
-  protected ConsumerRecords<Bytes, Bytes> poll(
-      FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
+  protected PolledRecords poll(
+      FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
     return poll(sink, consumer, pollingSettings.getPollTimeout());
   }
 
-  protected ConsumerRecords<Bytes, Bytes> poll(
-      FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
-    Instant start = Instant.now();
-    ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
-    Instant finish = Instant.now();
-    int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
-    throttler.throttleAfterPoll(polledBytes);
+  protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
+    var records = consumer.pollEnhanced(timeout);
+    sendConsuming(sink, records);
     return records;
   }
 
@@ -50,10 +40,8 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
     messagesProcessing.sendPhase(sink, name);
   }
 
-  protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
-                              ConsumerRecords<Bytes, Bytes> records,
-                              long elapsed) {
-    return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
+  protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
+    messagesProcessing.sentConsumingInfo(sink, records);
   }
 
   // cursor is null if target partitions were fully polled (no, need to do paging)

+ 7 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -9,9 +9,7 @@ import java.util.List;
 import java.util.TreeMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.utils.Bytes;
@@ -20,13 +18,13 @@ import reactor.core.publisher.FluxSink;
 @Slf4j
 public class BackwardRecordEmitter extends AbstractEmitter {
 
-  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition consumerPosition;
   private final int messagesPerPage;
   private final Cursor.Tracking cursor;
 
   public BackwardRecordEmitter(
-      Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+      Supplier<EnhancedConsumer> consumerSupplier,
       ConsumerPosition consumerPosition,
       int messagesPerPage,
       MessagesProcessing messagesProcessing,
@@ -42,7 +40,7 @@ public class BackwardRecordEmitter extends AbstractEmitter {
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting backward polling for {}", consumerPosition);
-    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
       sendPhase(sink, "Created consumer");
 
       var seekOperations = SeekOperations.create(consumer, consumerPosition);
@@ -93,7 +91,7 @@ public class BackwardRecordEmitter extends AbstractEmitter {
       TopicPartition tp,
       long fromOffset,
       long toOffset,
-      Consumer<Bytes, Bytes> consumer,
+      EnhancedConsumer consumer,
       FluxSink<TopicMessageEventDTO> sink
   ) {
     consumer.assign(Collections.singleton(tp));
@@ -104,13 +102,13 @@ public class BackwardRecordEmitter extends AbstractEmitter {
 
     var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
 
-    EmptyPollsCounter emptyPolls  = pollingSettings.createEmptyPollsCounter();
+    EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
     while (!sink.isCancelled()
         && !isSendLimitReached()
         && recordsToSend.size() < desiredMsgsToPoll
         && !emptyPolls.noDataEmptyPollsReached()) {
       var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
-      emptyPolls.count(polledRecords);
+      emptyPolls.count(polledRecords.count());
 
       log.debug("{} records polled from {}", polledRecords.count(), tp);
 
@@ -118,7 +116,7 @@ public class BackwardRecordEmitter extends AbstractEmitter {
           .filter(r -> r.offset() < toOffset)
           .toList();
 
-      if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
+      if (polledRecords.count() > 0 && filteredRecords.isEmpty()) {
         // we already read all messages in target offsets interval
         break;
       }

+ 5 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -3,10 +3,7 @@ package com.provectus.kafka.ui.emitter;
 import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageNextPageCursorDTO;
-import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
 import javax.annotation.Nullable;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 class ConsumingStats {
@@ -15,23 +12,17 @@ class ConsumingStats {
   private int records = 0;
   private long elapsed = 0;
 
-  /**
-   * returns bytes polled.
-   */
-  int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
-                       ConsumerRecords<Bytes, Bytes> polledRecords,
-                       long elapsed,
-                       int filterApplyErrors) {
-    int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
-    bytes += polledBytes;
+  void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
+                        PolledRecords polledRecords,
+                        int filterApplyErrors) {
+    bytes += polledRecords.bytes();
     this.records += polledRecords.count();
-    this.elapsed += elapsed;
+    this.elapsed += polledRecords.elapsed().toMillis();
     sink.next(
         new TopicMessageEventDTO()
             .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
             .consuming(createConsumingStats(sink, filterApplyErrors))
     );
-    return polledBytes;
   }
 
   void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors, @Nullable Cursor.Tracking cursor) {

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java

@@ -17,8 +17,8 @@ public class EmptyPollsCounter {
     this.maxEmptyPolls = maxEmptyPolls;
   }
 
-  public void count(ConsumerRecords<?, ?> polled) {
-    emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
+  public void count(int polledCount) {
+    emptyPolls = polledCount == 0 ? emptyPolls + 1 : 0;
   }
 
   public boolean noDataEmptyPollsReached() {

+ 82 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EnhancedConsumer.java

@@ -0,0 +1,82 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Delegate;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+
+
+public class EnhancedConsumer extends KafkaConsumer<Bytes, Bytes> {
+
+  private final PollingThrottler throttler;
+  private final ApplicationMetrics metrics;
+  private String pollingTopic;
+
+  public EnhancedConsumer(Properties properties,
+                          PollingThrottler throttler,
+                          ApplicationMetrics metrics) {
+    super(properties, new BytesDeserializer(), new BytesDeserializer());
+    this.throttler = throttler;
+    this.metrics = metrics;
+    metrics.activeConsumers().incrementAndGet();
+  }
+
+  public PolledRecords pollEnhanced(Duration dur) {
+    var stopwatch = Stopwatch.createStarted();
+    ConsumerRecords<Bytes, Bytes> polled = poll(dur);
+    PolledRecords polledEnhanced = PolledRecords.create(polled, stopwatch.elapsed());
+    var throttled = throttler.throttleAfterPoll(polledEnhanced.bytes());
+    metrics.meterPolledRecords(pollingTopic, polledEnhanced, throttled);
+    return polledEnhanced;
+  }
+
+  @Override
+  public void assign(Collection<TopicPartition> partitions) {
+    super.assign(partitions);
+    Set<String> assignedTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
+    Preconditions.checkState(assignedTopics.size() == 1);
+    this.pollingTopic = assignedTopics.iterator().next();
+  }
+
+  @Override
+  public void subscribe(Pattern pattern) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void subscribe(Collection<String> topics) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close(Duration timeout) {
+    metrics.activeConsumers().decrementAndGet();
+    super.close(timeout);
+  }
+
+}

+ 9 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -13,14 +13,16 @@ import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 @Slf4j
-public class ForwardRecordEmitter extends AbstractEmitter {
+public class ForwardRecordEmitter
+    extends AbstractEmitter
+    implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition position;
   private final Cursor.Tracking cursor;
 
   public ForwardRecordEmitter(
-      Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+      Supplier<EnhancedConsumer> consumerSupplier,
       ConsumerPosition position,
       MessagesProcessing messagesProcessing,
       PollingSettings pollingSettings,
@@ -34,7 +36,7 @@ public class ForwardRecordEmitter extends AbstractEmitter {
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting forward polling for {}", position);
-    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
       sendPhase(sink, "Assigning partitions");
       var seekOperations = SeekOperations.create(consumer, position);
       seekOperations.assignAndSeek();
@@ -47,8 +49,9 @@ public class ForwardRecordEmitter extends AbstractEmitter {
           && !emptyPolls.noDataEmptyPollsReached()) {
 
         sendPhase(sink, "Polling");
-        ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
-        emptyPolls.count(records);
+        var records = poll(sink, consumer);
+        emptyPolls.count(records.count());
+
         log.debug("{} records polled", records.count());
 
         for (TopicPartition tp : records.partitions()) {

+ 2 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -8,7 +8,6 @@ import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
@@ -54,13 +53,10 @@ public class MessagesProcessing {
     }
   }
 
-  int sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink,
-                        ConsumerRecords<Bytes, Bytes> polledRecords,
-                        long elapsed) {
+  void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
     if (!sink.isCancelled()) {
-      return consumingStats.sendConsumingEvt(sink, polledRecords, elapsed, filterApplyErrors);
+      consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);
     }
-    return 0;
   }
 
   void sendFinishEvents(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {

+ 48 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java

@@ -0,0 +1,48 @@
+package com.provectus.kafka.ui.emitter;
+
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.utils.Bytes;
+
+public record PolledRecords(int count,
+                            int bytes,
+                            Duration elapsed,
+                            ConsumerRecords<Bytes, Bytes> records) implements Iterable<ConsumerRecord<Bytes, Bytes>> {
+
+  static PolledRecords create(ConsumerRecords<Bytes, Bytes> polled, Duration pollDuration) {
+    return new PolledRecords(
+        polled.count(),
+        calculatePolledRecSize(polled),
+        pollDuration,
+        polled
+    );
+  }
+
+  public List<ConsumerRecord<Bytes, Bytes>> records(TopicPartition tp) {
+    return records.records(tp);
+  }
+
+  @Override
+  public Iterator<ConsumerRecord<Bytes, Bytes>> iterator() {
+    return records.iterator();
+  }
+
+  private static int calculatePolledRecSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
+    int polledBytes = 0;
+    for (ConsumerRecord<Bytes, Bytes> rec : recs) {
+      for (Header header : rec.headers()) {
+        polledBytes +=
+            (header.key() != null ? header.key().getBytes().length : 0)
+                + (header.value() != null ? header.value().length : 0);
+      }
+      polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
+      polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
+    }
+    return polledBytes;
+  }
+}

+ 4 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java

@@ -3,11 +3,8 @@ package com.provectus.kafka.ui.emitter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.config.ClustersProperties;
-import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.utils.Bytes;
 
 @Slf4j
 public class PollingThrottler {
@@ -36,18 +33,17 @@ public class PollingThrottler {
     return new PollingThrottler("noop", RateLimiter.create(Long.MAX_VALUE));
   }
 
-  public void throttleAfterPoll(int polledBytes) {
+  //returns true if polling was throttled
+  public boolean throttleAfterPoll(int polledBytes) {
     if (polledBytes > 0) {
       double sleptSeconds = rateLimiter.acquire(polledBytes);
       if (!throttled && sleptSeconds > 0.0) {
         throttled = true;
         log.debug("Polling throttling enabled for cluster {} at rate {} bytes/sec", clusterName, rateLimiter.getRate());
+        return true;
       }
     }
-  }
-
-  public void throttleAfterPoll(ConsumerRecords<Bytes, Bytes> polled) {
-    throttleAfterPoll(ConsumerRecordsUtil.calculatePolledSize(polled));
+    return false;
   }
 
 }

+ 5 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -2,21 +2,18 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
-import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 @Slf4j
-public class TailingEmitter extends AbstractEmitter {
+public class TailingEmitter extends AbstractEmitter
+    implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition consumerPosition;
 
-  public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+  public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
                         ConsumerPosition consumerPosition,
                         MessagesProcessing messagesProcessing,
                         PollingSettings pollingSettings) {
@@ -28,7 +25,7 @@ public class TailingEmitter extends AbstractEmitter {
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting tailing polling for {}", consumerPosition);
-    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
       SeekOperations.create(consumer, consumerPosition)
           .assignAndSeek();
       while (!sink.isCancelled()) {

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java

@@ -34,7 +34,7 @@ public interface KafkaConnectMapper {
       com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse
           connectorPluginConfigValidationResponse);
 
-  default FullConnectorInfoDTO fullConnectorInfoFromTuple(InternalConnectInfo connectInfo) {
+  default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) {
     ConnectorDTO connector = connectInfo.getConnector();
     List<TaskDTO> tasks = connectInfo.getTasks();
     int failedTasksCount = (int) tasks.stream()

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java

@@ -16,6 +16,7 @@ import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
 import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
@@ -73,6 +74,10 @@ public class Permission {
   }
 
   private List<String> getAllActionValues() {
+    if (resource == null) {
+      return Collections.emptyList();
+    }
+
     return switch (this.resource) {
       case APPLICATIONCONFIG -> Arrays.stream(ApplicationConfigAction.values()).map(Enum::toString).toList();
       case CLUSTERCONFIG -> Arrays.stream(ClusterConfigAction.values()).map(Enum::toString).toList();

+ 0 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/AvroEmbeddedSerde.java

@@ -19,12 +19,6 @@ public class AvroEmbeddedSerde implements BuiltInSerde {
     return "Avro (Embedded)";
   }
 
-  @Override
-  public void configure(PropertyResolver serdeProperties,
-                        PropertyResolver kafkaClusterProperties,
-                        PropertyResolver globalProperties) {
-  }
-
   @Override
   public Optional<String> getDescription() {
     return Optional.empty();

+ 9 - 23
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Base64Serde.java

@@ -1,8 +1,6 @@
 package com.provectus.kafka.ui.serdes.builtin;
 
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
-import com.provectus.kafka.ui.serde.api.PropertyResolver;
-import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import java.util.Base64;
@@ -16,12 +14,6 @@ public class Base64Serde implements BuiltInSerde {
     return "Base64";
   }
 
-  @Override
-  public void configure(PropertyResolver serdeProperties,
-                        PropertyResolver kafkaClusterProperties,
-                        PropertyResolver globalProperties) {
-  }
-
   @Override
   public Optional<String> getDescription() {
     return Optional.empty();
@@ -44,31 +36,25 @@ public class Base64Serde implements BuiltInSerde {
 
   @Override
   public Serializer serializer(String topic, Target type) {
-    return new Serializer() {
-      @Override
-      public byte[] serialize(String input) {
-        input = input.trim();
-        // it is actually a hack to provide ability to sent empty array as a key/value
-        if (input.length() == 0) {
-          return new byte[]{};
-        }
-        return Base64.getDecoder().decode(input);
+    var decoder = Base64.getDecoder();
+    return inputString -> {
+      inputString = inputString.trim();
+      // it is actually a hack to provide ability to sent empty array as a key/value
+      if (inputString.length() == 0) {
+        return new byte[] {};
       }
+      return decoder.decode(inputString);
     };
   }
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
     var encoder = Base64.getEncoder();
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        return new DeserializeResult(
+    return (headers, data) ->
+        new DeserializeResult(
             encoder.encodeToString(data),
             DeserializeResult.Type.STRING,
             Map.of()
         );
-      }
-    };
   }
 }

+ 2 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Int64Serde.java

@@ -55,15 +55,11 @@ public class Int64Serde implements BuiltInSerde {
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        return new DeserializeResult(
+    return (headers, data) ->
+        new DeserializeResult(
             String.valueOf(Longs.fromByteArray(data)),
             DeserializeResult.Type.JSON,
             Map.of()
         );
-      }
-    };
   }
 }

+ 3 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UInt64Serde.java

@@ -1,10 +1,8 @@
 package com.provectus.kafka.ui.serdes.builtin;
 
 import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedInteger;
 import com.google.common.primitives.UnsignedLong;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
-import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import java.util.Map;
@@ -32,7 +30,7 @@ public class UInt64Serde implements BuiltInSerde {
                     + "  \"minimum\" : 0, "
                     + "  \"maximum\" : %s "
                     + "}",
-                UnsignedInteger.MAX_VALUE
+                UnsignedLong.MAX_VALUE
             ),
             Map.of()
         )
@@ -56,15 +54,11 @@ public class UInt64Serde implements BuiltInSerde {
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        return new DeserializeResult(
+    return (headers, data) ->
+        new DeserializeResult(
             UnsignedLong.fromLongBits(Longs.fromByteArray(data)).toString(),
             DeserializeResult.Type.JSON,
             Map.of()
         );
-      }
-    };
   }
 }

+ 22 - 28
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UuidBinarySerde.java

@@ -50,41 +50,35 @@ public class UuidBinarySerde implements BuiltInSerde {
 
   @Override
   public Serializer serializer(String topic, Target type) {
-    return new Serializer() {
-      @Override
-      public byte[] serialize(String input) {
-        UUID uuid = UUID.fromString(input);
-        ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
-        if (mostSignificantBitsFirst) {
-          bb.putLong(uuid.getMostSignificantBits());
-          bb.putLong(uuid.getLeastSignificantBits());
-        } else {
-          bb.putLong(uuid.getLeastSignificantBits());
-          bb.putLong(uuid.getMostSignificantBits());
-        }
-        return bb.array();
+    return input -> {
+      UUID uuid = UUID.fromString(input);
+      ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
+      if (mostSignificantBitsFirst) {
+        bb.putLong(uuid.getMostSignificantBits());
+        bb.putLong(uuid.getLeastSignificantBits());
+      } else {
+        bb.putLong(uuid.getLeastSignificantBits());
+        bb.putLong(uuid.getMostSignificantBits());
       }
+      return bb.array();
     };
   }
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        if (data.length != 16) {
-          throw new ValidationException("UUID data should be 16 bytes, but it is " + data.length);
-        }
-        ByteBuffer bb = ByteBuffer.wrap(data);
-        long msb = bb.getLong();
-        long lsb = bb.getLong();
-        UUID uuid = mostSignificantBitsFirst ? new UUID(msb, lsb) : new UUID(lsb, msb);
-        return new DeserializeResult(
-            uuid.toString(),
-            DeserializeResult.Type.STRING,
-            Map.of()
-        );
+    return (headers, data) -> {
+      if (data.length != 16) {
+        throw new ValidationException("UUID data should be 16 bytes, but it is " + data.length);
       }
+      ByteBuffer bb = ByteBuffer.wrap(data);
+      long msb = bb.getLong();
+      long lsb = bb.getLong();
+      UUID uuid = mostSignificantBitsFirst ? new UUID(msb, lsb) : new UUID(lsb, msb);
+      return new DeserializeResult(
+          uuid.toString(),
+          DeserializeResult.Type.STRING,
+          Map.of()
+      );
     };
   }
 }

+ 10 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java

@@ -2,12 +2,14 @@ package com.provectus.kafka.ui.service;
 
 import com.google.common.collect.Streams;
 import com.google.common.collect.Table;
+import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
 import com.provectus.kafka.ui.model.InternalConsumerGroup;
 import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.SortOrderDTO;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -26,11 +28,8 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 
@@ -248,25 +247,27 @@ public class ConsumerGroupService {
         .flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)));
   }
 
-  public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
+  public EnhancedConsumer createConsumer(KafkaCluster cluster) {
     return createConsumer(cluster, Map.of());
   }
 
-  public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster,
-                                                    Map<String, Object> properties) {
+  public EnhancedConsumer createConsumer(KafkaCluster cluster,
+                                         Map<String, Object> properties) {
     Properties props = new Properties();
     SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
     props.putAll(cluster.getProperties());
     props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
     props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
     props.putAll(properties);
 
-    return new KafkaConsumer<>(props);
+    return new EnhancedConsumer(
+        props,
+        cluster.getPollingSettings().getPollingThrottler(),
+        ApplicationMetrics.forCluster(cluster)
+    );
   }
 
 }

+ 21 - 35
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
@@ -39,7 +38,6 @@ import org.springframework.stereotype.Service;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuples;
 
 @Service
 @Slf4j
@@ -61,39 +59,22 @@ public class KafkaConnectService {
   public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
                                                      @Nullable final String search) {
     return getConnects(cluster)
-        .flatMap(connect -> getConnectorNames(cluster, connect.getName()).map(cn -> Tuples.of(connect.getName(), cn)))
-        .flatMap(pair -> getConnector(cluster, pair.getT1(), pair.getT2()))
-        .flatMap(connector ->
-            getConnectorConfig(cluster, connector.getConnect(), connector.getName())
-                .map(config -> InternalConnectInfo.builder()
-                    .connector(connector)
-                    .config(config)
-                    .build()
-                )
-        )
-        .flatMap(connectInfo -> {
-          ConnectorDTO connector = connectInfo.getConnector();
-          return getConnectorTasks(cluster, connector.getConnect(), connector.getName())
-              .collectList()
-              .map(tasks -> InternalConnectInfo.builder()
-                  .connector(connector)
-                  .config(connectInfo.getConfig())
-                  .tasks(tasks)
-                  .build()
-              );
-        })
-        .flatMap(connectInfo -> {
-          ConnectorDTO connector = connectInfo.getConnector();
-          return getConnectorTopics(cluster, connector.getConnect(), connector.getName())
-              .map(ct -> InternalConnectInfo.builder()
-                  .connector(connector)
-                  .config(connectInfo.getConfig())
-                  .tasks(connectInfo.getTasks())
-                  .topics(ct.getTopics())
-                  .build()
-              );
-        })
-        .map(kafkaConnectMapper::fullConnectorInfoFromTuple)
+        .flatMap(connect ->
+            getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
+                .flatMap(connectorName ->
+                    Mono.zip(
+                        getConnector(cluster, connect.getName(), connectorName),
+                        getConnectorConfig(cluster, connect.getName(), connectorName),
+                        getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
+                        getConnectorTopics(cluster, connect.getName(), connectorName)
+                    ).map(tuple ->
+                        InternalConnectInfo.builder()
+                            .connector(tuple.getT1())
+                            .config(tuple.getT2())
+                            .tasks(tuple.getT3())
+                            .topics(tuple.getT4().getTopics())
+                            .build())))
+        .map(kafkaConnectMapper::fullConnectorInfo)
         .filter(matchesSearchTerm(search));
   }
 
@@ -132,6 +113,11 @@ public class KafkaConnectService {
         .flatMapMany(Flux::fromIterable);
   }
 
+  // returns empty flux if there was an error communicating with Connect
+  public Flux<String> getConnectorNamesWithErrorsSuppress(KafkaCluster cluster, String connectName) {
+    return getConnectorNames(cluster, connectName).onErrorComplete();
+  }
+
   @SneakyThrows
   private List<String> parseConnectorsNamesStringToList(String json) {
     return objectMapper.readValue(json, new TypeReference<>() {

+ 37 - 30
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -15,6 +15,8 @@ import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.util.KafkaVersion;
 import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
 import java.io.Closeable;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -129,38 +131,41 @@ public class ReactiveAdminClient implements Closeable {
                                    Set<SupportedFeature> features,
                                    boolean topicDeletionIsAllowed) {
 
-    private static Mono<ConfigRelatedInfo> extract(AdminClient ac, int controllerId) {
-      return loadBrokersConfig(ac, List.of(controllerId))
-          .map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(controllerId))
-          .flatMap(configs -> {
-            String version = "1.0-UNKNOWN";
-            boolean topicDeletionEnabled = true;
-            for (ConfigEntry entry : configs) {
-              if (entry.name().contains("inter.broker.protocol.version")) {
-                version = entry.value();
-              }
-              if (entry.name().equals("delete.topic.enable")) {
-                topicDeletionEnabled = Boolean.parseBoolean(entry.value());
-              }
-            }
-            var builder = ConfigRelatedInfo.builder()
-                .version(version)
-                .topicDeletionIsAllowed(topicDeletionEnabled);
-            return SupportedFeature.forVersion(ac, version)
-                .map(features -> builder.features(features).build());
-          });
+    static final Duration UPDATE_DURATION = Duration.of(1, ChronoUnit.HOURS);
+
+    private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
+      return ReactiveAdminClient.describeClusterImpl(ac, Set.of())
+          .flatMap(desc -> {
+            // choosing node from which we will get configs (starting with controller)
+            var targetNodeId = Optional.ofNullable(desc.controller)
+                .map(Node::id)
+                .orElse(desc.getNodes().iterator().next().id());
+            return loadBrokersConfig(ac, List.of(targetNodeId))
+                .map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(targetNodeId))
+                .flatMap(configs -> {
+                  String version = "1.0-UNKNOWN";
+                  boolean topicDeletionEnabled = true;
+                  for (ConfigEntry entry : configs) {
+                    if (entry.name().contains("inter.broker.protocol.version")) {
+                      version = entry.value();
+                    }
+                    if (entry.name().equals("delete.topic.enable")) {
+                      topicDeletionEnabled = Boolean.parseBoolean(entry.value());
+                    }
+                  }
+                  final String finalVersion = version;
+                  final boolean finalTopicDeletionEnabled = topicDeletionEnabled;
+                  return SupportedFeature.forVersion(ac, version)
+                      .map(features -> new ConfigRelatedInfo(finalVersion, features, finalTopicDeletionEnabled));
+                });
+          })
+          .cache(UPDATE_DURATION);
     }
   }
 
   public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
-    return describeClusterImpl(adminClient, Set.of())
-        // choosing node from which we will get configs (starting with controller)
-        .flatMap(descr -> descr.controller != null
-            ? Mono.just(descr.controller)
-            : Mono.justOrEmpty(descr.nodes.stream().findFirst())
-        )
-        .flatMap(node -> ConfigRelatedInfo.extract(adminClient, node.id()))
-        .map(info -> new ReactiveAdminClient(adminClient, info));
+    Mono<ConfigRelatedInfo> configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient);
+    return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info));
   }
 
 
@@ -170,7 +175,7 @@ public class ReactiveAdminClient implements Closeable {
         .doOnError(th -> !(th instanceof SecurityDisabledException)
                 && !(th instanceof InvalidRequestException)
                 && !(th instanceof UnsupportedVersionException),
-            th -> log.warn("Error checking if security enabled", th))
+            th -> log.debug("Error checking if security enabled", th))
         .onErrorReturn(false);
   }
 
@@ -202,6 +207,8 @@ public class ReactiveAdminClient implements Closeable {
 
   @Getter(AccessLevel.PACKAGE) // visible for testing
   private final AdminClient client;
+  private final Mono<ConfigRelatedInfo> configRelatedInfoMono;
+
   private volatile ConfigRelatedInfo configRelatedInfo;
 
   public Set<SupportedFeature> getClusterFeatures() {
@@ -228,7 +235,7 @@ public class ReactiveAdminClient implements Closeable {
     if (controller == null) {
       return Mono.empty();
     }
-    return ConfigRelatedInfo.extract(client, controller.id())
+    return configRelatedInfoMono
         .doOnNext(info -> this.configRelatedInfo = info)
         .then();
   }

+ 181 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java

@@ -1,16 +1,44 @@
 package com.provectus.kafka.ui.service.acl;
 
+import static org.apache.kafka.common.acl.AclOperation.ALL;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
+import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
+import static org.apache.kafka.common.resource.ResourceType.GROUP;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID;
+
 import com.google.common.collect.Sets;
+import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
+import com.provectus.kafka.ui.model.CreateProducerAclDTO;
+import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.AdminClientService;
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
+import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
 import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -22,11 +50,14 @@ public class AclsService {
   private final AdminClientService adminClientService;
 
   public Mono<Void> createAcl(KafkaCluster cluster, AclBinding aclBinding) {
-    var aclString = AclCsv.createAclString(aclBinding);
-    log.info("CREATING ACL: [{}]", aclString);
     return adminClientService.get(cluster)
-        .flatMap(ac -> ac.createAcls(List.of(aclBinding)))
-        .doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString));
+        .flatMap(ac -> createAclsWithLogging(ac, List.of(aclBinding)));
+  }
+
+  private Mono<Void> createAclsWithLogging(ReactiveAdminClient ac, Collection<AclBinding> bindings) {
+    bindings.forEach(b -> log.info("CREATING ACL: [{}]", AclCsv.createAclString(b)));
+    return ac.createAcls(bindings)
+        .doOnSuccess(v -> bindings.forEach(b -> log.info("ACL CREATED: [{}]", AclCsv.createAclString(b))));
   }
 
   public Mono<Void> deleteAcl(KafkaCluster cluster, AclBinding aclBinding) {
@@ -92,4 +123,150 @@ public class AclsService {
     }
   }
 
+  // creates allow binding for resources by prefix or specific names list
+  private List<AclBinding> createAllowBindings(ResourceType resourceType,
+                                               List<AclOperation> opsToAllow,
+                                               String principal,
+                                               String host,
+                                               @Nullable String resourcePrefix,
+                                               @Nullable Collection<String> resourceNames) {
+    List<AclBinding> bindings = new ArrayList<>();
+    if (resourcePrefix != null) {
+      for (var op : opsToAllow) {
+        bindings.add(
+            new AclBinding(
+                new ResourcePattern(resourceType, resourcePrefix, PREFIXED),
+                new AccessControlEntry(principal, host, op, ALLOW)));
+      }
+    }
+    if (!CollectionUtils.isEmpty(resourceNames)) {
+      resourceNames.stream()
+          .distinct()
+          .forEach(resource ->
+              opsToAllow.forEach(op ->
+                  bindings.add(
+                      new AclBinding(
+                          new ResourcePattern(resourceType, resource, LITERAL),
+                          new AccessControlEntry(principal, host, op, ALLOW)))));
+    }
+    return bindings;
+  }
+
+  public Mono<Void> createConsumerAcl(KafkaCluster cluster, CreateConsumerAclDTO request) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> createAclsWithLogging(ac, createConsumerBindings(request)))
+        .then();
+  }
+
+  //Read, Describe on topics, Read on consumerGroups
+  private List<AclBinding> createConsumerBindings(CreateConsumerAclDTO request) {
+    List<AclBinding> bindings = new ArrayList<>();
+    bindings.addAll(
+        createAllowBindings(TOPIC,
+            List.of(READ, DESCRIBE),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getTopicsPrefix(),
+            request.getTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            GROUP,
+            List.of(READ),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getConsumerGroupsPrefix(),
+            request.getConsumerGroups()));
+    return bindings;
+  }
+
+  public Mono<Void> createProducerAcl(KafkaCluster cluster, CreateProducerAclDTO request) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> createAclsWithLogging(ac, createProducerBindings(request)))
+        .then();
+  }
+
+  //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
+  //IDEMPOTENT_WRITE on cluster if idempotent is enabled
+  private List<AclBinding> createProducerBindings(CreateProducerAclDTO request) {
+    List<AclBinding> bindings = new ArrayList<>();
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(WRITE, DESCRIBE, CREATE),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getTopicsPrefix(),
+            request.getTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            TRANSACTIONAL_ID,
+            List.of(WRITE, DESCRIBE),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getTransactionsIdPrefix(),
+            Optional.ofNullable(request.getTransactionalId()).map(List::of).orElse(null)));
+
+    if (Boolean.TRUE.equals(request.getIdempotent())) {
+      bindings.addAll(
+          createAllowBindings(
+              CLUSTER,
+              List.of(IDEMPOTENT_WRITE),
+              request.getPrincipal(),
+              request.getHost(),
+              null,
+              List.of(Resource.CLUSTER_NAME))); // cluster name is a const string in ACL api
+    }
+    return bindings;
+  }
+
+  public Mono<Void> createStreamAppAcl(KafkaCluster cluster, CreateStreamAppAclDTO request) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> createAclsWithLogging(ac, createStreamAppBindings(request)))
+        .then();
+  }
+
+  // Read on input topics, Write on output topics
+  // ALL on applicationId-prefixed Groups and Topics
+  private List<AclBinding> createStreamAppBindings(CreateStreamAppAclDTO request) {
+    List<AclBinding> bindings = new ArrayList<>();
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(READ),
+            request.getPrincipal(),
+            request.getHost(),
+            null,
+            request.getInputTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(WRITE),
+            request.getPrincipal(),
+            request.getHost(),
+            null,
+            request.getOutputTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            GROUP,
+            List.of(ALL),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getApplicationId(),
+            null));
+
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(ALL),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getApplicationId(),
+            null));
+    return bindings;
+  }
+
 }

+ 4 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java

@@ -1,9 +1,9 @@
 package com.provectus.kafka.ui.service.analyze;
 
 import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
+import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.OffsetsInfo;
 import com.provectus.kafka.ui.emitter.PollingSettings;
-import com.provectus.kafka.ui.emitter.PollingThrottler;
 import com.provectus.kafka.ui.exception.TopicAnalysisException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.TopicAnalysisDTO;
@@ -20,11 +20,9 @@ import java.util.stream.IntStream;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -84,12 +82,11 @@ public class TopicAnalysisService {
     private final int partitionsCnt;
     private final long approxNumberOfMsgs;
     private final EmptyPollsCounter emptyPollsCounter;
-    private final PollingThrottler throttler;
 
     private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
     private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
 
-    private final KafkaConsumer<Bytes, Bytes> consumer;
+    private final EnhancedConsumer consumer;
 
     AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
                  long approxNumberOfMsgs, PollingSettings pollingSettings) {
@@ -104,7 +101,6 @@ public class TopicAnalysisService {
               ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
           )
       );
-      this.throttler = pollingSettings.getPollingThrottler();
       this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter();
     }
 
@@ -127,9 +123,8 @@ public class TopicAnalysisService {
 
         var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
         while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) {
-          var polled = consumer.poll(Duration.ofSeconds(3));
-          throttler.throttleAfterPoll(polled);
-          emptyPollsCounter.count(polled);
+          var polled = consumer.pollEnhanced(Duration.ofSeconds(3));
+          emptyPollsCounter.count(polled.count());
           polled.forEach(r -> {
             totalStats.apply(r);
             partitionStats.get(r.partition()).apply(r);

+ 36 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditService.java

@@ -13,6 +13,7 @@ import com.provectus.kafka.ui.service.ClustersStorage;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import java.io.Closeable;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -37,6 +38,7 @@ import reactor.core.publisher.Signal;
 public class AuditService implements Closeable {
 
   private static final Mono<AuthenticatedUser> NO_AUTH_USER = Mono.just(new AuthenticatedUser("Unknown", Set.of()));
+  private static final Duration BLOCK_TIMEOUT = Duration.ofSeconds(5);
 
   private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log";
   private static final int DEFAULT_AUDIT_TOPIC_PARTITIONS = 1;
@@ -56,14 +58,8 @@ public class AuditService implements Closeable {
   public AuditService(AdminClientService adminClientService, ClustersStorage clustersStorage) {
     Map<String, AuditWriter> auditWriters = new HashMap<>();
     for (var cluster : clustersStorage.getKafkaClusters()) {
-      ReactiveAdminClient adminClient;
-      try {
-        adminClient = adminClientService.get(cluster).block();
-      } catch (Exception e) {
-        printAuditInitError(cluster, "Error connect to cluster", e);
-        continue;
-      }
-      createAuditWriter(cluster, adminClient, () -> createProducer(cluster, AUDIT_PRODUCER_CONFIG))
+      Supplier<ReactiveAdminClient> adminClientSupplier = () -> adminClientService.get(cluster).block(BLOCK_TIMEOUT);
+      createAuditWriter(cluster, adminClientSupplier, () -> createProducer(cluster, AUDIT_PRODUCER_CONFIG))
           .ifPresent(writer -> auditWriters.put(cluster.getName(), writer));
     }
     this.auditWriters = auditWriters;
@@ -76,7 +72,7 @@ public class AuditService implements Closeable {
 
   @VisibleForTesting
   static Optional<AuditWriter> createAuditWriter(KafkaCluster cluster,
-                                                 ReactiveAdminClient ac,
+                                                 Supplier<ReactiveAdminClient> acSupplier,
                                                  Supplier<KafkaProducer<byte[], byte[]>> producerFactory) {
     var auditProps = cluster.getOriginalProperties().getAudit();
     if (auditProps == null) {
@@ -87,32 +83,54 @@ public class AuditService implements Closeable {
     if (!topicAudit && !consoleAudit) {
       return Optional.empty();
     }
+    if (!topicAudit) {
+      log.info("Audit initialization finished for cluster '{}' (console only)", cluster.getName());
+      return Optional.of(consoleOnlyWriter(cluster));
+    }
     String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
-    @Nullable KafkaProducer<byte[], byte[]> producer = null;
-    if (topicAudit && createTopicIfNeeded(cluster, ac, auditTopicName, auditProps)) {
-      producer = producerFactory.get();
+    boolean topicAuditCanBeDone = createTopicIfNeeded(cluster, acSupplier, auditTopicName, auditProps);
+    if (!topicAuditCanBeDone) {
+      if (consoleAudit) {
+        log.info(
+            "Audit initialization finished for cluster '{}' (console only, topic audit init failed)",
+            cluster.getName()
+        );
+        return Optional.of(consoleOnlyWriter(cluster));
+      }
+      return Optional.empty();
     }
-    log.info("Audit service initialized for cluster '{}'", cluster.getName());
+    log.info("Audit initialization finished for cluster '{}'", cluster.getName());
     return Optional.of(
         new AuditWriter(
             cluster.getName(),
             auditTopicName,
-            producer,
+            producerFactory.get(),
             consoleAudit ? AUDIT_LOGGER : null
         )
     );
   }
 
+  private static AuditWriter consoleOnlyWriter(KafkaCluster cluster) {
+    return new AuditWriter(cluster.getName(), null, null, AUDIT_LOGGER);
+  }
+
   /**
    * return true if topic created/existing and producing can be enabled.
    */
   private static boolean createTopicIfNeeded(KafkaCluster cluster,
-                                             ReactiveAdminClient ac,
+                                             Supplier<ReactiveAdminClient> acSupplier,
                                              String auditTopicName,
                                              ClustersProperties.AuditProperties auditProps) {
+    ReactiveAdminClient ac;
+    try {
+      ac = acSupplier.get();
+    } catch (Exception e) {
+      printAuditInitError(cluster, "Error while connecting to the cluster", e);
+      return false;
+    }
     boolean topicExists;
     try {
-      topicExists = ac.listTopics(true).block().contains(auditTopicName);
+      topicExists = ac.listTopics(true).block(BLOCK_TIMEOUT).contains(auditTopicName);
     } catch (Exception e) {
       printAuditInitError(cluster, "Error checking audit topic existence", e);
       return false;
@@ -130,7 +148,7 @@ public class AuditService implements Closeable {
           .ifPresent(topicConfig::putAll);
 
       log.info("Creating audit topic '{}' for cluster '{}'", auditTopicName, cluster.getName());
-      ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block();
+      ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block(BLOCK_TIMEOUT);
       log.info("Audit topic created for cluster '{}'", cluster.getName());
       return true;
     } catch (Exception e) {
@@ -142,7 +160,7 @@ public class AuditService implements Closeable {
   private static void printAuditInitError(KafkaCluster cluster, String errorMsg, Exception cause) {
     log.error("-----------------------------------------------------------------");
     log.error(
-        "Error initializing Audit Service for cluster '{}'. Audit will be disabled. See error below: ",
+        "Error initializing Audit for cluster '{}'. Audit will be disabled. See error below: ",
         cluster.getName()
     );
     log.error("{}", errorMsg, cause);

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditWriter.java

@@ -18,7 +18,7 @@ import org.slf4j.Logger;
 
 @Slf4j
 record AuditWriter(String clusterName,
-                   String targetTopic,
+                   @Nullable String targetTopic,
                    @Nullable KafkaProducer<byte[], byte[]> producer,
                    @Nullable Logger consoleLogger) implements Closeable {
 
@@ -43,7 +43,7 @@ record AuditWriter(String clusterName,
     if (consoleLogger != null) {
       consoleLogger.info(json);
     }
-    if (producer != null) {
+    if (targetTopic != null && producer != null) {
       producer.send(
           new ProducerRecord<>(targetTopic, null, json.getBytes(UTF_8)),
           (metadata, ex) -> {

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporter.java

@@ -25,7 +25,7 @@ class ConnectorsExporter {
 
   Flux<DataEntityList> export(KafkaCluster cluster) {
     return kafkaConnectService.getConnects(cluster)
-        .flatMap(connect -> kafkaConnectService.getConnectorNames(cluster, connect.getName())
+        .flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
             .flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName))
             .flatMap(connectorDTO ->
                 kafkaConnectService.getConnectorTopics(cluster, connect.getName(), connectorDTO.getName())

+ 14 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java

@@ -51,6 +51,8 @@ import reactor.core.publisher.Mono;
 @Slf4j
 public class AccessControlService {
 
+  private static final String ACCESS_DENIED = "Access denied";
+
   @Nullable
   private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository;
   private final RoleBasedAccessControlProperties properties;
@@ -97,6 +99,17 @@ public class AccessControlService {
       return Mono.empty();
     }
 
+    if (CollectionUtils.isNotEmpty(context.getApplicationConfigActions())) {
+      return getUser()
+          .doOnNext(user -> {
+            boolean accessGranted = isApplicationConfigAccessible(context, user);
+
+            if (!accessGranted) {
+              throw new AccessDeniedException(ACCESS_DENIED);
+            }
+          }).then();
+    }
+
     return getUser()
         .doOnNext(user -> {
           boolean accessGranted =
@@ -113,7 +126,7 @@ public class AccessControlService {
                   && isAuditAccessible(context, user);
 
           if (!accessGranted) {
-            throw new AccessDeniedException("Access denied");
+            throw new AccessDeniedException(ACCESS_DENIED);
           }
         })
         .then();

+ 82 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ApplicationMetrics.java

@@ -0,0 +1,82 @@
+package com.provectus.kafka.ui.util;
+
+import static lombok.AccessLevel.PRIVATE;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.provectus.kafka.ui.emitter.PolledRecords;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.DistributionSummary;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Timer;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor(access = PRIVATE)
+public class ApplicationMetrics {
+
+  private final String clusterName;
+  private final MeterRegistry registry;
+
+  public static ApplicationMetrics forCluster(KafkaCluster cluster) {
+    return new ApplicationMetrics(cluster.getName(), Metrics.globalRegistry);
+  }
+
+  @VisibleForTesting
+  public static ApplicationMetrics noop() {
+    return new ApplicationMetrics("noop", new SimpleMeterRegistry());
+  }
+
+  public void meterPolledRecords(String topic, PolledRecords polled, boolean throttled) {
+    pollTimer(topic).record(polled.elapsed());
+    polledRecords(topic).increment(polled.count());
+    polledBytes(topic).record(polled.bytes());
+    if (throttled) {
+      pollThrottlingActivations().increment();
+    }
+  }
+
+  private Counter polledRecords(String topic) {
+    return Counter.builder("topic_records_polled")
+        .description("Number of records polled from topic")
+        .tag("cluster", clusterName)
+        .tag("topic", topic)
+        .register(registry);
+  }
+
+  private DistributionSummary polledBytes(String topic) {
+    return DistributionSummary.builder("topic_polled_bytes")
+        .description("Bytes polled from kafka topic")
+        .tag("cluster", clusterName)
+        .tag("topic", topic)
+        .register(registry);
+  }
+
+  private Timer pollTimer(String topic) {
+    return Timer.builder("topic_poll_time")
+        .description("Time spend in polling for topic")
+        .tag("cluster", clusterName)
+        .tag("topic", topic)
+        .register(registry);
+  }
+
+  private Counter pollThrottlingActivations() {
+    return Counter.builder("poll_throttling_activations")
+        .description("Number of poll throttling activations")
+        .tag("cluster", clusterName)
+        .register(registry);
+  }
+
+  public AtomicInteger activeConsumers() {
+    var count = new AtomicInteger();
+    Gauge.builder("active_consumers", () -> count)
+        .description("Number of active consumers")
+        .tag("cluster", clusterName)
+        .register(registry);
+    return count;
+  }
+
+}

+ 0 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordsUtil.java

@@ -1,29 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.utils.Bytes;
-
-public class ConsumerRecordsUtil {
-
-  public static int calculatePolledRecSize(ConsumerRecord<Bytes, Bytes> rec) {
-    int polledBytes = 0;
-    for (Header header : rec.headers()) {
-      polledBytes +=
-          (header.key() != null ? header.key().getBytes().length : 0)
-              + (header.value() != null ? header.value().length : 0);
-    }
-    polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
-    polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
-    return polledBytes;
-  }
-
-  public static int calculatePolledSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
-    int polledBytes = 0;
-    for (ConsumerRecord<Bytes, Bytes> rec : recs) {
-      polledBytes += calculatePolledRecSize(rec);
-    }
-    return polledBytes;
-  }
-
-}

+ 1 - 1
kafka-ui-api/src/main/resources/application.yml

@@ -10,7 +10,7 @@ management:
   endpoints:
     web:
       exposure:
-        include: "info,health"
+        include: "info,health,prometheus"
 
 logging:
   level:

+ 2 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java

@@ -77,6 +77,8 @@ public abstract class AbstractIntegrationTest {
       System.setProperty("kafka.clusters.0.kafkaConnect.0.userName", "kafka-connect");
       System.setProperty("kafka.clusters.0.kafkaConnect.0.password", "kafka-connect");
       System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget());
+      System.setProperty("kafka.clusters.0.kafkaConnect.1.name", "notavailable");
+      System.setProperty("kafka.clusters.0.kafkaConnect.1.address", "http://notavailable:6666");
       System.setProperty("kafka.clusters.0.masking.0.type", "REPLACE");
       System.setProperty("kafka.clusters.0.masking.0.replacement", "***");
       System.setProperty("kafka.clusters.0.masking.0.topicValuesPattern", "masking-test-.*");

+ 7 - 9
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -11,9 +11,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.Cursor;
+import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.PollingSettings;
+import com.provectus.kafka.ui.emitter.PollingThrottler;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.ConsumerPosition.Offsets;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
@@ -22,6 +24,7 @@ import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -37,12 +40,9 @@ import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -338,22 +338,20 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     assertionsConsumer.accept(step.expectComplete().verifyThenAssertThat());
   }
 
-  private KafkaConsumer<Bytes, Bytes> createConsumer() {
+  private EnhancedConsumer createConsumer() {
     return createConsumer(Map.of());
   }
 
-  private KafkaConsumer<Bytes, Bytes> createConsumer(Map<String, Object> properties) {
+  private EnhancedConsumer createConsumer(Map<String, Object> properties) {
     final Map<String, ? extends Serializable> map = Map.of(
         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
         ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
-        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19, // to check multiple polls
-        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
-        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
+        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19 // to check multiple polls
     );
     Properties props = new Properties();
     props.putAll(map);
     props.putAll(properties);
-    return new KafkaConsumer<>(props);
+    return new EnhancedConsumer(props, PollingThrottler.noop(), ApplicationMetrics.noop());
   }
 
   @Value

+ 214 - 6
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java

@@ -4,16 +4,21 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
+import com.provectus.kafka.ui.model.CreateProducerAclDTO;
+import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.AdminClientService;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
@@ -53,12 +58,12 @@ class AclsServiceTest {
     when(adminClientMock.listAcls(ResourcePatternFilter.ANY))
         .thenReturn(Mono.just(List.of(existingBinding1, existingBinding2)));
 
-    ArgumentCaptor<?> createdCaptor = ArgumentCaptor.forClass(Collection.class);
-    when(adminClientMock.createAcls((Collection<AclBinding>) createdCaptor.capture()))
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
         .thenReturn(Mono.empty());
 
-    ArgumentCaptor<?> deletedCaptor = ArgumentCaptor.forClass(Collection.class);
-    when(adminClientMock.deleteAcls((Collection<AclBinding>) deletedCaptor.capture()))
+    ArgumentCaptor<Collection<AclBinding>> deletedCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.deleteAcls(deletedCaptor.capture()))
         .thenReturn(Mono.empty());
 
     aclsService.syncAclWithAclCsv(
@@ -68,15 +73,218 @@ class AclsServiceTest {
             + "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost"
     ).block();
 
-    Collection<AclBinding> createdBindings = (Collection<AclBinding>) createdCaptor.getValue();
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
     assertThat(createdBindings)
         .hasSize(1)
         .contains(newBindingToBeAdded);
 
-    Collection<AclBinding> deletedBindings = (Collection<AclBinding>) deletedCaptor.getValue();
+    Collection<AclBinding> deletedBindings = deletedCaptor.getValue();
     assertThat(deletedBindings)
         .hasSize(1)
         .contains(existingBinding2);
   }
 
+
+  @Test
+  void createsConsumerDependantAcls() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createConsumerAcl(
+        CLUSTER,
+        new CreateConsumerAclDTO()
+            .principal(principal)
+            .host(host)
+            .consumerGroups(List.of("cg1", "cg2"))
+            .topics(List.of("t1", "t2"))
+    ).block();
+
+    //Read, Describe on topics, Read on consumerGroups
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(6)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "cg1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "cg2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)));
+  }
+
+  @Test
+  void createsConsumerDependantAclsWhenTopicsAndGroupsSpecifiedByPrefix() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createConsumerAcl(
+        CLUSTER,
+        new CreateConsumerAclDTO()
+            .principal(principal)
+            .host(host)
+            .consumerGroupsPrefix("cgPref")
+            .topicsPrefix("topicPref")
+    ).block();
+
+    //Read, Describe on topics, Read on consumerGroups
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(3)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "cgPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)));
+  }
+
+  @Test
+  void createsProducerDependantAcls() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createProducerAcl(
+        CLUSTER,
+        new CreateProducerAclDTO()
+            .principal(principal)
+            .host(host)
+            .topics(List.of("t1"))
+            .idempotent(true)
+            .transactionalId("txId1")
+    ).block();
+
+    //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
+    //IDEMPOTENT_WRITE on cluster if idempotent is enabled (true)
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(6)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.IDEMPOTENT_WRITE, AclPermissionType.ALLOW)));
+  }
+
+
+  @Test
+  void createsProducerDependantAclsWhenTopicsAndTxIdSpecifiedByPrefix() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createProducerAcl(
+        CLUSTER,
+        new CreateProducerAclDTO()
+            .principal(principal)
+            .host(host)
+            .topicsPrefix("topicPref")
+            .transactionsIdPrefix("txIdPref")
+            .idempotent(false)
+    ).block();
+
+    //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
+    //IDEMPOTENT_WRITE on cluster if idempotent is enabled (false)
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(5)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)));
+  }
+
+
+  @Test
+  void createsStreamAppDependantAcls() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createStreamAppAcl(
+        CLUSTER,
+        new CreateStreamAppAclDTO()
+            .principal(principal)
+            .host(host)
+            .inputTopics(List.of("t1"))
+            .outputTopics(List.of("t2", "t3"))
+            .applicationId("appId1")
+    ).block();
+
+    // Read on input topics, Write on output topics
+    // ALL on applicationId-prefixed Groups and Topics
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(5)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t3", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "appId1", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "appId1", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW)));
+  }
 }

+ 4 - 4
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditServiceTest.java

@@ -81,7 +81,7 @@ class AuditServiceTest {
 
     @Test
     void noWriterIfNoAuditPropsSet() {
-      var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
+      var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
       assertThat(maybeWriter).isEmpty();
     }
 
@@ -91,7 +91,7 @@ class AuditServiceTest {
       auditProps.setConsoleAuditEnabled(true);
       clustersProperties.setAudit(auditProps);
 
-      var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
+      var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
       assertThat(maybeWriter).isPresent();
 
       var writer = maybeWriter.get();
@@ -116,7 +116,7 @@ class AuditServiceTest {
         when(adminClientMock.listTopics(true))
             .thenReturn(Mono.just(Set.of("test_audit_topic")));
 
-        var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
+        var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
         assertThat(maybeWriter).isPresent();
 
         //checking there was no topic creation request
@@ -136,7 +136,7 @@ class AuditServiceTest {
         when(adminClientMock.createTopic(eq("test_audit_topic"), eq(3), eq(null), anyMap()))
             .thenReturn(Mono.empty());
 
-        var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
+        var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
         assertThat(maybeWriter).isPresent();
 
         //verifying topic created

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporterTest.java

@@ -61,7 +61,7 @@ class ConnectorsExporterTest {
     when(kafkaConnectService.getConnects(CLUSTER))
         .thenReturn(Flux.just(connect));
 
-    when(kafkaConnectService.getConnectorNames(CLUSTER, connect.getName()))
+    when(kafkaConnectService.getConnectorNamesWithErrorsSuppress(CLUSTER, connect.getName()))
         .thenReturn(Flux.just(sinkConnector.getName(), sourceConnector.getName()));
 
     when(kafkaConnectService.getConnector(CLUSTER, connect.getName(), sinkConnector.getName()))

+ 127 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -1981,6 +1981,69 @@ paths:
         404:
           description: Acl not found
 
+  /api/clusters/{clusterName}/acl/consumer:
+    post:
+      tags:
+        - Acls
+      summary: createConsumerAcl
+      operationId: createConsumerAcl
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateConsumerAcl'
+      responses:
+        200:
+          description: OK
+
+  /api/clusters/{clusterName}/acl/producer:
+    post:
+      tags:
+        - Acls
+      summary: createProducerAcl
+      operationId: createProducerAcl
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateProducerAcl'
+      responses:
+        200:
+          description: OK
+
+  /api/clusters/{clusterName}/acl/streamApp:
+    post:
+      tags:
+        - Acls
+      summary: createStreamAppAcl
+      operationId: createStreamAppAcl
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateStreamAppAcl'
+      responses:
+        200:
+          description: OK
+
   /api/authorization:
     get:
       tags:
@@ -3693,7 +3756,7 @@ components:
         principal:
           type: string
         host:
-          type: string  # "*" if acl can be applied to any resource of given type
+          type: string
         operation:
           type: string
           enum:
@@ -3717,6 +3780,69 @@ components:
             - ALLOW
             - DENY
 
+    CreateConsumerAcl:
+      type: object
+      required: [principal, host]
+      properties:
+        principal:
+          type: string
+        host:
+          type: string
+        topics:
+          type: array
+          items:
+            type: string
+        topicsPrefix:
+          type: string
+        consumerGroups:
+          type: array
+          items:
+            type: string
+        consumerGroupsPrefix:
+          type: string
+
+    CreateProducerAcl:
+      type: object
+      required: [principal, host]
+      properties:
+        principal:
+          type: string
+        host:
+          type: string
+        topics:
+          type: array
+          items:
+            type: string
+        topicsPrefix:
+          type: string
+        transactionalId:
+          type: string
+        transactionsIdPrefix:
+          type: string
+        idempotent:
+          type: boolean
+          default: false
+
+    CreateStreamAppAcl:
+      type: object
+      required: [principal, host, applicationId, inputTopics, outputTopics]
+      properties:
+        principal:
+          type: string
+        host:
+          type: string
+        inputTopics:
+          type: array
+          items:
+            type: string
+        outputTopics:
+          type: array
+          items:
+            type: string
+        applicationId:
+          nullable: false
+          type: string
+
     KafkaAclResourceType:
       type: string
       enum:

+ 2 - 2
kafka-ui-react-app/src/components/NavBar/UserInfo/UserInfo.tsx

@@ -19,8 +19,8 @@ const UserInfo = () => {
         </S.Wrapper>
       }
     >
-      <DropdownItem>
-        <S.LogoutLink href={`${window.basePath}/logout`}>Log out</S.LogoutLink>
+      <DropdownItem href={`${window.basePath}/logout`}>
+        <S.LogoutLink>Log out</S.LogoutLink>
       </DropdownItem>
     </Dropdown>
   ) : null;

+ 0 - 2
kafka-ui-react-app/src/components/NavBar/UserInfo/__tests__/UserInfo.spec.tsx

@@ -34,7 +34,6 @@ describe('UserInfo', () => {
 
     const logout = screen.getByText('Log out');
     expect(logout).toBeInTheDocument();
-    expect(logout).toHaveAttribute('href', '/logout');
   });
 
   it('should render correct url during basePath initialization', async () => {
@@ -50,7 +49,6 @@ describe('UserInfo', () => {
 
     const logout = screen.getByText('Log out');
     expect(logout).toBeInTheDocument();
-    expect(logout).toHaveAttribute('href', `${baseUrl}/logout`);
   });
 
   it('should not render anything if the username does not exists', () => {