فهرست منبع

Merge branch 'master' into issue/3163

Roman Zabaluev 1 سال پیش
والد
کامیت
8da6ac3e1d
48فایلهای تغییر یافته به همراه1197 افزوده شده و 308 حذف شده
  1. 4 0
      README.md
  2. 5 0
      kafka-ui-api/pom.xml
  3. 1 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java
  4. 3 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/OAuthSecurityConfig.java
  5. 3 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/condition/CognitoCondition.java
  6. 54 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java
  7. 7 18
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java
  8. 7 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java
  9. 4 13
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java
  10. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java
  11. 82 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EnhancedConsumer.java
  12. 5 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java
  13. 2 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java
  14. 48 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java
  15. 4 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java
  16. 4 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java
  17. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java
  18. 5 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java
  19. 2 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java
  20. 0 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/AvroEmbeddedSerde.java
  21. 9 23
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Base64Serde.java
  22. 89 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/HexSerde.java
  23. 2 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Int64Serde.java
  24. 3 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UInt64Serde.java
  25. 22 28
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UuidBinarySerde.java
  26. 10 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java
  27. 21 35
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java
  28. 37 30
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java
  29. 181 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java
  30. 4 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java
  31. 36 18
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditService.java
  32. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditWriter.java
  33. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporter.java
  34. 14 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java
  35. 82 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ApplicationMetrics.java
  36. 0 29
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordsUtil.java
  37. 1 1
      kafka-ui-api/src/main/resources/application.yml
  38. 2 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
  39. 80 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/HexSerdeTest.java
  40. 7 7
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java
  41. 214 6
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java
  42. 4 4
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditServiceTest.java
  43. 1 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporterTest.java
  44. 127 1
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
  45. 1 1
      kafka-ui-react-app/src/components/ConsumerGroups/Details/ResetOffsets/Form.tsx
  46. 2 1
      kafka-ui-react-app/src/components/ConsumerGroups/Details/ResetOffsets/ResetOffsets.tsx
  47. 2 2
      kafka-ui-react-app/src/components/NavBar/UserInfo/UserInfo.tsx
  48. 0 2
      kafka-ui-react-app/src/components/NavBar/UserInfo/__tests__/UserInfo.spec.tsx

+ 4 - 0
README.md

@@ -18,6 +18,10 @@
     <a href="https://www.producthunt.com/products/ui-for-apache-kafka/reviews/new">ProductHunt</a>
     <a href="https://www.producthunt.com/products/ui-for-apache-kafka/reviews/new">ProductHunt</a>
 </p>
 </p>
 
 
+<p align="center">
+  <img src="https://repobeats.axiom.co/api/embed/2e8a7c2d711af9daddd34f9791143e7554c35d0f.svg" />
+</p>
+
 #### UI for Apache Kafka is a free, open-source web UI to monitor and manage Apache Kafka clusters.
 #### UI for Apache Kafka is a free, open-source web UI to monitor and manage Apache Kafka clusters.
 
 
 UI for Apache Kafka is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.
 UI for Apache Kafka is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.

+ 5 - 0
kafka-ui-api/pom.xml

@@ -114,6 +114,11 @@
             <artifactId>json</artifactId>
             <artifactId>json</artifactId>
             <version>${org.json.version}</version>
             <version>${org.json.version}</version>
         </dependency>
         </dependency>
+        <dependency>
+            <groupId>io.micrometer</groupId>
+            <artifactId>micrometer-registry-prometheus</artifactId>
+            <scope>runtime</scope>
+        </dependency>
 
 
         <dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <groupId>org.springframework.boot</groupId>

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java

@@ -13,6 +13,7 @@ abstract class AbstractAuthSecurityConfig {
       "/resources/**",
       "/resources/**",
       "/actuator/health/**",
       "/actuator/health/**",
       "/actuator/info",
       "/actuator/info",
+      "/actuator/prometheus",
       "/auth",
       "/auth",
       "/login",
       "/login",
       "/logout",
       "/logout",

+ 3 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/OAuthSecurityConfig.java

@@ -99,6 +99,9 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
     final OAuth2ClientProperties props = OAuthPropertiesConverter.convertProperties(properties);
     final OAuth2ClientProperties props = OAuthPropertiesConverter.convertProperties(properties);
     final List<ClientRegistration> registrations =
     final List<ClientRegistration> registrations =
         new ArrayList<>(new OAuth2ClientPropertiesMapper(props).asClientRegistrations().values());
         new ArrayList<>(new OAuth2ClientPropertiesMapper(props).asClientRegistrations().values());
+    if (registrations.isEmpty()) {
+      throw new IllegalArgumentException("OAuth2 authentication is enabled but no providers specified.");
+    }
     return new InMemoryReactiveClientRegistrationRepository(registrations);
     return new InMemoryReactiveClientRegistrationRepository(registrations);
   }
   }
 
 

+ 3 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/condition/CognitoCondition.java

@@ -1,13 +1,14 @@
 package com.provectus.kafka.ui.config.auth.condition;
 package com.provectus.kafka.ui.config.auth.condition;
 
 
 import com.provectus.kafka.ui.service.rbac.AbstractProviderCondition;
 import com.provectus.kafka.ui.service.rbac.AbstractProviderCondition;
+import org.jetbrains.annotations.NotNull;
 import org.springframework.context.annotation.Condition;
 import org.springframework.context.annotation.Condition;
 import org.springframework.context.annotation.ConditionContext;
 import org.springframework.context.annotation.ConditionContext;
 import org.springframework.core.type.AnnotatedTypeMetadata;
 import org.springframework.core.type.AnnotatedTypeMetadata;
 
 
 public class CognitoCondition extends AbstractProviderCondition implements Condition {
 public class CognitoCondition extends AbstractProviderCondition implements Condition {
   @Override
   @Override
-  public boolean matches(final ConditionContext context, final AnnotatedTypeMetadata metadata) {
+  public boolean matches(final ConditionContext context, final @NotNull AnnotatedTypeMetadata metadata) {
     return getRegisteredProvidersTypes(context.getEnvironment()).stream().anyMatch(a -> a.equalsIgnoreCase("cognito"));
     return getRegisteredProvidersTypes(context.getEnvironment()).stream().anyMatch(a -> a.equalsIgnoreCase("cognito"));
   }
   }
-}
+}

+ 54 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java

@@ -2,6 +2,9 @@ package com.provectus.kafka.ui.controller;
 
 
 import com.provectus.kafka.ui.api.AclsApi;
 import com.provectus.kafka.ui.api.AclsApi;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
+import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
+import com.provectus.kafka.ui.model.CreateProducerAclDTO;
+import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
 import com.provectus.kafka.ui.model.KafkaAclDTO;
 import com.provectus.kafka.ui.model.KafkaAclDTO;
 import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
 import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
 import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
 import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
@@ -123,4 +126,55 @@ public class AclsController extends AbstractController implements AclsApi {
         .doOnEach(sig -> auditService.audit(context, sig))
         .doOnEach(sig -> auditService.audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
         .thenReturn(ResponseEntity.ok().build());
   }
   }
+
+  @Override
+  public Mono<ResponseEntity<Void>> createConsumerAcl(String clusterName,
+                                                      Mono<CreateConsumerAclDTO> createConsumerAclDto,
+                                                      ServerWebExchange exchange) {
+    AccessContext context = AccessContext.builder()
+        .cluster(clusterName)
+        .aclActions(AclAction.EDIT)
+        .operationName("createConsumerAcl")
+        .build();
+
+    return accessControlService.validateAccess(context)
+        .then(createConsumerAclDto)
+        .flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
+  }
+
+  @Override
+  public Mono<ResponseEntity<Void>> createProducerAcl(String clusterName,
+                                                      Mono<CreateProducerAclDTO> createProducerAclDto,
+                                                      ServerWebExchange exchange) {
+    AccessContext context = AccessContext.builder()
+        .cluster(clusterName)
+        .aclActions(AclAction.EDIT)
+        .operationName("createProducerAcl")
+        .build();
+
+    return accessControlService.validateAccess(context)
+        .then(createProducerAclDto)
+        .flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
+  }
+
+  @Override
+  public Mono<ResponseEntity<Void>> createStreamAppAcl(String clusterName,
+                                                       Mono<CreateStreamAppAclDTO> createStreamAppAclDto,
+                                                       ServerWebExchange exchange) {
+    AccessContext context = AccessContext.builder()
+        .cluster(clusterName)
+        .aclActions(AclAction.EDIT)
+        .operationName("createStreamAppAcl")
+        .build();
+
+    return accessControlService.validateAccess(context)
+        .then(createStreamAppAclDto)
+        .flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
+  }
 }
 }

+ 7 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -2,37 +2,28 @@ package com.provectus.kafka.ui.emitter;
 
 
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.time.Duration;
 import java.time.Duration;
-import java.time.Instant;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.FluxSink;
 
 
 public abstract class AbstractEmitter {
 public abstract class AbstractEmitter {
 
 
   private final MessagesProcessing messagesProcessing;
   private final MessagesProcessing messagesProcessing;
-  private final PollingThrottler throttler;
   protected final PollingSettings pollingSettings;
   protected final PollingSettings pollingSettings;
 
 
   protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
   protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
     this.messagesProcessing = messagesProcessing;
     this.messagesProcessing = messagesProcessing;
     this.pollingSettings = pollingSettings;
     this.pollingSettings = pollingSettings;
-    this.throttler = pollingSettings.getPollingThrottler();
   }
   }
 
 
-  protected ConsumerRecords<Bytes, Bytes> poll(
-      FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
+  protected PolledRecords poll(
+      FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
     return poll(sink, consumer, pollingSettings.getPollTimeout());
     return poll(sink, consumer, pollingSettings.getPollTimeout());
   }
   }
 
 
-  protected ConsumerRecords<Bytes, Bytes> poll(
-      FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
-    Instant start = Instant.now();
-    ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
-    Instant finish = Instant.now();
-    int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
-    throttler.throttleAfterPoll(polledBytes);
+  protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
+    var records = consumer.pollEnhanced(timeout);
+    sendConsuming(sink, records);
     return records;
     return records;
   }
   }
 
 
@@ -49,10 +40,8 @@ public abstract class AbstractEmitter {
     messagesProcessing.sendPhase(sink, name);
     messagesProcessing.sendPhase(sink, name);
   }
   }
 
 
