Jelajahi Sumber

Merge branch 'master' into rbac_classes_refactoring

Ilya Kuramshin 1 tahun lalu
induk
melakukan
dcd4fb2ac1

+ 29 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -22,6 +22,7 @@ import com.provectus.kafka.ui.model.TopicConfigDTO;
 import com.provectus.kafka.ui.model.TopicCreationDTO;
 import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
+import com.provectus.kafka.ui.model.TopicProducerStateDTO;
 import com.provectus.kafka.ui.model.TopicUpdateDTO;
 import com.provectus.kafka.ui.model.TopicsResponseDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
@@ -316,6 +317,34 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .doOnEach(sig -> audit(context, sig));
   }
 
+  @Override
+  public Mono<ResponseEntity<Flux<TopicProducerStateDTO>>> getActiveProducerStates(String clusterName,
+                                                                                   String topicName,
+                                                                                   ServerWebExchange exchange) {
+    var context = AccessContext.builder()
+        .cluster(clusterName)
+        .topic(topicName)
+        .topicActions(VIEW)
+        .operationName("getActiveProducerStates")
+        .build();
+
+    Comparator<TopicProducerStateDTO> ordering =
+        Comparator.comparingInt(TopicProducerStateDTO::getPartition)
+            .thenComparing(Comparator.comparing(TopicProducerStateDTO::getProducerId).reversed());
+
+    Flux<TopicProducerStateDTO> states = topicsService.getActiveProducersState(getCluster(clusterName), topicName)
+        .flatMapMany(statesMap ->
+            Flux.fromStream(
+                statesMap.entrySet().stream()
+                    .flatMap(e -> e.getValue().stream().map(p -> clusterMapper.map(e.getKey().partition(), p)))
+                    .sorted(ordering)));
+
+    return validateAccess(context)
+        .thenReturn(states)
+        .map(ResponseEntity::ok)
+        .doOnEach(sig -> audit(context, sig));
+  }
+
   private Comparator<InternalTopic> getComparatorForTopic(
       TopicColumnsToSortDTO orderBy) {
     var defaultComparator = Comparator.comparing(InternalTopic::getName);

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -30,10 +30,12 @@ import com.provectus.kafka.ui.model.ReplicaDTO;
 import com.provectus.kafka.ui.model.TopicConfigDTO;
 import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
+import com.provectus.kafka.ui.model.TopicProducerStateDTO;
 import com.provectus.kafka.ui.service.metrics.RawMetric;
 import java.util.List;
 import java.util.Map;
 import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.ProducerState;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclOperation;
@@ -117,6 +119,17 @@ public interface ClusterMapper {
     return brokerDiskUsage;
   }
 
+  default TopicProducerStateDTO map(int partition, ProducerState state) {
+    return new TopicProducerStateDTO()
+        .partition(partition)
+        .producerId(state.producerId())
+        .producerEpoch(state.producerEpoch())
+        .lastSequence(state.lastSequence())
+        .lastTimestampMs(state.lastTimestamp())
+        .coordinatorEpoch(state.coordinatorEpoch().stream().boxed().findAny().orElse(null))
+        .currentTransactionStartOffset(state.currentTransactionStartOffset().stream().boxed().findAny().orElse(null));
+  }
+
   static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) {
     return switch (operation) {
       case ALL -> KafkaAclDTO.OperationEnum.ALL;

+ 0 - 46
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/AvroSchemaRegistrySerializer.java

@@ -1,46 +0,0 @@
-package com.provectus.kafka.ui.serdes.builtin.sr;
-
-import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
-import io.confluent.kafka.schemaregistry.ParsedSchema;
-import io.confluent.kafka.schemaregistry.avro.AvroSchema;
-import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
-import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
-import io.confluent.kafka.serializers.KafkaAvroSerializer;
-import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
-import java.util.Map;
-import org.apache.kafka.common.serialization.Serializer;
-
-class AvroSchemaRegistrySerializer extends SchemaRegistrySerializer<Object> {
-
-  AvroSchemaRegistrySerializer(String topic, boolean isKey,
-                               SchemaRegistryClient client,
-                               SchemaMetadata schema) {
-    super(topic, isKey, client, schema);
-  }
-
-  @Override
-  protected Serializer<Object> createSerializer(SchemaRegistryClient client) {
-    var serializer = new KafkaAvroSerializer(client);
-    serializer.configure(
-        Map.of(
-            "schema.registry.url", "wontbeused",
-            AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
-            KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true,
-            AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
-        ),
-        isKey
-    );
-    return serializer;
-  }
-
-  @Override
-  protected Object serialize(String value, ParsedSchema schema) {
-    try {
-      return JsonAvroConversion.convertJsonToAvro(value, ((AvroSchema) schema).rawSchema());
-    } catch (Throwable e) {
-      throw new RuntimeException("Failed to serialize record for topic " + topic, e);
-    }
-
-  }
-}

+ 0 - 79
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/JsonSchemaSchemaRegistrySerializer.java

@@ -1,79 +0,0 @@
-package com.provectus.kafka.ui.serdes.builtin.sr;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
-import io.confluent.kafka.schemaregistry.ParsedSchema;
-import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
-import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.json.JsonSchema;
-import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
-import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
-import java.util.Map;
-import org.apache.kafka.common.serialization.Serializer;
-
-class JsonSchemaSchemaRegistrySerializer extends SchemaRegistrySerializer<JsonNode> {
-
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-
-  JsonSchemaSchemaRegistrySerializer(String topic,
-                                            boolean isKey,
-                                            SchemaRegistryClient client,
-                                            SchemaMetadata schema) {
-    super(topic, isKey, client, schema);
-  }
-
-  @Override
-  protected Serializer<JsonNode> createSerializer(SchemaRegistryClient client) {
-    var serializer = new KafkaJsonSchemaSerializerWithoutSchemaInfer(client);
-    serializer.configure(
-        Map.of(
-            "schema.registry.url", "wontbeused",
-            AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
-            AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
-        ),
-        isKey
-    );
-    return serializer;
-  }
-
-  @Override
-  protected JsonNode serialize(String value, ParsedSchema schema) {
-    try {
-      JsonNode json = MAPPER.readTree(value);
-      ((JsonSchema) schema).validate(json);
-      return json;
-    } catch (JsonProcessingException e) {
-      throw new ValidationException(String.format("'%s' is not valid json", value));
-    } catch (org.everit.json.schema.ValidationException e) {
-      throw new ValidationException(
-          String.format("'%s' does not fit schema: %s", value, e.getAllMessages()));
-    }
-  }
-
-  @KafkaClientInternalsDependant
-  private class KafkaJsonSchemaSerializerWithoutSchemaInfer
-      extends KafkaJsonSchemaSerializer<JsonNode> {
-
-    KafkaJsonSchemaSerializerWithoutSchemaInfer(SchemaRegistryClient client) {
-      super(client);
-    }
-
-    /**
-     * Need to override original method because it tries to infer schema from input
-     * by checking 'schema' json field or @Schema annotation on input class, which is not
-     * possible in our case. So, we just skip all infer logic and pass schema directly.
-     */
-    @Override
-    public byte[] serialize(String topic, JsonNode rec) {
-      return super.serializeImpl(
-          super.getSubjectName(topic, isKey, rec, schema),
-          rec,
-          (JsonSchema) schema
-      );
-    }
-  }
-
-}

+ 0 - 50
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/ProtobufSchemaRegistrySerializer.java

@@ -1,50 +0,0 @@
-package com.provectus.kafka.ui.serdes.builtin.sr;
-
-import com.google.protobuf.DynamicMessage;
-import com.google.protobuf.Message;
-import com.google.protobuf.util.JsonFormat;
-import io.confluent.kafka.schemaregistry.ParsedSchema;
-import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
-import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
-import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
-import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
-import java.util.Map;
-import lombok.SneakyThrows;
-import org.apache.kafka.common.serialization.Serializer;
-
-class ProtobufSchemaRegistrySerializer extends SchemaRegistrySerializer<Message> {
-
-  @SneakyThrows
-  public ProtobufSchemaRegistrySerializer(String topic, boolean isKey,
-                                          SchemaRegistryClient client, SchemaMetadata schema) {
-    super(topic, isKey, client, schema);
-  }
-
-  @Override
-  protected Serializer<Message> createSerializer(SchemaRegistryClient client) {
-    var serializer = new KafkaProtobufSerializer<>(client);
-    serializer.configure(
-        Map.of(
-            "schema.registry.url", "wontbeused",
-            AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
-            AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
-        ),
-        isKey
-    );
-    return serializer;
-  }
-
-  @Override
-  protected Message serialize(String value, ParsedSchema schema) {
-    ProtobufSchema protobufSchema = (ProtobufSchema) schema;
-    DynamicMessage.Builder builder = protobufSchema.newMessageBuilder();
-    try {
-      JsonFormat.parser().merge(value, builder);
-      return builder.build();
-    } catch (Throwable e) {
-      throw new RuntimeException("Failed to serialize record for topic " + topic, e);
-    }
-  }
-
-}

+ 25 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java

@@ -1,5 +1,8 @@
 package com.provectus.kafka.ui.serdes.builtin.sr;
 
+import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeAvro;
+import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeJson;
+import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeProto;
 import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
 import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;
 
@@ -7,7 +10,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
 import com.provectus.kafka.ui.serde.api.PropertyResolver;
-import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter;
@@ -32,13 +34,15 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
-import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.kafka.common.config.SslConfigs;
 
 
 public class SchemaRegistrySerde implements BuiltInSerde {
 
+  private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0;
+  private static final int SR_PAYLOAD_PREFIX_LENGTH = 5;
+
   public static String name() {
     return "SchemaRegistry";
   }
@@ -221,8 +225,8 @@ public class SchemaRegistrySerde implements BuiltInSerde {
           .convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
           .toJson();
       case JSON ->
-        //need to use confluent JsonSchema since it includes resolved references
-        ((JsonSchema) parsedSchema).rawSchema().toString();
+          //need to use confluent JsonSchema since it includes resolved references
+          ((JsonSchema) parsedSchema).rawSchema().toString();
     };
   }
 
@@ -254,35 +258,27 @@ public class SchemaRegistrySerde implements BuiltInSerde {
   @Override
   public Serializer serializer(String topic, Target type) {
     String subject = schemaSubject(topic, type);
-    var schema = getSchemaBySubject(subject)
-        .orElseThrow(() -> new ValidationException(String.format("No schema for subject '%s' found", subject)));
-    boolean isKey = type == Target.KEY;
-    SchemaType schemaType = SchemaType.fromString(schema.getSchemaType())
-        .orElseThrow(() -> new IllegalStateException("Unknown schema type: " + schema.getSchemaType()));
+    SchemaMetadata meta = getSchemaBySubject(subject)
+        .orElseThrow(() -> new ValidationException(
+            String.format("No schema for subject '%s' found", subject)));
+    ParsedSchema schema = getSchemaById(meta.getId())
+        .orElseThrow(() -> new IllegalStateException(
+            String.format("Schema found for id %s, subject '%s'", meta.getId(), subject)));
+    SchemaType schemaType = SchemaType.fromString(meta.getSchemaType())
+        .orElseThrow(() -> new IllegalStateException("Unknown schema type: " + meta.getSchemaType()));
     return switch (schemaType) {
-      case PROTOBUF -> new ProtobufSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
-      case AVRO -> new AvroSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
-      case JSON -> new JsonSchemaSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
+      case PROTOBUF -> input ->
+          serializeProto(schemaRegistryClient, topic, type, (ProtobufSchema) schema, meta.getId(), input);
+      case AVRO -> input ->
+          serializeAvro((AvroSchema) schema, meta.getId(), input);
+      case JSON -> input ->
+          serializeJson((JsonSchema) schema, meta.getId(), input);
     };
   }
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
-    return new SrDeserializer(topic);
-  }
-
-  ///--------------------------------------------------------------
-
-  private static final byte SR_RECORD_MAGIC_BYTE = (byte) 0;
-  private static final int SR_RECORD_PREFIX_LENGTH = 5;
-
-  @RequiredArgsConstructor
-  private class SrDeserializer implements Deserializer {
-
-    private final String topic;
-
-    @Override
-    public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
+    return (headers, data) -> {
       var schemaId = extractSchemaIdFromMsg(data);
       SchemaType format = getMessageFormatBySchemaId(schemaId);
       MessageFormatter formatter = schemaRegistryFormatters.get(format);
@@ -294,7 +290,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
               "type", format.name()
           )
       );
-    }
+    };
   }
 
   private SchemaType getMessageFormatBySchemaId(int schemaId) {
@@ -306,7 +302,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
 
   private int extractSchemaIdFromMsg(byte[] data) {
     ByteBuffer buffer = ByteBuffer.wrap(data);
-    if (buffer.remaining() > SR_RECORD_PREFIX_LENGTH && buffer.get() == SR_RECORD_MAGIC_BYTE) {
+    if (buffer.remaining() >= SR_PAYLOAD_PREFIX_LENGTH && buffer.get() == SR_PAYLOAD_MAGIC_BYTE) {
       return buffer.getInt();
     }
     throw new ValidationException(

+ 0 - 34
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerializer.java

@@ -1,34 +0,0 @@
-package com.provectus.kafka.ui.serdes.builtin.sr;
-
-import com.provectus.kafka.ui.serde.api.Serde;
-import io.confluent.kafka.schemaregistry.ParsedSchema;
-import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
-import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import lombok.SneakyThrows;
-import org.apache.kafka.common.serialization.Serializer;
-
-abstract class SchemaRegistrySerializer<T> implements Serde.Serializer {
-  protected final Serializer<T> serializer;
-  protected final String topic;
-  protected final boolean isKey;
-  protected final ParsedSchema schema;
-
-  @SneakyThrows
-  protected SchemaRegistrySerializer(String topic, boolean isKey, SchemaRegistryClient client,
-                                     SchemaMetadata schema) {
-    this.topic = topic;
-    this.isKey = isKey;
-    this.serializer = createSerializer(client);
-    this.schema = client.getSchemaById(schema.getId());
-  }
-
-  protected abstract Serializer<T> createSerializer(SchemaRegistryClient client);
-
-  @Override
-  public byte[] serialize(String input) {
-    final T read = this.serialize(input, schema);
-    return this.serializer.serialize(topic, read);
-  }
-
-  protected abstract T serialize(String value, ParsedSchema schema);
-}

+ 126 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/Serialize.java

@@ -0,0 +1,126 @@
+package com.provectus.kafka.ui.serdes.builtin.sr;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.util.JsonFormat;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
+import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
+import io.confluent.kafka.schemaregistry.json.jackson.Jackson;
+import io.confluent.kafka.schemaregistry.protobuf.MessageIndexes;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufSerializer;
+import io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import lombok.SneakyThrows;
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+
+final class Serialize {
+
+  private static final byte MAGIC = 0x0;
+  private static final ObjectMapper JSON_SERIALIZE_MAPPER = Jackson.newObjectMapper(); //from confluent package
+
+  private Serialize() {
+  }
+
+  @KafkaClientInternalsDependant("AbstractKafkaJsonSchemaSerializer::serializeImpl")
+  @SneakyThrows
+  static byte[] serializeJson(JsonSchema schema, int schemaId, String value) {
+    JsonNode json;
+    try {
+      json = JSON_SERIALIZE_MAPPER.readTree(value);
+    } catch (JsonProcessingException e) {
+      throw new ValidationException(String.format("'%s' is not valid json", value));
+    }
+    try {
+      schema.validate(json);
+    } catch (org.everit.json.schema.ValidationException e) {
+      throw new ValidationException(
+          String.format("'%s' does not fit schema: %s", value, e.getAllMessages()));
+    }
+    try (var out = new ByteArrayOutputStream()) {
+      out.write(MAGIC);
+      out.write(schemaId(schemaId));
+      out.write(JSON_SERIALIZE_MAPPER.writeValueAsBytes(json));
+      return out.toByteArray();
+    }
+  }
+
+  @KafkaClientInternalsDependant("AbstractKafkaProtobufSerializer::serializeImpl")
+  @SneakyThrows
+  static byte[] serializeProto(SchemaRegistryClient srClient,
+                               String topic,
+                               Serde.Target target,
+                               ProtobufSchema schema,
+                               int schemaId,
+                               String input) {
+    // flags are tuned like in ProtobufSerializer by default
+    boolean normalizeSchema = false;
+    boolean autoRegisterSchema = false;
+    boolean useLatestVersion = true;
+    boolean latestCompatStrict = true;
+    boolean skipKnownTypes = true;
+
+    schema = AbstractKafkaProtobufSerializer.resolveDependencies(
+        srClient, normalizeSchema, autoRegisterSchema, useLatestVersion, latestCompatStrict,
+        new HashMap<>(), skipKnownTypes, new DefaultReferenceSubjectNameStrategy(),
+        topic, target == Serde.Target.KEY, schema
+    );
+
+    DynamicMessage.Builder builder = schema.newMessageBuilder();
+    JsonFormat.parser().merge(input, builder);
+    Message message = builder.build();
+    MessageIndexes indexes = schema.toMessageIndexes(message.getDescriptorForType().getFullName(), normalizeSchema);
+    try (var out = new ByteArrayOutputStream()) {
+      out.write(MAGIC);
+      out.write(schemaId(schemaId));
+      out.write(indexes.toByteArray());
+      message.writeTo(out);
+      return out.toByteArray();
+    }
+  }
+
+  @KafkaClientInternalsDependant("AbstractKafkaAvroSerializer::serializeImpl")
+  @SneakyThrows
+  static byte[] serializeAvro(AvroSchema schema, int schemaId, String input) {
+    var avroObject = JsonAvroConversion.convertJsonToAvro(input, schema.rawSchema());
+    try (var out = new ByteArrayOutputStream()) {
+      out.write(MAGIC);
+      out.write(schemaId(schemaId));
+      Schema rawSchema = schema.rawSchema();
+      if (rawSchema.getType().equals(Schema.Type.BYTES)) {
+        Preconditions.checkState(
+            avroObject instanceof ByteBuffer,
+            "Unrecognized bytes object of type: " + avroObject.getClass().getName()
+        );
+        out.write(((ByteBuffer) avroObject).array());
+      } else {
+        boolean useLogicalTypeConverters = true;
+        BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
+        DatumWriter<Object> writer =
+            (DatumWriter<Object>) AvroSchemaUtils.getDatumWriter(avroObject, rawSchema, useLogicalTypeConverters);
+        writer.write(avroObject, encoder);
+        encoder.flush();
+      }
+      return out.toByteArray();
+    }
+  }
+
+  private static byte[] schemaId(int id) {
+    return ByteBuffer.allocate(Integer.BYTES).putInt(id).array();
+  }
+}

+ 17 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -31,6 +31,7 @@ import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import lombok.AccessLevel;
@@ -55,6 +56,7 @@ import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.ProducerState;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -658,6 +660,21 @@ public class ReactiveAdminClient implements Closeable {
     return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
   }
 
+  // returns tp -> list of active producer's states (if any)
+  public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(String topic) {
+    return describeTopic(topic)
+        .map(td -> client.describeProducers(
+                IntStream.range(0, td.partitions().size())
+                    .mapToObj(i -> new TopicPartition(topic, i))
+                    .toList()
+            ).all()
+        )
+        .flatMap(ReactiveAdminClient::toMono)
+        .map(map -> map.entrySet().stream()
+            .filter(e -> !e.getValue().activeProducers().isEmpty()) // skipping partitions without producers
+            .collect(toMap(Map.Entry::getKey, e -> e.getValue().activeProducers())));
+  }
+
   private Mono<Void> incrementalAlterConfig(String topicName,
                                             List<ConfigEntry> currentConfigs,
                                             Map<String, String> newConfigs) {

+ 6 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -39,6 +39,7 @@ import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.ProducerState;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -459,6 +460,11 @@ public class TopicsService {
         );
   }
 
+  public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(KafkaCluster cluster, String topic) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> ac.getActiveProducersState(topic));
+  }
+
   private Mono<List<String>> filterExisting(KafkaCluster cluster, Collection<String> topics) {
     return adminClientService.get(cluster)
         .flatMap(ac -> ac.listTopics(true))

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/extractor/CognitoAuthorityExtractor.java

@@ -59,8 +59,8 @@ public class CognitoAuthorityExtractor implements ProviderAuthorityExtractor {
             .stream()
             .filter(s -> s.getProvider().equals(Provider.OAUTH_COGNITO))
             .filter(s -> s.getType().equals("group"))
-            .anyMatch(subject -> Stream.of(groups)
-                .map(Object::toString)
+            .anyMatch(subject -> groups
+                .stream()
                 .anyMatch(cognitoGroup -> cognitoGroup.equals(subject.getValue()))
             ))
         .map(Role::getName)

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/annotation/KafkaClientInternalsDependant.java

@@ -5,4 +5,5 @@ package com.provectus.kafka.ui.util.annotation;
  * should be marked with this annotation to make further update process easier.
  */
 public @interface KafkaClientInternalsDependant {
+  String value() default "";
 }

+ 52 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -763,6 +763,33 @@ paths:
         404:
           description: Not found
 
+  /api/clusters/{clusterName}/topics/{topicName}/activeproducers:
+    get:
+      tags:
+        - Topics
+      summary: get producer states for topic
+      operationId: getActiveProducerStates
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: topicName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/TopicProducerState'
+
   /api/clusters/{clusterName}/topics/{topicName}/consumer-groups:
     get:
       tags:
@@ -2619,6 +2646,31 @@ components:
         - PROTOBUF
         - UNKNOWN
 
+    TopicProducerState:
+      type: object
+      properties:
+        partition:
+          type: integer
+          format: int32
+        producerId:
+          type: integer
+          format: int64
+        producerEpoch:
+          type: integer
+          format: int32
+        lastSequence:
+          type: integer
+          format: int32
+        lastTimestampMs:
+          type: integer
+          format: int64
+        coordinatorEpoch:
+          type: integer
+          format: int32
+        currentTransactionStartOffset:
+          type: integer
+          format: int64
+
     ConsumerGroup:
       discriminator:
         propertyName: inherit

+ 27 - 12
kafka-ui-react-app/pnpm-lock.yaml

@@ -207,7 +207,7 @@ devDependencies:
     version: 8.5.0(eslint@8.16.0)
   eslint-import-resolver-node:
     specifier: ^0.3.6
-    version: 0.3.6
+    version: 0.3.9
   eslint-import-resolver-typescript:
     specifier: ^3.2.7
     version: 3.2.7(eslint-plugin-import@2.26.0)(eslint@8.16.0)
@@ -3069,11 +3069,12 @@ packages:
       eslint: 8.16.0
     dev: true
 
-  /eslint-import-resolver-node@0.3.6:
-    resolution: {integrity: sha512-0En0w03NRVMn9Uiyn8YRPDKvWjxCWkslUEhGNTdGx15RvPJYQ+lbOlqrlNI2vEAs4pDYK4f/HN2TbDmk5TP0iw==}
+  /eslint-import-resolver-node@0.3.9:
+    resolution: {integrity: sha512-WFj2isz22JahUv+B788TlO3N6zL3nNJGU8CcZbPZvVEkBPaJdCV4vy5wyghty5ROFbCRnm132v8BScu5/1BQ8g==}
     dependencies:
       debug: 3.2.7
-      resolve: 1.22.0
+      is-core-module: 2.13.0
+      resolve: 1.22.4
     transitivePeerDependencies:
       - supports-color
     dev: true
@@ -3098,7 +3099,7 @@ packages:
       - supports-color
     dev: true
 
-  /eslint-module-utils@2.7.3(@typescript-eslint/parser@5.29.0)(eslint-import-resolver-node@0.3.6)(eslint-import-resolver-typescript@3.2.7):
+  /eslint-module-utils@2.7.3(@typescript-eslint/parser@5.29.0)(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.2.7):
     resolution: {integrity: sha512-088JEC7O3lDZM9xGe0RerkOMd0EjFl+Yvd1jPWIkMT5u3H9+HC34mWWPnqPrN13gieT9pBOO+Qt07Nb/6TresQ==}
     engines: {node: '>=4'}
     peerDependencies:
@@ -3118,7 +3119,7 @@ packages:
     dependencies:
       '@typescript-eslint/parser': 5.29.0(eslint@8.16.0)(typescript@4.7.4)
       debug: 3.2.7
-      eslint-import-resolver-node: 0.3.6
+      eslint-import-resolver-node: 0.3.9
       eslint-import-resolver-typescript: 3.2.7(eslint-plugin-import@2.26.0)(eslint@8.16.0)
       find-up: 2.1.0
     transitivePeerDependencies:
@@ -3141,8 +3142,8 @@ packages:
       debug: 2.6.9
       doctrine: 2.1.0
       eslint: 8.16.0
-      eslint-import-resolver-node: 0.3.6
-      eslint-module-utils: 2.7.3(@typescript-eslint/parser@5.29.0)(eslint-import-resolver-node@0.3.6)(eslint-import-resolver-typescript@3.2.7)
+      eslint-import-resolver-node: 0.3.9
+      eslint-module-utils: 2.7.3(@typescript-eslint/parser@5.29.0)(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.2.7)
       has: 1.0.3
       is-core-module: 2.9.0
       is-glob: 4.0.3
@@ -3922,10 +3923,16 @@ packages:
     engines: {node: '>= 0.4'}
     dev: true
 
+  /is-core-module@2.13.0:
+    resolution: {integrity: sha512-Z7dk6Qo8pOCp3l4tsX2C5ZVas4V+UxwQodwZhLopL91TX8UyyHEXafPcyoeeWuLrwzHcr3igO78wNLwHJHsMCQ==}
+    dependencies:
+      has: 1.0.3
+
   /is-core-module@2.9.0:
     resolution: {integrity: sha512-+5FPy5PnwmO3lvfMb0AsoPaBG+5KHUI0wYFXOtYPnVVVspTFUuMZNfNaNVRt3FZadstu2c8x23vykRW/NBoU6A==}
     dependencies:
       has: 1.0.3
+    dev: true
 
   /is-date-object@1.0.4:
     resolution: {integrity: sha512-/b4ZVsG7Z5XVtIxs/h9W8nvfLgSAyKYdtGWQLbqy6jA1icmgjf8WCoTKgeS4wy5tYaPePouzFMANbnj94c2Z+A==}
@@ -4396,7 +4403,7 @@ packages:
       jest-pnp-resolver: 1.2.2(jest-resolve@29.6.4)
       jest-util: 29.6.3
       jest-validate: 29.6.3
-      resolve: 1.22.1
+      resolve: 1.22.4
       resolve.exports: 2.0.1
       slash: 3.0.0
     dev: false
@@ -5683,7 +5690,7 @@ packages:
     resolution: {integrity: sha512-Hhtrw0nLeSrFQ7phPp4OOcVjLPIeMnRlr5mcnVuMe7M/7eBn98A3hmFRLoFo3DLZkivSYwhRUJTyPyWAk56WLw==}
     hasBin: true
     dependencies:
-      is-core-module: 2.9.0
+      is-core-module: 2.13.0
       path-parse: 1.0.7
       supports-preserve-symlinks-flag: 1.0.0
     dev: true
@@ -5692,14 +5699,22 @@ packages:
     resolution: {integrity: sha512-nBpuuYuY5jFsli/JIs1oldw6fOQCBioohqWZg/2hiaOybXOft4lonv85uDOKXdf8rhyK159cxU5cDcK/NKk8zw==}
     hasBin: true
     dependencies:
-      is-core-module: 2.9.0
+      is-core-module: 2.13.0
+      path-parse: 1.0.7
+      supports-preserve-symlinks-flag: 1.0.0
+
+  /resolve@1.22.4:
+    resolution: {integrity: sha512-PXNdCiPqDqeUou+w1C2eTQbNfxKSuMxqTCuvlmmMsk1NWHL5fRrhY6Pl0qEYYc6+QqGClco1Qj8XnjPego4wfg==}
+    hasBin: true
+    dependencies:
+      is-core-module: 2.13.0
       path-parse: 1.0.7
       supports-preserve-symlinks-flag: 1.0.0
 
   /resolve@2.0.0-next.3:
     resolution: {integrity: sha512-W8LucSynKUIDu9ylraa7ueVZ7hc0uAgJBxVsQSKOXOyle8a93qXhcz+XAXZ8bIq2d6i4Ehddn6Evt+0/UwKk6Q==}
     dependencies:
-      is-core-module: 2.9.0
+      is-core-module: 2.13.0
       path-parse: 1.0.7
     dev: true