Преглед на файлове

Merge branch 'master' into ISSUE_3820

Roman Zabaluev преди 2 години
родител
ревизия
f0665b4275
променени са 22 файла, в които са добавени 844 реда и са изтрити 169 реда
  1. 11 11
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/BasicAuthSecurityConfig.java
  2. 6 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/DisabledAuthSecurityConfig.java
  3. 10 15
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/LdapSecurityConfig.java
  4. 12 17
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/OAuthSecurityConfig.java
  5. 54 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java
  6. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java
  7. 2 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java
  8. 0 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/AvroEmbeddedSerde.java
  9. 9 23
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Base64Serde.java
  10. 80 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/HexSerde.java
  11. 2 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Int64Serde.java
  12. 3 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UInt64Serde.java
  13. 22 28
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UuidBinarySerde.java
  14. 21 35
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java
  15. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java
  16. 181 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java
  17. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporter.java
  18. 2 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
  19. 84 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/HexSerdeTest.java
  20. 214 6
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java
  21. 1 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporterTest.java
  22. 127 1
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

+ 11 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/BasicAuthSecurityConfig.java

@@ -7,12 +7,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
-import org.springframework.security.config.web.server.SecurityWebFiltersOrder;
 import org.springframework.security.config.web.server.ServerHttpSecurity;
 import org.springframework.security.web.server.SecurityWebFilterChain;
 import org.springframework.security.web.server.authentication.RedirectServerAuthenticationSuccessHandler;
 import org.springframework.security.web.server.authentication.logout.RedirectServerLogoutSuccessHandler;
-import org.springframework.security.web.server.ui.LogoutPageGeneratingWebFilter;
 
 @Configuration
 @EnableWebFluxSecurity
@@ -33,15 +31,17 @@ public class BasicAuthSecurityConfig extends AbstractAuthSecurityConfig {
     final var logoutSuccessHandler = new RedirectServerLogoutSuccessHandler();
     logoutSuccessHandler.setLogoutSuccessUrl(URI.create(LOGOUT_URL));
 
-    return http
-        .addFilterAfter(new LogoutPageGeneratingWebFilter(), SecurityWebFiltersOrder.REACTOR_CONTEXT)
-        .csrf().disable()
-        .authorizeExchange()
-        .pathMatchers(AUTH_WHITELIST).permitAll()
-        .anyExchange().authenticated()
-        .and().formLogin().loginPage(LOGIN_URL).authenticationSuccessHandler(authHandler)
-        .and().logout().logoutSuccessHandler(logoutSuccessHandler)
-        .and().build();
+
+    return http.authorizeExchange(spec -> spec
+            .pathMatchers(AUTH_WHITELIST)
+            .permitAll()
+            .anyExchange()
+            .authenticated()
+        )
+        .formLogin(spec -> spec.loginPage(LOGIN_URL).authenticationSuccessHandler(authHandler))
+        .logout(spec -> spec.logoutSuccessHandler(logoutSuccessHandler))
+        .csrf(ServerHttpSecurity.CsrfSpec::disable)
+        .build();
   }
 
 }

+ 6 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/DisabledAuthSecurityConfig.java

@@ -27,10 +27,12 @@ public class DisabledAuthSecurityConfig extends AbstractAuthSecurityConfig {
       System.exit(1);
     }
     log.warn("Authentication is disabled. Access will be unrestricted.");
-    return http.authorizeExchange()
-        .anyExchange().permitAll()
-        .and()
-        .csrf().disable()
+
+    return http.authorizeExchange(spec -> spec
+            .anyExchange()
+            .permitAll()
+        )
+        .csrf(ServerHttpSecurity.CsrfSpec::disable)
         .build();
   }
 

+ 10 - 15
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/LdapSecurityConfig.java

@@ -24,6 +24,7 @@ import org.springframework.security.authentication.AuthenticationManager;
 import org.springframework.security.authentication.ProviderManager;
 import org.springframework.security.authentication.ReactiveAuthenticationManager;
 import org.springframework.security.authentication.ReactiveAuthenticationManagerAdapter;
+import org.springframework.security.config.Customizer;
 import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
 import org.springframework.security.config.web.server.ServerHttpSecurity;
 import org.springframework.security.core.GrantedAuthority;
@@ -126,21 +127,15 @@ public class LdapSecurityConfig {
       log.info("Active Directory support for LDAP has been enabled.");
     }
 
-    return http
-        .authorizeExchange()
-        .pathMatchers(AUTH_WHITELIST)
-        .permitAll()
-        .anyExchange()
-        .authenticated()
-
-        .and()
-        .formLogin()
-
-        .and()
-        .logout()
-
-        .and()
-        .csrf().disable()
+    return http.authorizeExchange(spec -> spec
+            .pathMatchers(AUTH_WHITELIST)
+            .permitAll()
+            .anyExchange()
+            .authenticated()
+        )
+        .formLogin(Customizer.withDefaults())
+        .logout(Customizer.withDefaults())
+        .csrf(ServerHttpSecurity.CsrfSpec::disable)
         .build();
   }
 

