Browse Source

Merge branch 'master' of github.com:provectus/kafka-ui into audit_be

 Conflicts:
	kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java
iliax 2 years ago
parent
commit
4b103cd6f4
22 changed files with 1538 additions and 86 deletions
  1. 1 0
      .github/workflows/release.yaml
  2. 1 1
      README.md
  3. 25 14
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java
  4. 7 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/JsonToAvroConversionException.java
  5. 4 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/AvroSchemaRegistrySerializer.java
  6. 14 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/MessageFormatter.java
  7. 5 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java
  8. 503 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversion.java
  9. 171 5
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java
  10. 621 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversionTest.java
  11. 5 0
      kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/pages/BasePage.java
  12. 54 2
      kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/pages/brokers/BrokersConfigTab.java
  13. 9 23
      kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/backlog/SmokeBacklog.java
  14. 74 7
      kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokesuite/brokers/BrokersTest.java
  15. 2 0
      kafka-ui-react-app/package.json
  16. 19 5
      kafka-ui-react-app/pnpm-lock.yaml
  17. 3 3
      kafka-ui-react-app/src/components/Connect/Details/Actions/Actions.tsx
  18. 10 4
      kafka-ui-react-app/src/components/Connect/Details/Tasks/ActionsCellTasks.tsx
  19. 3 3
      kafka-ui-react-app/src/components/Connect/List/ActionsCell.tsx
  20. 4 8
      kafka-ui-react-app/src/components/Connect/New/New.tsx
  21. 2 2
      kafka-ui-react-app/src/components/common/EditorViewer/EditorViewer.tsx
  22. 1 1
      pom.xml

+ 1 - 0
.github/workflows/release.yaml

@@ -77,6 +77,7 @@ jobs:
           builder: ${{ steps.buildx.outputs.name }}
           context: kafka-ui-api
           platforms: linux/amd64,linux/arm64
+          provenance: false
           push: true
           tags: |
             provectuslabs/kafka-ui:${{ steps.build.outputs.version }}

+ 1 - 1
README.md

@@ -99,7 +99,7 @@ services:
     ports:
       - 8080:8080
     environment:
