Explorar o código

Minor BE fixes and improvements (#3719)

1. Not setting default key serde to SR/Proto if it is not explicitly set.
2. enabling searching connector by connect name
3. switch using serializedKeySize/serializedValueSize methods instead using byte array len check
Ilya Kuramshin %!s(int64=2) %!d(string=hai) anos
pai
achega
601bd6bbf5

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/ConsumerRecordDeserializer.java

@@ -123,11 +123,11 @@ public class ConsumerRecordDeserializer {
   }
 
   private static Long getKeySize(ConsumerRecord<Bytes, Bytes> consumerRecord) {
-    return consumerRecord.key() != null ? (long) consumerRecord.key().get().length : null;
+    return consumerRecord.key() != null ? (long) consumerRecord.serializedKeySize() : null;
   }
 
   private static Long getValueSize(ConsumerRecord<Bytes, Bytes> consumerRecord) {
-    return consumerRecord.value() != null ? (long) consumerRecord.value().get().length : null;
+    return consumerRecord.value() != null ? (long) consumerRecord.serializedValueSize() : null;
   }
 
   private static int headerSize(Header header) {

+ 0 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

@@ -122,8 +122,6 @@ public class SerdesInitializer {
         registeredSerdes,
         Optional.ofNullable(clusterProperties.getDefaultKeySerde())
             .map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default key serde not found"))
-            .or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.name())))
-            .or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.name())))
             .orElse(null),
         Optional.ofNullable(clusterProperties.getDefaultValueSerde())
             .map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default value serde not found"))

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

@@ -109,6 +109,7 @@ public class KafkaConnectService {
   private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) {
     return Stream.of(
         fullConnectorInfo.getName(),
+        fullConnectorInfo.getConnect(),
         fullConnectorInfo.getStatus().getState().getValue(),
         fullConnectorInfo.getType().getValue());
   }

+ 3 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisStats.java

@@ -43,8 +43,7 @@ class TopicAnalysisStats {
     Long max;
     final UpdateDoublesSketch sizeSketch = DoublesSketch.builder().build();
 
-    void apply(byte[] bytes) {
-      int len = bytes.length;
+    void apply(int len) {
       sum += len;
       min = minNullable(min, len);
       max = maxNullable(max, len);
@@ -98,7 +97,7 @@ class TopicAnalysisStats {
 
     if (rec.key() != null) {
       byte[] keyBytes = rec.key().get();
-      keysSize.apply(keyBytes);
+      keysSize.apply(rec.serializedKeySize());
       uniqKeys.update(keyBytes);
     } else {
       nullKeys++;
@@ -106,7 +105,7 @@ class TopicAnalysisStats {
 
     if (rec.value() != null) {
       byte[] valueBytes = rec.value().get();
-      valuesSize.apply(valueBytes);
+      valuesSize.apply(rec.serializedValueSize());
       uniqValues.update(valueBytes);
     } else {
       nullValues++;