+ 12 - 17
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/OAuthSecurityConfig.java

@@ -12,10 +12,11 @@ import lombok.extern.log4j.Log4j2;
 import org.jetbrains.annotations.Nullable;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientProperties;
-import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientPropertiesRegistrationAdapter;
+import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientPropertiesMapper;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.security.config.Customizer;
 import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
 import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
 import org.springframework.security.config.web.server.ServerHttpSecurity;
@@ -49,21 +50,15 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
   public SecurityWebFilterChain configure(ServerHttpSecurity http, OAuthLogoutSuccessHandler logoutHandler) {
     log.info("Configuring OAUTH2 authentication.");
 
-    return http.authorizeExchange()
-        .pathMatchers(AUTH_WHITELIST)
-        .permitAll()
-        .anyExchange()
-        .authenticated()
-
-        .and()
-        .oauth2Login()
-
-        .and()
-        .logout()
-        .logoutSuccessHandler(logoutHandler)
-
-        .and()
-        .csrf().disable()
+    return http.authorizeExchange(spec -> spec
+            .pathMatchers(AUTH_WHITELIST)
+            .permitAll()
+            .anyExchange()
+            .authenticated()
+        )
+        .oauth2Login(Customizer.withDefaults())
+        .logout(spec -> spec.logoutSuccessHandler(logoutHandler))
+        .csrf(ServerHttpSecurity.CsrfSpec::disable)
         .build();
   }
 
@@ -103,7 +98,7 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
   public InMemoryReactiveClientRegistrationRepository clientRegistrationRepository() {
     final OAuth2ClientProperties props = OAuthPropertiesConverter.convertProperties(properties);
     final List<ClientRegistration> registrations =
-        new ArrayList<>(OAuth2ClientPropertiesRegistrationAdapter.getClientRegistrations(props).values());
+        new ArrayList<>(new OAuth2ClientPropertiesMapper(props).asClientRegistrations().values());
     return new InMemoryReactiveClientRegistrationRepository(registrations);
   }
 

+ 54 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java

@@ -2,6 +2,9 @@ package com.provectus.kafka.ui.controller;
 
 import com.provectus.kafka.ui.api.AclsApi;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
+import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
+import com.provectus.kafka.ui.model.CreateProducerAclDTO;
+import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
 import com.provectus.kafka.ui.model.KafkaAclDTO;
 import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
 import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