-  protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
-                              ConsumerRecords<Bytes, Bytes> records,
-                              long elapsed) {
-    return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
+  protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
+    messagesProcessing.sentConsumingInfo(sink, records);
   }
   }
 
 
   protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
   protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {

+ 7 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -9,9 +9,7 @@ import java.util.List;
 import java.util.TreeMap;
 import java.util.TreeMap;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Bytes;
@@ -22,12 +20,12 @@ public class BackwardRecordEmitter
     extends AbstractEmitter
     extends AbstractEmitter
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
 
-  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition consumerPosition;
   private final ConsumerPosition consumerPosition;
   private final int messagesPerPage;
   private final int messagesPerPage;
 
 
   public BackwardRecordEmitter(
   public BackwardRecordEmitter(
-      Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+      Supplier<EnhancedConsumer> consumerSupplier,
       ConsumerPosition consumerPosition,
       ConsumerPosition consumerPosition,
       int messagesPerPage,
       int messagesPerPage,
       MessagesProcessing messagesProcessing,
       MessagesProcessing messagesProcessing,
@@ -41,7 +39,7 @@ public class BackwardRecordEmitter
   @Override
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting backward polling for {}", consumerPosition);
     log.debug("Starting backward polling for {}", consumerPosition);
-    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
       sendPhase(sink, "Created consumer");
       sendPhase(sink, "Created consumer");
 
 
       var seekOperations = SeekOperations.create(consumer, consumerPosition);
       var seekOperations = SeekOperations.create(consumer, consumerPosition);
@@ -91,7 +89,7 @@ public class BackwardRecordEmitter
       TopicPartition tp,
       TopicPartition tp,
       long fromOffset,
       long fromOffset,
       long toOffset,
       long toOffset,
-      Consumer<Bytes, Bytes> consumer,
+      EnhancedConsumer consumer,
       FluxSink<TopicMessageEventDTO> sink
       FluxSink<TopicMessageEventDTO> sink
   ) {
   ) {
     consumer.assign(Collections.singleton(tp));
     consumer.assign(Collections.singleton(tp));
@@ -101,13 +99,13 @@ public class BackwardRecordEmitter
 
 
     var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
     var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
 
 
-    EmptyPollsCounter emptyPolls  = pollingSettings.createEmptyPollsCounter();
+    EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
     while (!sink.isCancelled()
     while (!sink.isCancelled()
         && !sendLimitReached()
         && !sendLimitReached()
         && recordsToSend.size() < desiredMsgsToPoll
         && recordsToSend.size() < desiredMsgsToPoll
         && !emptyPolls.noDataEmptyPollsReached()) {
         && !emptyPolls.noDataEmptyPollsReached()) {
       var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
       var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
-      emptyPolls.count(polledRecords);
+      emptyPolls.count(polledRecords.count());
 
 
       log.debug("{} records polled from {}", polledRecords.count(), tp);
       log.debug("{} records polled from {}", polledRecords.count(), tp);
 
 
@@ -115,7 +113,7 @@ public class BackwardRecordEmitter
           .filter(r -> r.offset() < toOffset)
           .filter(r -> r.offset() < toOffset)
           .toList();
           .toList();
 
 
-      if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
+      if (polledRecords.count() > 0 && filteredRecords.isEmpty()) {
         // we already read all messages in target offsets interval
         // we already read all messages in target offsets interval
         break;
         break;
       }
       }

+ 4 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -2,9 +2,6 @@ package com.provectus.kafka.ui.emitter;
 
 
 import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
 import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.FluxSink;
 
 
 class ConsumingStats {
 class ConsumingStats {
@@ -13,23 +10,17 @@ class ConsumingStats {
   private int records = 0;
   private int records = 0;
   private long elapsed = 0;
   private long elapsed = 0;
 
 
-  /**
-   * returns bytes polled.
-   */
-  int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
-                        ConsumerRecords<Bytes, Bytes> polledRecords,
-                        long elapsed,
+  void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
+                        PolledRecords polledRecords,
                         int filterApplyErrors) {
                         int filterApplyErrors) {
-    int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
-    bytes += polledBytes;
+    bytes += polledRecords.bytes();
     this.records += polledRecords.count();
     this.records += polledRecords.count();
-    this.elapsed += elapsed;
+    this.elapsed += polledRecords.elapsed().toMillis();
     sink.next(
     sink.next(
         new TopicMessageEventDTO()
         new TopicMessageEventDTO()
             .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
             .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
             .consuming(createConsumingStats(sink, filterApplyErrors))
             .consuming(createConsumingStats(sink, filterApplyErrors))
     );
     );
-    return polledBytes;
   }
   }
 
 
   void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {
   void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java

@@ -17,8 +17,8 @@ public class EmptyPollsCounter {
     this.maxEmptyPolls = maxEmptyPolls;
     this.maxEmptyPolls = maxEmptyPolls;
   }
   }
 
 
-  public void count(ConsumerRecords<?, ?> polled) {
-    emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
+  public void count(int polledCount) {
+    emptyPolls = polledCount == 0 ? emptyPolls + 1 : 0;
   }
   }
 
 
   public boolean noDataEmptyPollsReached() {
   public boolean noDataEmptyPollsReached() {

+ 82 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EnhancedConsumer.java

@@ -0,0 +1,82 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Delegate;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+
+
+public class EnhancedConsumer extends KafkaConsumer<Bytes, Bytes> {
+
+  private final PollingThrottler throttler;
+  private final ApplicationMetrics metrics;
+  private String pollingTopic;
+
+  public EnhancedConsumer(Properties properties,
+                          PollingThrottler throttler,
+                          ApplicationMetrics metrics) {
+    super(properties, new BytesDeserializer(), new BytesDeserializer());
+    this.throttler = throttler;
+    this.metrics = metrics;
+    metrics.activeConsumers().incrementAndGet();
+  }
+
+  public PolledRecords pollEnhanced(Duration dur) {
+    var stopwatch = Stopwatch.createStarted();
+    ConsumerRecords<Bytes, Bytes> polled = poll(dur);
+    PolledRecords polledEnhanced = PolledRecords.create(polled, stopwatch.elapsed());
+    var throttled = throttler.throttleAfterPoll(polledEnhanced.bytes());
+    metrics.meterPolledRecords(pollingTopic, polledEnhanced, throttled);
+    return polledEnhanced;
+  }
+
+  @Override
+  public void assign(Collection<TopicPartition> partitions) {
+    super.assign(partitions);
+    Set<String> assignedTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
+    Preconditions.checkState(assignedTopics.size() == 1);
+    this.pollingTopic = assignedTopics.iterator().next();
+  }
+
+  @Override
+  public void subscribe(Pattern pattern) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void subscribe(Collection<String> topics) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close(Duration timeout) {
+    metrics.activeConsumers().decrementAndGet();
+    super.close(timeout);
+  }
+
+}

+ 5 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -5,8 +5,6 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.FluxSink;
@@ -16,11 +14,11 @@ public class ForwardRecordEmitter
     extends AbstractEmitter
     extends AbstractEmitter
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
 
-  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition position;
   private final ConsumerPosition position;
 
 
   public ForwardRecordEmitter(
   public ForwardRecordEmitter(
-      Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+      Supplier<EnhancedConsumer> consumerSupplier,
       ConsumerPosition position,
       ConsumerPosition position,
       MessagesProcessing messagesProcessing,
       MessagesProcessing messagesProcessing,
       PollingSettings pollingSettings) {
       PollingSettings pollingSettings) {
@@ -32,7 +30,7 @@ public class ForwardRecordEmitter
   @Override
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting forward polling for {}", position);
     log.debug("Starting forward polling for {}", position);
-    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
       sendPhase(sink, "Assigning partitions");
       sendPhase(sink, "Assigning partitions");
       var seekOperations = SeekOperations.create(consumer, position);
       var seekOperations = SeekOperations.create(consumer, position);
       seekOperations.assignAndSeekNonEmptyPartitions();
       seekOperations.assignAndSeekNonEmptyPartitions();
@@ -44,8 +42,8 @@ public class ForwardRecordEmitter
           && !emptyPolls.noDataEmptyPollsReached()) {
           && !emptyPolls.noDataEmptyPollsReached()) {
 
 
         sendPhase(sink, "Polling");
         sendPhase(sink, "Polling");
-        ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
-        emptyPolls.count(records);
+        var records = poll(sink, consumer);
+        emptyPolls.count(records.count());
 
 
         log.debug("{} records polled", records.count());
         log.debug("{} records polled", records.count());
 
 

+ 2 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -8,7 +8,6 @@ import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.FluxSink;
 
 
@@ -54,13 +53,10 @@ public class MessagesProcessing {
     }
     }
   }
   }
 
 
-  int sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink,
-                        ConsumerRecords<Bytes, Bytes> polledRecords,
-                        long elapsed) {
+  void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
     if (!sink.isCancelled()) {
     if (!sink.isCancelled()) {
-      return consumingStats.sendConsumingEvt(sink, polledRecords, elapsed, filterApplyErrors);
+      consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);
     }
     }
-    return 0;
   }
   }
 
 
   void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
   void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {

+ 48 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java

@@ -0,0 +1,48 @@
+package com.provectus.kafka.ui.emitter;
+
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.utils.Bytes;
+
+public record PolledRecords(int count,
+                            int bytes,
+                            Duration elapsed,
+                            ConsumerRecords<Bytes, Bytes> records) implements Iterable<ConsumerRecord<Bytes, Bytes>> {
+
+  static PolledRecords create(ConsumerRecords<Bytes, Bytes> polled, Duration pollDuration) {
+    return new PolledRecords(
+        polled.count(),
+        calculatePolledRecSize(polled),
+        pollDuration,
+        polled
+    );
+  }
+
+  public List<ConsumerRecord<Bytes, Bytes>> records(TopicPartition tp) {
+    return records.records(tp);
+  }
+
+  @Override
+  public Iterator<ConsumerRecord<Bytes, Bytes>> iterator() {
+    return records.iterator();
+  }
+
+  private static int calculatePolledRecSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
+    int polledBytes = 0;
+    for (ConsumerRecord<Bytes, Bytes> rec : recs) {
+      for (Header header : rec.headers()) {
+        polledBytes +=
+            (header.key() != null ? header.key().getBytes().length : 0)
+                + (header.value() != null ? header.value().length : 0);
+      }
+      polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
+      polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
+    }
+    return polledBytes;
+  }
+}

+ 4 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java

@@ -3,11 +3,8 @@ package com.provectus.kafka.ui.emitter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.config.ClustersProperties;
-import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.utils.Bytes;
 
 
 @Slf4j
 @Slf4j
 public class PollingThrottler {
 public class PollingThrottler {
@@ -36,18 +33,17 @@ public class PollingThrottler {
     return new PollingThrottler("noop", RateLimiter.create(Long.MAX_VALUE));
     return new PollingThrottler("noop", RateLimiter.create(Long.MAX_VALUE));
   }
   }
 
 
-  public void throttleAfterPoll(int polledBytes) {
+  //returns true if polling was throttled
+  public boolean throttleAfterPoll(int polledBytes) {
     if (polledBytes > 0) {
     if (polledBytes > 0) {
       double sleptSeconds = rateLimiter.acquire(polledBytes);
       double sleptSeconds = rateLimiter.acquire(polledBytes);
       if (!throttled && sleptSeconds > 0.0) {
       if (!throttled && sleptSeconds > 0.0) {
         throttled = true;
         throttled = true;
         log.debug("Polling throttling enabled for cluster {} at rate {} bytes/sec", clusterName, rateLimiter.getRate());
         log.debug("Polling throttling enabled for cluster {} at rate {} bytes/sec", clusterName, rateLimiter.getRate());
+        return true;
       }
       }
     }
     }
-  }
-
-  public void throttleAfterPoll(ConsumerRecords<Bytes, Bytes> polled) {
-    throttleAfterPoll(ConsumerRecordsUtil.calculatePolledSize(polled));
+    return false;
   }
   }
 
 
 }
 }

+ 4 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -5,19 +5,17 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.FluxSink;
 
 
 @Slf4j
 @Slf4j
 public class TailingEmitter extends AbstractEmitter
 public class TailingEmitter extends AbstractEmitter
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
 
-  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition consumerPosition;
   private final ConsumerPosition consumerPosition;
 
 
-  public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+  public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
                         ConsumerPosition consumerPosition,
                         ConsumerPosition consumerPosition,
                         MessagesProcessing messagesProcessing,
                         MessagesProcessing messagesProcessing,
                         PollingSettings pollingSettings) {
                         PollingSettings pollingSettings) {
@@ -29,7 +27,7 @@ public class TailingEmitter extends AbstractEmitter
   @Override
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting tailing polling for {}", consumerPosition);
     log.debug("Starting tailing polling for {}", consumerPosition);
-    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
       assignAndSeek(consumer);
       assignAndSeek(consumer);
       while (!sink.isCancelled()) {
       while (!sink.isCancelled()) {
         sendPhase(sink, "Polling");
         sendPhase(sink, "Polling");
@@ -47,7 +45,7 @@ public class TailingEmitter extends AbstractEmitter
     }
     }
   }
   }
 
 