-      DYNAMIC_CONFIG_ENABLED: true
+      DYNAMIC_CONFIG_ENABLED: 'true'
     volumes:
       - ~/kui/config.yml:/etc/kafkaui/dynamic_config.yaml
 ```

+ 25 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java

@@ -1,5 +1,9 @@
 package com.provectus.kafka.ui.controller;
 
+import static com.provectus.kafka.ui.model.ConnectorActionDTO.RESTART;
+import static com.provectus.kafka.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
+import static com.provectus.kafka.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
+
 import com.provectus.kafka.ui.api.KafkaConnectApi;
 import com.provectus.kafka.ui.model.ConnectDTO;
 import com.provectus.kafka.ui.model.ConnectorActionDTO;
@@ -18,6 +22,7 @@ import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.Comparator;
 import java.util.Map;
+import java.util.Set;
 import javax.validation.Valid;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -31,6 +36,9 @@ import reactor.core.publisher.Mono;
 @RequiredArgsConstructor
 @Slf4j
 public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
+  private static final Set<ConnectorActionDTO> RESTART_ACTIONS
+      = Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
+
   private final KafkaConnectService kafkaConnectService;
   private final AccessControlService accessControlService;
   private final AuditService auditService;
@@ -134,11 +142,13 @@ public class KafkaConnectController extends AbstractController implements KafkaC
     var comparator = sortOrder == null || sortOrder.equals(SortOrderDTO.ASC)
         ? getConnectorsComparator(orderBy)
         : getConnectorsComparator(orderBy).reversed();
+
     Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
         .filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
-        .filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName));
+        .filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName))
+        .sort(comparator);
 
-    return Mono.just(ResponseEntity.ok(job.sort(comparator)))
+    return Mono.just(ResponseEntity.ok(job))
         .doOnEach(sig -> auditService.audit(context, sig));
   }
 
@@ -187,11 +197,17 @@ public class KafkaConnectController extends AbstractController implements KafkaC
                                                          String connectorName,
                                                          ConnectorActionDTO action,
                                                          ServerWebExchange exchange) {
+    ConnectAction[] connectActions;
+    if (RESTART_ACTIONS.contains(action)) {
+      connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.RESTART};
+    } else {
+      connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.EDIT};
+    }
 
     var context = AccessContext.builder()
         .cluster(clusterName)
         .connect(connectName)
-        .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
+        .connectActions(connectActions)
         .auditOperation("updateConnectorState")
         .build();
 
@@ -273,16 +289,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
     if (orderBy == null) {
       return defaultComparator;
     }
-    switch (orderBy) {
-      case CONNECT:
-        return Comparator.comparing(FullConnectorInfoDTO::getConnect);
-      case TYPE:
-        return Comparator.comparing(FullConnectorInfoDTO::getType);
-      case STATUS:
-        return Comparator.comparing(fullConnectorInfoDTO -> fullConnectorInfoDTO.getStatus().getState());
-      case NAME:
-      default:
-        return defaultComparator;
-    }
+    return switch (orderBy) {
+      case CONNECT -> Comparator.comparing(FullConnectorInfoDTO::getConnect);
+      case TYPE -> Comparator.comparing(FullConnectorInfoDTO::getType);
+      case STATUS -> Comparator.comparing(fullConnectorInfoDTO -> fullConnectorInfoDTO.getStatus().getState());
+      default -> defaultComparator;
+    };
   }
 }

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/JsonToAvroConversionException.java

@@ -0,0 +1,7 @@
+package com.provectus.kafka.ui.exception;
+
+public class JsonToAvroConversionException extends ValidationException {
+  public JsonToAvroConversionException(String message) {
+    super(message);
+  }
+}

+ 4 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/AvroSchemaRegistrySerializer.java

@@ -1,12 +1,13 @@
 package com.provectus.kafka.ui.serdes.builtin.sr;
 
+import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
 import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.avro.AvroSchema;
-import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
 import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
 import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
 import java.util.Map;
 import org.apache.kafka.common.serialization.Serializer;
 
@@ -25,6 +26,7 @@ class AvroSchemaRegistrySerializer extends SchemaRegistrySerializer<Object> {
         Map.of(
             "schema.registry.url", "wontbeused",
             AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
+            KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true,
             AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
         ),
         isKey
@@ -35,7 +37,7 @@ class AvroSchemaRegistrySerializer extends SchemaRegistrySerializer<Object> {
   @Override
   protected Object serialize(String value, ParsedSchema schema) {
     try {
-      return AvroSchemaUtils.toObject(value, (AvroSchema) schema);
+      return JsonAvroConversion.convertJsonToAvro(value, ((AvroSchema) schema).rawSchema());
     } catch (Throwable e) {
       throw new RuntimeException("Failed to serialize record for topic " + topic, e);
     }

+ 14 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/MessageFormatter.java

@@ -3,9 +3,12 @@ package com.provectus.kafka.ui.serdes.builtin.sr;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.protobuf.Message;
 import com.google.protobuf.util.JsonFormat;
+import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
 import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
 import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
 import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
 import java.util.Map;
@@ -28,16 +31,22 @@ interface MessageFormatter {
 
     AvroMessageFormatter(SchemaRegistryClient client) {
       this.avroDeserializer = new KafkaAvroDeserializer(client);
+      this.avroDeserializer.configure(
+          Map.of(
+              AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused",
+              KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false,
+              KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false,
+              KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true
+          ),
+          false
+      );
     }
 
     @Override
-    @SneakyThrows
     public String format(String topic, byte[] value) {
-      // deserialized will have type, that depends on schema type (record or primitive),
-      // AvroSchemaUtils.toJson(...) method will take it into account
       Object deserialized = avroDeserializer.deserialize(topic, value);
-      byte[] jsonBytes = AvroSchemaUtils.toJson(deserialized);
-      return new String(jsonBytes);
+      var schema = AvroSchemaUtils.getSchema(deserialized);
+      return JsonAvroConversion.convertAvroToJson(deserialized, schema).toString();
     }
   }
 

+ 5 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java

@@ -5,6 +5,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.avro.Schema;
 import reactor.util.function.Tuple2;
@@ -40,6 +41,10 @@ public class AvroJsonSchemaConverter implements JsonSchemaConverter<Schema> {
 
   private FieldSchema convertSchema(Schema schema,
                                     Map<String, FieldSchema> definitions, boolean isRoot) {
+    Optional<FieldSchema> logicalTypeSchema = JsonAvroConversion.LogicalTypeConversion.getJsonSchema(schema);
+    if (logicalTypeSchema.isPresent()) {
+      return logicalTypeSchema.get();
+    }
     if (!schema.isUnion()) {
       JsonType type = convertType(schema);
       switch (type.getType()) {
@@ -66,7 +71,6 @@ public class AvroJsonSchemaConverter implements JsonSchemaConverter<Schema> {
     }
   }
 
-
   // this method formats json-schema field in a way
   // to fit avro-> json encoding rules (https://avro.apache.org/docs/1.11.1/specification/_print/#json-encoding)
   private FieldSchema createUnionSchema(Schema schema, Map<String, FieldSchema> definitions) {

+ 503 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversion.java

@@ -0,0 +1,503 @@
+package com.provectus.kafka.ui.util.jsonschema;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.DecimalNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.FloatNode;
+import com.fasterxml.jackson.databind.node.IntNode;
+import com.fasterxml.jackson.databind.node.JsonNodeType;
+import com.fasterxml.jackson.databind.node.LongNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.google.common.collect.Lists;
+import com.provectus.kafka.ui.exception.JsonToAvroConversionException;
+import io.confluent.kafka.serializers.AvroData;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.stream.Stream;
+import lombok.SneakyThrows;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+
+// json <-> avro
+public class JsonAvroConversion {
+
+  private static final JsonMapper MAPPER = new JsonMapper();
+
+  // converts json into Object that is expected input for KafkaAvroSerializer
+  // (with AVRO_USE_LOGICAL_TYPE_CONVERTERS flat enabled!)
+  @SneakyThrows
+  public static Object convertJsonToAvro(String jsonString, Schema avroSchema) {
+    JsonNode rootNode = MAPPER.readTree(jsonString);
+    return convert(rootNode, avroSchema);
+  }
+
+  private static Object convert(JsonNode node, Schema avroSchema) {
+    return switch (avroSchema.getType()) {
+      case RECORD -> {
+        assertJsonType(node, JsonNodeType.OBJECT);
+        var rec = new GenericData.Record(avroSchema);
+        for (Schema.Field field : avroSchema.getFields()) {
+          if (node.has(field.name()) && !node.get(field.name()).isNull()) {
+            rec.put(field.name(), convert(node.get(field.name()), field.schema()));
+          }
+        }
+        yield rec;
+      }
+      case MAP -> {
+        assertJsonType(node, JsonNodeType.OBJECT);
+        var map = new LinkedHashMap<String, Object>();
+        var valueSchema = avroSchema.getValueType();
+        node.fields().forEachRemaining(f -> map.put(f.getKey(), convert(f.getValue(), valueSchema)));
+        yield map;
+      }
+      case ARRAY -> {
+        assertJsonType(node, JsonNodeType.ARRAY);
+        var lst = new ArrayList<>();
+        node.elements().forEachRemaining(e -> lst.add(convert(e, avroSchema.getElementType())));
+        yield lst;
+      }
+      case ENUM -> {
+        assertJsonType(node, JsonNodeType.STRING);
+        String symbol = node.textValue();
+        if (!avroSchema.getEnumSymbols().contains(symbol)) {
+          throw new JsonToAvroConversionException("%s is not a part of enum symbols [%s]"
+              .formatted(symbol, avroSchema.getEnumSymbols()));
+        }
+        yield new GenericData.EnumSymbol(avroSchema, symbol);
+      }
+      case UNION -> {
+        // for types from enum (other than null) payload should be an object with single key == name of type
+        // ex: schema = [ "null", "int", "string" ], possible payloads = null, { "string": "str" },  { "int": 123 }
+        if (node.isNull() && avroSchema.getTypes().contains(Schema.create(Schema.Type.NULL))) {
+          yield null;
+        }
+
+        assertJsonType(node, JsonNodeType.OBJECT);
+        var elements = Lists.newArrayList(node.fields());
+        if (elements.size() != 1) {
+          throw new JsonToAvroConversionException(
+              "UNION field value should be an object with single field == type name");
+        }
+        var typeNameToValue = elements.get(0);
+        for (Schema unionType : avroSchema.getTypes()) {
+          if (typeNameToValue.getKey().equals(unionType.getFullName())) {
+            yield convert(typeNameToValue.getValue(), unionType);
+          }
+        }
+        throw new JsonToAvroConversionException(
+            "json value '%s' is cannot be converted to any of union types [%s]"
+                .formatted(node, avroSchema.getTypes()));
+      }
+      case STRING -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(node, avroSchema);
+        }
+        assertJsonType(node, JsonNodeType.STRING);
+        yield node.textValue();
+      }
+      case LONG -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(node, avroSchema);
+        }
+        assertJsonType(node, JsonNodeType.NUMBER);
+        assertJsonNumberType(node, JsonParser.NumberType.LONG, JsonParser.NumberType.INT);
+        yield node.longValue();
+      }
+      case INT -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(node, avroSchema);
+        }
+        assertJsonType(node, JsonNodeType.NUMBER);
+        assertJsonNumberType(node, JsonParser.NumberType.INT);
+        yield node.intValue();
+      }
+      case FLOAT -> {
+        assertJsonType(node, JsonNodeType.NUMBER);
+        assertJsonNumberType(node, JsonParser.NumberType.DOUBLE, JsonParser.NumberType.FLOAT);
+        yield node.floatValue();
+      }
+      case DOUBLE -> {
+        assertJsonType(node, JsonNodeType.NUMBER);
+        assertJsonNumberType(node, JsonParser.NumberType.DOUBLE, JsonParser.NumberType.FLOAT);
+        yield node.doubleValue();
+      }
+      case BOOLEAN -> {
+        assertJsonType(node, JsonNodeType.BOOLEAN);
+        yield node.booleanValue();
+      }
+      case NULL -> {
+        assertJsonType(node, JsonNodeType.NULL);
+        yield null;
+      }
+      case BYTES -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(node, avroSchema);
+        }
+        assertJsonType(node, JsonNodeType.STRING);
+        // logic copied from JsonDecoder::readBytes
+        yield ByteBuffer.wrap(node.textValue().getBytes(StandardCharsets.ISO_8859_1));
+      }
+      case FIXED -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(node, avroSchema);
+        }
+        assertJsonType(node, JsonNodeType.STRING);
+        byte[] bytes = node.textValue().getBytes(StandardCharsets.ISO_8859_1);
+        if (bytes.length != avroSchema.getFixedSize()) {
+          throw new JsonToAvroConversionException(
+              "Fixed field has unexpected size %d (should be %d)"
+                  .formatted(bytes.length, avroSchema.getFixedSize()));
+        }
+        yield new GenericData.Fixed(avroSchema, bytes);
+      }
+    };
+  }
+
+  // converts output of KafkaAvroDeserializer (with AVRO_USE_LOGICAL_TYPE_CONVERTERS flat enabled!) into json.
+  // Note: conversion should be compatible with AvroJsonSchemaConverter logic!
+  public static JsonNode convertAvroToJson(Object obj, Schema avroSchema) {
+    if (obj == null) {
+      return NullNode.getInstance();
+    }
+    return switch (avroSchema.getType()) {
+      case RECORD -> {
+        var rec = (GenericData.Record) obj;
+        ObjectNode node = MAPPER.createObjectNode();
+        for (Schema.Field field : avroSchema.getFields()) {
+          var fieldVal = rec.get(field.name());
+          if (fieldVal != null) {
+            node.set(field.name(), convertAvroToJson(fieldVal, field.schema()));
+          }
+        }
+        yield node;
+      }
+      case MAP -> {
+        ObjectNode node = MAPPER.createObjectNode();
+        ((Map) obj).forEach((k, v) -> node.set(k.toString(), convertAvroToJson(v, avroSchema.getValueType())));
+        yield node;
+      }
+      case ARRAY -> {
+        var list = (List<Object>) obj;
+        ArrayNode node = MAPPER.createArrayNode();
+        list.forEach(e -> node.add(convertAvroToJson(e, avroSchema.getElementType())));
+        yield node;
+      }
+      case ENUM -> {
+        yield new TextNode(obj.toString());
+      }
+      case UNION -> {
+        ObjectNode node = MAPPER.createObjectNode();
+        int unionIdx = AvroData.getGenericData().resolveUnion(avroSchema, obj);
+        Schema unionType = avroSchema.getTypes().get(unionIdx);
+        node.set(unionType.getFullName(), convertAvroToJson(obj, unionType));
+        yield node;
+      }
+      case STRING -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(obj, avroSchema);
+        }
+        yield new TextNode(obj.toString());
+      }
+      case LONG -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(obj, avroSchema);
+        }
+        yield new LongNode((Long) obj);
+      }
+      case INT -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(obj, avroSchema);
+        }
+        yield new IntNode((Integer) obj);
+      }
+      case FLOAT -> new FloatNode((Float) obj);
+      case DOUBLE -> new DoubleNode((Double) obj);
+      case BOOLEAN -> BooleanNode.valueOf((Boolean) obj);
+      case NULL -> NullNode.getInstance();
+      case BYTES -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(obj, avroSchema);
+        }
+        ByteBuffer bytes = (ByteBuffer) obj;
+        //see JsonEncoder::writeByteArray
+        yield new TextNode(new String(bytes.array(), StandardCharsets.ISO_8859_1));
+      }
+      case FIXED -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(obj, avroSchema);
+        }
+        var fixed = (GenericData.Fixed) obj;
+        yield new TextNode(new String(fixed.bytes(), StandardCharsets.ISO_8859_1));
+      }
+    };
+  }
+
+  private static Object processLogicalType(JsonNode node, Schema schema) {
+    return findConversion(schema)
+        .map(c -> c.jsonToAvroConversion.apply(node, schema))
+        .orElseThrow(() ->
+            new JsonToAvroConversionException("'%s' logical type is not supported"
+                .formatted(schema.getLogicalType().getName())));
+  }
+
+  private static JsonNode processLogicalType(Object obj, Schema schema) {
+    return findConversion(schema)
+        .map(c -> c.avroToJsonConversion.apply(obj, schema))
+        .orElseThrow(() ->
+            new JsonToAvroConversionException("'%s' logical type is not supported"
+                .formatted(schema.getLogicalType().getName())));
+  }
+
+  private static Optional<LogicalTypeConversion> findConversion(Schema schema) {
+    String logicalTypeName = schema.getLogicalType().getName();
+    return Stream.of(LogicalTypeConversion.values())
+        .filter(t -> t.name.equalsIgnoreCase(logicalTypeName))
+        .findFirst();
+  }
+
+  private static boolean isLogicalType(Schema schema) {
+    return schema.getLogicalType() != null;
+  }
+
+  private static void assertJsonType(JsonNode node, JsonNodeType... allowedTypes) {
+    if (Stream.of(allowedTypes).noneMatch(t -> node.getNodeType() == t)) {
+      throw new JsonToAvroConversionException(
+          "%s node has unexpected type, allowed types %s, actual type %s"
+              .formatted(node, Arrays.toString(allowedTypes), node.getNodeType()));
+    }
+  }
+
+  private static void assertJsonNumberType(JsonNode node, JsonParser.NumberType... allowedTypes) {
+    if (Stream.of(allowedTypes).noneMatch(t -> node.numberType() == t)) {
+      throw new JsonToAvroConversionException(
+          "%s node has unexpected numeric type, allowed types %s, actual type %s"
+              .formatted(node, Arrays.toString(allowedTypes), node.numberType()));
+    }
+  }
+
+  enum LogicalTypeConversion {
+
+    UUID("uuid",
+        (node, schema) -> {
+          assertJsonType(node, JsonNodeType.STRING);
+          return java.util.UUID.fromString(node.asText());
+        },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("uuid"))))
+    ),
+
+    DECIMAL("decimal",
+        (node, schema) -> {
+          if (node.isTextual()) {
+            return new BigDecimal(node.asText());
+          } else if (node.isNumber()) {
+            return new BigDecimal(node.numberValue().toString());
+          }
+          throw new JsonToAvroConversionException(
+              "node '%s' can't be converted to decimal logical type"
+                  .formatted(node));
+        },
+        (obj, schema) -> {
+          return new DecimalNode((BigDecimal) obj);
+        },
+        new SimpleFieldSchema(new SimpleJsonType(JsonType.Type.NUMBER))
+    ),
+
+    DATE("date",
+        (node, schema) -> {
+          if (node.isInt()) {
+            return LocalDate.ofEpochDay(node.intValue());
+          } else if (node.isTextual()) {
+            return LocalDate.parse(node.asText());
+          } else {
+            throw new JsonToAvroConversionException(
+                "node '%s' can't be converted to date logical type"
+                    .formatted(node));
+          }
+        },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("date"))))
+    ),
+
+    TIME_MILLIS("time-millis",
+        (node, schema) -> {
+          if (node.isIntegralNumber()) {
+            return LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(node.longValue()));
+          } else if (node.isTextual()) {
+            return LocalTime.parse(node.asText());
+          } else {
+            throw new JsonToAvroConversionException(
+                "node '%s' can't be converted to time-millis logical type"
+                    .formatted(node));
+          }
+        },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("time"))))
+    ),
+
+    TIME_MICROS("time-micros",
+        (node, schema) -> {
+          if (node.isIntegralNumber()) {
+            return LocalTime.ofNanoOfDay(TimeUnit.MICROSECONDS.toNanos(node.longValue()));
+          } else if (node.isTextual()) {
+            return LocalTime.parse(node.asText());
+          } else {
+            throw new JsonToAvroConversionException(
+                "node '%s' can't be converted to time-micros logical type"
+                    .formatted(node));
+          }
+        },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("time"))))
+    ),
+
+    TIMESTAMP_MILLIS("timestamp-millis",
+        (node, schema) -> {
+          if (node.isIntegralNumber()) {
+            return Instant.ofEpochMilli(node.longValue());
+          } else if (node.isTextual()) {
+            return Instant.parse(node.asText());
+          } else {
+            throw new JsonToAvroConversionException(
+                "node '%s' can't be converted to timestamp-millis logical type"
+                    .formatted(node));
+          }
+        },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("date-time"))))
+    ),
+
+    TIMESTAMP_MICROS("timestamp-micros",
+        (node, schema) -> {
+          if (node.isIntegralNumber()) {
+            // TimeConversions.TimestampMicrosConversion for impl
+            long microsFromEpoch = node.longValue();
+            long epochSeconds = microsFromEpoch / (1_000_000L);
+            long nanoAdjustment = (microsFromEpoch % (1_000_000L)) * 1_000L;
+            return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
+          } else if (node.isTextual()) {
+            return Instant.parse(node.asText());
+          } else {
+            throw new JsonToAvroConversionException(
+                "node '%s' can't be converted to timestamp-millis logical type"
+                    .formatted(node));
+          }
+        },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("date-time"))))
+    ),
+
+    LOCAL_TIMESTAMP_MILLIS("local-timestamp-millis",
+        (node, schema) -> {
+          if (node.isTextual()) {
+            return LocalDateTime.parse(node.asText());
+          }
+          // TimeConversions.TimestampMicrosConversion for impl
+          Instant instant = (Instant) TIMESTAMP_MILLIS.jsonToAvroConversion.apply(node, schema);
+          return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
+        },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("date-time"))))
+    ),
+
+    LOCAL_TIMESTAMP_MICROS("local-timestamp-micros",
+        (node, schema) -> {
+          if (node.isTextual()) {
+            return LocalDateTime.parse(node.asText());
+          }
+          Instant instant = (Instant) TIMESTAMP_MICROS.jsonToAvroConversion.apply(node, schema);
+          return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
+        },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("date-time"))))
+    );
+
+    private final String name;
+    private final BiFunction<JsonNode, Schema, Object> jsonToAvroConversion;
+    private final BiFunction<Object, Schema, JsonNode> avroToJsonConversion;
+    private final FieldSchema jsonSchema;
+
+    LogicalTypeConversion(String name,
+                          BiFunction<JsonNode, Schema, Object> jsonToAvroConversion,
+                          BiFunction<Object, Schema, JsonNode> avroToJsonConversion,
+                          FieldSchema jsonSchema) {
+      this.name = name;
+      this.jsonToAvroConversion = jsonToAvroConversion;
+      this.avroToJsonConversion = avroToJsonConversion;
+      this.jsonSchema = jsonSchema;
+    }
+
+    static Optional<FieldSchema> getJsonSchema(Schema schema) {
+      if (schema.getLogicalType() == null) {
+        return Optional.empty();
+      }
+      String logicalTypeName = schema.getLogicalType().getName();
+      return Stream.of(JsonAvroConversion.LogicalTypeConversion.values())
+          .filter(t -> t.name.equalsIgnoreCase(logicalTypeName))
+          .map(c -> c.jsonSchema)
+          .findFirst();
+    }
+  }
+
+
+}

+ 171 - 5
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java

@@ -2,13 +2,12 @@ package com.provectus.kafka.ui.serdes.builtin.sr;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.json.JsonMapper;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
 import io.confluent.kafka.schemaregistry.avro.AvroSchema;
-import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
 import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import java.io.ByteArrayOutputStream;
@@ -54,7 +53,8 @@ class SchemaRegistrySerdeTest {
 
     SchemaDescription schemaDescription = schemaOptional.get();
     assertThat(schemaDescription.getSchema())
-        .contains("{\"$id\":\"int\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"type\":\"integer\"}");
+        .contains(
+            "{\"$id\":\"int\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"type\":\"integer\"}");
     assertThat(schemaDescription.getAdditionalProperties())
         .containsOnlyKeys("subject", "schemaId", "latestVersion", "type")
         .containsEntry("subject", subject)
@@ -189,7 +189,8 @@ class SchemaRegistrySerdeTest {
     assertThat(serde.canSerialize(topic, Serde.Target.VALUE)).isFalse();
   }
 
-  private void assertJsonsEqual(String expected, String actual) throws JsonProcessingException {
+  @SneakyThrows
+  private void assertJsonsEqual(String expected, String actual) {
     var mapper = new JsonMapper();
     assertThat(mapper.readTree(actual)).isEqualTo(mapper.readTree(expected));
   }
@@ -211,9 +212,174 @@ class SchemaRegistrySerdeTest {
     GenericDatumWriter<Object> writer = new GenericDatumWriter<>(schema.rawSchema());
     ByteArrayOutputStream output = new ByteArrayOutputStream();
     Encoder encoder = EncoderFactory.get().binaryEncoder(output, null);
-    writer.write(AvroSchemaUtils.toObject(json, schema), encoder);
+    writer.write(JsonAvroConversion.convertJsonToAvro(json, schema.rawSchema()), encoder);
     encoder.flush();
     return output.toByteArray();
   }
 
+  @Test
+  void avroFieldsRepresentationIsConsistentForSerializationAndDeserialization() throws Exception {
+    AvroSchema schema = new AvroSchema(
+        """
+             {
+               "type": "record",
+               "name": "TestAvroRecord",
+               "fields": [
+                 {
+                   "name": "f_int",
+                   "type": "int"
+                 },
+                 {
+                   "name": "f_long",
+                   "type": "long"
+                 },
+                 {
+                   "name": "f_string",
+                   "type": "string"
+                 },
+                 {
+                   "name": "f_boolean",
+                   "type": "boolean"
+                 },
+                 {
+                   "name": "f_float",
+                   "type": "float"
+                 },
+                 {
+                   "name": "f_double",
+                   "type": "double"
+                 },
+                 {
+                   "name": "f_enum",
+                   "type" : {
+                    "type": "enum",
+                    "name": "Suit",
+                    "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
+                   }
+                 },
+                 {
+                  "name": "f_map",
+                  "type": {
+                     "type": "map",
+                     "values" : "string",
+                     "default": {}
+                   }
+                 },
+                 {
+                  "name": "f_union",
+                  "type": ["null", "string", "int" ]
+                 },
+                 {
+                  "name": "f_optional_to_test_not_filled_case",
+                  "type": [ "null", "string"]
+                 },
+                 {
+                     "name" : "f_fixed",
+                     "type" : { "type" : "fixed" ,"size" : 8, "name": "long_encoded" }
+                   },
+                   {
+                     "name" : "f_bytes",
+                     "type": "bytes"
+                   }
+               ]
+            }"""
+    );
+
+    String jsonPayload = """
+        {
+          "f_int": 123,
+          "f_long": 4294967294,
+          "f_string": "string here",
+          "f_boolean": true,
+          "f_float": 123.1,
+          "f_double": 123456.123456,
+          "f_enum": "SPADES",
+          "f_map": { "k1": "string value" },
+          "f_union": { "int": 123 },
+          "f_fixed": "\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0004Ò",
+          "f_bytes": "\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\t)"
+        }
+        """;
+
+    registryClient.register("test-value", schema);
+    assertSerdeCycle("test", jsonPayload);
+  }
+
+  @Test
+  void avroLogicalTypesRepresentationIsConsistentForSerializationAndDeserialization() throws Exception {
+    AvroSchema schema = new AvroSchema(
+        """
+             {
+               "type": "record",
+               "name": "TestAvroRecord",
+               "fields": [
+                 {
+                   "name": "lt_date",
+                   "type": { "type": "int", "logicalType": "date" }
+                 },
+                 {
+                   "name": "lt_uuid",
+                   "type": { "type": "string", "logicalType": "uuid" }
+                 },
+                 {
+                   "name": "lt_decimal",
+                   "type": { "type": "bytes", "logicalType": "decimal", "precision": 22, "scale":10 }
+                 },
+                 {
+                   "name": "lt_time_millis",
+                   "type": { "type": "int", "logicalType": "time-millis"}
+                 },
+                 {
+                   "name": "lt_time_micros",
+                   "type": { "type": "long", "logicalType": "time-micros"}
+                 },
+                 {
+                   "name": "lt_timestamp_millis",
+                   "type": { "type": "long", "logicalType": "timestamp-millis" }
+                 },
+                 {
+                   "name": "lt_timestamp_micros",
+                   "type": { "type": "long", "logicalType": "timestamp-micros" }
+                 },
+                 {
+                   "name": "lt_local_timestamp_millis",
+                   "type": { "type": "long", "logicalType": "local-timestamp-millis" }
+                 },
+                 {
+                   "name": "lt_local_timestamp_micros",
+                   "type": { "type": "long", "logicalType": "local-timestamp-micros" }
+                 }
+               ]
+            }"""
+    );
+
+    String jsonPayload = """
+        {
+          "lt_date":"1991-08-14",
+          "lt_decimal": 2.1617413862327545E11,
+          "lt_time_millis": "10:15:30.001",
+          "lt_time_micros": "10:15:30.123456",
+          "lt_uuid": "a37b75ca-097c-5d46-6119-f0637922e908",
+          "lt_timestamp_millis": "2007-12-03T10:15:30.123Z",
+          "lt_timestamp_micros": "2007-12-03T10:15:30.123456Z",
+          "lt_local_timestamp_millis": "2017-12-03T10:15:30.123",
+          "lt_local_timestamp_micros": "2017-12-03T10:15:30.123456"
+        }
+        """;
+
+    registryClient.register("test-value", schema);
+    assertSerdeCycle("test", jsonPayload);
+  }
+
+  // 1. serialize input json to binary
+  // 2. deserialize from binary
+  // 3. check that deserialized version equal to input
+  void assertSerdeCycle(String topic, String jsonInput) {
+    byte[] serializedBytes = serde.serializer(topic, Serde.Target.VALUE).serialize(jsonInput);
+    var deserializedJson = serde.deserializer(topic, Serde.Target.VALUE)
+        .deserialize(null, serializedBytes)
+        .getResult();
+    assertJsonsEqual(jsonInput, deserializedJson);
+  }
+
 }

+ 621 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversionTest.java

@@ -0,0 +1,621 @@
+package com.provectus.kafka.ui.util.jsonschema;
+
+import static com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion.convertAvroToJson;
+import static com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion.convertJsonToAvro;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.FloatNode;
+import com.fasterxml.jackson.databind.node.IntNode;
+import com.fasterxml.jackson.databind.node.LongNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.google.common.primitives.Longs;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import lombok.SneakyThrows;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+class JsonAvroConversionTest {
+
+  // checking conversion from json to KafkaAvroSerializer-compatible avro objects
+  @Nested
+  class FromJsonToAvro {
+
+    @Test
+    void primitiveRoot() {
+      assertThat(convertJsonToAvro("\"str\"", createSchema("\"string\"")))
+          .isEqualTo("str");
+
+      assertThat(convertJsonToAvro("123", createSchema("\"int\"")))
+          .isEqualTo(123);
+
+      assertThat(convertJsonToAvro("123", createSchema("\"long\"")))
+          .isEqualTo(123L);
+
+      assertThat(convertJsonToAvro("123.123", createSchema("\"float\"")))
+          .isEqualTo(123.123F);
+
+      assertThat(convertJsonToAvro("12345.12345", createSchema("\"double\"")))
+          .isEqualTo(12345.12345);
+    }
+
+    @Test
+    void primitiveTypedFields() {
+      var schema = createSchema(
+          """
+               {
+                 "type": "record",
+                 "name": "TestAvroRecord",
+                 "fields": [
+                   {
+                     "name": "f_int",
+                     "type": "int"
+                   },
+                   {
+                     "name": "f_long",
+                     "type": "long"
+                   },
+                   {
+                     "name": "f_string",
+                     "type": "string"
+                   },
+                   {
+                     "name": "f_boolean",
+                     "type": "boolean"
+                   },
+                   {
+                     "name": "f_float",
+                     "type": "float"
+                   },
+                   {
+                     "name": "f_double",
+                     "type": "double"
+                   },
+                   {
+                     "name": "f_enum",
+                     "type" : {
+                      "type": "enum",
+                      "name": "Suit",
+                      "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
+                     }
+                   },
+                   {
+                     "name" : "f_fixed",
+                     "type" : { "type" : "fixed" ,"size" : 8, "name": "long_encoded" }
+                   },
+                   {
+                     "name" : "f_bytes",
+                     "type": "bytes"
+                   }
+                 ]
+              }"""
+      );
+
+      String jsonPayload = """
+          {
+            "f_int": 123,
+            "f_long": 4294967294,
+            "f_string": "string here",
+            "f_boolean": true,
+            "f_float": 123.1,
+            "f_double": 123456.123456,
+            "f_enum": "SPADES",
+            "f_fixed": "\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0004Ò",
+            "f_bytes": "\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\t)"
+          }
+          """;
+
+      var converted = convertJsonToAvro(jsonPayload, schema);
+      assertThat(converted).isInstanceOf(GenericData.Record.class);
+
+      var record = (GenericData.Record) converted;
+      assertThat(record.get("f_int")).isEqualTo(123);
+      assertThat(record.get("f_long")).isEqualTo(4294967294L);
+      assertThat(record.get("f_string")).isEqualTo("string here");
+      assertThat(record.get("f_boolean")).isEqualTo(true);
+      assertThat(record.get("f_float")).isEqualTo(123.1f);
+      assertThat(record.get("f_double")).isEqualTo(123456.123456);
+      assertThat(record.get("f_enum"))
+          .isEqualTo(
+              new GenericData.EnumSymbol(
+                  schema.getField("f_enum").schema(),
+                  "SPADES"
+              )
+          );
+      assertThat(((GenericData.Fixed) record.get("f_fixed")).bytes()).isEqualTo(Longs.toByteArray(1234L));
+      assertThat(((ByteBuffer) record.get("f_bytes")).array()).isEqualTo(Longs.toByteArray(2345L));
+    }
+
+    @Test
+    void unionRoot() {
+      var schema = createSchema("[ \"null\", \"string\", \"int\" ]");
+
+      var converted = convertJsonToAvro("{\"string\":\"string here\"}", schema);
+      assertThat(converted).isEqualTo("string here");
+
+      converted = convertJsonToAvro("{\"int\": 123}", schema);
+      assertThat(converted).isEqualTo(123);
+
+      converted = convertJsonToAvro("null", schema);
+      assertThat(converted).isEqualTo(null);
+    }
+
+    @Test
+    void unionField() {
+      var schema = createSchema(
+          """
+               {
+                 "type": "record",
+                 "namespace": "com.test",
+                 "name": "TestAvroRecord",
+                 "fields": [
+                   {
+                     "name": "f_union",
+                     "type": [ "null", "int", "TestAvroRecord"]
+                   }
+                 ]
+              }"""
+      );
+
+      String jsonPayload = "{ \"f_union\": null }";
+
+      var record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
+      assertThat(record.get("f_union")).isNull();
+
+      jsonPayload = "{ \"f_union\": { \"int\": 123 } }";
+      record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
+      assertThat(record.get("f_union")).isEqualTo(123);
+
+      //inner-record's name should be fully-qualified!
+      jsonPayload = "{ \"f_union\": { \"com.test.TestAvroRecord\": { \"f_union\": { \"int\": 123  } } } }";
+      record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
+      assertThat(record.get("f_union")).isInstanceOf(GenericData.Record.class);
+      var innerRec = (GenericData.Record) record.get("f_union");
+      assertThat(innerRec.get("f_union")).isEqualTo(123);
+    }
+
+    @Test
+    void mapField() {
+      var schema = createSchema(
+          """
+               {
+                 "type": "record",
+                 "name": "TestAvroRecord",
+                 "fields": [
+                   {
+                     "name": "long_map",
+                     "type": {
+                       "type": "map",
+                       "values" : "long",
+                       "default": {}
+                     }
+                   },
+                   {
+                     "name": "string_map",
+                     "type": {
+                       "type": "map",
+                       "values" : "string",
+                       "default": {}
+                     }
+                   },
+                   {
+                     "name": "self_ref_map",
+                     "type": {
+                       "type": "map",
+                       "values" : "TestAvroRecord",
+                       "default": {}
+                     }
+                   }
+                 ]
+              }"""
+      );
+
+      String jsonPayload = """
+          {
+            "long_map": {
+              "k1": 123,
+              "k2": 456
+            },
+            "string_map": {
+              "k3": "s1",
+              "k4": "s2"
+            },
+            "self_ref_map": {
+              "k5" : {
+                "long_map": { "_k1": 222 },
+                "string_map": { "_k2": "_s1" }
+              }
+            }
+          }
+          """;
+
+      var record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
+      assertThat(record.get("long_map"))
+          .isEqualTo(Map.of("k1", 123L, "k2", 456L));
+      assertThat(record.get("string_map"))
+          .isEqualTo(Map.of("k3", "s1", "k4", "s2"));
+      assertThat(record.get("self_ref_map"))
+          .isNotNull();
+
+      Map<String, Object> selfRefMapField = (Map<String, Object>) record.get("self_ref_map");
+      assertThat(selfRefMapField)
+          .hasSize(1)
+          .hasEntrySatisfying("k5", v -> {
+            assertThat(v).isInstanceOf(GenericData.Record.class);
+            var innerRec = (GenericData.Record) v;
+            assertThat(innerRec.get("long_map"))
+                .isEqualTo(Map.of("_k1", 222L));
+            assertThat(innerRec.get("string_map"))
+                .isEqualTo(Map.of("_k2", "_s1"));
+          });
+    }
+
+    @Test
+    void arrayField() {
+      var schema = createSchema(
+          """
+               {
+                 "type": "record",
+                 "name": "TestAvroRecord",
+                 "fields": [
+                   {
+                     "name": "f_array",
+                     "type": {
+                        "type": "array",
+                        "items" : "string",
+                        "default": []
+                      }
+                   }
+                 ]
+              }"""
+      );
+
+      String jsonPayload = """
+          {
+            "f_array": [ "e1", "e2" ]
+          }
+          """;
+
+      var record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
+      assertThat(record.get("f_array")).isEqualTo(List.of("e1", "e2"));
+    }
+
+    @Test
+    void logicalTypesField() {
+      var schema = createSchema(
+          """
+               {
+                 "type": "record",
+                 "name": "TestAvroRecord",
+                 "fields": [
+                   {
+                     "name": "lt_date",
+                     "type": { "type": "int", "logicalType": "date" }
+                   },
+                   {
+                     "name": "lt_uuid",
+                     "type": { "type": "string", "logicalType": "uuid" }
+                   },
+                   {
+                     "name": "lt_decimal",
+                     "type": { "type": "bytes", "logicalType": "decimal", "precision": 22, "scale":10 }
+                   },
+                   {
+                     "name": "lt_time_millis",
+                     "type": { "type": "int", "logicalType": "time-millis"}
+                   },
+                   {
+                     "name": "lt_time_micros",
+                     "type": { "type": "long", "logicalType": "time-micros"}
+                   },
+                   {
+                     "name": "lt_timestamp_millis",
+                     "type": { "type": "long", "logicalType": "timestamp-millis" }
+                   },
+                   {
+                     "name": "lt_timestamp_micros",
+                     "type": { "type": "long", "logicalType": "timestamp-micros" }
+                   },
+                   {
+                     "name": "lt_local_timestamp_millis",
+                     "type": { "type": "long", "logicalType": "local-timestamp-millis" }
+                   },
+                   {
+                     "name": "lt_local_timestamp_micros",
+                     "type": { "type": "long", "logicalType": "local-timestamp-micros" }
+                   }
+                 ]
+              }"""
+      );
+
+      String jsonPayload = """
+          {
+            "lt_date":"1991-08-14",
+            "lt_decimal": 2.1617413862327545E11,
+            "lt_time_millis": "10:15:30.001",
+            "lt_time_micros": "10:15:30.123456",
+            "lt_uuid": "a37b75ca-097c-5d46-6119-f0637922e908",
+            "lt_timestamp_millis": "2007-12-03T10:15:30.123Z",
+            "lt_timestamp_micros": "2007-12-13T10:15:30.123456Z",
+            "lt_local_timestamp_millis": "2017-12-03T10:15:30.123",
+            "lt_local_timestamp_micros": "2017-12-13T10:15:30.123456"
+          }
+          """;
+
+      var converted = convertJsonToAvro(jsonPayload, schema);
+      assertThat(converted).isInstanceOf(GenericData.Record.class);
+
+      var record = (GenericData.Record) converted;
+
+      assertThat(record.get("lt_date"))
+          .isEqualTo(LocalDate.of(1991, 8, 14));
+      assertThat(record.get("lt_decimal"))
+          .isEqualTo(new BigDecimal("2.1617413862327545E11"));
+      assertThat(record.get("lt_time_millis"))
+          .isEqualTo(LocalTime.parse("10:15:30.001"));
+      assertThat(record.get("lt_time_micros"))
+          .isEqualTo(LocalTime.parse("10:15:30.123456"));
+      assertThat(record.get("lt_timestamp_millis"))
+          .isEqualTo(Instant.parse("2007-12-03T10:15:30.123Z"));
+      assertThat(record.get("lt_timestamp_micros"))
+          .isEqualTo(Instant.parse("2007-12-13T10:15:30.123456Z"));
+      assertThat(record.get("lt_local_timestamp_millis"))
+          .isEqualTo(LocalDateTime.parse("2017-12-03T10:15:30.123"));
+      assertThat(record.get("lt_local_timestamp_micros"))
+          .isEqualTo(LocalDateTime.parse("2017-12-13T10:15:30.123456"));
+    }
+  }
+
+  // checking conversion of KafkaAvroDeserializer output to JsonNode
+  @Nested
+  class FromAvroToJson {
+
+    @Test
+    void primitiveRoot() {
+      assertThat(convertAvroToJson("str", createSchema("\"string\"")))
+          .isEqualTo(new TextNode("str"));
+
+      assertThat(convertAvroToJson(123, createSchema("\"int\"")))
+          .isEqualTo(new IntNode(123));
+
+      assertThat(convertAvroToJson(123L, createSchema("\"long\"")))
+          .isEqualTo(new LongNode(123));
+
+      assertThat(convertAvroToJson(123.1F, createSchema("\"float\"")))
+          .isEqualTo(new FloatNode(123.1F));
+
+      assertThat(convertAvroToJson(123.1, createSchema("\"double\"")))
+          .isEqualTo(new DoubleNode(123.1));
+
+      assertThat(convertAvroToJson(true, createSchema("\"boolean\"")))
+          .isEqualTo(BooleanNode.valueOf(true));
+
+      assertThat(convertAvroToJson(ByteBuffer.wrap(Longs.toByteArray(123L)), createSchema("\"bytes\"")))
+          .isEqualTo(new TextNode(new String(Longs.toByteArray(123L), StandardCharsets.ISO_8859_1)));
+    }
+
+    @SneakyThrows
+    @Test
+    void primitiveTypedFields() {
+      var schema = createSchema(
+          """
+               {
+                 "type": "record",
+                 "name": "TestAvroRecord",
+                 "fields": [
+                   {
+                     "name": "f_int",
+                     "type": "int"
+                   },
+                   {
+                     "name": "f_long",
+                     "type": "long"
+                   },
+                   {
+                     "name": "f_string",
+                     "type": "string"
+                   },
+                   {
+                     "name": "f_boolean",
+                     "type": "boolean"
+                   },
+                   {
+                     "name": "f_float",
+                     "type": "float"
+                   },
+                   {
+                     "name": "f_double",
+                     "type": "double"
+                   },
+                   {
+                     "name": "f_enum",
+                     "type" : {
+                      "type": "enum",
+                      "name": "Suit",
+                      "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
+                     }
+                   },
+                   {
+                     "name" : "f_fixed",
+                     "type" : { "type" : "fixed" ,"size" : 8, "name": "long_encoded" }
+                   },
+                   {
+                     "name" : "f_bytes",
+                     "type": "bytes"
+                   }
+                 ]
+              }"""
+      );
+
+      byte[] fixedFieldValue = Longs.toByteArray(1234L);
+      byte[] bytesFieldValue = Longs.toByteArray(2345L);
+
+      GenericData.Record inputRecord = new GenericData.Record(schema);
+      inputRecord.put("f_int", 123);
+      inputRecord.put("f_long", 4294967294L);
+      inputRecord.put("f_string", "string here");
+      inputRecord.put("f_boolean", true);
+      inputRecord.put("f_float", 123.1f);
+      inputRecord.put("f_double", 123456.123456);
+      inputRecord.put("f_enum", new GenericData.EnumSymbol(schema.getField("f_enum").schema(), "SPADES"));
+      inputRecord.put("f_fixed", new GenericData.Fixed(schema.getField("f_fixed").schema(), fixedFieldValue));
+      inputRecord.put("f_bytes", ByteBuffer.wrap(bytesFieldValue));
+
+      String expectedJson = """
+          {
+            "f_int": 123,
+            "f_long": 4294967294,
+            "f_string": "string here",
+            "f_boolean": true,
+            "f_float": 123.1,
+            "f_double": 123456.123456,
+            "f_enum": "SPADES",
+            "f_fixed": "\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0004Ò",
+            "f_bytes": "\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\t)"
+          }
+          """;
+
+      assertJsonsEqual(expectedJson, convertAvroToJson(inputRecord, schema));
+    }
+
+    @Test
+    void logicalTypesField() {
+      var schema = createSchema(
+          """
+               {
+                 "type": "record",
+                 "name": "TestAvroRecord",
+                 "fields": [
+                   {
+                     "name": "lt_date",
+                     "type": { "type": "int", "logicalType": "date" }
+                   },
+                   {
+                     "name": "lt_uuid",
+                     "type": { "type": "string", "logicalType": "uuid" }
+                   },
+                   {
+                     "name": "lt_decimal",
+                     "type": { "type": "bytes", "logicalType": "decimal", "precision": 22, "scale":10 }
+                   },
+                   {
+                     "name": "lt_time_millis",
+                     "type": { "type": "int", "logicalType": "time-millis"}
+                   },
+                   {
+                     "name": "lt_time_micros",
+                     "type": { "type": "long", "logicalType": "time-micros"}
+                   },
+                   {
+                     "name": "lt_timestamp_millis",
+                     "type": { "type": "long", "logicalType": "timestamp-millis" }
+                   },
+                   {
+                     "name": "lt_timestamp_micros",
+                     "type": { "type": "long", "logicalType": "timestamp-micros" }
+                   },
+                   {
+                     "name": "lt_local_timestamp_millis",
+                     "type": { "type": "long", "logicalType": "local-timestamp-millis" }
+                   },
+                   {
+                     "name": "lt_local_timestamp_micros",
+                     "type": { "type": "long", "logicalType": "local-timestamp-micros" }
+                   }
+                 ]
+              }"""
+      );
+
+      GenericData.Record inputRecord = new GenericData.Record(schema);
+      inputRecord.put("lt_date", LocalDate.of(1991, 8, 14));
+      inputRecord.put("lt_uuid", UUID.fromString("a37b75ca-097c-5d46-6119-f0637922e908"));
+      inputRecord.put("lt_decimal", new BigDecimal("2.16"));
+      inputRecord.put("lt_time_millis", LocalTime.parse("10:15:30.001"));
+      inputRecord.put("lt_time_micros", LocalTime.parse("10:15:30.123456"));
+      inputRecord.put("lt_timestamp_millis", Instant.parse("2007-12-03T10:15:30.123Z"));
+      inputRecord.put("lt_timestamp_micros", Instant.parse("2007-12-13T10:15:30.123456Z"));
+      inputRecord.put("lt_local_timestamp_millis", LocalDateTime.parse("2017-12-03T10:15:30.123"));
+      inputRecord.put("lt_local_timestamp_micros", LocalDateTime.parse("2017-12-13T10:15:30.123456"));
+
+      String expectedJson = """
+          {
+            "lt_date":"1991-08-14",
+            "lt_uuid": "a37b75ca-097c-5d46-6119-f0637922e908",
+            "lt_decimal": 2.16,
+            "lt_time_millis": "10:15:30.001",
+            "lt_time_micros": "10:15:30.123456",
+            "lt_timestamp_millis": "2007-12-03T10:15:30.123Z",
+            "lt_timestamp_micros": "2007-12-13T10:15:30.123456Z",
+            "lt_local_timestamp_millis": "2017-12-03T10:15:30.123",
+            "lt_local_timestamp_micros": "2017-12-13T10:15:30.123456"
+          }
+          """;
+
+      assertJsonsEqual(expectedJson, convertAvroToJson(inputRecord, schema));
+    }
+
+    @Test
+    void unionField() {
+      var schema = createSchema(
+          """
+               {
+                 "type": "record",
+                 "namespace": "com.test",
+                 "name": "TestAvroRecord",
+                 "fields": [
+                   {
+                     "name": "f_union",
+                     "type": [ "null", "int", "TestAvroRecord"]
+                   }
+                 ]
+              }"""
+      );
+
+      var r = new GenericData.Record(schema);
+      r.put("f_union", null);
+      assertJsonsEqual(" {}", convertAvroToJson(r, schema));
+
+      r = new GenericData.Record(schema);
+      r.put("f_union", 123);
+      assertJsonsEqual(" { \"f_union\" : { \"int\" : 123 } }", convertAvroToJson(r, schema));
+
+
+      r = new GenericData.Record(schema);
+      var innerRec = new GenericData.Record(schema);
+      innerRec.put("f_union", 123);
+      r.put("f_union", innerRec);
+      assertJsonsEqual(
+          " { \"f_union\" : { \"com.test.TestAvroRecord\" : { \"f_union\" : { \"int\" : 123 } } } }",
+          convertAvroToJson(r, schema)
+      );
+    }
+
+  }
+
+  private Schema createSchema(String schema) {
+    return new AvroSchema(schema).rawSchema();
+  }
+
+  @SneakyThrows
+  private void assertJsonsEqual(String expectedJson, JsonNode actual) {
+    var mapper = new JsonMapper();
+    assertThat(actual.toPrettyString())
+        .isEqualTo(mapper.readTree(expectedJson).toPrettyString());
+  }
+
+}

+ 5 - 0
kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/pages/BasePage.java

@@ -28,6 +28,7 @@ public abstract class BasePage extends WebUtils {
   protected SelenideElement confirmBtn = $x("//button[contains(text(),'Confirm')]");
   protected SelenideElement cancelBtn = $x("//button[contains(text(),'Cancel')]");
   protected SelenideElement backBtn = $x("//button[contains(text(),'Back')]");
+  protected SelenideElement previousBtn = $x("//button[contains(text(),'Previous')]");
   protected SelenideElement nextBtn = $x("//button[contains(text(),'Next')]");
   protected ElementsCollection ddlOptions = $$x("//li[@value]");
   protected ElementsCollection gridItems = $$x("//tr[@class]");
@@ -75,6 +76,10 @@ public abstract class BasePage extends WebUtils {
     clickByJavaScript(backBtn);
   }
 
+  protected void clickPreviousBtn() {
+    clickByJavaScript(previousBtn);
+  }
+
   protected void setJsonInputValue(SelenideElement jsonInput, String jsonConfig) {
     sendKeysByActions(jsonInput, jsonConfig.replace("  ", ""));
     new Actions(WebDriverRunner.getWebDriver())

+ 54 - 2
kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/pages/brokers/BrokersConfigTab.java

@@ -66,6 +66,13 @@ public class BrokersConfigTab extends BasePage {
     return this;
   }
 
+  @Step
+  public BrokersConfigTab clickPreviousButton() {
+    clickPreviousBtn();
+    waitUntilSpinnerDisappear(1);
+    return this;
+  }
+
   private List<BrokersConfigTab.BrokersConfigItem> initGridItems() {
     List<BrokersConfigTab.BrokersConfigItem> gridItemList = new ArrayList<>();
     gridItems.shouldHave(CollectionCondition.sizeGreaterThan(0))
@@ -104,13 +111,58 @@ public class BrokersConfigTab extends BasePage {
     }
 
     @Step
-    public void edit() {
-      element.$x("./td[2]//button").shouldBe(Condition.enabled).click();
+    public BrokersConfigItem setValue(String value) {
+      sendKeysAfterClear(getValueFld(), value);
+      return this;
+    }
+
+    @Step
+    public SelenideElement getValueFld() {
+      return element.$x("./td[2]//input");
+    }
+
+    @Step
+    public SelenideElement getSaveBtn() {
+      return element.$x("./td[2]//button[@aria-label='confirmAction']");
+    }
+
+    @Step
+    public SelenideElement getCancelBtn() {
+      return element.$x("./td[2]//button[@aria-label='cancelAction']");
+    }
+
+    @Step
+    public SelenideElement getEditBtn() {
+      return element.$x("./td[2]//button[@aria-label='editAction']");
+    }
+
+    @Step
+    public BrokersConfigItem clickSaveBtn() {
+      getSaveBtn().shouldBe(Condition.enabled).click();
+      return this;
+    }
+
+    @Step
+    public BrokersConfigItem clickCancelBtn() {
+      getCancelBtn().shouldBe(Condition.enabled).click();
+      return this;
+    }
+
+    @Step
+    public BrokersConfigItem clickEditBtn() {
+      getEditBtn().shouldBe(Condition.enabled).click();
+      return this;
     }
 
     @Step
     public String getSource() {
       return element.$x("./td[3]").getText().trim();
     }
+
+    @Step
+    public BrokersConfigItem clickConfirm() {
+      clickConfirmButton();
+      return this;
+    }
   }
 }

+ 9 - 23
kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/backlog/SmokeBacklog.java

@@ -15,80 +15,66 @@ import org.testng.annotations.Test;
 
 public class SmokeBacklog extends BaseManualTest {
 
-  @Automation(state = TO_BE_AUTOMATED)
-  @Suite(id = BROKERS_SUITE_ID)
-  @QaseId(332)
-  @Test
-  public void testCaseA() {
-  }
-
   @Automation(state = TO_BE_AUTOMATED)
   @Suite(id = TOPICS_PROFILE_SUITE_ID)
   @QaseId(335)
   @Test
-  public void testCaseB() {
+  public void testCaseA() {
   }
 
   @Automation(state = TO_BE_AUTOMATED)
   @Suite(id = TOPICS_PROFILE_SUITE_ID)
   @QaseId(336)
   @Test
-  public void testCaseC() {
+  public void testCaseB() {
   }
 
   @Automation(state = TO_BE_AUTOMATED)
   @Suite(id = TOPICS_PROFILE_SUITE_ID)
   @QaseId(343)
   @Test
-  public void testCaseD() {
+  public void testCaseC() {
   }
 
   @Automation(state = TO_BE_AUTOMATED)
   @Suite(id = SCHEMAS_SUITE_ID)
   @QaseId(345)
   @Test
-  public void testCaseE() {
+  public void testCaseD() {
   }
 
   @Automation(state = TO_BE_AUTOMATED)
   @Suite(id = SCHEMAS_SUITE_ID)
   @QaseId(346)
   @Test
-  public void testCaseF() {
+  public void testCaseE() {
   }
 
   @Automation(state = TO_BE_AUTOMATED)
   @Suite(id = TOPICS_PROFILE_SUITE_ID)
   @QaseId(347)
   @Test
-  public void testCaseG() {
+  public void testCaseF() {
   }
 
   @Automation(state = TO_BE_AUTOMATED)
   @Suite(id = BROKERS_SUITE_ID)
   @QaseId(348)
   @Test
-  public void testCaseH() {
-  }
-
-  @Automation(state = TO_BE_AUTOMATED)
-  @Suite(id = BROKERS_SUITE_ID)
-  @QaseId(350)
-  @Test
-  public void testCaseI() {
+  public void testCaseG() {
   }
 
   @Automation(state = NOT_AUTOMATED)
   @Suite(id = TOPICS_SUITE_ID)
   @QaseId(50)
   @Test
-  public void testCaseJ() {
+  public void testCaseH() {
   }
 
   @Automation(state = NOT_AUTOMATED)
   @Suite(id = SCHEMAS_SUITE_ID)
   @QaseId(351)
   @Test
-  public void testCaseK() {
+  public void testCaseI() {
   }
 }

+ 74 - 7
kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokesuite/brokers/BrokersTest.java

@@ -11,6 +11,7 @@ import io.qase.api.annotation.QaseId;
 import org.testng.Assert;
 import org.testng.annotations.Ignore;
 import org.testng.annotations.Test;
+import org.testng.asserts.SoftAssert;
 
 public class BrokersTest extends BaseTest {
 
@@ -49,11 +50,11 @@ public class BrokersTest extends BaseTest {
   @Issue("https://github.com/provectus/kafka-ui/issues/3347")
   @QaseId(330)
   @Test
-  public void brokersConfigSearchCheck() {
+  public void brokersConfigFirstPageSearchCheck() {
     navigateToBrokersAndOpenDetails(DEFAULT_BROKER_ID);
     brokersDetails
         .openDetailsTab(CONFIGS);
-    String anyConfigKey = brokersConfigTab
+    String anyConfigKeyFirstPage = brokersConfigTab
         .getAllConfigs().stream()
         .findAny().orElseThrow()
         .getKey();
@@ -61,14 +62,42 @@ public class BrokersTest extends BaseTest {
         .clickNextButton();
     Assert.assertFalse(brokersConfigTab.getAllConfigs().stream()
             .map(BrokersConfigTab.BrokersConfigItem::getKey)
-            .toList().contains(anyConfigKey),
-        String.format("getAllConfigs().contains(%s)", anyConfigKey));
+            .toList().contains(anyConfigKeyFirstPage),
+        String.format("getAllConfigs().contains(%s)", anyConfigKeyFirstPage));
     brokersConfigTab
-        .searchConfig(anyConfigKey);
+        .searchConfig(anyConfigKeyFirstPage);
     Assert.assertTrue(brokersConfigTab.getAllConfigs().stream()
             .map(BrokersConfigTab.BrokersConfigItem::getKey)
-            .toList().contains(anyConfigKey),
-        String.format("getAllConfigs().contains(%s)", anyConfigKey));
+            .toList().contains(anyConfigKeyFirstPage),
+        String.format("getAllConfigs().contains(%s)", anyConfigKeyFirstPage));
+  }
+
+  @Ignore
+  @Issue("https://github.com/provectus/kafka-ui/issues/3347")
+  @QaseId(350)
+  @Test
+  public void brokersConfigSecondPageSearchCheck() {
+    navigateToBrokersAndOpenDetails(DEFAULT_BROKER_ID);
+    brokersDetails
+        .openDetailsTab(CONFIGS);
+    brokersConfigTab
+        .clickNextButton();
+    String anyConfigKeySecondPage = brokersConfigTab
+        .getAllConfigs().stream()
+        .findAny().orElseThrow()
+        .getKey();
+    brokersConfigTab
+        .clickPreviousButton();
+    Assert.assertFalse(brokersConfigTab.getAllConfigs().stream()
+            .map(BrokersConfigTab.BrokersConfigItem::getKey)
+            .toList().contains(anyConfigKeySecondPage),
+        String.format("getAllConfigs().contains(%s)", anyConfigKeySecondPage));
+    brokersConfigTab
+        .searchConfig(anyConfigKeySecondPage);
+    Assert.assertTrue(brokersConfigTab.getAllConfigs().stream()
+            .map(BrokersConfigTab.BrokersConfigItem::getKey)
+            .toList().contains(anyConfigKeySecondPage),
+        String.format("getAllConfigs().contains(%s)", anyConfigKeySecondPage));
   }
 
   @QaseId(331)
@@ -82,4 +111,42 @@ public class BrokersTest extends BaseTest {
         .getSourceInfoTooltipText();
     Assert.assertEquals(sourceInfoTooltip, BROKER_SOURCE_INFO_TOOLTIP, "brokerSourceInfoTooltip");
   }
+
+  @QaseId(332)
+  @Test
+  public void brokersConfigEditCheck() {
+    navigateToBrokersAndOpenDetails(DEFAULT_BROKER_ID);
+    brokersDetails
+        .openDetailsTab(CONFIGS);
+    String configKey = "log.cleaner.min.compaction.lag.ms";
+    BrokersConfigTab.BrokersConfigItem configItem = brokersConfigTab
+        .searchConfig(configKey)
+        .getConfig(configKey);
+    int defaultValue = Integer.parseInt(configItem.getValue());
+    configItem
+        .clickEditBtn();
+    SoftAssert softly = new SoftAssert();
+    softly.assertTrue(configItem.getSaveBtn().isDisplayed(), "getSaveBtn().isDisplayed()");
+    softly.assertTrue(configItem.getCancelBtn().isDisplayed(), "getCancelBtn().isDisplayed()");
+    softly.assertTrue(configItem.getValueFld().isEnabled(), "getValueFld().isEnabled()");
+    softly.assertAll();
+    int newValue = defaultValue + 1;
+    configItem
+        .setValue(String.valueOf(newValue))
+        .clickCancelBtn();
+    Assert.assertEquals(Integer.parseInt(configItem.getValue()), defaultValue, "getValue()");
+    configItem
+        .clickEditBtn()
+        .setValue(String.valueOf(newValue))
+        .clickSaveBtn()
+        .clickConfirm();
+    configItem = brokersConfigTab
+        .searchConfig(configKey)
+        .getConfig(configKey);
+    softly.assertFalse(configItem.getSaveBtn().isDisplayed(), "getSaveBtn().isDisplayed()");
+    softly.assertFalse(configItem.getCancelBtn().isDisplayed(), "getCancelBtn().isDisplayed()");
+    softly.assertTrue(configItem.getEditBtn().isDisplayed(), "getEditBtn().isDisplayed()");
+    softly.assertEquals(Integer.parseInt(configItem.getValue()), newValue, "getValue()");
+    softly.assertAll();
+  }
 }

+ 2 - 0
kafka-ui-react-app/package.json

@@ -24,6 +24,7 @@
     "json-schema-faker": "^0.5.0-rcv.44",
     "jsonpath-plus": "^7.2.0",
     "lodash": "^4.17.21",
+    "lossless-json": "^2.0.8",
     "pretty-ms": "7.0.1",
     "react": "^18.1.0",
     "react-ace": "^10.1.0",
@@ -71,6 +72,7 @@
     "@testing-library/user-event": "^14.4.3",
     "@types/eventsource": "^1.1.8",
     "@types/lodash": "^4.14.172",
+    "@types/lossless-json": "^1.0.1",
     "@types/node": "^16.4.13",
     "@types/react": "^18.0.9",
     "@types/react-datepicker": "^4.8.0",

+ 19 - 5
kafka-ui-react-app/pnpm-lock.yaml

@@ -19,6 +19,7 @@ specifiers:
   '@testing-library/user-event': ^14.4.3
   '@types/eventsource': ^1.1.8
   '@types/lodash': ^4.14.172
+  '@types/lossless-json': ^1.0.1
   '@types/node': ^16.4.13
   '@types/react': ^18.0.9
   '@types/react-datepicker': ^4.8.0
@@ -55,6 +56,7 @@ specifiers:
   json-schema-faker: ^0.5.0-rcv.44
   jsonpath-plus: ^7.2.0
   lodash: ^4.17.21
+  lossless-json: ^2.0.8
   prettier: ^2.8.4
   pretty-ms: 7.0.1
   react: ^18.1.0
@@ -96,7 +98,7 @@ dependencies:
   '@types/testing-library__jest-dom': 5.14.5
   ace-builds: 1.7.1
   ajv: 8.8.2
-  ajv-formats: 2.1.1
+  ajv-formats: 2.1.1_ajv@8.8.2
   classnames: 2.3.1
   fetch-mock: 9.11.0
   jest: 29.5.0_6m7kcbkkzjz4ln6z66tlzx44we
@@ -104,6 +106,7 @@ dependencies:
   json-schema-faker: 0.5.0-rcv.44
   jsonpath-plus: 7.2.0
   lodash: 4.17.21
+  lossless-json: 2.0.8
   pretty-ms: 7.0.1
   react: 18.1.0
   react-ace: 10.1.0_ef5jwxihqo6n7gxfmzogljlgcm
@@ -136,6 +139,7 @@ devDependencies:
   '@testing-library/user-event': 14.4.3_@testing-library+dom@9.0.0
   '@types/eventsource': 1.1.8
   '@types/lodash': 4.14.177
+  '@types/lossless-json': 1.0.1
   '@types/node': 16.11.7
   '@types/react': 18.0.9
   '@types/react-datepicker': 4.10.0_react@18.1.0
@@ -1770,6 +1774,10 @@ packages:
     resolution: {integrity: sha512-0fDwydE2clKe9MNfvXHBHF9WEahRuj+msTuQqOmAApNORFvhMYZKNGGJdCzuhheVjMps/ti0Ak/iJPACMaevvw==}
     dev: true
 
+  /@types/lossless-json/1.0.1:
+    resolution: {integrity: sha512-zPE8kmpeL5/6L5gtTQHSOkAW/OSYYNTDRt6/2oEgLO1Zd3Rj5WVDoMloTtLJxQJhZGLGbL4pktKSh3NbzdaWdw==}
+    dev: true
+
   /@types/node/16.11.7:
     resolution: {integrity: sha512-QB5D2sqfSjCmTuWcBWyJ+/44bcjO7VbjSbOE0ucoVbAsSNQc4Lt6QkgkVXkTDwkL4z/beecZNDvVX15D4P8Jbw==}
 
@@ -2050,8 +2058,10 @@ packages:
       - supports-color
     dev: true
 
-  /ajv-formats/2.1.1:
+  /ajv-formats/2.1.1_ajv@8.8.2:
     resolution: {integrity: sha512-Wx0Kx52hxE7C18hkMEggYlEifqWZtYaRgouJor+WMdPnQyEK13vgEWyVNup7SoeeoLMsr4kf5h6dOW11I15MUA==}
+    peerDependencies:
+      ajv: ^8.0.0
     peerDependenciesMeta:
       ajv:
         optional: true
@@ -2734,8 +2744,8 @@ packages:
       ms: 2.1.2
       supports-color: 5.5.0
 
-  /decimal.js/10.3.1:
-    resolution: {integrity: sha512-V0pfhfr8suzyPGOx3nmq4aHqabehUZn6Ch9kyFpV79TGDTWFmHqUqXdabR7QHqxzrYolF4+tVmJhUG4OURg5dQ==}
+  /decimal.js/10.4.3:
+    resolution: {integrity: sha512-VBBaLc1MgL5XpzgIP7ny5Z6Nx3UrRkIViUkPUdtl9aya5amy3De1gsUUSB1g3+3sExYNjCAsAznmukyxCb1GRA==}
     dev: true
 
   /dedent/0.7.0:
@@ -4649,7 +4659,7 @@ packages:
       cssom: 0.5.0
       cssstyle: 2.3.0
       data-urls: 3.0.2
-      decimal.js: 10.3.1
+      decimal.js: 10.4.3
       domexception: 4.0.0
       escodegen: 2.0.0
       form-data: 4.0.0
@@ -4841,6 +4851,10 @@ packages:
     dependencies:
       js-tokens: 4.0.0
 
+  /lossless-json/2.0.8:
+    resolution: {integrity: sha512-7/GaZldUc7H5oNZlSk6bF06cRbtA7oF8zWXwbfMZm8yrYC2debx0KvWTBbQIbj6fh08LsXTWg+YtHJshXgYKow==}
+    dev: false
+
   /lru-cache/6.0.0:
     resolution: {integrity: sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==}
     engines: {node: '>=10'}

+ 3 - 3
kafka-ui-react-app/src/components/Connect/Details/Actions/Actions.tsx

@@ -102,7 +102,7 @@ const Actions: React.FC = () => {
           disabled={isMutating}
           permission={{
             resource: ResourceType.CONNECT,
-            action: Action.EDIT,
+            action: Action.RESTART,
             value: routerProps.connectorName,
           }}
         >
@@ -113,7 +113,7 @@ const Actions: React.FC = () => {
           disabled={isMutating}
           permission={{
             resource: ResourceType.CONNECT,
-            action: Action.EDIT,
+            action: Action.RESTART,
             value: routerProps.connectorName,
           }}
         >
@@ -124,7 +124,7 @@ const Actions: React.FC = () => {
           disabled={isMutating}
           permission={{
             resource: ResourceType.CONNECT,
-            action: Action.EDIT,
+            action: Action.RESTART,
             value: routerProps.connectorName,
           }}
         >

+ 10 - 4
kafka-ui-react-app/src/components/Connect/Details/Tasks/ActionsCellTasks.tsx

@@ -1,9 +1,10 @@
 import React from 'react';
-import { Task } from 'generated-sources';
+import { Action, ResourceType, Task } from 'generated-sources';
 import { CellContext } from '@tanstack/react-table';
 import useAppParams from 'lib/hooks/useAppParams';
 import { useRestartConnectorTask } from 'lib/hooks/api/kafkaConnect';
-import { Dropdown, DropdownItem } from 'components/common/Dropdown';
+import { Dropdown } from 'components/common/Dropdown';
+import { ActionDropdownItem } from 'components/common/ActionComponent';
 import { RouterParamsClusterConnectConnector } from 'lib/paths';
 
 const ActionsCellTasks: React.FC<CellContext<Task, unknown>> = ({ row }) => {
@@ -18,13 +19,18 @@ const ActionsCellTasks: React.FC<CellContext<Task, unknown>> = ({ row }) => {
 
   return (
     <Dropdown>
-      <DropdownItem
+      <ActionDropdownItem
         onClick={() => restartTaskHandler(id?.task)}
         danger
         confirm="Are you sure you want to restart the task?"
+        permission={{
+          resource: ResourceType.CONNECT,
+          action: Action.RESTART,
+          value: routerProps.connectorName,
+        }}
       >
         <span>Restart task</span>
-      </DropdownItem>
+      </ActionDropdownItem>
     </Dropdown>
   );
 };

+ 3 - 3
kafka-ui-react-app/src/components/Connect/List/ActionsCell.tsx

@@ -78,7 +78,7 @@ const ActionsCell: React.FC<CellContext<FullConnectorInfo, unknown>> = ({
         disabled={isMutating}
         permission={{
           resource: ResourceType.CONNECT,
-          action: Action.EDIT,
+          action: Action.RESTART,
           value: name,
         }}
       >
@@ -89,7 +89,7 @@ const ActionsCell: React.FC<CellContext<FullConnectorInfo, unknown>> = ({
         disabled={isMutating}
         permission={{
           resource: ResourceType.CONNECT,
-          action: Action.EDIT,
+          action: Action.RESTART,
           value: name,
         }}
       >
@@ -100,7 +100,7 @@ const ActionsCell: React.FC<CellContext<FullConnectorInfo, unknown>> = ({
         disabled={isMutating}
         permission={{
           resource: ResourceType.CONNECT,
-          action: Action.EDIT,
+          action: Action.RESTART,
           value: name,
         }}
       >

+ 4 - 8
kafka-ui-react-app/src/components/Connect/New/New.tsx

@@ -38,7 +38,7 @@ const New: React.FC = () => {
   const { clusterName } = useAppParams<ClusterNameRoute>();
   const navigate = useNavigate();
 
-  const { data: connects } = useConnects(clusterName);
+  const { data: connects = [] } = useConnects(clusterName);
   const mutation = useCreateConnector(clusterName);
 
   const methods = useForm<FormValues>({
@@ -88,10 +88,6 @@ const New: React.FC = () => {
     }
   };
 
-  if (!connects || connects.length === 0) {
-    return null;
-  }
-
   const connectOptions = connects.map(({ name: connectName }) => ({
     value: connectName,
     label: connectName,
@@ -108,10 +104,10 @@ const New: React.FC = () => {
         onSubmit={handleSubmit(onSubmit)}
         aria-label="Create connect form"
       >
-        <S.Filed $hidden={connects.length <= 1}>
+        <S.Filed $hidden={connects?.length <= 1}>
           <Heading level={3}>Connect *</Heading>
           <Controller
-            defaultValue={connectOptions[0].value}
+            defaultValue={connectOptions[0]?.value}
             control={control}
             name="connectName"
             render={({ field: { name, onChange } }) => (
@@ -120,7 +116,7 @@ const New: React.FC = () => {
                 name={name}
                 disabled={isSubmitting}
                 onChange={onChange}
-                value={connectOptions[0].value}
+                value={connectOptions[0]?.value}
                 minWidth="100%"
                 options={connectOptions}
               />

+ 2 - 2
kafka-ui-react-app/src/components/common/EditorViewer/EditorViewer.tsx

@@ -1,6 +1,7 @@
 import React from 'react';
 import Editor from 'components/common/Editor/Editor';
 import { SchemaType } from 'generated-sources';
+import { parse, stringify } from 'lossless-json';
 
 import * as S from './EditorViewer.styled';
 
@@ -9,10 +10,9 @@ export interface EditorViewerProps {
   schemaType?: string;
   maxLines?: number;
 }
-
 const getSchemaValue = (data: string, schemaType?: string) => {
   if (schemaType === SchemaType.JSON || schemaType === SchemaType.AVRO) {
-    return JSON.stringify(JSON.parse(data), null, '\t');
+    return stringify(parse(data), undefined, '\t');
   }
   return data;
 };

+ 1 - 1
pom.xml

@@ -59,7 +59,7 @@
         <maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
         <maven-resources-plugin.version>3.2.0</maven-resources-plugin.version>
         <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
-        <openapi-generator-maven-plugin.version>6.5.0</openapi-generator-maven-plugin.version>
+        <openapi-generator-maven-plugin.version>6.6.0</openapi-generator-maven-plugin.version>
         <springdoc-openapi-webflux-ui.version>1.2.32</springdoc-openapi-webflux-ui.version>
     </properties>