@@ -123,4 +126,55 @@ public class AclsController extends AbstractController implements AclsApi {
         .doOnEach(sig -> auditService.audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
   }
+
+  @Override
+  public Mono<ResponseEntity<Void>> createConsumerAcl(String clusterName,
+                                                      Mono<CreateConsumerAclDTO> createConsumerAclDto,
+                                                      ServerWebExchange exchange) {
+    AccessContext context = AccessContext.builder()
+        .cluster(clusterName)
+        .aclActions(AclAction.EDIT)
+        .operationName("createConsumerAcl")
+        .build();
+
+    return accessControlService.validateAccess(context)
+        .then(createConsumerAclDto)
+        .flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
+  }
+
+  @Override
+  public Mono<ResponseEntity<Void>> createProducerAcl(String clusterName,
+                                                      Mono<CreateProducerAclDTO> createProducerAclDto,
+                                                      ServerWebExchange exchange) {
+    AccessContext context = AccessContext.builder()
+        .cluster(clusterName)
+        .aclActions(AclAction.EDIT)
+        .operationName("createProducerAcl")
+        .build();
+
+    return accessControlService.validateAccess(context)
+        .then(createProducerAclDto)
+        .flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
+  }
+
+  @Override
+  public Mono<ResponseEntity<Void>> createStreamAppAcl(String clusterName,
+                                                       Mono<CreateStreamAppAclDTO> createStreamAppAclDto,
+                                                       ServerWebExchange exchange) {
+    AccessContext context = AccessContext.builder()
+        .cluster(clusterName)
+        .aclActions(AclAction.EDIT)
+        .operationName("createStreamAppAcl")
+        .build();
+
+    return accessControlService.validateAccess(context)
+        .then(createStreamAppAclDto)
+        .flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
+  }
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java

@@ -34,7 +34,7 @@ public interface KafkaConnectMapper {
       com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse
           connectorPluginConfigValidationResponse);
 
-  default FullConnectorInfoDTO fullConnectorInfoFromTuple(InternalConnectInfo connectInfo) {
+  default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) {
     ConnectorDTO connector = connectInfo.getConnector();
     List<TaskDTO> tasks = connectInfo.getTasks();
     int failedTasksCount = (int) tasks.stream()

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

@@ -12,6 +12,7 @@ import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde;
 import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
 import com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde;
+import com.provectus.kafka.ui.serdes.builtin.HexSerde;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
 import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
@@ -47,6 +48,7 @@ public class SerdesInitializer {
             .put(UInt64Serde.name(), UInt64Serde.class)
             .put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
             .put(Base64Serde.name(), Base64Serde.class)
+            .put(HexSerde.name(), HexSerde.class)
             .put(UuidBinarySerde.name(), UuidBinarySerde.class)
             .build(),
         new CustomSerdeLoader()

+ 0 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/AvroEmbeddedSerde.java

@@ -19,12 +19,6 @@ public class AvroEmbeddedSerde implements BuiltInSerde {
     return "Avro (Embedded)";
   }
 
-  @Override
-  public void configure(PropertyResolver serdeProperties,
-                        PropertyResolver kafkaClusterProperties,
-                        PropertyResolver globalProperties) {
-  }
-
   @Override
   public Optional<String> getDescription() {
     return Optional.empty();

+ 9 - 23
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Base64Serde.java

@@ -1,8 +1,6 @@
 package com.provectus.kafka.ui.serdes.builtin;
 
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
-import com.provectus.kafka.ui.serde.api.PropertyResolver;
-import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import java.util.Base64;
@@ -16,12 +14,6 @@ public class Base64Serde implements BuiltInSerde {
     return "Base64";
   }
 
-  @Override
-  public void configure(PropertyResolver serdeProperties,
-                        PropertyResolver kafkaClusterProperties,
-                        PropertyResolver globalProperties) {
-  }
-
   @Override
   public Optional<String> getDescription() {
     return Optional.empty();
@@ -44,31 +36,25 @@ public class Base64Serde implements BuiltInSerde {
 
   @Override
   public Serializer serializer(String topic, Target type) {
-    return new Serializer() {
-      @Override
-      public byte[] serialize(String input) {
-        input = input.trim();
-        // it is actually a hack to provide ability to sent empty array as a key/value
-        if (input.length() == 0) {
-          return new byte[]{};
-        }
-        return Base64.getDecoder().decode(input);
+    var decoder = Base64.getDecoder();
+    return inputString -> {
+      inputString = inputString.trim();
+      // it is actually a hack to provide ability to sent empty array as a key/value
+      if (inputString.length() == 0) {
+        return new byte[] {};
       }
+      return decoder.decode(inputString);
     };
   }
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
     var encoder = Base64.getEncoder();
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        return new DeserializeResult(
+    return (headers, data) ->
+        new DeserializeResult(
             encoder.encodeToString(data),
             DeserializeResult.Type.STRING,
             Map.of()
         );
-      }
-    };
   }
 }

+ 80 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/HexSerde.java

@@ -0,0 +1,80 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.PropertyResolver;
+import com.provectus.kafka.ui.serde.api.SchemaDescription;
+import com.provectus.kafka.ui.serdes.BuiltInSerde;
+import java.util.HexFormat;
+import java.util.Map;
+import java.util.Optional;
+
+public class HexSerde implements BuiltInSerde {
+
+  private HexFormat deserializeHexFormat;
+
+  public static String name() {
+    return "Hex";
+  }
+
+  @Override
+  public void configure(PropertyResolver serdeProperties,
+                        PropertyResolver kafkaClusterProperties,
+                        PropertyResolver globalProperties) {
+    String delim = serdeProperties.getProperty("delimiter", String.class).orElse(" ");
+    boolean uppercase = serdeProperties.getProperty("uppercase", Boolean.class).orElse(true);
+    deserializeHexFormat = HexFormat.ofDelimiter(delim);
+    if (uppercase) {
+      deserializeHexFormat = deserializeHexFormat.withUpperCase();
+    }
+  }
+
+  @Override
+  public Optional<String> getDescription() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<SchemaDescription> getSchema(String topic, Target type) {
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean canDeserialize(String topic, Target type) {
+    return true;
+  }
+
+  @Override
+  public boolean canSerialize(String topic, Target type) {
+    return true;
+  }
+
+  @Override
+  public Serializer serializer(String topic, Target type) {
+    return input -> {
+      input = input.trim();
+      // it is a hack to provide ability to sent empty array as a key/value
+      if (input.length() == 0) {
+        return new byte[] {};
+      }
+      return HexFormat.of().parseHex(prepareInputForParse(input));
+    };
+  }
+
+  // removing most-common delimiters and prefixes
+  private static String prepareInputForParse(String input) {
+    return input
+        .replaceAll(" ", "")
+        .replaceAll("#", "")
+        .replaceAll(":", "");
+  }
+
+  @Override
+  public Deserializer deserializer(String topic, Target type) {
+    return (headers, data) ->
+        new DeserializeResult(
+            deserializeHexFormat.formatHex(data),
+            DeserializeResult.Type.STRING,
+            Map.of()
+        );
+  }
+}

+ 2 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Int64Serde.java

@@ -55,15 +55,11 @@ public class Int64Serde implements BuiltInSerde {
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        return new DeserializeResult(
+    return (headers, data) ->
+        new DeserializeResult(
             String.valueOf(Longs.fromByteArray(data)),
             DeserializeResult.Type.JSON,
             Map.of()
         );
-      }
-    };
   }
 }

+ 3 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UInt64Serde.java

@@ -1,10 +1,8 @@
 package com.provectus.kafka.ui.serdes.builtin;
 
 import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedInteger;
 import com.google.common.primitives.UnsignedLong;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
-import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import java.util.Map;
@@ -32,7 +30,7 @@ public class UInt64Serde implements BuiltInSerde {
                     + "  \"minimum\" : 0, "
                     + "  \"maximum\" : %s "
                     + "}",
-                UnsignedInteger.MAX_VALUE
+                UnsignedLong.MAX_VALUE
             ),
             Map.of()
         )
@@ -56,15 +54,11 @@ public class UInt64Serde implements BuiltInSerde {
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        return new DeserializeResult(
+    return (headers, data) ->
+        new DeserializeResult(
             UnsignedLong.fromLongBits(Longs.fromByteArray(data)).toString(),
             DeserializeResult.Type.JSON,
             Map.of()
         );
-      }
-    };
   }
 }

+ 22 - 28
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UuidBinarySerde.java

@@ -50,41 +50,35 @@ public class UuidBinarySerde implements BuiltInSerde {
 
   @Override
   public Serializer serializer(String topic, Target type) {
-    return new Serializer() {
-      @Override
-      public byte[] serialize(String input) {
-        UUID uuid = UUID.fromString(input);
-        ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
-        if (mostSignificantBitsFirst) {
-          bb.putLong(uuid.getMostSignificantBits());
-          bb.putLong(uuid.getLeastSignificantBits());
-        } else {
-          bb.putLong(uuid.getLeastSignificantBits());
-          bb.putLong(uuid.getMostSignificantBits());
-        }
-        return bb.array();
+    return input -> {
+      UUID uuid = UUID.fromString(input);
+      ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
+      if (mostSignificantBitsFirst) {
+        bb.putLong(uuid.getMostSignificantBits());
+        bb.putLong(uuid.getLeastSignificantBits());
+      } else {
+        bb.putLong(uuid.getLeastSignificantBits());
+        bb.putLong(uuid.getMostSignificantBits());
       }
+      return bb.array();
     };
   }
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        if (data.length != 16) {
-          throw new ValidationException("UUID data should be 16 bytes, but it is " + data.length);
-        }
-        ByteBuffer bb = ByteBuffer.wrap(data);
-        long msb = bb.getLong();
-        long lsb = bb.getLong();
-        UUID uuid = mostSignificantBitsFirst ? new UUID(msb, lsb) : new UUID(lsb, msb);
-        return new DeserializeResult(
-            uuid.toString(),
-            DeserializeResult.Type.STRING,
-            Map.of()
-        );
+    return (headers, data) -> {
+      if (data.length != 16) {
+        throw new ValidationException("UUID data should be 16 bytes, but it is " + data.length);
       }
+      ByteBuffer bb = ByteBuffer.wrap(data);
+      long msb = bb.getLong();
+      long lsb = bb.getLong();
+      UUID uuid = mostSignificantBitsFirst ? new UUID(msb, lsb) : new UUID(lsb, msb);
+      return new DeserializeResult(
+          uuid.toString(),
+          DeserializeResult.Type.STRING,
+          Map.of()
+      );
     };
   }
 }

+ 21 - 35
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
@@ -39,7 +38,6 @@ import org.springframework.stereotype.Service;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuples;
 
 @Service
 @Slf4j
@@ -61,39 +59,22 @@ public class KafkaConnectService {
   public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
                                                      @Nullable final String search) {
     return getConnects(cluster)
-        .flatMap(connect -> getConnectorNames(cluster, connect.getName()).map(cn -> Tuples.of(connect.getName(), cn)))
-        .flatMap(pair -> getConnector(cluster, pair.getT1(), pair.getT2()))
-        .flatMap(connector ->
-            getConnectorConfig(cluster, connector.getConnect(), connector.getName())
-                .map(config -> InternalConnectInfo.builder()
-                    .connector(connector)
-                    .config(config)
-                    .build()
-                )
-        )
-        .flatMap(connectInfo -> {
-          ConnectorDTO connector = connectInfo.getConnector();
-          return getConnectorTasks(cluster, connector.getConnect(), connector.getName())
-              .collectList()
-              .map(tasks -> InternalConnectInfo.builder()
-                  .connector(connector)
-                  .config(connectInfo.getConfig())
-                  .tasks(tasks)
-                  .build()
-              );
-        })
-        .flatMap(connectInfo -> {
-          ConnectorDTO connector = connectInfo.getConnector();
-          return getConnectorTopics(cluster, connector.getConnect(), connector.getName())
-              .map(ct -> InternalConnectInfo.builder()
-                  .connector(connector)
-                  .config(connectInfo.getConfig())
-                  .tasks(connectInfo.getTasks())
-                  .topics(ct.getTopics())
-                  .build()
-              );
-        })
-        .map(kafkaConnectMapper::fullConnectorInfoFromTuple)
+        .flatMap(connect ->
+            getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
+                .flatMap(connectorName ->
+                    Mono.zip(
+                        getConnector(cluster, connect.getName(), connectorName),
+                        getConnectorConfig(cluster, connect.getName(), connectorName),
+                        getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
+                        getConnectorTopics(cluster, connect.getName(), connectorName)
+                    ).map(tuple ->
+                        InternalConnectInfo.builder()
+                            .connector(tuple.getT1())
+                            .config(tuple.getT2())
+                            .tasks(tuple.getT3())
+                            .topics(tuple.getT4().getTopics())
+                            .build())))
+        .map(kafkaConnectMapper::fullConnectorInfo)
         .filter(matchesSearchTerm(search));
   }
 
@@ -132,6 +113,11 @@ public class KafkaConnectService {
         .flatMapMany(Flux::fromIterable);
   }
 
+  // returns empty flux if there was an error communicating with Connect
+  public Flux<String> getConnectorNamesWithErrorsSuppress(KafkaCluster cluster, String connectName) {
+    return getConnectorNames(cluster, connectName).onErrorComplete();
+  }
+
   @SneakyThrows
   private List<String> parseConnectorsNamesStringToList(String json) {
     return objectMapper.readValue(json, new TypeReference<>() {

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java

@@ -125,7 +125,7 @@ public class SchemaRegistryService {
         .onErrorMap(WebClientResponseException.Conflict.class,
             th -> new SchemaCompatibilityException())
         .onErrorMap(WebClientResponseException.UnprocessableEntity.class,
-            th -> new ValidationException("Invalid schema"))
+            th -> new ValidationException("Invalid schema. Error from registry: " + th.getResponseBodyAsString()))
         .then(getLatestSchemaVersionBySubject(cluster, subject));
   }
 

+ 181 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java

@@ -1,16 +1,44 @@
 package com.provectus.kafka.ui.service.acl;
 
+import static org.apache.kafka.common.acl.AclOperation.ALL;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
+import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
+import static org.apache.kafka.common.resource.ResourceType.GROUP;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID;
+
 import com.google.common.collect.Sets;
+import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
+import com.provectus.kafka.ui.model.CreateProducerAclDTO;
+import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.AdminClientService;
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
+import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
 import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -22,11 +50,14 @@ public class AclsService {
   private final AdminClientService adminClientService;
 
   public Mono<Void> createAcl(KafkaCluster cluster, AclBinding aclBinding) {
-    var aclString = AclCsv.createAclString(aclBinding);
-    log.info("CREATING ACL: [{}]", aclString);
     return adminClientService.get(cluster)
-        .flatMap(ac -> ac.createAcls(List.of(aclBinding)))
-        .doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString));
+        .flatMap(ac -> createAclsWithLogging(ac, List.of(aclBinding)));
+  }
+
+  private Mono<Void> createAclsWithLogging(ReactiveAdminClient ac, Collection<AclBinding> bindings) {
+    bindings.forEach(b -> log.info("CREATING ACL: [{}]", AclCsv.createAclString(b)));
+    return ac.createAcls(bindings)
+        .doOnSuccess(v -> bindings.forEach(b -> log.info("ACL CREATED: [{}]", AclCsv.createAclString(b))));
   }
 
   public Mono<Void> deleteAcl(KafkaCluster cluster, AclBinding aclBinding) {
@@ -92,4 +123,150 @@ public class AclsService {
     }
   }
 
+  // creates allow binding for resources by prefix or specific names list
+  private List<AclBinding> createAllowBindings(ResourceType resourceType,
+                                               List<AclOperation> opsToAllow,
+                                               String principal,
+                                               String host,
+                                               @Nullable String resourcePrefix,
+                                               @Nullable Collection<String> resourceNames) {
+    List<AclBinding> bindings = new ArrayList<>();
+    if (resourcePrefix != null) {
+      for (var op : opsToAllow) {
+        bindings.add(
+            new AclBinding(
+                new ResourcePattern(resourceType, resourcePrefix, PREFIXED),
+                new AccessControlEntry(principal, host, op, ALLOW)));
+      }
+    }
+    if (!CollectionUtils.isEmpty(resourceNames)) {
+      resourceNames.stream()
+          .distinct()
+          .forEach(resource ->
+              opsToAllow.forEach(op ->
+                  bindings.add(
+                      new AclBinding(
+                          new ResourcePattern(resourceType, resource, LITERAL),
+                          new AccessControlEntry(principal, host, op, ALLOW)))));
+    }
+    return bindings;
+  }
+
+  public Mono<Void> createConsumerAcl(KafkaCluster cluster, CreateConsumerAclDTO request) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> createAclsWithLogging(ac, createConsumerBindings(request)))
+        .then();
+  }
+
+  //Read, Describe on topics, Read on consumerGroups
+  private List<AclBinding> createConsumerBindings(CreateConsumerAclDTO request) {
+    List<AclBinding> bindings = new ArrayList<>();
+    bindings.addAll(
+        createAllowBindings(TOPIC,
+            List.of(READ, DESCRIBE),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getTopicsPrefix(),
+            request.getTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            GROUP,
+            List.of(READ),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getConsumerGroupsPrefix(),
+            request.getConsumerGroups()));
+    return bindings;
+  }
+
+  public Mono<Void> createProducerAcl(KafkaCluster cluster, CreateProducerAclDTO request) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> createAclsWithLogging(ac, createProducerBindings(request)))
+        .then();
+  }
+
+  //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
+  //IDEMPOTENT_WRITE on cluster if idempotent is enabled
+  private List<AclBinding> createProducerBindings(CreateProducerAclDTO request) {
+    List<AclBinding> bindings = new ArrayList<>();
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(WRITE, DESCRIBE, CREATE),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getTopicsPrefix(),
+            request.getTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            TRANSACTIONAL_ID,
+            List.of(WRITE, DESCRIBE),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getTransactionsIdPrefix(),
+            Optional.ofNullable(request.getTransactionalId()).map(List::of).orElse(null)));
+
+    if (Boolean.TRUE.equals(request.getIdempotent())) {
+      bindings.addAll(
+          createAllowBindings(
+              CLUSTER,
+              List.of(IDEMPOTENT_WRITE),
+              request.getPrincipal(),
+              request.getHost(),
+              null,
+              List.of(Resource.CLUSTER_NAME))); // cluster name is a const string in ACL api
+    }
+    return bindings;
+  }
+
+  public Mono<Void> createStreamAppAcl(KafkaCluster cluster, CreateStreamAppAclDTO request) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> createAclsWithLogging(ac, createStreamAppBindings(request)))
+        .then();
+  }
+
+  // Read on input topics, Write on output topics
+  // ALL on applicationId-prefixed Groups and Topics
+  private List<AclBinding> createStreamAppBindings(CreateStreamAppAclDTO request) {
+    List<AclBinding> bindings = new ArrayList<>();
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(READ),
+            request.getPrincipal(),
+            request.getHost(),
+            null,
+            request.getInputTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(WRITE),
+            request.getPrincipal(),
+            request.getHost(),
+            null,
+            request.getOutputTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            GROUP,
+            List.of(ALL),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getApplicationId(),
+            null));
+
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(ALL),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getApplicationId(),
+            null));
+    return bindings;
+  }
+
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporter.java