-  private void assignAndSeek(KafkaConsumer<Bytes, Bytes> consumer) {
+  private void assignAndSeek(EnhancedConsumer consumer) {
     var seekOperations = SeekOperations.create(consumer, consumerPosition);
     var seekOperations = SeekOperations.create(consumer, consumerPosition);
     var seekOffsets = new HashMap<>(seekOperations.getEndOffsets()); // defaulting offsets to topic end
     var seekOffsets = new HashMap<>(seekOperations.getEndOffsets()); // defaulting offsets to topic end
     seekOffsets.putAll(seekOperations.getOffsetsForSeek()); // this will only set non-empty partitions
     seekOffsets.putAll(seekOperations.getOffsetsForSeek()); // this will only set non-empty partitions

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java

@@ -34,7 +34,7 @@ public interface KafkaConnectMapper {
       com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse
       com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse
           connectorPluginConfigValidationResponse);
           connectorPluginConfigValidationResponse);
 
 
-  default FullConnectorInfoDTO fullConnectorInfoFromTuple(InternalConnectInfo connectInfo) {
+  default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) {
     ConnectorDTO connector = connectInfo.getConnector();
     ConnectorDTO connector = connectInfo.getConnector();
     List<TaskDTO> tasks = connectInfo.getTasks();
     List<TaskDTO> tasks = connectInfo.getTasks();
     int failedTasksCount = (int) tasks.stream()
     int failedTasksCount = (int) tasks.stream()

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java

@@ -16,6 +16,7 @@ import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
 import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
 import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import java.util.Arrays;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.List;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
@@ -73,6 +74,10 @@ public class Permission {
   }
   }
 
 
   private List<String> getAllActionValues() {
   private List<String> getAllActionValues() {
+    if (resource == null) {
+      return Collections.emptyList();
+    }
+
     return switch (this.resource) {
     return switch (this.resource) {
       case APPLICATIONCONFIG -> Arrays.stream(ApplicationConfigAction.values()).map(Enum::toString).toList();
       case APPLICATIONCONFIG -> Arrays.stream(ApplicationConfigAction.values()).map(Enum::toString).toList();
       case CLUSTERCONFIG -> Arrays.stream(ClusterConfigAction.values()).map(Enum::toString).toList();
       case CLUSTERCONFIG -> Arrays.stream(ClusterConfigAction.values()).map(Enum::toString).toList();

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

@@ -12,6 +12,7 @@ import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde;
 import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde;
 import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
 import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
 import com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde;
 import com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde;
+import com.provectus.kafka.ui.serdes.builtin.HexSerde;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
 import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
 import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
@@ -47,6 +48,7 @@ public class SerdesInitializer {
             .put(UInt64Serde.name(), UInt64Serde.class)
             .put(UInt64Serde.name(), UInt64Serde.class)
             .put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
             .put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
             .put(Base64Serde.name(), Base64Serde.class)
             .put(Base64Serde.name(), Base64Serde.class)
+            .put(HexSerde.name(), HexSerde.class)
             .put(UuidBinarySerde.name(), UuidBinarySerde.class)
             .put(UuidBinarySerde.name(), UuidBinarySerde.class)
             .build(),
             .build(),
         new CustomSerdeLoader()
         new CustomSerdeLoader()

+ 0 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/AvroEmbeddedSerde.java

@@ -19,12 +19,6 @@ public class AvroEmbeddedSerde implements BuiltInSerde {
     return "Avro (Embedded)";
     return "Avro (Embedded)";
   }
   }
 
 
-  @Override
-  public void configure(PropertyResolver serdeProperties,
-                        PropertyResolver kafkaClusterProperties,
-                        PropertyResolver globalProperties) {
-  }
-
   @Override
   @Override
   public Optional<String> getDescription() {
   public Optional<String> getDescription() {
     return Optional.empty();
     return Optional.empty();

+ 9 - 23
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Base64Serde.java

@@ -1,8 +1,6 @@
 package com.provectus.kafka.ui.serdes.builtin;
 package com.provectus.kafka.ui.serdes.builtin;
 
 
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
-import com.provectus.kafka.ui.serde.api.PropertyResolver;
-import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import java.util.Base64;
 import java.util.Base64;
@@ -16,12 +14,6 @@ public class Base64Serde implements BuiltInSerde {
     return "Base64";
     return "Base64";
   }
   }
 
 
-  @Override
-  public void configure(PropertyResolver serdeProperties,
-                        PropertyResolver kafkaClusterProperties,
-                        PropertyResolver globalProperties) {
-  }
-
   @Override
   @Override
   public Optional<String> getDescription() {
   public Optional<String> getDescription() {
     return Optional.empty();
     return Optional.empty();
@@ -44,31 +36,25 @@ public class Base64Serde implements BuiltInSerde {
 
 
   @Override
   @Override
   public Serializer serializer(String topic, Target type) {
   public Serializer serializer(String topic, Target type) {
-    return new Serializer() {
-      @Override
-      public byte[] serialize(String input) {
-        input = input.trim();
-        // it is actually a hack to provide ability to sent empty array as a key/value
-        if (input.length() == 0) {
-          return new byte[]{};
-        }
-        return Base64.getDecoder().decode(input);
+    var decoder = Base64.getDecoder();
+    return inputString -> {
+      inputString = inputString.trim();
+      // it is actually a hack to provide ability to sent empty array as a key/value
+      if (inputString.length() == 0) {
+        return new byte[] {};
       }
       }
+      return decoder.decode(inputString);
     };
     };
   }
   }
 
 
   @Override
   @Override
   public Deserializer deserializer(String topic, Target type) {
   public Deserializer deserializer(String topic, Target type) {
     var encoder = Base64.getEncoder();
     var encoder = Base64.getEncoder();
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        return new DeserializeResult(
+    return (headers, data) ->
+        new DeserializeResult(
             encoder.encodeToString(data),
             encoder.encodeToString(data),
             DeserializeResult.Type.STRING,
             DeserializeResult.Type.STRING,
             Map.of()
             Map.of()
         );
         );
-      }
-    };
   }
   }
 }
 }

+ 89 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/HexSerde.java

@@ -0,0 +1,89 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.PropertyResolver;
+import com.provectus.kafka.ui.serde.api.SchemaDescription;
+import com.provectus.kafka.ui.serdes.BuiltInSerde;
+import java.util.HexFormat;
+import java.util.Map;
+import java.util.Optional;
+
+public class HexSerde implements BuiltInSerde {
+
+  private HexFormat deserializeHexFormat;
+
+  public static String name() {
+    return "Hex";
+  }
+
+  @Override
+  public void autoConfigure(PropertyResolver kafkaClusterProperties, PropertyResolver globalProperties) {
+    configure(" ", true);
+  }
+
+  @Override
+  public void configure(PropertyResolver serdeProperties,
+                        PropertyResolver kafkaClusterProperties,
+                        PropertyResolver globalProperties) {
+    String delim = serdeProperties.getProperty("delimiter", String.class).orElse(" ");
+    boolean uppercase = serdeProperties.getProperty("uppercase", Boolean.class).orElse(true);
+    configure(delim, uppercase);
+  }
+
+  private void configure(String delim, boolean uppercase) {
+    deserializeHexFormat = HexFormat.ofDelimiter(delim);
+    if (uppercase) {
+      deserializeHexFormat = deserializeHexFormat.withUpperCase();
+    }
+  }
+
+  @Override
+  public Optional<String> getDescription() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<SchemaDescription> getSchema(String topic, Target type) {
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean canDeserialize(String topic, Target type) {
+    return true;
+  }
+
+  @Override
+  public boolean canSerialize(String topic, Target type) {
+    return true;
+  }
+
+  @Override
+  public Serializer serializer(String topic, Target type) {
+    return input -> {
+      input = input.trim();
+      // it is a hack to provide ability to sent empty array as a key/value
+      if (input.length() == 0) {
+        return new byte[] {};
+      }
+      return HexFormat.of().parseHex(prepareInputForParse(input));
+    };
+  }
+
+  // removing most-common delimiters and prefixes
+  private static String prepareInputForParse(String input) {
+    return input
+        .replaceAll(" ", "")
+        .replaceAll("#", "")
+        .replaceAll(":", "");
+  }
+
+  @Override
+  public Deserializer deserializer(String topic, Target type) {
+    return (headers, data) ->
+        new DeserializeResult(
+            deserializeHexFormat.formatHex(data),
+            DeserializeResult.Type.STRING,
+            Map.of()
+        );
+  }
+}

+ 2 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Int64Serde.java

@@ -55,15 +55,11 @@ public class Int64Serde implements BuiltInSerde {
 
 
   @Override
   @Override
   public Deserializer deserializer(String topic, Target type) {
   public Deserializer deserializer(String topic, Target type) {
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        return new DeserializeResult(
+    return (headers, data) ->
+        new DeserializeResult(
             String.valueOf(Longs.fromByteArray(data)),
             String.valueOf(Longs.fromByteArray(data)),
             DeserializeResult.Type.JSON,
             DeserializeResult.Type.JSON,
             Map.of()
             Map.of()
         );
         );
-      }
-    };
   }
   }
 }
 }

+ 3 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UInt64Serde.java

@@ -1,10 +1,8 @@
 package com.provectus.kafka.ui.serdes.builtin;
 package com.provectus.kafka.ui.serdes.builtin;
 
 
 import com.google.common.primitives.Longs;
 import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedInteger;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.primitives.UnsignedLong;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
-import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import java.util.Map;
 import java.util.Map;
@@ -32,7 +30,7 @@ public class UInt64Serde implements BuiltInSerde {
                     + "  \"minimum\" : 0, "
                     + "  \"minimum\" : 0, "
                     + "  \"maximum\" : %s "
                     + "  \"maximum\" : %s "
                     + "}",
                     + "}",
-                UnsignedInteger.MAX_VALUE
+                UnsignedLong.MAX_VALUE
             ),
             ),
             Map.of()
             Map.of()
         )
         )
@@ -56,15 +54,11 @@ public class UInt64Serde implements BuiltInSerde {
 
 
   @Override
   @Override
   public Deserializer deserializer(String topic, Target type) {
   public Deserializer deserializer(String topic, Target type) {
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        return new DeserializeResult(
+    return (headers, data) ->
+        new DeserializeResult(
             UnsignedLong.fromLongBits(Longs.fromByteArray(data)).toString(),
             UnsignedLong.fromLongBits(Longs.fromByteArray(data)).toString(),
             DeserializeResult.Type.JSON,
             DeserializeResult.Type.JSON,
             Map.of()
             Map.of()
         );
         );
-      }
-    };
   }
   }
 }
 }

+ 22 - 28
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UuidBinarySerde.java

@@ -50,41 +50,35 @@ public class UuidBinarySerde implements BuiltInSerde {
 
 
   @Override
   @Override
   public Serializer serializer(String topic, Target type) {
   public Serializer serializer(String topic, Target type) {
-    return new Serializer() {
-      @Override
-      public byte[] serialize(String input) {
-        UUID uuid = UUID.fromString(input);
-        ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
-        if (mostSignificantBitsFirst) {
-          bb.putLong(uuid.getMostSignificantBits());
-          bb.putLong(uuid.getLeastSignificantBits());
-        } else {
-          bb.putLong(uuid.getLeastSignificantBits());
-          bb.putLong(uuid.getMostSignificantBits());
-        }
-        return bb.array();
+    return input -> {
+      UUID uuid = UUID.fromString(input);
+      ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
+      if (mostSignificantBitsFirst) {
+        bb.putLong(uuid.getMostSignificantBits());
+        bb.putLong(uuid.getLeastSignificantBits());
+      } else {
+        bb.putLong(uuid.getLeastSignificantBits());
+        bb.putLong(uuid.getMostSignificantBits());
       }
       }
+      return bb.array();
     };
     };
   }
   }
 
 
   @Override
   @Override
   public Deserializer deserializer(String topic, Target type) {
   public Deserializer deserializer(String topic, Target type) {
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        if (data.length != 16) {
-          throw new ValidationException("UUID data should be 16 bytes, but it is " + data.length);
-        }
-        ByteBuffer bb = ByteBuffer.wrap(data);
-        long msb = bb.getLong();
-        long lsb = bb.getLong();
-        UUID uuid = mostSignificantBitsFirst ? new UUID(msb, lsb) : new UUID(lsb, msb);
-        return new DeserializeResult(
-            uuid.toString(),
-            DeserializeResult.Type.STRING,
-            Map.of()
-        );
+    return (headers, data) -> {
+      if (data.length != 16) {
+        throw new ValidationException("UUID data should be 16 bytes, but it is " + data.length);
       }
       }
+      ByteBuffer bb = ByteBuffer.wrap(data);
+      long msb = bb.getLong();
+      long lsb = bb.getLong();
+      UUID uuid = mostSignificantBitsFirst ? new UUID(msb, lsb) : new UUID(lsb, msb);
+      return new DeserializeResult(
+          uuid.toString(),
+          DeserializeResult.Type.STRING,
+          Map.of()
+      );
     };
     };
   }
   }
 }
 }

+ 10 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java

@@ -2,12 +2,14 @@ package com.provectus.kafka.ui.service;
 
 
 import com.google.common.collect.Streams;
 import com.google.common.collect.Streams;
 import com.google.common.collect.Table;
 import com.google.common.collect.Table;
+import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
 import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
 import com.provectus.kafka.ui.model.InternalConsumerGroup;
 import com.provectus.kafka.ui.model.InternalConsumerGroup;
 import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
 import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.SortOrderDTO;
 import com.provectus.kafka.ui.model.SortOrderDTO;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
@@ -26,11 +28,8 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
@@ -248,25 +247,27 @@ public class ConsumerGroupService {
         .flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)));
         .flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)));
   }
   }
 
 
-  public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
+  public EnhancedConsumer createConsumer(KafkaCluster cluster) {
     return createConsumer(cluster, Map.of());
     return createConsumer(cluster, Map.of());
   }
   }
 
 
-  public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster,
-                                                    Map<String, Object> properties) {
+  public EnhancedConsumer createConsumer(KafkaCluster cluster,
+                                         Map<String, Object> properties) {
     Properties props = new Properties();
     Properties props = new Properties();
     SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
     SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
     props.putAll(cluster.getProperties());
     props.putAll(cluster.getProperties());
     props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
     props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
     props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
     props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
     props.putAll(properties);
     props.putAll(properties);
 
 
-    return new KafkaConsumer<>(props);
+    return new EnhancedConsumer(
+        props,
+        cluster.getPollingSettings().getPollingThrottler(),
+        ApplicationMetrics.forCluster(cluster)
+    );
   }
   }
 
 
 }
 }

+ 21 - 35
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Optional;
 import java.util.function.Predicate;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
@@ -39,7 +38,6 @@ import org.springframework.stereotype.Service;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuples;
 
 
 @Service
 @Service
 @Slf4j
 @Slf4j
@@ -61,39 +59,22 @@ public class KafkaConnectService {
   public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
   public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
                                                      @Nullable final String search) {
                                                      @Nullable final String search) {
     return getConnects(cluster)
     return getConnects(cluster)
-        .flatMap(connect -> getConnectorNames(cluster, connect.getName()).map(cn -> Tuples.of(connect.getName(), cn)))
-        .flatMap(pair -> getConnector(cluster, pair.getT1(), pair.getT2()))
-        .flatMap(connector ->
-            getConnectorConfig(cluster, connector.getConnect(), connector.getName())
-                .map(config -> InternalConnectInfo.builder()
-                    .connector(connector)
-                    .config(config)
-                    .build()
-                )
-        )
-        .flatMap(connectInfo -> {
-          ConnectorDTO connector = connectInfo.getConnector();
-          return getConnectorTasks(cluster, connector.getConnect(), connector.getName())
-              .collectList()
-              .map(tasks -> InternalConnectInfo.builder()
-                  .connector(connector)
-                  .config(connectInfo.getConfig())
-                  .tasks(tasks)
-                  .build()
-              );
-        })
-        .flatMap(connectInfo -> {
-          ConnectorDTO connector = connectInfo.getConnector();
-          return getConnectorTopics(cluster, connector.getConnect(), connector.getName())
-              .map(ct -> InternalConnectInfo.builder()
-                  .connector(connector)
-                  .config(connectInfo.getConfig())
-                  .tasks(connectInfo.getTasks())
-                  .topics(ct.getTopics())
-                  .build()
-              );
-        })
-        .map(kafkaConnectMapper::fullConnectorInfoFromTuple)
+        .flatMap(connect ->
+            getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
+                .flatMap(connectorName ->
+                    Mono.zip(
+                        getConnector(cluster, connect.getName(), connectorName),
+                        getConnectorConfig(cluster, connect.getName(), connectorName),
+                        getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
+                        getConnectorTopics(cluster, connect.getName(), connectorName)
+                    ).map(tuple ->
+                        InternalConnectInfo.builder()
+                            .connector(tuple.getT1())
+                            .config(tuple.getT2())
+                            .tasks(tuple.getT3())
+                            .topics(tuple.getT4().getTopics())
+                            .build())))
+        .map(kafkaConnectMapper::fullConnectorInfo)
         .filter(matchesSearchTerm(search));
         .filter(matchesSearchTerm(search));
   }
   }
 
 
@@ -132,6 +113,11 @@ public class KafkaConnectService {
         .flatMapMany(Flux::fromIterable);
         .flatMapMany(Flux::fromIterable);
   }
   }
 
 
+  // returns empty flux if there was an error communicating with Connect
+  public Flux<String> getConnectorNamesWithErrorsSuppress(KafkaCluster cluster, String connectName) {
+    return getConnectorNames(cluster, connectName).onErrorComplete();
+  }
+
   @SneakyThrows
   @SneakyThrows
   private List<String> parseConnectorsNamesStringToList(String json) {
   private List<String> parseConnectorsNamesStringToList(String json) {
     return objectMapper.readValue(json, new TypeReference<>() {
     return objectMapper.readValue(json, new TypeReference<>() {

+ 37 - 30
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -15,6 +15,8 @@ import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.util.KafkaVersion;
 import com.provectus.kafka.ui.util.KafkaVersion;
 import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
 import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
 import java.io.Closeable;
 import java.io.Closeable;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -129,38 +131,41 @@ public class ReactiveAdminClient implements Closeable {
                                    Set<SupportedFeature> features,
                                    Set<SupportedFeature> features,
                                    boolean topicDeletionIsAllowed) {
                                    boolean topicDeletionIsAllowed) {
 
 
-    private static Mono<ConfigRelatedInfo> extract(AdminClient ac, int controllerId) {
-      return loadBrokersConfig(ac, List.of(controllerId))
-          .map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(controllerId))
-          .flatMap(configs -> {
-            String version = "1.0-UNKNOWN";
-            boolean topicDeletionEnabled = true;
-            for (ConfigEntry entry : configs) {
-              if (entry.name().contains("inter.broker.protocol.version")) {
-                version = entry.value();
-              }
-              if (entry.name().equals("delete.topic.enable")) {
-                topicDeletionEnabled = Boolean.parseBoolean(entry.value());
-              }
-            }
-            var builder = ConfigRelatedInfo.builder()
-                .version(version)
-                .topicDeletionIsAllowed(topicDeletionEnabled);
-            return SupportedFeature.forVersion(ac, version)
-                .map(features -> builder.features(features).build());
-          });
+    static final Duration UPDATE_DURATION = Duration.of(1, ChronoUnit.HOURS);
+
+    private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
+      return ReactiveAdminClient.describeClusterImpl(ac, Set.of())
+          .flatMap(desc -> {
+            // choosing node from which we will get configs (starting with controller)
+            var targetNodeId = Optional.ofNullable(desc.controller)
+                .map(Node::id)
+                .orElse(desc.getNodes().iterator().next().id());
+            return loadBrokersConfig(ac, List.of(targetNodeId))
+                .map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(targetNodeId))
+                .flatMap(configs -> {
+                  String version = "1.0-UNKNOWN";
+                  boolean topicDeletionEnabled = true;
+                  for (ConfigEntry entry : configs) {
+                    if (entry.name().contains("inter.broker.protocol.version")) {
+                      version = entry.value();
+                    }
+                    if (entry.name().equals("delete.topic.enable")) {
+                      topicDeletionEnabled = Boolean.parseBoolean(entry.value());
+                    }
+                  }
+                  final String finalVersion = version;
+                  final boolean finalTopicDeletionEnabled = topicDeletionEnabled;
+                  return SupportedFeature.forVersion(ac, version)
+                      .map(features -> new ConfigRelatedInfo(finalVersion, features, finalTopicDeletionEnabled));
+                });
+          })
+          .cache(UPDATE_DURATION);
     }
     }
   }
   }
 
 
   public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
   public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
-    return describeClusterImpl(adminClient, Set.of())
-        // choosing node from which we will get configs (starting with controller)
-        .flatMap(descr -> descr.controller != null
-            ? Mono.just(descr.controller)
-            : Mono.justOrEmpty(descr.nodes.stream().findFirst())
-        )
-        .flatMap(node -> ConfigRelatedInfo.extract(adminClient, node.id()))
-        .map(info -> new ReactiveAdminClient(adminClient, info));
+    Mono<ConfigRelatedInfo> configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient);
+    return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info));
   }
   }
 
 
 
 