@@ -25,7 +25,7 @@ class ConnectorsExporter {
 
   Flux<DataEntityList> export(KafkaCluster cluster) {
     return kafkaConnectService.getConnects(cluster)
-        .flatMap(connect -> kafkaConnectService.getConnectorNames(cluster, connect.getName())
+        .flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
             .flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName))
             .flatMap(connectorDTO ->
                 kafkaConnectService.getConnectorTopics(cluster, connect.getName(), connectorDTO.getName())

+ 2 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java

@@ -77,6 +77,8 @@ public abstract class AbstractIntegrationTest {
       System.setProperty("kafka.clusters.0.kafkaConnect.0.userName", "kafka-connect");
       System.setProperty("kafka.clusters.0.kafkaConnect.0.password", "kafka-connect");
       System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget());
+      System.setProperty("kafka.clusters.0.kafkaConnect.1.name", "notavailable");
+      System.setProperty("kafka.clusters.0.kafkaConnect.1.address", "http://notavailable:6666");
       System.setProperty("kafka.clusters.0.masking.0.type", "REPLACE");
       System.setProperty("kafka.clusters.0.masking.0.replacement", "***");
       System.setProperty("kafka.clusters.0.masking.0.topicValuesPattern", "masking-test-.*");

+ 84 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/HexSerdeTest.java

@@ -0,0 +1,84 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
+import com.provectus.kafka.ui.serdes.RecordHeadersImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class HexSerdeTest {
+
+  private static final byte[] TEST_BYTES = "hello world".getBytes();
+  private static final String TEST_BYTES_HEX_ENCODED = "68 65 6C 6C 6F 20 77 6F 72 6C 64";
+
+  private Serde hexSerde;
+
+  @BeforeEach
+  void init() {
+    hexSerde = new HexSerde();
+    hexSerde.configure(
+        PropertyResolverImpl.empty(),
+        PropertyResolverImpl.empty(),
+        PropertyResolverImpl.empty()
+    );
+  }
+
+
+  @ParameterizedTest
+  @CsvSource({
+      "68656C6C6F20776F726C64", // uppercase
+      "68656c6c6f20776f726c64", // lowercase
+      "68:65:6c:6c:6f:20:77:6f:72:6c:64", // ':' delim
+      "68 65 6C 6C 6F 20 77 6F 72 6C 64", // space delim, UC
+      "68 65 6c 6c 6f 20 77 6f 72 6c 64", // space delim, LC
+      "#68 #65 #6C #6C #6F #20 #77 #6F #72 #6C #64"  // '#' prefix, space delim
+  })
+  void serializesInputAsHexString(String hexString) {
+    for (Serde.Target type : Serde.Target.values()) {
+      var serializer = hexSerde.serializer("anyTopic", type);
+      byte[] bytes = serializer.serialize(hexString);
+      assertThat(bytes).isEqualTo(TEST_BYTES);
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void serializesEmptyStringAsEmptyBytesArray(Serde.Target type) {
+    var serializer = hexSerde.serializer("anyTopic", type);
+    byte[] bytes = serializer.serialize("");
+    assertThat(bytes).isEqualTo(new byte[] {});
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void deserializesDataAsHexBytes(Serde.Target type) {
+    var deserializer = hexSerde.deserializer("anyTopic", type);
+    var result = deserializer.deserialize(new RecordHeadersImpl(), TEST_BYTES);
+    assertThat(result.getResult()).isEqualTo(TEST_BYTES_HEX_ENCODED);
+    assertThat(result.getType()).isEqualTo(DeserializeResult.Type.STRING);
+    assertThat(result.getAdditionalProperties()).isEmpty();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void getSchemaReturnsEmpty(Serde.Target type) {
+    assertThat(hexSerde.getSchema("anyTopic", type)).isEmpty();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void canDeserializeReturnsTrueForAllInputs(Serde.Target type) {
+    assertThat(hexSerde.canDeserialize("anyTopic", type)).isTrue();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void canSerializeReturnsTrueForAllInput(Serde.Target type) {
+    assertThat(hexSerde.canSerialize("anyTopic", type)).isTrue();
+  }
+}

+ 214 - 6
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java

@@ -4,16 +4,21 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
+import com.provectus.kafka.ui.model.CreateProducerAclDTO;
+import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.AdminClientService;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
@@ -53,12 +58,12 @@ class AclsServiceTest {
     when(adminClientMock.listAcls(ResourcePatternFilter.ANY))
         .thenReturn(Mono.just(List.of(existingBinding1, existingBinding2)));
 
-    ArgumentCaptor<?> createdCaptor = ArgumentCaptor.forClass(Collection.class);
-    when(adminClientMock.createAcls((Collection<AclBinding>) createdCaptor.capture()))
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
         .thenReturn(Mono.empty());
 
-    ArgumentCaptor<?> deletedCaptor = ArgumentCaptor.forClass(Collection.class);
-    when(adminClientMock.deleteAcls((Collection<AclBinding>) deletedCaptor.capture()))
+    ArgumentCaptor<Collection<AclBinding>> deletedCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.deleteAcls(deletedCaptor.capture()))
         .thenReturn(Mono.empty());
 
     aclsService.syncAclWithAclCsv(
@@ -68,15 +73,218 @@ class AclsServiceTest {
             + "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost"
     ).block();
 
-    Collection<AclBinding> createdBindings = (Collection<AclBinding>) createdCaptor.getValue();
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
     assertThat(createdBindings)
         .hasSize(1)
         .contains(newBindingToBeAdded);
 
-    Collection<AclBinding> deletedBindings = (Collection<AclBinding>) deletedCaptor.getValue();
+    Collection<AclBinding> deletedBindings = deletedCaptor.getValue();
     assertThat(deletedBindings)
         .hasSize(1)
         .contains(existingBinding2);
   }
 
+
+  @Test
+  void createsConsumerDependantAcls() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createConsumerAcl(
+        CLUSTER,
+        new CreateConsumerAclDTO()
+            .principal(principal)
+            .host(host)
+            .consumerGroups(List.of("cg1", "cg2"))
+            .topics(List.of("t1", "t2"))
+    ).block();
+
+    //Read, Describe on topics, Read on consumerGroups
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(6)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "cg1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "cg2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)));
+  }
+
+  @Test
+  void createsConsumerDependantAclsWhenTopicsAndGroupsSpecifiedByPrefix() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createConsumerAcl(
+        CLUSTER,
+        new CreateConsumerAclDTO()
+            .principal(principal)
+            .host(host)
+            .consumerGroupsPrefix("cgPref")
+            .topicsPrefix("topicPref")
+    ).block();
+
+    //Read, Describe on topics, Read on consumerGroups
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(3)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "cgPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)));
+  }
+
+  @Test
+  void createsProducerDependantAcls() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createProducerAcl(
+        CLUSTER,
+        new CreateProducerAclDTO()
+            .principal(principal)
+            .host(host)
+            .topics(List.of("t1"))
+            .idempotent(true)
+            .transactionalId("txId1")
+    ).block();
+
+    //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
+    //IDEMPOTENT_WRITE on cluster if idempotent is enabled (true)
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(6)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.IDEMPOTENT_WRITE, AclPermissionType.ALLOW)));
+  }
+
+
+  @Test
+  void createsProducerDependantAclsWhenTopicsAndTxIdSpecifiedByPrefix() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createProducerAcl(
+        CLUSTER,
+        new CreateProducerAclDTO()
+            .principal(principal)
+            .host(host)
+            .topicsPrefix("topicPref")
+            .transactionsIdPrefix("txIdPref")
+            .idempotent(false)
+    ).block();
+
+    //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
+    //IDEMPOTENT_WRITE on cluster if idempotent is enabled (false)
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(5)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)));
+  }
+
+
+  @Test
+  void createsStreamAppDependantAcls() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createStreamAppAcl(
+        CLUSTER,
+        new CreateStreamAppAclDTO()
+            .principal(principal)
+            .host(host)
+            .inputTopics(List.of("t1"))
+            .outputTopics(List.of("t2", "t3"))
+            .applicationId("appId1")
+    ).block();
+
+    // Read on input topics, Write on output topics
+    // ALL on applicationId-prefixed Groups and Topics
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(5)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t3", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "appId1", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "appId1", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW)));
+  }
 }

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporterTest.java