@@ -170,7 +175,7 @@ public class ReactiveAdminClient implements Closeable {
         .doOnError(th -> !(th instanceof SecurityDisabledException)
         .doOnError(th -> !(th instanceof SecurityDisabledException)
                 && !(th instanceof InvalidRequestException)
                 && !(th instanceof InvalidRequestException)
                 && !(th instanceof UnsupportedVersionException),
                 && !(th instanceof UnsupportedVersionException),
-            th -> log.warn("Error checking if security enabled", th))
+            th -> log.debug("Error checking if security enabled", th))
         .onErrorReturn(false);
         .onErrorReturn(false);
   }
   }
 
 
@@ -202,6 +207,8 @@ public class ReactiveAdminClient implements Closeable {
 
 
   @Getter(AccessLevel.PACKAGE) // visible for testing
   @Getter(AccessLevel.PACKAGE) // visible for testing
   private final AdminClient client;
   private final AdminClient client;
+  private final Mono<ConfigRelatedInfo> configRelatedInfoMono;
+
   private volatile ConfigRelatedInfo configRelatedInfo;
   private volatile ConfigRelatedInfo configRelatedInfo;
 
 
   public Set<SupportedFeature> getClusterFeatures() {
   public Set<SupportedFeature> getClusterFeatures() {
@@ -228,7 +235,7 @@ public class ReactiveAdminClient implements Closeable {
     if (controller == null) {
     if (controller == null) {
       return Mono.empty();
       return Mono.empty();
     }
     }
-    return ConfigRelatedInfo.extract(client, controller.id())
+    return configRelatedInfoMono
         .doOnNext(info -> this.configRelatedInfo = info)
         .doOnNext(info -> this.configRelatedInfo = info)
         .then();
         .then();
   }
   }

+ 181 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java

@@ -1,16 +1,44 @@
 package com.provectus.kafka.ui.service.acl;
 package com.provectus.kafka.ui.service.acl;
 
 
+import static org.apache.kafka.common.acl.AclOperation.ALL;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
+import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
+import static org.apache.kafka.common.resource.ResourceType.GROUP;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID;
+
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets;
+import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
+import com.provectus.kafka.ui.model.CreateProducerAclDTO;
+import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.AdminClientService;
 import com.provectus.kafka.ui.service.AdminClientService;
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.Comparator;
 import java.util.List;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.Set;
+import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
@@ -22,11 +50,14 @@ public class AclsService {
   private final AdminClientService adminClientService;
   private final AdminClientService adminClientService;
 
 
   public Mono<Void> createAcl(KafkaCluster cluster, AclBinding aclBinding) {
   public Mono<Void> createAcl(KafkaCluster cluster, AclBinding aclBinding) {
-    var aclString = AclCsv.createAclString(aclBinding);
-    log.info("CREATING ACL: [{}]", aclString);
     return adminClientService.get(cluster)
     return adminClientService.get(cluster)
-        .flatMap(ac -> ac.createAcls(List.of(aclBinding)))
-        .doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString));
+        .flatMap(ac -> createAclsWithLogging(ac, List.of(aclBinding)));
+  }
+
+  private Mono<Void> createAclsWithLogging(ReactiveAdminClient ac, Collection<AclBinding> bindings) {
+    bindings.forEach(b -> log.info("CREATING ACL: [{}]", AclCsv.createAclString(b)));
+    return ac.createAcls(bindings)
+        .doOnSuccess(v -> bindings.forEach(b -> log.info("ACL CREATED: [{}]", AclCsv.createAclString(b))));
   }
   }
 
 
   public Mono<Void> deleteAcl(KafkaCluster cluster, AclBinding aclBinding) {
   public Mono<Void> deleteAcl(KafkaCluster cluster, AclBinding aclBinding) {
@@ -92,4 +123,150 @@ public class AclsService {
     }
     }
   }
   }
 
 
+  // creates allow binding for resources by prefix or specific names list
+  private List<AclBinding> createAllowBindings(ResourceType resourceType,
+                                               List<AclOperation> opsToAllow,
+                                               String principal,
+                                               String host,
+                                               @Nullable String resourcePrefix,
+                                               @Nullable Collection<String> resourceNames) {
+    List<AclBinding> bindings = new ArrayList<>();
+    if (resourcePrefix != null) {
+      for (var op : opsToAllow) {
+        bindings.add(
+            new AclBinding(
+                new ResourcePattern(resourceType, resourcePrefix, PREFIXED),
+                new AccessControlEntry(principal, host, op, ALLOW)));
+      }
+    }
+    if (!CollectionUtils.isEmpty(resourceNames)) {
+      resourceNames.stream()
+          .distinct()
+          .forEach(resource ->
+              opsToAllow.forEach(op ->
+                  bindings.add(
+                      new AclBinding(
+                          new ResourcePattern(resourceType, resource, LITERAL),
+                          new AccessControlEntry(principal, host, op, ALLOW)))));
+    }
+    return bindings;
+  }
+
+  public Mono<Void> createConsumerAcl(KafkaCluster cluster, CreateConsumerAclDTO request) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> createAclsWithLogging(ac, createConsumerBindings(request)))
+        .then();
+  }
+
+  //Read, Describe on topics, Read on consumerGroups
+  private List<AclBinding> createConsumerBindings(CreateConsumerAclDTO request) {
+    List<AclBinding> bindings = new ArrayList<>();
+    bindings.addAll(
+        createAllowBindings(TOPIC,
+            List.of(READ, DESCRIBE),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getTopicsPrefix(),
+            request.getTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            GROUP,
+            List.of(READ),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getConsumerGroupsPrefix(),
+            request.getConsumerGroups()));
+    return bindings;
+  }
+
+  public Mono<Void> createProducerAcl(KafkaCluster cluster, CreateProducerAclDTO request) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> createAclsWithLogging(ac, createProducerBindings(request)))
+        .then();
+  }
+
+  //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
+  //IDEMPOTENT_WRITE on cluster if idempotent is enabled
+  private List<AclBinding> createProducerBindings(CreateProducerAclDTO request) {
+    List<AclBinding> bindings = new ArrayList<>();
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(WRITE, DESCRIBE, CREATE),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getTopicsPrefix(),
+            request.getTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            TRANSACTIONAL_ID,
+            List.of(WRITE, DESCRIBE),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getTransactionsIdPrefix(),
+            Optional.ofNullable(request.getTransactionalId()).map(List::of).orElse(null)));
+
+    if (Boolean.TRUE.equals(request.getIdempotent())) {
+      bindings.addAll(
+          createAllowBindings(
+              CLUSTER,
+              List.of(IDEMPOTENT_WRITE),
+              request.getPrincipal(),
+              request.getHost(),
+              null,
+              List.of(Resource.CLUSTER_NAME))); // cluster name is a const string in ACL api
+    }
+    return bindings;
+  }
+
+  public Mono<Void> createStreamAppAcl(KafkaCluster cluster, CreateStreamAppAclDTO request) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> createAclsWithLogging(ac, createStreamAppBindings(request)))
+        .then();
+  }
+
+  // Read on input topics, Write on output topics
+  // ALL on applicationId-prefixed Groups and Topics
+  private List<AclBinding> createStreamAppBindings(CreateStreamAppAclDTO request) {
+    List<AclBinding> bindings = new ArrayList<>();
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(READ),
+            request.getPrincipal(),
+            request.getHost(),
+            null,
+            request.getInputTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(WRITE),
+            request.getPrincipal(),
+            request.getHost(),
+            null,
+            request.getOutputTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            GROUP,
+            List.of(ALL),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getApplicationId(),
+            null));
+
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(ALL),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getApplicationId(),
+            null));
+    return bindings;
+  }
+
 }
 }

+ 4 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java

@@ -1,9 +1,9 @@
 package com.provectus.kafka.ui.service.analyze;
 package com.provectus.kafka.ui.service.analyze;
 
 
 import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
 import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
+import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.OffsetsInfo;
 import com.provectus.kafka.ui.emitter.OffsetsInfo;
 import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.emitter.PollingSettings;
-import com.provectus.kafka.ui.emitter.PollingThrottler;
 import com.provectus.kafka.ui.exception.TopicAnalysisException;
 import com.provectus.kafka.ui.exception.TopicAnalysisException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.TopicAnalysisDTO;
 import com.provectus.kafka.ui.model.TopicAnalysisDTO;
@@ -20,11 +20,9 @@ import java.util.stream.IntStream;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.core.scheduler.Schedulers;
@@ -84,12 +82,11 @@ public class TopicAnalysisService {
     private final int partitionsCnt;
     private final int partitionsCnt;
     private final long approxNumberOfMsgs;
     private final long approxNumberOfMsgs;
     private final EmptyPollsCounter emptyPollsCounter;
     private final EmptyPollsCounter emptyPollsCounter;
-    private final PollingThrottler throttler;
 
 
     private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
     private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
     private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
     private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
 
 
-    private final KafkaConsumer<Bytes, Bytes> consumer;
+    private final EnhancedConsumer consumer;
 
 
     AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
     AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
                  long approxNumberOfMsgs, PollingSettings pollingSettings) {
                  long approxNumberOfMsgs, PollingSettings pollingSettings) {
@@ -104,7 +101,6 @@ public class TopicAnalysisService {
               ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
               ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
           )
           )
       );
       );
-      this.throttler = pollingSettings.getPollingThrottler();
       this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter();
       this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter();
     }
     }
 
 
@@ -127,9 +123,8 @@ public class TopicAnalysisService {
 
 
         var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
         var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
         while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) {
         while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) {
-          var polled = consumer.poll(Duration.ofSeconds(3));
-          throttler.throttleAfterPoll(polled);
-          emptyPollsCounter.count(polled);
+          var polled = consumer.pollEnhanced(Duration.ofSeconds(3));
+          emptyPollsCounter.count(polled.count());
           polled.forEach(r -> {
           polled.forEach(r -> {
             totalStats.apply(r);
             totalStats.apply(r);
             partitionStats.get(r.partition()).apply(r);
             partitionStats.get(r.partition()).apply(r);

+ 36 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditService.java

@@ -13,6 +13,7 @@ import com.provectus.kafka.ui.service.ClustersStorage;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import java.io.Closeable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Optional;
@@ -37,6 +38,7 @@ import reactor.core.publisher.Signal;
 public class AuditService implements Closeable {
 public class AuditService implements Closeable {
 
 
   private static final Mono<AuthenticatedUser> NO_AUTH_USER = Mono.just(new AuthenticatedUser("Unknown", Set.of()));
   private static final Mono<AuthenticatedUser> NO_AUTH_USER = Mono.just(new AuthenticatedUser("Unknown", Set.of()));
+  private static final Duration BLOCK_TIMEOUT = Duration.ofSeconds(5);
 
 
   private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log";
   private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log";
   private static final int DEFAULT_AUDIT_TOPIC_PARTITIONS = 1;
   private static final int DEFAULT_AUDIT_TOPIC_PARTITIONS = 1;
@@ -56,14 +58,8 @@ public class AuditService implements Closeable {
   public AuditService(AdminClientService adminClientService, ClustersStorage clustersStorage) {
   public AuditService(AdminClientService adminClientService, ClustersStorage clustersStorage) {
     Map<String, AuditWriter> auditWriters = new HashMap<>();
     Map<String, AuditWriter> auditWriters = new HashMap<>();
     for (var cluster : clustersStorage.getKafkaClusters()) {
     for (var cluster : clustersStorage.getKafkaClusters()) {
-      ReactiveAdminClient adminClient;
-      try {
-        adminClient = adminClientService.get(cluster).block();
-      } catch (Exception e) {
-        printAuditInitError(cluster, "Error connect to cluster", e);
-        continue;
-      }
-      createAuditWriter(cluster, adminClient, () -> createProducer(cluster, AUDIT_PRODUCER_CONFIG))
+      Supplier<ReactiveAdminClient> adminClientSupplier = () -> adminClientService.get(cluster).block(BLOCK_TIMEOUT);
+      createAuditWriter(cluster, adminClientSupplier, () -> createProducer(cluster, AUDIT_PRODUCER_CONFIG))
           .ifPresent(writer -> auditWriters.put(cluster.getName(), writer));
           .ifPresent(writer -> auditWriters.put(cluster.getName(), writer));
     }
     }
     this.auditWriters = auditWriters;
     this.auditWriters = auditWriters;
@@ -76,7 +72,7 @@ public class AuditService implements Closeable {
 
 
   @VisibleForTesting
   @VisibleForTesting
   static Optional<AuditWriter> createAuditWriter(KafkaCluster cluster,
   static Optional<AuditWriter> createAuditWriter(KafkaCluster cluster,
-                                                 ReactiveAdminClient ac,
+                                                 Supplier<ReactiveAdminClient> acSupplier,
                                                  Supplier<KafkaProducer<byte[], byte[]>> producerFactory) {
                                                  Supplier<KafkaProducer<byte[], byte[]>> producerFactory) {
     var auditProps = cluster.getOriginalProperties().getAudit();
     var auditProps = cluster.getOriginalProperties().getAudit();
     if (auditProps == null) {
     if (auditProps == null) {
@@ -87,32 +83,54 @@ public class AuditService implements Closeable {
     if (!topicAudit && !consoleAudit) {
     if (!topicAudit && !consoleAudit) {
       return Optional.empty();
       return Optional.empty();
     }
     }
+    if (!topicAudit) {
+      log.info("Audit initialization finished for cluster '{}' (console only)", cluster.getName());
+      return Optional.of(consoleOnlyWriter(cluster));
+    }
     String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
     String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
-    @Nullable KafkaProducer<byte[], byte[]> producer = null;
-    if (topicAudit && createTopicIfNeeded(cluster, ac, auditTopicName, auditProps)) {
-      producer = producerFactory.get();
+    boolean topicAuditCanBeDone = createTopicIfNeeded(cluster, acSupplier, auditTopicName, auditProps);
+    if (!topicAuditCanBeDone) {
+      if (consoleAudit) {
+        log.info(
+            "Audit initialization finished for cluster '{}' (console only, topic audit init failed)",
+            cluster.getName()
+        );
+        return Optional.of(consoleOnlyWriter(cluster));
+      }
+      return Optional.empty();
     }
     }
-    log.info("Audit service initialized for cluster '{}'", cluster.getName());
+    log.info("Audit initialization finished for cluster '{}'", cluster.getName());
     return Optional.of(
     return Optional.of(
         new AuditWriter(
         new AuditWriter(
             cluster.getName(),
             cluster.getName(),
             auditTopicName,
             auditTopicName,
-            producer,
+            producerFactory.get(),
             consoleAudit ? AUDIT_LOGGER : null
             consoleAudit ? AUDIT_LOGGER : null
         )
         )
     );
     );
   }
   }
 
 
+  private static AuditWriter consoleOnlyWriter(KafkaCluster cluster) {
+    return new AuditWriter(cluster.getName(), null, null, AUDIT_LOGGER);
+  }
+
   /**
   /**
    * return true if topic created/existing and producing can be enabled.
    * return true if topic created/existing and producing can be enabled.
    */
    */
   private static boolean createTopicIfNeeded(KafkaCluster cluster,
   private static boolean createTopicIfNeeded(KafkaCluster cluster,
-                                             ReactiveAdminClient ac,
+                                             Supplier<ReactiveAdminClient> acSupplier,
                                              String auditTopicName,
                                              String auditTopicName,
                                              ClustersProperties.AuditProperties auditProps) {
                                              ClustersProperties.AuditProperties auditProps) {
+    ReactiveAdminClient ac;
+    try {
+      ac = acSupplier.get();
+    } catch (Exception e) {
+      printAuditInitError(cluster, "Error while connecting to the cluster", e);
+      return false;
+    }
     boolean topicExists;
     boolean topicExists;
     try {
     try {
-      topicExists = ac.listTopics(true).block().contains(auditTopicName);
+      topicExists = ac.listTopics(true).block(BLOCK_TIMEOUT).contains(auditTopicName);
     } catch (Exception e) {
     } catch (Exception e) {
       printAuditInitError(cluster, "Error checking audit topic existence", e);
       printAuditInitError(cluster, "Error checking audit topic existence", e);
       return false;
       return false;
@@ -130,7 +148,7 @@ public class AuditService implements Closeable {
           .ifPresent(topicConfig::putAll);
           .ifPresent(topicConfig::putAll);
 
 
       log.info("Creating audit topic '{}' for cluster '{}'", auditTopicName, cluster.getName());
       log.info("Creating audit topic '{}' for cluster '{}'", auditTopicName, cluster.getName());
-      ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block();
+      ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block(BLOCK_TIMEOUT);
       log.info("Audit topic created for cluster '{}'", cluster.getName());
       log.info("Audit topic created for cluster '{}'", cluster.getName());
       return true;
       return true;
     } catch (Exception e) {
     } catch (Exception e) {
@@ -142,7 +160,7 @@ public class AuditService implements Closeable {
   private static void printAuditInitError(KafkaCluster cluster, String errorMsg, Exception cause) {
   private static void printAuditInitError(KafkaCluster cluster, String errorMsg, Exception cause) {
     log.error("-----------------------------------------------------------------");
     log.error("-----------------------------------------------------------------");
     log.error(
     log.error(
-        "Error initializing Audit Service for cluster '{}'. Audit will be disabled. See error below: ",
+        "Error initializing Audit for cluster '{}'. Audit will be disabled. See error below: ",
         cluster.getName()
         cluster.getName()
     );
     );
     log.error("{}", errorMsg, cause);
     log.error("{}", errorMsg, cause);

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditWriter.java

@@ -18,7 +18,7 @@ import org.slf4j.Logger;
 
 
 @Slf4j
 @Slf4j
 record AuditWriter(String clusterName,
 record AuditWriter(String clusterName,
-                   String targetTopic,
+                   @Nullable String targetTopic,
                    @Nullable KafkaProducer<byte[], byte[]> producer,
                    @Nullable KafkaProducer<byte[], byte[]> producer,
                    @Nullable Logger consoleLogger) implements Closeable {
                    @Nullable Logger consoleLogger) implements Closeable {
 
 
@@ -43,7 +43,7 @@ record AuditWriter(String clusterName,
     if (consoleLogger != null) {
     if (consoleLogger != null) {
       consoleLogger.info(json);
       consoleLogger.info(json);
     }
     }
-    if (producer != null) {
+    if (targetTopic != null && producer != null) {
       producer.send(
       producer.send(
           new ProducerRecord<>(targetTopic, null, json.getBytes(UTF_8)),
           new ProducerRecord<>(targetTopic, null, json.getBytes(UTF_8)),
           (metadata, ex) -> {
           (metadata, ex) -> {

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporter.java

@@ -25,7 +25,7 @@ class ConnectorsExporter {
 
 
   Flux<DataEntityList> export(KafkaCluster cluster) {
   Flux<DataEntityList> export(KafkaCluster cluster) {
     return kafkaConnectService.getConnects(cluster)
     return kafkaConnectService.getConnects(cluster)
-        .flatMap(connect -> kafkaConnectService.getConnectorNames(cluster, connect.getName())
+        .flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
             .flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName))
             .flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName))
             .flatMap(connectorDTO ->
             .flatMap(connectorDTO ->
                 kafkaConnectService.getConnectorTopics(cluster, connect.getName(), connectorDTO.getName())
                 kafkaConnectService.getConnectorTopics(cluster, connect.getName(), connectorDTO.getName())

+ 14 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java

@@ -51,6 +51,8 @@ import reactor.core.publisher.Mono;
 @Slf4j
 @Slf4j
 public class AccessControlService {
 public class AccessControlService {
 
 
+  private static final String ACCESS_DENIED = "Access denied";
+
   @Nullable
   @Nullable
   private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository;
   private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository;
   private final RoleBasedAccessControlProperties properties;
   private final RoleBasedAccessControlProperties properties;
@@ -97,6 +99,17 @@ public class AccessControlService {
       return Mono.empty();
       return Mono.empty();
     }
     }
 
 
+    if (CollectionUtils.isNotEmpty(context.getApplicationConfigActions())) {
+      return getUser()
+          .doOnNext(user -> {
+            boolean accessGranted = isApplicationConfigAccessible(context, user);
+
+            if (!accessGranted) {
+              throw new AccessDeniedException(ACCESS_DENIED);
+            }
+          }).then();
+    }
+
     return getUser()
     return getUser()
         .doOnNext(user -> {
         .doOnNext(user -> {
           boolean accessGranted =
           boolean accessGranted =
@@ -113,7 +126,7 @@ public class AccessControlService {
                   && isAuditAccessible(context, user);
                   && isAuditAccessible(context, user);
 
 
           if (!accessGranted) {
           if (!accessGranted) {
-            throw new AccessDeniedException("Access denied");
+            throw new AccessDeniedException(ACCESS_DENIED);
           }
           }
         })
         })
         .then();
         .then();

+ 82 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ApplicationMetrics.java

@@ -0,0 +1,82 @@
+package com.provectus.kafka.ui.util;
+
+import static lombok.AccessLevel.PRIVATE;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.provectus.kafka.ui.emitter.PolledRecords;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.DistributionSummary;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Timer;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor(access = PRIVATE)
+public class ApplicationMetrics {
+
+  private final String clusterName;
+  private final MeterRegistry registry;
+
+  public static ApplicationMetrics forCluster(KafkaCluster cluster) {
+    return new ApplicationMetrics(cluster.getName(), Metrics.globalRegistry);
+  }
+
+  @VisibleForTesting
+  public static ApplicationMetrics noop() {
+    return new ApplicationMetrics("noop", new SimpleMeterRegistry());
+  }
+
+  public void meterPolledRecords(String topic, PolledRecords polled, boolean throttled) {
+    pollTimer(topic).record(polled.elapsed());
+    polledRecords(topic).increment(polled.count());
+    polledBytes(topic).record(polled.bytes());
+    if (throttled) {
+      pollThrottlingActivations().increment();
+    }
+  }
+
+  private Counter polledRecords(String topic) {
+    return Counter.builder("topic_records_polled")
+        .description("Number of records polled from topic")
+        .tag("cluster", clusterName)
+        .tag("topic", topic)
+        .register(registry);
+  }
+
+  private DistributionSummary polledBytes(String topic) {
+    return DistributionSummary.builder("topic_polled_bytes")
+        .description("Bytes polled from kafka topic")
+        .tag("cluster", clusterName)
+        .tag("topic", topic)
+        .register(registry);
+  }
+
+  private Timer pollTimer(String topic) {
+    return Timer.builder("topic_poll_time")
+        .description("Time spend in polling for topic")
+        .tag("cluster", clusterName)
+        .tag("topic", topic)
+        .register(registry);
+  }
+
+  private Counter pollThrottlingActivations() {
+    return Counter.builder("poll_throttling_activations")
+        .description("Number of poll throttling activations")
+        .tag("cluster", clusterName)
+        .register(registry);
+  }
+
+  public AtomicInteger activeConsumers() {
+    var count = new AtomicInteger();
+    Gauge.builder("active_consumers", () -> count)
+        .description("Number of active consumers")
+        .tag("cluster", clusterName)
+        .register(registry);
+    return count;
+  }
+
+}

+ 0 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordsUtil.java

@@ -1,29 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.utils.Bytes;
-
-public class ConsumerRecordsUtil {
-
-  public static int calculatePolledRecSize(ConsumerRecord<Bytes, Bytes> rec) {
-    int polledBytes = 0;
-    for (Header header : rec.headers()) {
-      polledBytes +=
-          (header.key() != null ? header.key().getBytes().length : 0)
-              + (header.value() != null ? header.value().length : 0);
-    }
-    polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
-    polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
-    return polledBytes;
-  }
-
-  public static int calculatePolledSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
-    int polledBytes = 0;
-    for (ConsumerRecord<Bytes, Bytes> rec : recs) {
-      polledBytes += calculatePolledRecSize(rec);
-    }
-    return polledBytes;
-  }
-
-}

+ 1 - 1
kafka-ui-api/src/main/resources/application.yml

@@ -10,7 +10,7 @@ management:
   endpoints:
   endpoints:
     web:
     web:
       exposure:
       exposure:
-        include: "info,health"
+        include: "info,health,prometheus"
 
 
 logging:
 logging:
   level:
   level:

+ 2 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java

@@ -77,6 +77,8 @@ public abstract class AbstractIntegrationTest {
       System.setProperty("kafka.clusters.0.kafkaConnect.0.userName", "kafka-connect");
       System.setProperty("kafka.clusters.0.kafkaConnect.0.userName", "kafka-connect");
       System.setProperty("kafka.clusters.0.kafkaConnect.0.password", "kafka-connect");
       System.setProperty("kafka.clusters.0.kafkaConnect.0.password", "kafka-connect");
       System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget());
       System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget());
+      System.setProperty("kafka.clusters.0.kafkaConnect.1.name", "notavailable");
+      System.setProperty("kafka.clusters.0.kafkaConnect.1.address", "http://notavailable:6666");
       System.setProperty("kafka.clusters.0.masking.0.type", "REPLACE");
       System.setProperty("kafka.clusters.0.masking.0.type", "REPLACE");
       System.setProperty("kafka.clusters.0.masking.0.replacement", "***");
       System.setProperty("kafka.clusters.0.masking.0.replacement", "***");
       System.setProperty("kafka.clusters.0.masking.0.topicValuesPattern", "masking-test-.*");
       System.setProperty("kafka.clusters.0.masking.0.topicValuesPattern", "masking-test-.*");

+ 80 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/HexSerdeTest.java

@@ -0,0 +1,80 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
+import com.provectus.kafka.ui.serdes.RecordHeadersImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class HexSerdeTest {
+
+  private static final byte[] TEST_BYTES = "hello world".getBytes();
+  private static final String TEST_BYTES_HEX_ENCODED = "68 65 6C 6C 6F 20 77 6F 72 6C 64";
+
+  private HexSerde hexSerde;
+
+  @BeforeEach
+  void init() {
+    hexSerde = new HexSerde();
+    hexSerde.autoConfigure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty());
+  }
+
+
+  @ParameterizedTest
+  @CsvSource({
+      "68656C6C6F20776F726C64", // uppercase
+      "68656c6c6f20776f726c64", // lowercase
+      "68:65:6c:6c:6f:20:77:6f:72:6c:64", // ':' delim
+      "68 65 6C 6C 6F 20 77 6F 72 6C 64", // space delim, UC
+      "68 65 6c 6c 6f 20 77 6f 72 6c 64", // space delim, LC
+      "#68 #65 #6C #6C #6F #20 #77 #6F #72 #6C #64"  // '#' prefix, space delim
+  })
+  void serializesInputAsHexString(String hexString) {
+    for (Serde.Target type : Serde.Target.values()) {
+      var serializer = hexSerde.serializer("anyTopic", type);
+      byte[] bytes = serializer.serialize(hexString);
+      assertThat(bytes).isEqualTo(TEST_BYTES);
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void serializesEmptyStringAsEmptyBytesArray(Serde.Target type) {
+    var serializer = hexSerde.serializer("anyTopic", type);
+    byte[] bytes = serializer.serialize("");
+    assertThat(bytes).isEqualTo(new byte[] {});
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void deserializesDataAsHexBytes(Serde.Target type) {
+    var deserializer = hexSerde.deserializer("anyTopic", type);
+    var result = deserializer.deserialize(new RecordHeadersImpl(), TEST_BYTES);
+    assertThat(result.getResult()).isEqualTo(TEST_BYTES_HEX_ENCODED);
+    assertThat(result.getType()).isEqualTo(DeserializeResult.Type.STRING);
+    assertThat(result.getAdditionalProperties()).isEmpty();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void getSchemaReturnsEmpty(Serde.Target type) {
+    assertThat(hexSerde.getSchema("anyTopic", type)).isEmpty();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void canDeserializeReturnsTrueForAllInputs(Serde.Target type) {
+    assertThat(hexSerde.canDeserialize("anyTopic", type)).isTrue();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void canSerializeReturnsTrueForAllInput(Serde.Target type) {
+    assertThat(hexSerde.canSerialize("anyTopic", type)).isTrue();
+  }
+}

+ 7 - 7
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -8,9 +8,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.emitter.PollingSettings;
+import com.provectus.kafka.ui.emitter.PollingThrottler;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
@@ -18,6 +20,7 @@ import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
 import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
 import java.io.Serializable;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -38,7 +41,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Test;
@@ -325,22 +327,20 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     assertionsConsumer.accept(step.expectComplete().verifyThenAssertThat());
     assertionsConsumer.accept(step.expectComplete().verifyThenAssertThat());
   }
   }
 
 
-  private KafkaConsumer<Bytes, Bytes> createConsumer() {
+  private EnhancedConsumer createConsumer() {
     return createConsumer(Map.of());
     return createConsumer(Map.of());
   }
   }
 
 
-  private KafkaConsumer<Bytes, Bytes> createConsumer(Map<String, Object> properties) {
+  private EnhancedConsumer createConsumer(Map<String, Object> properties) {
     final Map<String, ? extends Serializable> map = Map.of(
     final Map<String, ? extends Serializable> map = Map.of(
         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
         ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
         ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
-        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19, // to check multiple polls
-        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
-        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
+        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19 // to check multiple polls
     );
     );
     Properties props = new Properties();
     Properties props = new Properties();
     props.putAll(map);
     props.putAll(map);
     props.putAll(properties);
     props.putAll(properties);
-    return new KafkaConsumer<>(props);
+    return new EnhancedConsumer(props, PollingThrottler.noop(), ApplicationMetrics.noop());
   }
   }
 
 
   @Value
   @Value

+ 214 - 6
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java

@@ -4,16 +4,21 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.when;
 
 
+import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
+import com.provectus.kafka.ui.model.CreateProducerAclDTO;
+import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.AdminClientService;
 import com.provectus.kafka.ui.service.AdminClientService;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.List;
 import java.util.List;
+import java.util.UUID;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.resource.ResourceType;
@@ -53,12 +58,12 @@ class AclsServiceTest {
     when(adminClientMock.listAcls(ResourcePatternFilter.ANY))
     when(adminClientMock.listAcls(ResourcePatternFilter.ANY))
         .thenReturn(Mono.just(List.of(existingBinding1, existingBinding2)));
         .thenReturn(Mono.just(List.of(existingBinding1, existingBinding2)));
 
 
-    ArgumentCaptor<?> createdCaptor = ArgumentCaptor.forClass(Collection.class);
-    when(adminClientMock.createAcls((Collection<AclBinding>) createdCaptor.capture()))
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
         .thenReturn(Mono.empty());
         .thenReturn(Mono.empty());
 
 
-    ArgumentCaptor<?> deletedCaptor = ArgumentCaptor.forClass(Collection.class);
-    when(adminClientMock.deleteAcls((Collection<AclBinding>) deletedCaptor.capture()))
+    ArgumentCaptor<Collection<AclBinding>> deletedCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.deleteAcls(deletedCaptor.capture()))
         .thenReturn(Mono.empty());
         .thenReturn(Mono.empty());
 
 
     aclsService.syncAclWithAclCsv(
     aclsService.syncAclWithAclCsv(
@@ -68,15 +73,218 @@ class AclsServiceTest {
             + "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost"
             + "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost"
     ).block();
     ).block();
 
 
-    Collection<AclBinding> createdBindings = (Collection<AclBinding>) createdCaptor.getValue();
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
     assertThat(createdBindings)
     assertThat(createdBindings)
         .hasSize(1)
         .hasSize(1)
         .contains(newBindingToBeAdded);
         .contains(newBindingToBeAdded);
 
 
-    Collection<AclBinding> deletedBindings = (Collection<AclBinding>) deletedCaptor.getValue();
+    Collection<AclBinding> deletedBindings = deletedCaptor.getValue();
     assertThat(deletedBindings)
     assertThat(deletedBindings)
         .hasSize(1)
         .hasSize(1)
         .contains(existingBinding2);
         .contains(existingBinding2);
   }
   }
 
 
+
+  @Test
+  void createsConsumerDependantAcls() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createConsumerAcl(
+        CLUSTER,
+        new CreateConsumerAclDTO()
+            .principal(principal)
+            .host(host)
+            .consumerGroups(List.of("cg1", "cg2"))
+            .topics(List.of("t1", "t2"))
+    ).block();
+
+    //Read, Describe on topics, Read on consumerGroups
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(6)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "cg1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "cg2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)));
+  }
+
+  @Test
+  void createsConsumerDependantAclsWhenTopicsAndGroupsSpecifiedByPrefix() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createConsumerAcl(
+        CLUSTER,
+        new CreateConsumerAclDTO()
+            .principal(principal)
+            .host(host)
+            .consumerGroupsPrefix("cgPref")
+            .topicsPrefix("topicPref")
+    ).block();
+
+    //Read, Describe on topics, Read on consumerGroups
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(3)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "cgPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)));
+  }
+
+  @Test
+  void createsProducerDependantAcls() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createProducerAcl(
+        CLUSTER,
+        new CreateProducerAclDTO()
+            .principal(principal)
+            .host(host)
+            .topics(List.of("t1"))
+            .idempotent(true)
+            .transactionalId("txId1")
+    ).block();
+
+    //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
+    //IDEMPOTENT_WRITE on cluster if idempotent is enabled (true)
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(6)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.IDEMPOTENT_WRITE, AclPermissionType.ALLOW)));
+  }
+
+
+  @Test
+  void createsProducerDependantAclsWhenTopicsAndTxIdSpecifiedByPrefix() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createProducerAcl(
+        CLUSTER,
+        new CreateProducerAclDTO()
+            .principal(principal)
+            .host(host)
+            .topicsPrefix("topicPref")
+            .transactionsIdPrefix("txIdPref")
+            .idempotent(false)
+    ).block();
+
+    //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
+    //IDEMPOTENT_WRITE on cluster if idempotent is enabled (false)
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(5)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)));
+  }
+
+
+  @Test
+  void createsStreamAppDependantAcls() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createStreamAppAcl(
+        CLUSTER,
+        new CreateStreamAppAclDTO()
+            .principal(principal)
+            .host(host)
+            .inputTopics(List.of("t1"))
+            .outputTopics(List.of("t2", "t3"))
+            .applicationId("appId1")
+    ).block();
+
+    // Read on input topics, Write on output topics
+    // ALL on applicationId-prefixed Groups and Topics
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(5)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t3", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "appId1", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "appId1", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW)));
+  }
 }
 }

+ 4 - 4
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditServiceTest.java

@@ -81,7 +81,7 @@ class AuditServiceTest {
 
 
     @Test
     @Test
     void noWriterIfNoAuditPropsSet() {
     void noWriterIfNoAuditPropsSet() {
-      var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
+      var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
       assertThat(maybeWriter).isEmpty();
       assertThat(maybeWriter).isEmpty();
     }
     }
 
 
@@ -91,7 +91,7 @@ class AuditServiceTest {
       auditProps.setConsoleAuditEnabled(true);
       auditProps.setConsoleAuditEnabled(true);
       clustersProperties.setAudit(auditProps);
       clustersProperties.setAudit(auditProps);
 
 
-      var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
+      var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
       assertThat(maybeWriter).isPresent();
       assertThat(maybeWriter).isPresent();
 
 
       var writer = maybeWriter.get();
       var writer = maybeWriter.get();
@@ -116,7 +116,7 @@ class AuditServiceTest {
         when(adminClientMock.listTopics(true))
         when(adminClientMock.listTopics(true))
             .thenReturn(Mono.just(Set.of("test_audit_topic")));
             .thenReturn(Mono.just(Set.of("test_audit_topic")));
 
 
-        var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
+        var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
         assertThat(maybeWriter).isPresent();
         assertThat(maybeWriter).isPresent();
 
 
         //checking there was no topic creation request
         //checking there was no topic creation request
@@ -136,7 +136,7 @@ class AuditServiceTest {
         when(adminClientMock.createTopic(eq("test_audit_topic"), eq(3), eq(null), anyMap()))
         when(adminClientMock.createTopic(eq("test_audit_topic"), eq(3), eq(null), anyMap()))
             .thenReturn(Mono.empty());
             .thenReturn(Mono.empty());
 
 
-        var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
+        var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
         assertThat(maybeWriter).isPresent();
         assertThat(maybeWriter).isPresent();
 
 
         //verifying topic created
         //verifying topic created

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporterTest.java

@@ -61,7 +61,7 @@ class ConnectorsExporterTest {
     when(kafkaConnectService.getConnects(CLUSTER))
     when(kafkaConnectService.getConnects(CLUSTER))
         .thenReturn(Flux.just(connect));
         .thenReturn(Flux.just(connect));
 
 
-    when(kafkaConnectService.getConnectorNames(CLUSTER, connect.getName()))
+    when(kafkaConnectService.getConnectorNamesWithErrorsSuppress(CLUSTER, connect.getName()))
         .thenReturn(Flux.just(sinkConnector.getName(), sourceConnector.getName()));
         .thenReturn(Flux.just(sinkConnector.getName(), sourceConnector.getName()));
 
 
     when(kafkaConnectService.getConnector(CLUSTER, connect.getName(), sinkConnector.getName()))
     when(kafkaConnectService.getConnector(CLUSTER, connect.getName(), sinkConnector.getName()))

+ 127 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -1868,6 +1868,69 @@ paths:
         404:
         404:
           description: Acl not found
           description: Acl not found
 
 
+  /api/clusters/{clusterName}/acl/consumer:
+    post:
+      tags:
+        - Acls
+      summary: createConsumerAcl
+      operationId: createConsumerAcl
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateConsumerAcl'
+      responses:
+        200:
+          description: OK
+
+  /api/clusters/{clusterName}/acl/producer:
+    post:
+      tags:
+        - Acls
+      summary: createProducerAcl
+      operationId: createProducerAcl
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateProducerAcl'
+      responses:
+        200:
+          description: OK
+
+  /api/clusters/{clusterName}/acl/streamApp:
+    post:
+      tags:
+        - Acls
+      summary: createStreamAppAcl
+      operationId: createStreamAppAcl
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateStreamAppAcl'
+      responses:
+        200:
+          description: OK
+
   /api/authorization:
   /api/authorization:
     get:
     get:
       tags:
       tags:
@@ -3551,7 +3614,7 @@ components:
         principal:
         principal:
           type: string
           type: string
         host:
         host:
-          type: string  # "*" if acl can be applied to any resource of given type
+          type: string
         operation:
         operation:
           type: string
           type: string
           enum:
           enum:
@@ -3575,6 +3638,69 @@ components:
             - ALLOW
             - ALLOW
             - DENY
             - DENY
 
 
+    CreateConsumerAcl:
+      type: object
+      required: [principal, host]
+      properties:
+        principal:
+          type: string
+        host:
+          type: string
+        topics:
+          type: array
+          items:
+            type: string
+        topicsPrefix:
+          type: string
+        consumerGroups:
+          type: array
+          items:
+            type: string
+        consumerGroupsPrefix:
+          type: string
+
+    CreateProducerAcl:
+      type: object
+      required: [principal, host]
+      properties:
+        principal:
+          type: string
+        host:
+          type: string
+        topics:
+          type: array
+          items:
+            type: string
+        topicsPrefix:
+          type: string
+        transactionalId:
+          type: string
+        transactionsIdPrefix:
+          type: string
+        idempotent:
+          type: boolean
+          default: false
+
+    CreateStreamAppAcl:
+      type: object
+      required: [principal, host, applicationId, inputTopics, outputTopics]
+      properties:
+        principal:
+          type: string
+        host:
+          type: string
+        inputTopics:
+          type: array
+          items:
+            type: string
+        outputTopics:
+          type: array
+          items:
+            type: string
+        applicationId:
+          nullable: false
+          type: string
+
     KafkaAclResourceType:
     KafkaAclResourceType:
       type: string
       type: string
       enum:
       enum:

+ 1 - 1
kafka-ui-react-app/src/components/ConsumerGroups/Details/ResetOffsets/Form.tsx

@@ -186,7 +186,7 @@ const Form: React.FC<FormProps> = ({ defaultValues, partitions, topics }) => {
             type="submit"
             type="submit"
             disabled={partitionsValue.length === 0}
             disabled={partitionsValue.length === 0}
           >
           >
-            Submit
+            Reset Offsets
           </Button>
           </Button>
         </div>
         </div>
       </StyledForm>
       </StyledForm>

+ 2 - 1
kafka-ui-react-app/src/components/ConsumerGroups/Details/ResetOffsets/ResetOffsets.tsx

@@ -15,6 +15,7 @@ import Form from './Form';
 const ResetOffsets: React.FC = () => {
 const ResetOffsets: React.FC = () => {
   const routerParams = useAppParams<ClusterGroupParam>();
   const routerParams = useAppParams<ClusterGroupParam>();
 
 
+  const { consumerGroupID } = routerParams;
   const consumerGroup = useConsumerGroupDetails(routerParams);
   const consumerGroup = useConsumerGroupDetails(routerParams);
 
 
   if (consumerGroup.isLoading || !consumerGroup.isSuccess)
   if (consumerGroup.isLoading || !consumerGroup.isSuccess)
@@ -37,7 +38,7 @@ const ResetOffsets: React.FC = () => {
   return (
   return (
     <>
     <>
       <PageHeading
       <PageHeading
-        text="Reset offsets"
+        text={consumerGroupID}
         backTo={clusterConsumerGroupsPath(routerParams.clusterName)}
         backTo={clusterConsumerGroupsPath(routerParams.clusterName)}
         backText="Consumers"
         backText="Consumers"
       />
       />

+ 2 - 2
kafka-ui-react-app/src/components/NavBar/UserInfo/UserInfo.tsx

@@ -19,8 +19,8 @@ const UserInfo = () => {
         </S.Wrapper>
         </S.Wrapper>
       }
       }
     >
     >
-      <DropdownItem>
-        <S.LogoutLink href={`${window.basePath}/logout`}>Log out</S.LogoutLink>
+      <DropdownItem href={`${window.basePath}/logout`}>
+        <S.LogoutLink>Log out</S.LogoutLink>
       </DropdownItem>
       </DropdownItem>
     </Dropdown>
     </Dropdown>
   ) : null;
   ) : null;

+ 0 - 2
kafka-ui-react-app/src/components/NavBar/UserInfo/__tests__/UserInfo.spec.tsx

@@ -34,7 +34,6 @@ describe('UserInfo', () => {
 
 
     const logout = screen.getByText('Log out');
     const logout = screen.getByText('Log out');
     expect(logout).toBeInTheDocument();
     expect(logout).toBeInTheDocument();
-    expect(logout).toHaveAttribute('href', '/logout');
   });
   });
 
 
   it('should render correct url during basePath initialization', async () => {
   it('should render correct url during basePath initialization', async () => {
@@ -50,7 +49,6 @@ describe('UserInfo', () => {
 
 
     const logout = screen.getByText('Log out');
     const logout = screen.getByText('Log out');
     expect(logout).toBeInTheDocument();
     expect(logout).toBeInTheDocument();
-    expect(logout).toHaveAttribute('href', `${baseUrl}/logout`);
   });
   });
 
 
   it('should not render anything if the username does not exists', () => {
   it('should not render anything if the username does not exists', () => {