@@ -61,7 +61,7 @@ class ConnectorsExporterTest {
     when(kafkaConnectService.getConnects(CLUSTER))
         .thenReturn(Flux.just(connect));
 
-    when(kafkaConnectService.getConnectorNames(CLUSTER, connect.getName()))
+    when(kafkaConnectService.getConnectorNamesWithErrorsSuppress(CLUSTER, connect.getName()))
         .thenReturn(Flux.just(sinkConnector.getName(), sourceConnector.getName()));
 
     when(kafkaConnectService.getConnector(CLUSTER, connect.getName(), sinkConnector.getName()))

+ 127 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -1868,6 +1868,69 @@ paths:
         404:
           description: Acl not found
 
+  /api/clusters/{clusterName}/acl/consumer:
+    post:
+      tags:
+        - Acls
+      summary: createConsumerAcl
+      operationId: createConsumerAcl
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateConsumerAcl'
+      responses:
+        200:
+          description: OK
+
+  /api/clusters/{clusterName}/acl/producer:
+    post:
+      tags:
+        - Acls
+      summary: createProducerAcl
+      operationId: createProducerAcl
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateProducerAcl'
+      responses:
+        200:
+          description: OK
+
+  /api/clusters/{clusterName}/acl/streamApp:
+    post:
+      tags:
+        - Acls
+      summary: createStreamAppAcl
+      operationId: createStreamAppAcl
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateStreamAppAcl'
+      responses:
+        200:
+          description: OK
+
   /api/authorization:
     get:
       tags:
@@ -3551,7 +3614,7 @@ components:
         principal:
           type: string
         host:
-          type: string  # "*" if acl can be applied to any resource of given type
+          type: string
         operation:
           type: string
           enum:
@@ -3575,6 +3638,69 @@ components:
             - ALLOW
             - DENY
 
+    CreateConsumerAcl:
+      type: object
+      required: [principal, host]
+      properties:
+        principal:
+          type: string
+        host:
+          type: string
+        topics:
+          type: array
+          items:
+            type: string
+        topicsPrefix:
+          type: string
+        consumerGroups:
+          type: array
+          items:
+            type: string
+        consumerGroupsPrefix:
+          type: string
+
+    CreateProducerAcl:
+      type: object
+      required: [principal, host]
+      properties:
+        principal:
+          type: string
+        host:
+          type: string
+        topics:
+          type: array
+          items:
+            type: string
+        topicsPrefix:
+          type: string
+        transactionalId:
+          type: string
+        transactionsIdPrefix:
+          type: string
+        idempotent:
+          type: boolean
+          default: false
+
+    CreateStreamAppAcl:
+      type: object
+      required: [principal, host, applicationId, inputTopics, outputTopics]
+      properties:
+        principal:
+          type: string
+        host:
+          type: string
+        inputTopics:
+          type: array
+          items:
+            type: string
+        outputTopics:
+          type: array
+          items:
+            type: string
+        applicationId:
+          nullable: false
+          type: string
+
     KafkaAclResourceType:
       type: string
       enum: