Browse Source

Merge branch 'master' into backend-traffic-metrics

Roman Nedzvetskiy 5 years ago
parent
commit
a00489f864
58 changed files with 2151 additions and 975 deletions
  1. 3 1
      .gitignore
  2. 20 1
      docker/kafka-clusters-only.yaml
  3. 17 0
      docker/kafka-ui.yaml
  4. 15 0
      kafka-ui-api/pom.xml
  5. 2 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java
  6. 43 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java
  7. 9 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/RecordDeserializer.java
  8. 131 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java
  9. 15 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SimpleRecordDeserializer.java
  10. 2 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java
  11. 7 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java
  12. 4 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java
  13. 2 0
      kafka-ui-api/src/main/resources/application-local.yml
  14. 1 0
      kafka-ui-api/src/main/resources/application-sdp.yml
  15. 1 1
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
  16. 608 411
      kafka-ui-react-app/package-lock.json
  17. 23 20
      kafka-ui-react-app/package.json
  18. 1 1
      kafka-ui-react-app/src/components/App.scss
  19. 5 0
      kafka-ui-react-app/src/components/Topics/Details/Details.tsx
  20. 90 4
      kafka-ui-react-app/src/components/Topics/Details/Messages/Messages.tsx
  21. 25 5
      kafka-ui-react-app/src/components/Topics/Details/Messages/MessagesContainer.ts
  22. 16 0
      kafka-ui-react-app/src/components/Topics/Details/Settings/SettingsEditButton.tsx
  23. 143 0
      kafka-ui-react-app/src/components/Topics/Edit/Edit.tsx
  24. 63 0
      kafka-ui-react-app/src/components/Topics/Edit/EditContainer.tsx
  25. 0 20
      kafka-ui-react-app/src/components/Topics/New/CustomParams/CustomParamOptions.tsx
  26. 0 87
      kafka-ui-react-app/src/components/Topics/New/CustomParams/CustomParams.tsx
  27. 0 18
      kafka-ui-react-app/src/components/Topics/New/CustomParams/CustomParamsContainer.tsx
  28. 0 96
      kafka-ui-react-app/src/components/Topics/New/CustomParams/customParamsOptions.tsx
  29. 8 172
      kafka-ui-react-app/src/components/Topics/New/New.tsx
  30. 6 0
      kafka-ui-react-app/src/components/Topics/Topics.tsx
  31. 0 0
      kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamAction.tsx
  32. 0 0
      kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamButton.tsx
  33. 57 0
      kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamField.tsx
  34. 27 0
      kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamOptions.tsx
  35. 23 13
      kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamSelect.tsx
  36. 5 4
      kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamValue.tsx
  37. 105 0
      kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParams.tsx
  38. 19 0
      kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamsContainer.tsx
  39. 98 0
      kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/customParamsOptions.tsx
  40. 35 0
      kafka-ui-react-app/src/components/Topics/shared/Form/FormBreadcrumbs.tsx
  41. 0 0
      kafka-ui-react-app/src/components/Topics/shared/Form/TimeToRetain.tsx
  42. 0 0
      kafka-ui-react-app/src/components/Topics/shared/Form/TimeToRetainBtn.tsx
  43. 0 0
      kafka-ui-react-app/src/components/Topics/shared/Form/TimeToRetainBtns.tsx
  44. 170 0
      kafka-ui-react-app/src/components/Topics/shared/Form/TopicForm.tsx
  45. 16 3
      kafka-ui-react-app/src/components/common/PageLoader/PageLoader.tsx
  46. 5 0
      kafka-ui-react-app/src/lib/paths.ts
  47. 12 2
      kafka-ui-react-app/src/redux/actionType.ts
  48. 33 14
      kafka-ui-react-app/src/redux/actions/actions.ts
  49. 75 14
      kafka-ui-react-app/src/redux/actions/thunks.ts
  50. 60 22
      kafka-ui-react-app/src/redux/api/topics.ts
  51. 22 3
      kafka-ui-react-app/src/redux/interfaces/topic.ts
  52. 9 6
      kafka-ui-react-app/src/redux/reducers/brokers/reducer.ts
  53. 1 1
      kafka-ui-react-app/src/redux/reducers/clusters/reducer.ts
  54. 19 20
      kafka-ui-react-app/src/redux/reducers/consumerGroups/reducer.ts
  55. 21 18
      kafka-ui-react-app/src/redux/reducers/topics/reducer.ts
  56. 60 13
      kafka-ui-react-app/src/redux/reducers/topics/selectors.ts
  57. 3 0
      package-lock.json
  58. 16 0
      pom.xml

+ 3 - 1
.gitignore

@@ -29,4 +29,6 @@ build/
 
 ### VS Code ###
 .vscode/
-/kafka-ui-api/app/node
+/kafka-ui-api/app/node
+
+.DS_Store

+ 20 - 1
docker/kafka-clusters-only.yaml

@@ -95,4 +95,23 @@ services:
      KAFKA_BROKER_ID: ignored
      KAFKA_ZOOKEEPER_CONNECT: ignored
     networks:
-     - default
+     - default
+
+  schemaregistry0:
+    image: confluentinc/cp-schema-registry:5.1.0
+    depends_on:
+      - zookeeper0
+      - kafka0
+      - kafka01
+    ports:
+      - 8085:8085
+    environment:
+      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
+      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
+      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+      SCHEMA_REGISTRY_HOST_NAME: schemaregistry
+      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
+
+      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

+ 17 - 0
docker/kafka-ui.yaml

@@ -12,6 +12,7 @@ services:
       - zookeeper1
       - kafka0
       - kafka1
+      - schemaregistry0
     command: [ "java", "-jar", "kafka-ui-api.jar", "--spring.profiles.active=sdp"]
 
   zookeeper0:
@@ -53,3 +54,19 @@ services:
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       JMX_PORT: 9997
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9997
+
+  schemaregistry0:
+    image: confluentinc/cp-schema-registry:5.1.0
+    depends_on:
+      - zookeeper0
+      - kafka0
+    environment:
+      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
+      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2181
+      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+      SCHEMA_REGISTRY_HOST_NAME: schemaregistry0
+      SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085
+
+      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

+ 15 - 0
kafka-ui-api/pom.xml

@@ -69,6 +69,21 @@
             <artifactId>mapstruct</artifactId>
             <version>${org.mapstruct.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.confluent</groupId>
+            <artifactId>kafka-schema-registry-client</artifactId>
+            <version>${confluent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.confluent</groupId>
+            <artifactId>kafka-avro-serializer</artifactId>
+            <version>${confluent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java

@@ -19,6 +19,8 @@ public class ClustersProperties {
         String name;
         String bootstrapServers;
         String zookeeper;
+        String schemaRegistry;
+        String schemaNameTemplate = "%s-value";
         int jmxPort;
     }
 }

+ 43 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java

@@ -0,0 +1,43 @@
+package com.provectus.kafka.ui.cluster.deserialization;
+
+import lombok.RequiredArgsConstructor;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.stereotype.Component;
+
+import com.provectus.kafka.ui.cluster.model.ClustersStorage;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+
+@Component
+@RequiredArgsConstructor
+public class DeserializationService {
+
+	private final ClustersStorage clustersStorage;
+	private Map<String, RecordDeserializer> clusterDeserializers;
+
+	@PostConstruct
+	public void init() {
+		this.clusterDeserializers = clustersStorage.getKafkaClusters().stream()
+				.collect(Collectors.toMap(
+						KafkaCluster::getName,
+						this::createRecordDeserializerForCluster
+				));
+	}
+
+	private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) {
+		if (StringUtils.isEmpty(cluster.getSchemaRegistry())) {
+			return new SimpleRecordDeserializer();
+		} else {
+			return new SchemaRegistryRecordDeserializer(cluster);
+		}
+	}
+
+	public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) {
+		return clusterDeserializers.get(cluster.getName());
+	}
+}

+ 9 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/RecordDeserializer.java

@@ -0,0 +1,9 @@
+package com.provectus.kafka.ui.cluster.deserialization;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+
+public interface RecordDeserializer {
+
+	Object deserialize(ConsumerRecord<Bytes, Bytes> record);
+}

+ 131 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java

@@ -0,0 +1,131 @@
+package com.provectus.kafka.ui.cluster.deserialization;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+
+import io.confluent.kafka.schemaregistry.SchemaProvider;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+
+@Log4j2
+@RequiredArgsConstructor
+public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
+
+	private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100;
+
+	private final KafkaCluster cluster;
+	private final  SchemaRegistryClient schemaRegistryClient;
+	private KafkaAvroDeserializer avroDeserializer;
+	private ObjectMapper objectMapper;
+	private StringDeserializer stringDeserializer;
+
+	private final Map<String, MessageFormat> topicFormatMap = new ConcurrentHashMap<>();
+
+	public SchemaRegistryRecordDeserializer(KafkaCluster cluster) {
+		this.cluster = cluster;
+
+		List<String> endpoints = Collections.singletonList(cluster.getSchemaRegistry());
+		List<SchemaProvider> providers = Collections.singletonList(new AvroSchemaProvider());
+		this.schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, CLIENT_IDENTITY_MAP_CAPACITY, providers, Collections.emptyMap());
+
+		this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
+		this.objectMapper = new ObjectMapper();
+		this.stringDeserializer = new StringDeserializer();
+	}
+
+	public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
+		MessageFormat format = getMessageFormat(record);
+
+		try {
+			Object parsedValue;
+			switch (format) {
+				case AVRO:
+					parsedValue = parseAvroRecord(record);
+					break;
+				case JSON:
+					parsedValue = parseJsonRecord(record);
+					break;
+				case STRING:
+					parsedValue = parseStringRecord(record);
+					break;
+				default:
+					throw new IllegalArgumentException("Unknown message format " + format + " for topic " + record.topic());
+			}
+			return parsedValue;
+		} catch (IOException e) {
+			throw new RuntimeException("Failed to parse record from topic " + record.topic(), e);
+		}
+	}
+
+	private MessageFormat getMessageFormat(ConsumerRecord<Bytes, Bytes> record) {
+		return topicFormatMap.computeIfAbsent(record.topic(), k -> detectFormat(record));
+	}
+
+	private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
+		String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic());
+		try {
+			schemaRegistryClient.getAllVersions(avroSchema);
+			return MessageFormat.AVRO;
+		} catch (RestClientException | IOException e) {
+			log.info("Failed to get Avro schema for topic {}", record.topic());
+		}
+
+		try {
+			parseJsonRecord(record);
+			return MessageFormat.JSON;
+		} catch (IOException e) {
+			log.info("Failed to parse json from topic {}", record.topic());
+		}
+
+		return MessageFormat.STRING;
+	}
+
+	private Object parseAvroRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
+		String topic = record.topic();
+		byte[] valueBytes = record.value().get();
+		GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
+		byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
+		return parseJson(bytes);
+	}
+
+	private Object parseJsonRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
+		byte[] valueBytes = record.value().get();
+		return parseJson(valueBytes);
+	}
+
+	private Object parseJson(byte[] bytes) throws IOException {
+		return objectMapper.readValue(bytes, new TypeReference<Map<String, Object>>() {
+		});
+	}
+
+	private Object parseStringRecord(ConsumerRecord<Bytes, Bytes> record) {
+		String topic = record.topic();
+		byte[] valueBytes = record.value().get();
+		return stringDeserializer.deserialize(topic, valueBytes);
+	}
+
+	public enum MessageFormat {
+		AVRO,
+		JSON,
+		STRING
+	}
+}

+ 15 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SimpleRecordDeserializer.java

@@ -0,0 +1,15 @@
+package com.provectus.kafka.ui.cluster.deserialization;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+
+public class SimpleRecordDeserializer implements RecordDeserializer {
+
+	private final StringDeserializer stringDeserializer = new StringDeserializer();
+
+	@Override
+	public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
+		return stringDeserializer.deserialize(record.topic(), record.value().get());
+	}
+}

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

@@ -14,11 +14,12 @@ public class KafkaCluster {
     private final int jmxPort;
     private final String bootstrapServers;
     private final String zookeeper;
+    private final String schemaRegistry;
+    private final String schemaNameTemplate;
     private final ServerStatus status;
     private final ServerStatus zookeeperStatus;
     private final InternalClusterMetrics metrics;
     private final Map<String, InternalTopic> topics;
     private final Throwable lastKafkaException;
     private final Throwable lastZookeeperException;
-
 }

+ 7 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java

@@ -16,6 +16,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
 
+import com.provectus.kafka.ui.cluster.deserialization.DeserializationService;
+import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
 import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
@@ -37,15 +39,17 @@ public class ConsumingService {
 	private static final int MAX_POLLS_COUNT = 30;
 
 	private final KafkaService kafkaService;
+	private final DeserializationService deserializationService;
 
 	public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, Integer limit) {
 		int recordsLimit = Optional.ofNullable(limit)
 				.map(s -> Math.min(s, MAX_RECORD_LIMIT))
 				.orElse(DEFAULT_RECORD_LIMIT);
 		RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic, consumerPosition);
+		RecordDeserializer recordDeserializer = deserializationService.getRecordDeserializerForCluster(cluster);
 		return Flux.create(emitter::emit)
 				.subscribeOn(Schedulers.boundedElastic())
-				.map(ClusterUtil::mapToTopicMessage)
+				.map(r -> ClusterUtil.mapToTopicMessage(r, recordDeserializer))
 				.limitRequest(recordsLimit);
 	}
 
@@ -64,12 +68,13 @@ public class ConsumingService {
 				assignPartitions(consumer);
 				seekOffsets(consumer);
 				int pollsCount = 0;
-				while (!sink.isCancelled() || ++pollsCount > MAX_POLLS_COUNT) {
+				while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) {
 					ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
 					log.info("{} records polled", records.count());
 					records.iterator()
 							.forEachRemaining(sink::next);
 				}
+				sink.complete();
 			} catch (Exception e) {
 				log.error("Error occurred while consuming records", e);
 				throw new RuntimeException(e);

+ 4 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.cluster.util;
 
 import com.provectus.kafka.ui.cluster.model.*;
+import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
 import com.provectus.kafka.ui.model.*;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.*;
@@ -149,7 +150,7 @@ public class ClusterUtil {
         return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
     }
 
-    public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord) {
+    public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord, RecordDeserializer recordDeserializer) {
         OffsetDateTime timestamp = OffsetDateTime.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), UTC_ZONE_ID);
         TopicMessage.TimestampTypeEnum timestampType = mapToTimestampType(consumerRecord.timestampType());
         Map<String, String> headers = new HashMap<>();
@@ -166,7 +167,8 @@ public class ClusterUtil {
             topicMessage.setKey(consumerRecord.key().toString());
         }
         topicMessage.setHeaders(headers);
-        topicMessage.setContent(consumerRecord.value().toString());
+        Object parsedValue = recordDeserializer.deserialize(consumerRecord);
+        topicMessage.setContent(parsedValue);
 
         return topicMessage;
     }

+ 2 - 0
kafka-ui-api/src/main/resources/application-local.yml

@@ -4,6 +4,8 @@ kafka:
       name: local
       bootstrapServers: localhost:29091
       zookeeper: localhost:2183
+      schemaRegistry: http://localhost:8085
+#      schemaNameTemplate: "%s-value"
       jmxPort: 9997
     -
       name: secondLocal

+ 1 - 0
kafka-ui-api/src/main/resources/application-sdp.yml

@@ -4,6 +4,7 @@ kafka:
       name: local
       bootstrapServers: kafka0:29092
       zookeeper: zookeeper0:2181
+      schemaRegistry: http://schemaregistry0:8085
     -
       name: secondLocal
       zookeeper: zookeeper1:2181

+ 1 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -462,7 +462,7 @@ components:
           additionalProperties:
             type: string
         content:
-          type: string
+          type: object
       required:
         - partition
         - offset

File diff suppressed because it is too large
+ 608 - 411
kafka-ui-react-app/package-lock.json


+ 23 - 20
kafka-ui-react-app/package.json

@@ -3,37 +3,22 @@
   "version": "0.1.0",
   "private": true,
   "dependencies": {
-    "@testing-library/jest-dom": "^4.2.4",
-    "@testing-library/react": "^9.3.2",
-    "@testing-library/user-event": "^7.1.2",
-    "@types/classnames": "^2.2.9",
-    "@types/jest": "^24.0.25",
-    "@types/lodash": "^4.14.149",
-    "@types/node": "^12.12.24",
-    "@types/react": "^16.9.17",
-    "@types/react-dom": "^16.9.0",
-    "@types/react-redux": "^7.1.5",
-    "@types/react-router-dom": "^5.1.3",
-    "@types/redux": "^3.6.0",
-    "@types/redux-thunk": "^2.1.0",
     "bulma": "^0.8.0",
     "bulma-switch": "^2.0.0",
     "classnames": "^2.2.6",
-    "json-server": "^0.15.1",
+    "immer": "^6.0.5",
+    "date-fns": "^2.14.0",
     "lodash": "^4.17.15",
-    "node-sass": "^4.13.1",
     "pretty-ms": "^6.0.1",
     "react": "^16.12.0",
     "react-dom": "^16.12.0",
     "react-hook-form": "^4.5.5",
     "react-redux": "^7.1.3",
     "react-router-dom": "^5.1.2",
-    "react-scripts": "3.3.0",
     "redux": "^4.0.5",
     "redux-thunk": "^2.3.0",
     "reselect": "^4.0.0",
-    "typesafe-actions": "^5.1.0",
-    "typescript": "~3.7.4"
+    "typesafe-actions": "^5.1.0"
   },
   "lint-staged": {
     "*.{js,ts,jsx,tsx}": [
@@ -70,6 +55,19 @@
     ]
   },
   "devDependencies": {
+    "@testing-library/jest-dom": "^4.2.4",
+    "@testing-library/react": "^9.3.2",
+    "@testing-library/user-event": "^7.1.2",
+    "@types/classnames": "^2.2.9",
+    "@types/jest": "^24.0.25",
+    "@types/lodash": "^4.14.149",
+    "@types/node": "^12.12.24",
+    "@types/react": "^16.9.17",
+    "@types/react-dom": "^16.9.0",
+    "@types/react-redux": "^7.1.5",
+    "@types/react-router-dom": "^5.1.3",
+    "@types/redux": "^3.6.0",
+    "@types/redux-thunk": "^2.1.0",
     "@typescript-eslint/eslint-plugin": "^2.27.0",
     "@typescript-eslint/parser": "^2.27.0",
     "eslint": "^6.8.0",
@@ -82,7 +80,12 @@
     "eslint-plugin-react-hooks": "^2.5.1",
     "esprint": "^0.6.0",
     "husky": "^4.2.5",
+    "json-server": "^0.15.1",
     "lint-staged": ">=10",
-    "prettier": "^2.0.4"
-  }
+    "node-sass": "^4.13.1",
+    "prettier": "^2.0.4",
+    "react-scripts": "3.4.0",
+    "typescript": "~3.7.4"
+  },
+  "proxy": "http://localhost:8080"
 }

+ 1 - 1
kafka-ui-react-app/src/components/App.scss

@@ -25,6 +25,6 @@ $navbar-width: 250px;
     left: 0;
     bottom: 0;
     padding: 20px 20px;
-
+    overflow-y: scroll;
   }
 }

+ 5 - 0
kafka-ui-react-app/src/components/Topics/Details/Details.tsx

@@ -7,10 +7,12 @@ import {
   clusterTopicSettingsPath,
   clusterTopicPath,
   clusterTopicMessagesPath,
+  clusterTopicsTopicEditPath,
 } from 'lib/paths';
 import OverviewContainer from './Overview/OverviewContainer';
 import MessagesContainer from './Messages/MessagesContainer';
 import SettingsContainer from './Settings/SettingsContainer';
+import SettingsEditButton from './Settings/SettingsEditButton';
 
 interface Props extends Topic, TopicDetails {
   clusterName: ClusterName;
@@ -30,6 +32,9 @@ const Details: React.FC<Props> = ({ clusterName, topicName }) => {
             {topicName}
           </Breadcrumb>
         </div>
+        <SettingsEditButton
+          to={clusterTopicsTopicEditPath(clusterName, topicName)}
+        />
       </div>
 
       <div className="box">

+ 90 - 4
kafka-ui-react-app/src/components/Topics/Details/Messages/Messages.tsx

@@ -1,19 +1,105 @@
 import React from 'react';
-import { ClusterName, TopicName } from 'redux/interfaces';
+import { ClusterName, TopicMessage, TopicName } from 'redux/interfaces';
+import PageLoader from 'components/common/PageLoader/PageLoader';
+import { format } from 'date-fns';
 
 interface Props {
   clusterName: ClusterName;
   topicName: TopicName;
+  isFetched: boolean;
+  fetchTopicMessages: (clusterName: ClusterName, topicName: TopicName) => void;
+  messages: TopicMessage[];
 }
 
 const Messages: React.FC<Props> = ({
+  isFetched,
   clusterName,
   topicName,
+  messages,
+  fetchTopicMessages,
 }) => {
+  React.useEffect(() => {
+    fetchTopicMessages(clusterName, topicName);
+  }, [fetchTopicMessages, clusterName, topicName]);
+
+  const [searchText, setSearchText] = React.useState<string>('');
+
+  const handleInputChange = (event: React.ChangeEvent<HTMLInputElement>) => {
+    setSearchText(event.target.value);
+  };
+
+  const getTimestampDate = (timestamp: number) => {
+    return format(new Date(timestamp * 1000), 'MM.dd.yyyy HH:mm:ss');
+  };
+
+  const getMessageContentHeaders = () => {
+    const message = messages[0];
+    const headers: JSX.Element[] = [];
+    const content = JSON.parse(message.content);
+    Object.keys(content).forEach((k) =>
+      headers.push(<th>{`content.${k}`}</th>)
+    );
+
+    return headers;
+  };
+
+  const getMessageContentBody = (content: string) => {
+    const c = JSON.parse(content);
+    const columns: JSX.Element[] = [];
+    Object.values(c).map((v) => columns.push(<td>{JSON.stringify(v)}</td>));
+    return columns;
+  };
+
   return (
-    <h1>
-      Messages from {clusterName}{topicName}
-    </h1>
+    // eslint-disable-next-line no-nested-ternary
+    isFetched ? (
+      messages.length > 0 ? (
+        <div>
+          <div className="columns">
+            <div className="column is-half is-offset-half">
+              <input
+                id="searchText"
+                type="text"
+                name="searchText"
+                className="input"
+                placeholder="Search"
+                value={searchText}
+                onChange={handleInputChange}
+              />
+            </div>
+          </div>
+          <table className="table is-striped is-fullwidth">
+            <thead>
+              <tr>
+                <th>Timestamp</th>
+                <th>Offset</th>
+                <th>Partition</th>
+                {getMessageContentHeaders()}
+              </tr>
+            </thead>
+            <tbody>
+              {messages
+                .filter(
+                  (message) =>
+                    !searchText || message?.content?.indexOf(searchText) >= 0
+                )
+                .map((message) => (
+                  <tr key={message.timestamp}>
+                    <td>{getTimestampDate(message.timestamp)}</td>
+                    <td>{message.offset}</td>
+                    <td>{message.partition}</td>
+                    {getMessageContentBody(message.content)}
+                  </tr>
+                ))}
+            </tbody>
+          </table>
+        </div>
+      ) : (
+        <div>No messages at selected topic</div>
+      )
+    ) : (
+      <PageLoader isFullHeight={false} />
+    )
   );
 };
 

+ 25 - 5
kafka-ui-react-app/src/components/Topics/Details/Messages/MessagesContainer.ts

@@ -1,20 +1,40 @@
 import { connect } from 'react-redux';
+import { ClusterName, RootState, TopicName } from 'redux/interfaces';
+import { RouteComponentProps, withRouter } from 'react-router-dom';
+import { fetchTopicMessages } from 'redux/actions';
+import {
+  getIsTopicMessagesFetched,
+  getTopicMessages,
+} from 'redux/reducers/topics/selectors';
+
 import Messages from './Messages';
-import {ClusterName, RootState, TopicName} from 'redux/interfaces';
-import { withRouter, RouteComponentProps } from 'react-router-dom';
 
 interface RouteProps {
   clusterName: ClusterName;
   topicName: TopicName;
 }
 
-interface OwnProps extends RouteComponentProps<RouteProps> { }
+type OwnProps = RouteComponentProps<RouteProps>;
 
-const mapStateToProps = (state: RootState, { match: { params: { topicName, clusterName } } }: OwnProps) => ({
+const mapStateToProps = (
+  state: RootState,
+  {
+    match: {
+      params: { topicName, clusterName },
+    },
+  }: OwnProps
+) => ({
   clusterName,
   topicName,
+  isFetched: getIsTopicMessagesFetched(state),
+  messages: getTopicMessages(state),
 });
 
+const mapDispatchToProps = {
+  fetchTopicMessages: (clusterName: ClusterName, topicName: TopicName) =>
+    fetchTopicMessages(clusterName, topicName),
+};
+
 export default withRouter(
-  connect(mapStateToProps)(Messages)
+  connect(mapStateToProps, mapDispatchToProps)(Messages)
 );

+ 16 - 0
kafka-ui-react-app/src/components/Topics/Details/Settings/SettingsEditButton.tsx

@@ -0,0 +1,16 @@
+import React from 'react';
+import { Link } from 'react-router-dom';
+
+interface Props {
+  to: string;
+}
+
+const SettingsEditButton: React.FC<Props> = ({ to }) => (
+  <Link to={to}>
+    <button type="button" className="button is-small is-warning">
+      Edit settings
+    </button>
+  </Link>
+);
+
+export default SettingsEditButton;

+ 143 - 0
kafka-ui-react-app/src/components/Topics/Edit/Edit.tsx

@@ -0,0 +1,143 @@
+import React from 'react';
+import {
+  ClusterName,
+  TopicFormData,
+  TopicName,
+  TopicConfigByName,
+  TopicWithDetailedInfo,
+  CleanupPolicy,
+} from 'redux/interfaces';
+import { useForm, FormContext } from 'react-hook-form';
+import { camelCase } from 'lodash';
+
+import TopicForm from '../shared/Form/TopicForm';
+import FormBreadcrumbs from '../shared/Form/FormBreadcrumbs';
+
+interface Props {
+  clusterName: ClusterName;
+  topicName: TopicName;
+  topic?: TopicWithDetailedInfo;
+  isFetched: boolean;
+  isTopicDetailsFetched: boolean;
+  isTopicUpdated: boolean;
+  fetchTopicDetails: (clusterName: ClusterName, topicName: TopicName) => void;
+  fetchTopicConfig: (clusterName: ClusterName, topicName: TopicName) => void;
+  updateTopic: (clusterName: ClusterName, form: TopicFormData) => void;
+  redirectToTopicPath: (clusterName: ClusterName, topicName: TopicName) => void;
+  resetUploadedState: () => void;
+}
+
+const DEFAULTS = {
+  partitions: 1,
+  replicationFactor: 1,
+  minInSyncReplicas: 1,
+  cleanupPolicy: CleanupPolicy.Delete,
+  retentionBytes: -1,
+  maxMessageBytes: 1000012,
+};
+
+const topicParams = (topic: TopicWithDetailedInfo | undefined) => {
+  if (!topic) {
+    return DEFAULTS;
+  }
+
+  const { name, replicationFactor } = topic;
+
+  const configs = topic.config?.reduce(
+    (result: { [name: string]: string }, param) => {
+      result[camelCase(param.name)] = param.value || param.defaultValue;
+      return result;
+    },
+    {}
+  );
+
+  return {
+    ...DEFAULTS,
+    name,
+    partitions: topic.partitionCount || DEFAULTS.partitions,
+    replicationFactor,
+    ...configs,
+  };
+};
+
+let formInit = false;
+
+const Edit: React.FC<Props> = ({
+  clusterName,
+  topicName,
+  topic,
+  isFetched,
+  isTopicDetailsFetched,
+  isTopicUpdated,
+  fetchTopicDetails,
+  fetchTopicConfig,
+  updateTopic,
+  redirectToTopicPath,
+}) => {
+  const defaultValues = topicParams(topic);
+
+  const methods = useForm<TopicFormData>({ defaultValues });
+
+  const [isSubmitting, setIsSubmitting] = React.useState<boolean>(false);
+
+  React.useEffect(() => {
+    fetchTopicConfig(clusterName, topicName);
+    fetchTopicDetails(clusterName, topicName);
+  }, [fetchTopicConfig, fetchTopicDetails, clusterName, topicName]);
+
+  React.useEffect(() => {
+    if (isSubmitting && isTopicUpdated) {
+      const { name } = methods.getValues();
+      redirectToTopicPath(clusterName, name);
+    }
+  }, [isSubmitting, isTopicUpdated, redirectToTopicPath, clusterName, methods]);
+
+  if (!isFetched || !isTopicDetailsFetched || !topic || !topic.config) {
+    return null;
+  }
+
+  if (!formInit) {
+    methods.reset(defaultValues);
+    formInit = true;
+  }
+
+  const config: TopicConfigByName = {
+    byName: {},
+  };
+
+  topic.config.forEach((param) => {
+    config.byName[param.name] = param;
+  });
+
+  const onSubmit = async (data: TopicFormData) => {
+    setIsSubmitting(true);
+    updateTopic(clusterName, data);
+  };
+
+  return (
+    <div className="section">
+      <div className="level">
+        <FormBreadcrumbs
+          clusterName={clusterName}
+          topicName={topicName}
+          current="Edit Topic"
+        />
+      </div>
+
+      <div className="box">
+        {/* eslint-disable-next-line react/jsx-props-no-spreading */}
+        <FormContext {...methods}>
+          <TopicForm
+            topicName={topicName}
+            config={config}
+            isSubmitting={isSubmitting}
+            isEditing
+            onSubmit={methods.handleSubmit(onSubmit)}
+          />
+        </FormContext>
+      </div>
+    </div>
+  );
+};
+
+export default Edit;

+ 63 - 0
kafka-ui-react-app/src/components/Topics/Edit/EditContainer.tsx

@@ -0,0 +1,63 @@
+import { connect } from 'react-redux';
+import {
+  RootState,
+  ClusterName,
+  TopicFormData,
+  TopicName,
+  Action,
+} from 'redux/interfaces';
+import { withRouter, RouteComponentProps } from 'react-router-dom';
+import {
+  updateTopic,
+  fetchTopicConfig,
+  fetchTopicDetails,
+} from 'redux/actions';
+import {
+  getTopicConfigFetched,
+  getTopicUpdated,
+  getIsTopicDetailsFetched,
+  getFullTopic,
+} from 'redux/reducers/topics/selectors';
+import { clusterTopicPath } from 'lib/paths';
+import { ThunkDispatch } from 'redux-thunk';
+import Edit from './Edit';
+
+interface RouteProps {
+  clusterName: ClusterName;
+  topicName: TopicName;
+}
+
+type OwnProps = RouteComponentProps<RouteProps>;
+
+const mapStateToProps = (
+  state: RootState,
+  {
+    match: {
+      params: { topicName, clusterName },
+    },
+  }: OwnProps
+) => ({
+  clusterName,
+  topicName,
+  topic: getFullTopic(state, topicName),
+  isFetched: getTopicConfigFetched(state),
+  isTopicDetailsFetched: getIsTopicDetailsFetched(state),
+  isTopicUpdated: getTopicUpdated(state),
+});
+
+const mapDispatchToProps = (
+  dispatch: ThunkDispatch<RootState, undefined, Action>,
+  { history }: OwnProps
+) => ({
+  fetchTopicDetails: (clusterName: ClusterName, topicName: TopicName) =>
+    dispatch(fetchTopicDetails(clusterName, topicName)),
+  fetchTopicConfig: (clusterName: ClusterName, topicName: TopicName) =>
+    dispatch(fetchTopicConfig(clusterName, topicName)),
+  updateTopic: (clusterName: ClusterName, form: TopicFormData) =>
+    dispatch(updateTopic(clusterName, form)),
+  redirectToTopicPath: (clusterName: ClusterName, topicName: TopicName) => {
+    history.push(clusterTopicPath(clusterName, topicName));
+  },
+});
+
+export default withRouter(connect(mapStateToProps, mapDispatchToProps)(Edit));

+ 0 - 20
kafka-ui-react-app/src/components/Topics/New/CustomParams/CustomParamOptions.tsx

@@ -1,20 +0,0 @@
-import React from 'react';
-import { TopicCustomParamOption } from 'redux/interfaces';
-import { CUSTOM_PARAMS_OPTIONS } from './customParamsOptions';
-
-interface Props {};
-
-const CustomParamOptions: React.FC<Props> = () => (
-  <>
-    <option value=''>Select</option>
-    {
-      Object.values(CUSTOM_PARAMS_OPTIONS).map((opt: TopicCustomParamOption) => (
-        <option key={opt.name} value={opt.name}>
-          {opt.name}
-        </option>
-      ))
-    }
-  </>
-);
-
-export default React.memo(CustomParamOptions);

+ 0 - 87
kafka-ui-react-app/src/components/Topics/New/CustomParams/CustomParams.tsx

@@ -1,87 +0,0 @@
-import React from 'react';
-import { omit, reject } from 'lodash';
-
-import { TopicFormCustomParams } from 'redux/interfaces';
-import CustomParamSelect from './CustomParamSelect';
-import CustomParamValue from './CustomParamValue';
-import CustomParamAction from './CustomParamAction';
-import CustomParamButton, { CustomParamButtonType } from './CustomParamButton';
-
-export const INDEX_PREFIX = 'customParams';
-
-interface Props {
-  isSubmitting: boolean;
-}
-
-const CustomParams: React.FC<Props> = ({ isSubmitting }) => {
-  const [formCustomParams, setFormCustomParams] = React.useState<
-    TopicFormCustomParams
-  >({
-    byIndex: {},
-    allIndexes: [],
-  });
-
-  const onAdd = (event: React.MouseEvent<HTMLButtonElement>) => {
-    event.preventDefault();
-
-    const newIndex = `${INDEX_PREFIX}.${new Date().getTime()}ts`;
-
-    setFormCustomParams({
-      ...formCustomParams,
-      byIndex: {
-        ...formCustomParams.byIndex,
-        [newIndex]: { name: '', value: '' },
-      },
-      allIndexes: [newIndex, ...formCustomParams.allIndexes],
-    });
-  };
-
-  const onRemove = (index: string) => {
-    setFormCustomParams({
-      ...formCustomParams,
-      byIndex: omit(formCustomParams.byIndex, index),
-      allIndexes: reject(formCustomParams.allIndexes, (i) => i === index),
-    });
-  };
-
-  return (
-    <>
-      <div className="columns">
-        <div className="column">
-          <CustomParamButton
-            className="is-success"
-            type={CustomParamButtonType.plus}
-            onClick={onAdd}
-            btnText="Add Custom Parameter"
-          />
-        </div>
-      </div>
-      {formCustomParams.allIndexes.map((index) => (
-        <div className="columns is-centered" key={index}>
-          <div className="column">
-            <CustomParamSelect
-              index={index}
-              isDisabled={isSubmitting}
-              name={formCustomParams.byIndex[index].name}
-            />
-          </div>
-
-          <div className="column">
-            <CustomParamValue
-              index={index}
-              isDisabled={isSubmitting}
-              name={formCustomParams.byIndex[index].name}
-              defaultValue={formCustomParams.byIndex[index].value}
-            />
-          </div>
-
-          <div className="column is-narrow">
-            <CustomParamAction index={index} onRemove={onRemove} />
-          </div>
-        </div>
-      ))}
-    </>
-  );
-};
-
-export default CustomParams;

+ 0 - 18
kafka-ui-react-app/src/components/Topics/New/CustomParams/CustomParamsContainer.tsx

@@ -1,18 +0,0 @@
-import { connect } from 'react-redux';
-import { RootState } from 'redux/interfaces';
-import { withRouter, RouteComponentProps } from 'react-router-dom';
-import CustomParams from './CustomParams';
-
-interface RouteProps {};
-
-interface OwnProps extends RouteComponentProps<RouteProps> {
-  isSubmitting: boolean;
-}
-
-const mapStateToProps = (state: RootState, { isSubmitting }: OwnProps) => ({
-  isSubmitting,
-})
-
-export default withRouter(
-  connect(mapStateToProps)(CustomParams)
-);

+ 0 - 96
kafka-ui-react-app/src/components/Topics/New/CustomParams/customParamsOptions.tsx

@@ -1,96 +0,0 @@
-import { TopicCustomParamOption } from 'redux/interfaces';
-
-interface CustomParamOption {
-  [optionName: string]: TopicCustomParamOption;
-}
-
-export const CUSTOM_PARAMS_OPTIONS: CustomParamOption = {
-  "compression.type": {
-    "name": "compression.type",
-    "defaultValue": "producer"
-  },
-  "leader.replication.throttled.replicas": {
-    "name": "leader.replication.throttled.replicas",
-    "defaultValue": ""
-  },
-  "message.downconversion.enable": {
-    "name": "message.downconversion.enable",
-    "defaultValue": "true"
-  },
-  "segment.jitter.ms": {
-    "name": "segment.jitter.ms",
-    "defaultValue": "0"
-  },
-  "flush.ms": {
-    "name": "flush.ms",
-    "defaultValue": "9223372036854775807"
-  },
-  "follower.replication.throttled.replicas": {
-    "name": "follower.replication.throttled.replicas",
-    "defaultValue": ""
-  },
-  "segment.bytes": {
-    "name": "segment.bytes",
-    "defaultValue": "1073741824"
-  },
-  "flush.messages": {
-    "name": "flush.messages",
-    "defaultValue": "9223372036854775807"
-  },
-  "message.format.version": {
-    "name": "message.format.version",
-    "defaultValue": "2.3-IV1"
-  },
-  "file.delete.delay.ms": {
-    "name": "file.delete.delay.ms",
-    "defaultValue": "60000"
-  },
-  "max.compaction.lag.ms": {
-    "name": "max.compaction.lag.ms",
-    "defaultValue": "9223372036854775807"
-  },
-  "min.compaction.lag.ms": {
-    "name": "min.compaction.lag.ms",
-    "defaultValue": "0"
-  },
-  "message.timestamp.type": {
-    "name": "message.timestamp.type",
-    "defaultValue": "CreateTime"
-  },
-  "preallocate": {
-    "name": "preallocate",
-    "defaultValue": "false"
-  },
-  "min.cleanable.dirty.ratio": {
-    "name": "min.cleanable.dirty.ratio",
-    "defaultValue": "0.5"
-  },
-  "index.interval.bytes": {
-    "name": "index.interval.bytes",
-    "defaultValue": "4096"
-  },
-  "unclean.leader.election.enable": {
-    "name": "unclean.leader.election.enable",
-    "defaultValue": "true"
-  },
-  "retention.bytes": {
-    "name": "retention.bytes",
-    "defaultValue": "-1"
-  },
-  "delete.retention.ms": {
-    "name": "delete.retention.ms",
-    "defaultValue": "86400000"
-  },
-  "segment.ms": {
-    "name": "segment.ms",
-    "defaultValue": "604800000"
-  },
-  "message.timestamp.difference.max.ms": {
-    "name": "message.timestamp.difference.max.ms",
-    "defaultValue": "9223372036854775807"
-  },
-  "segment.index.bytes": {
-    "name": "segment.index.bytes",
-    "defaultValue": "10485760"
-  }
-}

+ 8 - 172
kafka-ui-react-app/src/components/Topics/New/New.tsx

@@ -1,17 +1,10 @@
 import React from 'react';
-import {
-  ClusterName,
-  CleanupPolicy,
-  TopicFormData,
-  TopicName,
-} from 'redux/interfaces';
-import { useForm, FormContext, ErrorMessage } from 'react-hook-form';
+import { ClusterName, TopicFormData, TopicName } from 'redux/interfaces';
+import { useForm, FormContext } from 'react-hook-form';
 
 import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';
 import { clusterTopicsPath } from 'lib/paths';
-import { TOPIC_NAME_VALIDATION_PATTERN, BYTES_IN_GB } from 'lib/constants';
-import CustomParamsContainer from './CustomParams/CustomParamsContainer';
-import TimeToRetain from './TimeToRetain';
+import TopicForm from 'components/Topics/shared/Form/TopicForm';
 
 interface Props {
   clusterName: ClusterName;
@@ -36,13 +29,7 @@ const New: React.FC<Props> = ({
       const { name } = methods.getValues();
       redirectToTopicPath(clusterName, name);
     }
-  }, [
-    isSubmitting,
-    isTopicCreated,
-    redirectToTopicPath,
-    clusterName,
-    methods.getValues,
-  ]);
+  }, [isSubmitting, isTopicCreated, redirectToTopicPath, clusterName, methods]);
 
   const onSubmit = async (data: TopicFormData) => {
     // TODO: need to fix loader. After success loading the first time, we won't wait for creation any more, because state is
@@ -70,161 +57,10 @@ const New: React.FC<Props> = ({
       <div className="box">
         {/* eslint-disable react/jsx-props-no-spreading */}
         <FormContext {...methods}>
-          <form onSubmit={methods.handleSubmit(onSubmit)}>
-            <div className="columns">
-              <div className="column is-three-quarters">
-                <label className="label">Topic Name *</label>
-                <input
-                  className="input"
-                  placeholder="Topic Name"
-                  ref={methods.register({
-                    required: 'Topic Name is required.',
-                    pattern: {
-                      value: TOPIC_NAME_VALIDATION_PATTERN,
-                      message: 'Only alphanumeric, _, -, and . allowed',
-                    },
-                  })}
-                  name="name"
-                  autoComplete="off"
-                  disabled={isSubmitting}
-                />
-                <p className="help is-danger">
-                  <ErrorMessage errors={methods.errors} name="name" />
-                </p>
-              </div>
-
-              <div className="column">
-                <label className="label">Number of partitions *</label>
-                <input
-                  className="input"
-                  type="number"
-                  placeholder="Number of partitions"
-                  defaultValue="1"
-                  ref={methods.register({
-                    required: 'Number of partitions is required.',
-                  })}
-                  name="partitions"
-                  disabled={isSubmitting}
-                />
-                <p className="help is-danger">
-                  <ErrorMessage errors={methods.errors} name="partitions" />
-                </p>
-              </div>
-            </div>
-
-            <div className="columns">
-              <div className="column">
-                <label className="label">Replication Factor *</label>
-                <input
-                  className="input"
-                  type="number"
-                  placeholder="Replication Factor"
-                  defaultValue="1"
-                  ref={methods.register({
-                    required: 'Replication Factor is required.',
-                  })}
-                  name="replicationFactor"
-                  disabled={isSubmitting}
-                />
-                <p className="help is-danger">
-                  <ErrorMessage
-                    errors={methods.errors}
-                    name="replicationFactor"
-                  />
-                </p>
-              </div>
-
-              <div className="column">
-                <label className="label">Min In Sync Replicas *</label>
-                <input
-                  className="input"
-                  type="number"
-                  placeholder="Replication Factor"
-                  defaultValue="1"
-                  ref={methods.register({
-                    required: 'Min In Sync Replicas is required.',
-                  })}
-                  name="minInSyncReplicas"
-                  disabled={isSubmitting}
-                />
-                <p className="help is-danger">
-                  <ErrorMessage
-                    errors={methods.errors}
-                    name="minInSyncReplicas"
-                  />
-                </p>
-              </div>
-            </div>
-
-            <div className="columns">
-              <div className="column is-one-third">
-                <label className="label">Cleanup policy</label>
-                <div className="select is-block">
-                  <select
-                    defaultValue={CleanupPolicy.Delete}
-                    name="cleanupPolicy"
-                    ref={methods.register}
-                    disabled={isSubmitting}
-                  >
-                    <option value={CleanupPolicy.Delete}>Delete</option>
-                    <option value={CleanupPolicy.Compact}>Compact</option>
-                  </select>
-                </div>
-              </div>
-
-              <div className="column is-one-third">
-                <TimeToRetain isSubmitting={isSubmitting} />
-              </div>
-
-              <div className="column is-one-third">
-                <label className="label">Max size on disk in GB</label>
-                <div className="select is-block">
-                  <select
-                    defaultValue={-1}
-                    name="retentionBytes"
-                    ref={methods.register}
-                    disabled={isSubmitting}
-                  >
-                    <option value={-1}>Not Set</option>
-                    <option value={BYTES_IN_GB}>1 GB</option>
-                    <option value={BYTES_IN_GB * 10}>10 GB</option>
-                    <option value={BYTES_IN_GB * 20}>20 GB</option>
-                    <option value={BYTES_IN_GB * 50}>50 GB</option>
-                  </select>
-                </div>
-              </div>
-            </div>
-
-            <div className="columns">
-              <div className="column">
-                <label className="label">Maximum message size in bytes *</label>
-                <input
-                  className="input"
-                  type="number"
-                  defaultValue="1000012"
-                  ref={methods.register({
-                    required: 'Maximum message size in bytes is required',
-                  })}
-                  name="maxMessageBytes"
-                  disabled={isSubmitting}
-                />
-                <p className="help is-danger">
-                  <ErrorMessage
-                    errors={methods.errors}
-                    name="maxMessageBytes"
-                  />
-                </p>
-              </div>
-            </div>
-
-            <CustomParamsContainer isSubmitting={isSubmitting} />
-
-            <input
-              type="submit"
-              className="button is-primary"
-              disabled={isSubmitting}
-            />
-          </form>
+          <TopicForm
+            isSubmitting={isSubmitting}
+            onSubmit={methods.handleSubmit(onSubmit)}
+          />
         </FormContext>
       </div>
     </div>

+ 6 - 0
kafka-ui-react-app/src/components/Topics/Topics.tsx

@@ -2,6 +2,7 @@ import React from 'react';
 import { ClusterName } from 'redux/interfaces';
 import { Switch, Route } from 'react-router-dom';
 import PageLoader from 'components/common/PageLoader/PageLoader';
+import EditContainer from 'components/Topics/Edit/EditContainer';
 import ListContainer from './List/ListContainer';
 import DetailsContainer from './Details/DetailsContainer';
 import NewContainer from './New/NewContainer';
@@ -35,6 +36,11 @@ const Topics: React.FC<Props> = ({
           path="/ui/clusters/:clusterName/topics/new"
           component={NewContainer}
         />
+        <Route
+          exact
+          path="/ui/clusters/:clusterName/topics/:topicName/edit"
+          component={EditContainer}
+        />
         <Route
           path="/ui/clusters/:clusterName/topics/:topicName"
           component={DetailsContainer}

+ 0 - 0
kafka-ui-react-app/src/components/Topics/New/CustomParams/CustomParamAction.tsx → kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamAction.tsx


+ 0 - 0
kafka-ui-react-app/src/components/Topics/New/CustomParams/CustomParamButton.tsx → kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamButton.tsx


+ 57 - 0
kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamField.tsx

@@ -0,0 +1,57 @@
+import React from 'react';
+import { useFormContext, ErrorMessage } from 'react-hook-form';
+import { TopicFormCustomParam } from 'redux/interfaces';
+import CustomParamSelect from 'components/Topics/shared/Form/CustomParams/CustomParamSelect';
+import CustomParamValue from 'components/Topics/shared/Form/CustomParams/CustomParamValue';
+import CustomParamAction from 'components/Topics/shared/Form/CustomParams/CustomParamAction';
+import { INDEX_PREFIX } from './CustomParams';
+import CustomParamOptions from './CustomParamOptions';
+
+interface Props {
+  isDisabled: boolean;
+  index: string;
+  name: string;
+  existingFields: string[];
+  defaultValue: string;
+  onNameChange: (inputName: string, name: string) => void;
+  onRemove: (index: string) => void;
+}
+
+const CustomParamField: React.FC<Props> = ({
+  isDisabled,
+  index,
+  name,
+  existingFields,
+  defaultValue,
+  onNameChange,
+  onRemove,
+}) => {
+  return (
+    <div className="columns is-centered">
+      <div className="column">
+        <CustomParamSelect
+          index={index}
+          isDisabled={isDisabled}
+          name={name}
+          existingFields={existingFields}
+          onNameChange={onNameChange}
+        />
+      </div>
+
+      <div className="column">
+        <CustomParamValue
+          index={index}
+          isDisabled={isDisabled}
+          name={name}
+          defaultValue={defaultValue}
+        />
+      </div>
+
+      <div className="column is-narrow">
+        <CustomParamAction index={index} onRemove={onRemove} />
+      </div>
+    </div>
+  );
+};
+
+export default React.memo(CustomParamField);

+ 27 - 0
kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamOptions.tsx

@@ -0,0 +1,27 @@
+import React from 'react';
+import { TopicCustomParamOption } from 'redux/interfaces';
+import { omitBy } from 'lodash';
+import CUSTOM_PARAMS_OPTIONS from './customParamsOptions';
+
+interface Props {
+  existingFields: string[];
+}
+
+const CustomParamOptions: React.FC<Props> = ({ existingFields }) => {
+  const fields = omitBy(Object.values(CUSTOM_PARAMS_OPTIONS), (field) =>
+    existingFields.includes(field.name)
+  );
+
+  return (
+    <>
+      <option value="">Select</option>
+      {Object.values(fields).map((opt: TopicCustomParamOption) => (
+        <option key={opt.name} value={opt.name}>
+          {opt.name}
+        </option>
+      ))}
+    </>
+  );
+};
+
+export default React.memo(CustomParamOptions);

+ 23 - 13
kafka-ui-react-app/src/components/Topics/New/CustomParams/CustomParamSelect.tsx → kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamSelect.tsx

@@ -8,9 +8,17 @@ interface Props {
   isDisabled: boolean;
   index: string;
   name: string;
+  existingFields: string[];
+  onNameChange: (inputName: string, name: string) => void;
 }
 
-const CustomParamSelect: React.FC<Props> = ({ isDisabled, index, name }) => {
+const CustomParamSelect: React.FC<Props> = ({
+  isDisabled,
+  index,
+  name,
+  existingFields,
+  onNameChange,
+}) => {
   const { register, errors, getValues, triggerValidation } = useFormContext();
   const optInputName = `${index}[name]`;
 
@@ -18,18 +26,20 @@ const CustomParamSelect: React.FC<Props> = ({ isDisabled, index, name }) => {
     const values = getValues({ nest: true });
     const customParamsValues: TopicFormCustomParam = values.customParams;
 
-    let valid = true;
-
-    for (const [key, customParam] of Object.entries(customParamsValues)) {
-      if (`${INDEX_PREFIX}.${key}` !== index) {
-        if (selected === customParam.name) {
-          valid = false;
-          break;
-        }
+    const valid = !Object.entries(customParamsValues).some(
+      ([key, customParam]) => {
+        return (
+          `${INDEX_PREFIX}.${key}` !== index && selected === customParam.name
+        );
       }
-    }
+    );
+
+    return valid || 'Custom Parameter must be unique';
+  };
 
-    return valid ? true : 'Custom Parameter must be unique';
+  const onChange = (inputName: string) => (event: any) => {
+    triggerValidation(inputName);
+    onNameChange(index, event.target.value);
   };
 
   return (
@@ -42,11 +52,11 @@ const CustomParamSelect: React.FC<Props> = ({ isDisabled, index, name }) => {
             required: 'Custom Parameter is required.',
             validate: { unique: (selected) => selectedMustBeUniq(selected) },
           })}
-          onChange={() => triggerValidation(optInputName)}
+          onChange={onChange(optInputName)}
           disabled={isDisabled}
           defaultValue={name}
         >
-          <CustomParamOptions />
+          <CustomParamOptions existingFields={existingFields} />
         </select>
         <p className="help is-danger">
           <ErrorMessage errors={errors} name={optInputName} />

+ 5 - 4
kafka-ui-react-app/src/components/Topics/New/CustomParams/CustomParamValue.tsx → kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamValue.tsx

@@ -1,6 +1,7 @@
 import React from 'react';
 import { useFormContext, ErrorMessage } from 'react-hook-form';
-import { CUSTOM_PARAMS_OPTIONS } from './customParamsOptions';
+import { camelCase } from 'lodash';
+import CUSTOM_PARAMS_OPTIONS from './customParamsOptions';
 
 interface Props {
   isDisabled: boolean;
@@ -21,14 +22,14 @@ const CustomParamValue: React.FC<Props> = ({
   const selectedParamName = watch(selectInputName, name);
 
   React.useEffect(() => {
-    if (selectedParamName) {
+    if (selectedParamName && !defaultValue) {
       setValue(
         valInputName,
         CUSTOM_PARAMS_OPTIONS[selectedParamName].defaultValue,
         true
       );
     }
-  }, [selectedParamName]);
+  }, [selectedParamName, setValue, valInputName]);
 
   return (
     <>
@@ -45,7 +46,7 @@ const CustomParamValue: React.FC<Props> = ({
         disabled={isDisabled}
       />
       <p className="help is-danger">
-        <ErrorMessage errors={errors} name={valInputName} />
+        <ErrorMessage errors={errors} name={name} />
       </p>
     </>
   );

+ 105 - 0
kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParams.tsx

@@ -0,0 +1,105 @@
+import React from 'react';
+import { omit, reject, reduce, remove } from 'lodash';
+
+import { TopicFormCustomParams, TopicConfigByName } from 'redux/interfaces';
+import CustomParamButton, { CustomParamButtonType } from './CustomParamButton';
+import CustomParamField from './CustomParamField';
+
+export const INDEX_PREFIX = 'customParams';
+
+interface Props {
+  isSubmitting: boolean;
+  config?: TopicConfigByName;
+}
+
+interface Param {
+  [index: string]: {
+    name: string;
+    value: string;
+  };
+}
+
+const existingFields: string[] = [];
+
+const CustomParams: React.FC<Props> = ({ isSubmitting, config }) => {
+  const byIndex = config
+    ? reduce(
+        config.byName,
+        (result: Param, param, paramName) => {
+          result[`${INDEX_PREFIX}.${new Date().getTime()}ts`] = {
+            name: paramName,
+            value: param.value,
+          };
+          return result;
+        },
+        {}
+      )
+    : {};
+
+  const [formCustomParams, setFormCustomParams] = React.useState<
+    TopicFormCustomParams
+  >({
+    byIndex,
+    allIndexes: Object.keys(byIndex),
+  });
+
+  const onAdd = (event: React.MouseEvent<HTMLButtonElement>) => {
+    event.preventDefault();
+
+    const newIndex = `${INDEX_PREFIX}.${new Date().getTime()}ts`;
+
+    setFormCustomParams({
+      ...formCustomParams,
+      byIndex: {
+        ...formCustomParams.byIndex,
+        [newIndex]: { name: '', value: '' },
+      },
+      allIndexes: [newIndex, ...formCustomParams.allIndexes],
+    });
+  };
+
+  const onRemove = (index: string) => {
+    const fieldName = formCustomParams.byIndex[index].name;
+    remove(existingFields, (el) => el === fieldName);
+    setFormCustomParams({
+      ...formCustomParams,
+      byIndex: omit(formCustomParams.byIndex, index),
+      allIndexes: reject(formCustomParams.allIndexes, (i) => i === index),
+    });
+  };
+
+  const onFieldNameChange = (index: string, name: string) => {
+    formCustomParams.byIndex[index].name = name;
+    existingFields.push(name);
+  };
+
+  return (
+    <>
+      <div className="columns">
+        <div className="column">
+          <CustomParamButton
+            className="is-success"
+            type={CustomParamButtonType.plus}
+            onClick={onAdd}
+            btnText="Add Custom Parameter"
+          />
+        </div>
+      </div>
+
+      {formCustomParams.allIndexes.map((index) => (
+        <CustomParamField
+          key={index}
+          index={index}
+          isDisabled={isSubmitting}
+          name={formCustomParams.byIndex[index].name}
+          defaultValue={formCustomParams.byIndex[index].value}
+          existingFields={existingFields}
+          onNameChange={onFieldNameChange}
+          onRemove={onRemove}
+        />
+      ))}
+    </>
+  );
+};
+
+export default CustomParams;

+ 19 - 0
kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamsContainer.tsx

@@ -0,0 +1,19 @@
+import { connect } from 'react-redux';
+import { RootState, TopicConfigByName } from 'redux/interfaces';
+import { withRouter, RouteComponentProps } from 'react-router-dom';
+import CustomParams from './CustomParams';
+
+interface OwnProps extends RouteComponentProps {
+  isSubmitting: boolean;
+  config?: TopicConfigByName;
+}
+
+const mapStateToProps = (
+  state: RootState,
+  { isSubmitting, config }: OwnProps
+) => ({
+  isSubmitting,
+  config,
+});
+
+export default withRouter(connect(mapStateToProps)(CustomParams));

+ 98 - 0
kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/customParamsOptions.tsx

@@ -0,0 +1,98 @@
+import { TopicCustomParamOption } from 'redux/interfaces';
+
+interface CustomParamOption {
+  [optionName: string]: TopicCustomParamOption;
+}
+
+const CUSTOM_PARAMS_OPTIONS: CustomParamOption = {
+  'compression.type': {
+    name: 'compression.type',
+    defaultValue: 'producer',
+  },
+  'leader.replication.throttled.replicas': {
+    name: 'leader.replication.throttled.replicas',
+    defaultValue: '',
+  },
+  'message.downconversion.enable': {
+    name: 'message.downconversion.enable',
+    defaultValue: 'true',
+  },
+  'segment.jitter.ms': {
+    name: 'segment.jitter.ms',
+    defaultValue: '0',
+  },
+  'flush.ms': {
+    name: 'flush.ms',
+    defaultValue: '9223372036854775807',
+  },
+  'follower.replication.throttled.replicas': {
+    name: 'follower.replication.throttled.replicas',
+    defaultValue: '',
+  },
+  'segment.bytes': {
+    name: 'segment.bytes',
+    defaultValue: '1073741824',
+  },
+  'flush.messages': {
+    name: 'flush.messages',
+    defaultValue: '9223372036854775807',
+  },
+  'message.format.version': {
+    name: 'message.format.version',
+    defaultValue: '2.3-IV1',
+  },
+  'file.delete.delay.ms': {
+    name: 'file.delete.delay.ms',
+    defaultValue: '60000',
+  },
+  'max.compaction.lag.ms': {
+    name: 'max.compaction.lag.ms',
+    defaultValue: '9223372036854775807',
+  },
+  'min.compaction.lag.ms': {
+    name: 'min.compaction.lag.ms',
+    defaultValue: '0',
+  },
+  'message.timestamp.type': {
+    name: 'message.timestamp.type',
+    defaultValue: 'CreateTime',
+  },
+  preallocate: {
+    name: 'preallocate',
+    defaultValue: 'false',
+  },
+  'min.cleanable.dirty.ratio': {
+    name: 'min.cleanable.dirty.ratio',
+    defaultValue: '0.5',
+  },
+  'index.interval.bytes': {
+    name: 'index.interval.bytes',
+    defaultValue: '4096',
+  },
+  'unclean.leader.election.enable': {
+    name: 'unclean.leader.election.enable',
+    defaultValue: 'true',
+  },
+  'retention.bytes': {
+    name: 'retention.bytes',
+    defaultValue: '-1',
+  },
+  'delete.retention.ms': {
+    name: 'delete.retention.ms',
+    defaultValue: '86400000',
+  },
+  'segment.ms': {
+    name: 'segment.ms',
+    defaultValue: '604800000',
+  },
+  'message.timestamp.difference.max.ms': {
+    name: 'message.timestamp.difference.max.ms',
+    defaultValue: '9223372036854775807',
+  },
+  'segment.index.bytes': {
+    name: 'segment.index.bytes',
+    defaultValue: '10485760',
+  },
+};
+
+export default CUSTOM_PARAMS_OPTIONS;

+ 35 - 0
kafka-ui-react-app/src/components/Topics/shared/Form/FormBreadcrumbs.tsx

@@ -0,0 +1,35 @@
+import React from 'react';
+import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';
+import { clusterTopicsPath, clusterTopicPath } from 'lib/paths';
+import { ClusterName, TopicName } from 'redux/interfaces';
+
+interface Props {
+  clusterName: ClusterName;
+  topicName?: TopicName;
+  current: string;
+}
+
+const FormBreadcrumbs: React.FC<Props> = ({
+  clusterName,
+  topicName,
+  current,
+}) => {
+  const allTopicsLink = {
+    href: clusterTopicsPath(clusterName),
+    label: 'All Topics',
+  };
+  const links = topicName
+    ? [
+        allTopicsLink,
+        { href: clusterTopicPath(clusterName, topicName), label: topicName },
+      ]
+    : [allTopicsLink];
+
+  return (
+    <div className="level-item level-left">
+      <Breadcrumb links={links}>{current}</Breadcrumb>
+    </div>
+  );
+};
+
+export default FormBreadcrumbs;

+ 0 - 0
kafka-ui-react-app/src/components/Topics/New/TimeToRetain.tsx → kafka-ui-react-app/src/components/Topics/shared/Form/TimeToRetain.tsx


+ 0 - 0
kafka-ui-react-app/src/components/Topics/New/TimeToRetainBtn.tsx → kafka-ui-react-app/src/components/Topics/shared/Form/TimeToRetainBtn.tsx


+ 0 - 0
kafka-ui-react-app/src/components/Topics/New/TimeToRetainBtns.tsx → kafka-ui-react-app/src/components/Topics/shared/Form/TimeToRetainBtns.tsx


+ 170 - 0
kafka-ui-react-app/src/components/Topics/shared/Form/TopicForm.tsx

@@ -0,0 +1,170 @@
+import React from 'react';
+import { useFormContext, ErrorMessage } from 'react-hook-form';
+import { TOPIC_NAME_VALIDATION_PATTERN, BYTES_IN_GB } from 'lib/constants';
+import { CleanupPolicy, TopicName, TopicConfigByName } from 'redux/interfaces';
+import CustomParamsContainer from './CustomParams/CustomParamsContainer';
+import TimeToRetain from './TimeToRetain';
+
+interface Props {
+  topicName?: TopicName;
+  config?: TopicConfigByName;
+  isEditing?: boolean;
+  isSubmitting: boolean;
+  onSubmit: (e: React.BaseSyntheticEvent) => Promise<void>;
+}
+
+const TopicForm: React.FC<Props> = ({
+  topicName,
+  config,
+  isEditing,
+  isSubmitting,
+  onSubmit,
+}) => {
+  const { register, errors } = useFormContext();
+
+  return (
+    <form onSubmit={onSubmit}>
+      <div className="columns">
+        <div className="column is-three-quarters">
+          <label className="label">Topic Name *</label>
+          <input
+            className="input"
+            placeholder="Topic Name"
+            ref={register({
+              required: 'Topic Name is required.',
+              pattern: {
+                value: TOPIC_NAME_VALIDATION_PATTERN,
+                message: 'Only alphanumeric, _, -, and . allowed',
+              },
+            })}
+            defaultValue={topicName}
+            name="name"
+            autoComplete="off"
+            disabled={isEditing || isSubmitting}
+          />
+          <p className="help is-danger">
+            <ErrorMessage errors={errors} name="name" />
+          </p>
+        </div>
+
+        <div className="column">
+          <label className="label">Number of partitions *</label>
+          <input
+            className="input"
+            type="number"
+            placeholder="Number of partitions"
+            defaultValue="1"
+            ref={register({ required: 'Number of partitions is required.' })}
+            name="partitions"
+            disabled={isEditing || isSubmitting}
+          />
+          <p className="help is-danger">
+            <ErrorMessage errors={errors} name="partitions" />
+          </p>
+        </div>
+      </div>
+
+      <div className="columns">
+        <div className="column">
+          <label className="label">Replication Factor *</label>
+          <input
+            className="input"
+            type="number"
+            placeholder="Replication Factor"
+            defaultValue="1"
+            ref={register({ required: 'Replication Factor is required.' })}
+            name="replicationFactor"
+            disabled={isEditing || isSubmitting}
+          />
+          <p className="help is-danger">
+            <ErrorMessage errors={errors} name="replicationFactor" />
+          </p>
+        </div>
+
+        <div className="column">
+          <label className="label">Min In Sync Replicas *</label>
+          <input
+            className="input"
+            type="number"
+            placeholder="Min In Sync Replicas"
+            defaultValue="1"
+            ref={register({ required: 'Min In Sync Replicas is required.' })}
+            name="minInSyncReplicas"
+            disabled={isSubmitting}
+          />
+          <p className="help is-danger">
+            <ErrorMessage errors={errors} name="minInSyncReplicas" />
+          </p>
+        </div>
+      </div>
+
+      <div className="columns">
+        <div className="column is-one-third">
+          <label className="label">Cleanup policy</label>
+          <div className="select is-block">
+            <select
+              defaultValue={CleanupPolicy.Delete}
+              name="cleanupPolicy"
+              ref={register}
+              disabled={isSubmitting}
+            >
+              <option value={CleanupPolicy.Delete}>Delete</option>
+              <option value={CleanupPolicy.Compact}>Compact</option>
+            </select>
+          </div>
+        </div>
+
+        <div className="column is-one-third">
+          <TimeToRetain isSubmitting={isSubmitting} />
+        </div>
+
+        <div className="column is-one-third">
+          <label className="label">Max size on disk in GB</label>
+          <div className="select is-block">
+            <select
+              defaultValue={-1}
+              name="retentionBytes"
+              ref={register}
+              disabled={isSubmitting}
+            >
+              <option value={-1}>Not Set</option>
+              <option value={BYTES_IN_GB}>1 GB</option>
+              <option value={BYTES_IN_GB * 10}>10 GB</option>
+              <option value={BYTES_IN_GB * 20}>20 GB</option>
+              <option value={BYTES_IN_GB * 50}>50 GB</option>
+            </select>
+          </div>
+        </div>
+      </div>
+
+      <div className="columns">
+        <div className="column">
+          <label className="label">Maximum message size in bytes *</label>
+          <input
+            className="input"
+            type="number"
+            defaultValue="1000012"
+            ref={register({
+              required: 'Maximum message size in bytes is required',
+            })}
+            name="maxMessageBytes"
+            disabled={isSubmitting}
+          />
+          <p className="help is-danger">
+            <ErrorMessage errors={errors} name="maxMessageBytes" />
+          </p>
+        </div>
+      </div>
+
+      <CustomParamsContainer isSubmitting={isSubmitting} config={config} />
+
+      <input
+        type="submit"
+        className="button is-primary"
+        disabled={isSubmitting}
+      />
+    </form>
+  );
+};
+
+export default TopicForm;

+ 16 - 3
kafka-ui-react-app/src/components/common/PageLoader/PageLoader.tsx

@@ -1,8 +1,21 @@
 import React from 'react';
+import cx from 'classnames';
 
-const PageLoader: React.FC = () => (
-  <section className="hero is-fullheight-with-navbar">
-    <div className="hero-body has-text-centered" style={{ justifyContent: 'center' }}>
+interface Props {
+  isFullHeight: boolean;
+}
+
+const PageLoader: React.FC<Partial<Props>> = ({ isFullHeight = true }) => (
+  <section
+    className={cx(
+      'hero',
+      isFullHeight ? 'is-fullheight-with-navbar' : 'is-halfheight'
+    )}
+  >
+    <div
+      className="hero-body has-text-centered"
+      style={{ justifyContent: 'center' }}
+    >
       <div style={{ width: 300 }}>
         <div className="subtitle">Loading...</div>
         <progress

+ 5 - 0
kafka-ui-react-app/src/lib/paths.ts

@@ -23,3 +23,8 @@ export const clusterTopicMessagesPath = (
   clusterName: ClusterName,
   topicName: TopicName
 ) => `${clusterTopicsPath(clusterName)}/${topicName}/messages`;
+
+export const clusterTopicsTopicEditPath = (
+  clusterName: ClusterName,
+  topicName: TopicName
+) => `${clusterTopicsPath(clusterName)}/${topicName}/edit`;

+ 12 - 2
kafka-ui-react-app/src/redux/actionType.ts

@@ -1,4 +1,4 @@
-export enum ActionType {
+enum ActionType {
   GET_CLUSTERS__REQUEST = 'GET_CLUSTERS__REQUEST',
   GET_CLUSTERS__SUCCESS = 'GET_CLUSTERS__SUCCESS',
   GET_CLUSTERS__FAILURE = 'GET_CLUSTERS__FAILURE',
@@ -15,6 +15,10 @@ export enum ActionType {
   GET_TOPICS__SUCCESS = 'GET_TOPICS__SUCCESS',
   GET_TOPICS__FAILURE = 'GET_TOPICS__FAILURE',
 
+  GET_TOPIC_MESSAGES__REQUEST = 'GET_TOPIC_MESSAGES__REQUEST',
+  GET_TOPIC_MESSAGES__SUCCESS = 'GET_TOPIC_MESSAGES__SUCCESS',
+  GET_TOPIC_MESSAGES__FAILURE = 'GET_TOPIC_MESSAGES__FAILURE',
+
   GET_TOPIC_DETAILS__REQUEST = 'GET_TOPIC_DETAILS__REQUEST',
   GET_TOPIC_DETAILS__SUCCESS = 'GET_TOPIC_DETAILS__SUCCESS',
   GET_TOPIC_DETAILS__FAILURE = 'GET_TOPIC_DETAILS__FAILURE',
@@ -27,6 +31,10 @@ export enum ActionType {
   POST_TOPIC__SUCCESS = 'POST_TOPIC__SUCCESS',
   POST_TOPIC__FAILURE = 'POST_TOPIC__FAILURE',
 
+  PATCH_TOPIC__REQUEST = 'PATCH_TOPIC__REQUEST',
+  PATCH_TOPIC__SUCCESS = 'PATCH_TOPIC__SUCCESS',
+  PATCH_TOPIC__FAILURE = 'PATCH_TOPIC__FAILURE',
+
   GET_CONSUMER_GROUPS__REQUEST = 'GET_CONSUMER_GROUPS__REQUEST',
   GET_CONSUMER_GROUPS__SUCCESS = 'GET_CONSUMER_GROUPS__SUCCESS',
   GET_CONSUMER_GROUPS__FAILURE = 'GET_CONSUMER_GROUPS__FAILURE',
@@ -34,4 +42,6 @@ export enum ActionType {
   GET_CONSUMER_GROUP_DETAILS__REQUEST = 'GET_CONSUMER_GROUP_DETAILS__REQUEST',
   GET_CONSUMER_GROUP_DETAILS__SUCCESS = 'GET_CONSUMER_GROUP_DETAILS__SUCCESS',
   GET_CONSUMER_GROUP_DETAILS__FAILURE = 'GET_CONSUMER_GROUP_DETAILS__FAILURE',
-};
+}
+
+export default ActionType;

+ 33 - 14
kafka-ui-react-app/src/redux/actions/actions.ts

@@ -1,6 +1,5 @@
 import { createAsyncAction } from 'typesafe-actions';
-import { ActionType } from 'redux/actionType';
-import { ConsumerGroup, ConsumerGroupID, ConsumerGroupDetails } from '../interfaces/consumerGroup';
+import ActionType from 'redux/actionType';
 import {
   Broker,
   BrokerMetrics,
@@ -8,59 +7,79 @@ import {
   Topic,
   TopicConfig,
   TopicDetails,
+  TopicMessage,
   TopicName,
+  ConsumerGroup,
+  ConsumerGroupDetails,
+  ConsumerGroupID,
 } from 'redux/interfaces';
 
 export const fetchBrokersAction = createAsyncAction(
   ActionType.GET_BROKERS__REQUEST,
   ActionType.GET_BROKERS__SUCCESS,
-  ActionType.GET_BROKERS__FAILURE,
+  ActionType.GET_BROKERS__FAILURE
 )<undefined, Broker[], undefined>();
 
 export const fetchBrokerMetricsAction = createAsyncAction(
   ActionType.GET_BROKER_METRICS__REQUEST,
   ActionType.GET_BROKER_METRICS__SUCCESS,
-  ActionType.GET_BROKER_METRICS__FAILURE,
+  ActionType.GET_BROKER_METRICS__FAILURE
 )<undefined, BrokerMetrics, undefined>();
 
 export const fetchClusterListAction = createAsyncAction(
   ActionType.GET_CLUSTERS__REQUEST,
   ActionType.GET_CLUSTERS__SUCCESS,
-  ActionType.GET_CLUSTERS__FAILURE,
+  ActionType.GET_CLUSTERS__FAILURE
 )<undefined, Cluster[], undefined>();
 
 export const fetchTopicListAction = createAsyncAction(
   ActionType.GET_TOPICS__REQUEST,
   ActionType.GET_TOPICS__SUCCESS,
-  ActionType.GET_TOPICS__FAILURE,
+  ActionType.GET_TOPICS__FAILURE
 )<undefined, Topic[], undefined>();
 
+export const fetchTopicMessagesAction = createAsyncAction(
+  ActionType.GET_TOPIC_MESSAGES__REQUEST,
+  ActionType.GET_TOPIC_MESSAGES__SUCCESS,
+  ActionType.GET_TOPIC_MESSAGES__FAILURE
+)<undefined, TopicMessage[], undefined>();
+
 export const fetchTopicDetailsAction = createAsyncAction(
   ActionType.GET_TOPIC_DETAILS__REQUEST,
   ActionType.GET_TOPIC_DETAILS__SUCCESS,
-  ActionType.GET_TOPIC_DETAILS__FAILURE,
-)<undefined, { topicName: TopicName, details: TopicDetails }, undefined>();
+  ActionType.GET_TOPIC_DETAILS__FAILURE
+)<undefined, { topicName: TopicName; details: TopicDetails }, undefined>();
 
 export const fetchTopicConfigAction = createAsyncAction(
   ActionType.GET_TOPIC_CONFIG__REQUEST,
   ActionType.GET_TOPIC_CONFIG__SUCCESS,
-  ActionType.GET_TOPIC_CONFIG__FAILURE,
-)<undefined, { topicName: TopicName, config: TopicConfig[] }, undefined>();
+  ActionType.GET_TOPIC_CONFIG__FAILURE
+)<undefined, { topicName: TopicName; config: TopicConfig[] }, undefined>();
 
 export const createTopicAction = createAsyncAction(
   ActionType.POST_TOPIC__REQUEST,
   ActionType.POST_TOPIC__SUCCESS,
-  ActionType.POST_TOPIC__FAILURE,
+  ActionType.POST_TOPIC__FAILURE
+)<undefined, Topic, undefined>();
+
+export const updateTopicAction = createAsyncAction(
+  ActionType.PATCH_TOPIC__REQUEST,
+  ActionType.PATCH_TOPIC__SUCCESS,
+  ActionType.PATCH_TOPIC__FAILURE
 )<undefined, Topic, undefined>();
 
 export const fetchConsumerGroupsAction = createAsyncAction(
   ActionType.GET_CONSUMER_GROUPS__REQUEST,
   ActionType.GET_CONSUMER_GROUPS__SUCCESS,
-  ActionType.GET_CONSUMER_GROUPS__FAILURE,
+  ActionType.GET_CONSUMER_GROUPS__FAILURE
 )<undefined, ConsumerGroup[], undefined>();
 
 export const fetchConsumerGroupDetailsAction = createAsyncAction(
   ActionType.GET_CONSUMER_GROUP_DETAILS__REQUEST,
   ActionType.GET_CONSUMER_GROUP_DETAILS__SUCCESS,
-  ActionType.GET_CONSUMER_GROUP_DETAILS__FAILURE,
-)<undefined, { consumerGroupID: ConsumerGroupID, details: ConsumerGroupDetails }, undefined>();
+  ActionType.GET_CONSUMER_GROUP_DETAILS__FAILURE
+)<
+  undefined,
+  { consumerGroupID: ConsumerGroupID; details: ConsumerGroupDetails },
+  undefined
+>();

+ 75 - 14
kafka-ui-react-app/src/redux/actions/thunks.ts

@@ -1,15 +1,19 @@
 import * as api from 'redux/api';
-import * as actions from './actions';
-import { ConsumerGroupID } from '../interfaces/consumerGroup';
 import {
+  ConsumerGroupID,
   PromiseThunk,
   Cluster,
   ClusterName,
   TopicFormData,
-  TopicName, Topic,
+  TopicName,
+  Topic,
 } from 'redux/interfaces';
 
-export const fetchBrokers = (clusterName: ClusterName): PromiseThunk<void> => async (dispatch) => {
+import * as actions from './actions';
+
+export const fetchBrokers = (
+  clusterName: ClusterName
+): PromiseThunk<void> => async (dispatch) => {
   dispatch(actions.fetchBrokersAction.request());
   try {
     const payload = await api.getBrokers(clusterName);
@@ -19,7 +23,9 @@ export const fetchBrokers = (clusterName: ClusterName): PromiseThunk<void> => as
   }
 };
 
-export const fetchBrokerMetrics = (clusterName: ClusterName): PromiseThunk<void> => async (dispatch) => {
+export const fetchBrokerMetrics = (
+  clusterName: ClusterName
+): PromiseThunk<void> => async (dispatch) => {
   dispatch(actions.fetchBrokerMetricsAction.request());
   try {
     const payload = await api.getBrokerMetrics(clusterName);
@@ -39,7 +45,9 @@ export const fetchClustersList = (): PromiseThunk<void> => async (dispatch) => {
   }
 };
 
-export const fetchTopicList = (clusterName: ClusterName): PromiseThunk<void> => async (dispatch) => {
+export const fetchTopicList = (
+  clusterName: ClusterName
+): PromiseThunk<void> => async (dispatch) => {
   dispatch(actions.fetchTopicListAction.request());
   try {
     const topics = await api.getTopics(clusterName);
@@ -49,17 +57,41 @@ export const fetchTopicList = (clusterName: ClusterName): PromiseThunk<void> =>
   }
 };
 
-export const fetchTopicDetails = (clusterName: ClusterName, topicName: TopicName): PromiseThunk<void> => async (dispatch) => {
+export const fetchTopicMessages = (
+  clusterName: ClusterName,
+  topicName: TopicName
+): PromiseThunk<void> => async (dispatch) => {
+  dispatch(actions.fetchTopicMessagesAction.request());
+  try {
+    const messages = await api.getTopicMessages(clusterName, topicName);
+    dispatch(actions.fetchTopicMessagesAction.success(messages));
+  } catch (e) {
+    dispatch(actions.fetchTopicMessagesAction.failure());
+  }
+};
+
+export const fetchTopicDetails = (
+  clusterName: ClusterName,
+  topicName: TopicName
+): PromiseThunk<void> => async (dispatch) => {
   dispatch(actions.fetchTopicDetailsAction.request());
   try {
     const topicDetails = await api.getTopicDetails(clusterName, topicName);
-    dispatch(actions.fetchTopicDetailsAction.success({ topicName, details: topicDetails }));
+    dispatch(
+      actions.fetchTopicDetailsAction.success({
+        topicName,
+        details: topicDetails,
+      })
+    );
   } catch (e) {
     dispatch(actions.fetchTopicDetailsAction.failure());
   }
 };
 
-export const fetchTopicConfig = (clusterName: ClusterName, topicName: TopicName): PromiseThunk<void> => async (dispatch) => {
+export const fetchTopicConfig = (
+  clusterName: ClusterName,
+  topicName: TopicName
+): PromiseThunk<void> => async (dispatch) => {
   dispatch(actions.fetchTopicConfigAction.request());
   try {
     const config = await api.getTopicConfig(clusterName, topicName);
@@ -69,7 +101,10 @@ export const fetchTopicConfig = (clusterName: ClusterName, topicName: TopicName)
   }
 };
 
-export const createTopic = (clusterName: ClusterName, form: TopicFormData): PromiseThunk<void> => async (dispatch) => {
+export const createTopic = (
+  clusterName: ClusterName,
+  form: TopicFormData
+): PromiseThunk<void> => async (dispatch) => {
   dispatch(actions.createTopicAction.request());
   try {
     const topic: Topic = await api.postTopic(clusterName, form);
@@ -79,7 +114,22 @@ export const createTopic = (clusterName: ClusterName, form: TopicFormData): Prom
   }
 };
 
-export const fetchConsumerGroupsList = (clusterName: ClusterName): PromiseThunk<void> => async (dispatch) => {
+export const updateTopic = (
+  clusterName: ClusterName,
+  form: TopicFormData
+): PromiseThunk<void> => async (dispatch) => {
+  dispatch(actions.updateTopicAction.request());
+  try {
+    const topic: Topic = await api.patchTopic(clusterName, form);
+    dispatch(actions.updateTopicAction.success(topic));
+  } catch (e) {
+    dispatch(actions.updateTopicAction.failure());
+  }
+};
+
+export const fetchConsumerGroupsList = (
+  clusterName: ClusterName
+): PromiseThunk<void> => async (dispatch) => {
   dispatch(actions.fetchConsumerGroupsAction.request());
   try {
     const consumerGroups = await api.getConsumerGroups(clusterName);
@@ -89,11 +139,22 @@ export const fetchConsumerGroupsList = (clusterName: ClusterName): PromiseThunk<
   }
 };
 
-export const fetchConsumerGroupDetails = (clusterName: ClusterName, consumerGroupID: ConsumerGroupID): PromiseThunk<void> => async (dispatch) => {
+export const fetchConsumerGroupDetails = (
+  clusterName: ClusterName,
+  consumerGroupID: ConsumerGroupID
+): PromiseThunk<void> => async (dispatch) => {
   dispatch(actions.fetchConsumerGroupDetailsAction.request());
   try {
-    const consumerGroupDetails = await api.getConsumerGroupDetails(clusterName, consumerGroupID);
-    dispatch(actions.fetchConsumerGroupDetailsAction.success({ consumerGroupID, details: consumerGroupDetails }));
+    const consumerGroupDetails = await api.getConsumerGroupDetails(
+      clusterName,
+      consumerGroupID
+    );
+    dispatch(
+      actions.fetchConsumerGroupDetailsAction.success({
+        consumerGroupID,
+        details: consumerGroupDetails,
+      })
+    );
   } catch (e) {
     dispatch(actions.fetchConsumerGroupDetailsAction.failure());
   }

+ 60 - 22
kafka-ui-react-app/src/redux/api/topics.ts

@@ -1,14 +1,30 @@
-import { reduce } from 'lodash';
 import {
-  ClusterName,
+  TopicName,
+  TopicMessage,
   Topic,
-  TopicConfig,
+  ClusterName,
   TopicDetails,
-  TopicFormCustomParam,
+  TopicConfig,
   TopicFormData,
-  TopicName,
+  TopicFormCustomParam,
+  TopicFormFormattedParams,
+  TopicFormCustomParams,
 } from 'redux/interfaces';
-import { BASE_PARAMS, BASE_URL } from 'lib/constants';
+import { BASE_URL, BASE_PARAMS } from 'lib/constants';
+
+const formatCustomParams = (
+  customParams: TopicFormCustomParams
+): TopicFormFormattedParams => {
+  return Object.values(customParams || {}).reduce(
+    (result: TopicFormFormattedParams, customParam: TopicFormCustomParam) => {
+      return {
+        ...result,
+        [customParam.name]: customParam.value,
+      };
+    },
+    {} as TopicFormFormattedParams
+  );
+};
 
 export const getTopicConfig = (
   clusterName: ClusterName,
@@ -31,9 +47,13 @@ export const getTopics = (clusterName: ClusterName): Promise<Topic[]> =>
     ...BASE_PARAMS,
   }).then((res) => res.json());
 
-interface Result {
-  [index: string]: string;
-}
+export const getTopicMessages = (
+  clusterName: ClusterName,
+  topicName: TopicName
+): Promise<TopicMessage[]> =>
+  fetch(`${BASE_URL}/clusters/${clusterName}/topics/${topicName}/messages`, {
+    ...BASE_PARAMS,
+  }).then((res) => res.json());
 
 export const postTopic = (
   clusterName: ClusterName,
@@ -50,18 +70,6 @@ export const postTopic = (
     minInSyncReplicas,
   } = form;
 
-  const customParams =
-    (form.customParams &&
-      reduce(
-        Object.values(form.customParams),
-        (result: Result, customParam: TopicFormCustomParam) => {
-          result[customParam.name] = customParam.value;
-          return result;
-        },
-        {}
-      )) ||
-    {};
-
   const body = JSON.stringify({
     name,
     partitions,
@@ -72,7 +80,7 @@ export const postTopic = (
       'retention.bytes': retentionBytes,
       'max.message.bytes': maxMessageBytes,
       'min.insync.replicas': minInSyncReplicas,
-      ...customParams,
+      ...formatCustomParams(form.customParams),
     },
   });
 
@@ -82,3 +90,33 @@ export const postTopic = (
     body,
   }).then((res) => res.json());
 };
+
+export const patchTopic = (
+  clusterName: ClusterName,
+  form: TopicFormData
+): Promise<Topic> => {
+  const {
+    cleanupPolicy,
+    retentionBytes,
+    retentionMs,
+    maxMessageBytes,
+    minInSyncReplicas,
+  } = form;
+
+  const body = JSON.stringify({
+    configs: {
+      'cleanup.policy': cleanupPolicy,
+      'retention.ms': retentionMs,
+      'retention.bytes': retentionBytes,
+      'max.message.bytes': maxMessageBytes,
+      'min.insync.replicas': minInSyncReplicas,
+      ...formatCustomParams(form.customParams),
+    },
+  });
+
+  return fetch(`${BASE_URL}/clusters/${clusterName}/topics/${form.name}`, {
+    ...BASE_PARAMS,
+    method: 'PATCH',
+    body,
+  }).then((res) => res.json());
+};

+ 22 - 3
kafka-ui-react-app/src/redux/interfaces/topic.ts

@@ -11,6 +11,12 @@ export interface TopicConfig {
   defaultValue: string;
 }
 
+export interface TopicConfigByName {
+  byName: {
+    [paramName: string]: TopicConfig;
+  };
+}
+
 export interface TopicReplica {
   broker: number;
   leader: boolean;
@@ -44,6 +50,16 @@ export interface Topic {
   partitions: TopicPartition[];
 }
 
+export interface TopicMessage {
+  partition: number;
+  offset: number;
+  timestamp: number;
+  timestampType: string;
+  key: string;
+  headers: Record<string, string>;
+  content: string;
+}
+
 export interface TopicFormCustomParam {
   name: string;
   value: string;
@@ -61,6 +77,11 @@ export interface TopicWithDetailedInfo extends Topic, TopicDetails {
 export interface TopicsState {
   byName: { [topicName: string]: TopicWithDetailedInfo };
   allNames: TopicName[];
+  messages: TopicMessage[];
+}
+
+export interface TopicFormFormattedParams {
+  [name: string]: string;
 }
 
 export interface TopicFormData {
@@ -72,7 +93,5 @@ export interface TopicFormData {
   retentionMs: number;
   retentionBytes: number;
   maxMessageBytes: number;
-  customParams: {
-    [index: string]: TopicFormCustomParam;
-  };
+  customParams: TopicFormCustomParams;
 }

+ 9 - 6
kafka-ui-react-app/src/redux/reducers/brokers/reducer.ts

@@ -4,11 +4,9 @@ import {
   ZooKeeperStatus,
   BrokerMetrics,
 } from 'redux/interfaces';
-import {
-  ActionType,
-} from 'redux/actionType';
+import ActionType from 'redux/actionType';
 
-export const initialState: BrokersState =  {
+export const initialState: BrokersState = {
   items: [],
   brokerCount: 0,
   zooKeeperStatus: ZooKeeperStatus.offline,
@@ -21,12 +19,17 @@ export const initialState: BrokersState =  {
   diskUsage: [],
 };
 
-const updateBrokerSegmentSize = (state: BrokersState, payload: BrokerMetrics) => {
+const updateBrokerSegmentSize = (
+  state: BrokersState,
+  payload: BrokerMetrics
+) => {
   const brokers = state.items;
   const { diskUsage } = payload;
 
   const items = brokers.map((broker) => {
-    const brokerMetrics = diskUsage.find(({ brokerId }) => brokerId === broker.brokerId);
+    const brokerMetrics = diskUsage.find(
+      ({ brokerId }) => brokerId === broker.brokerId
+    );
     if (brokerMetrics !== undefined) {
       return { ...broker, ...brokerMetrics };
     }

+ 1 - 1
kafka-ui-react-app/src/redux/reducers/clusters/reducer.ts

@@ -1,5 +1,5 @@
 import { Cluster, Action } from 'redux/interfaces';
-import { ActionType } from 'redux/actionType';
+import ActionType from 'redux/actionType';
 
 export const initialState: Cluster[] = [];
 

+ 19 - 20
kafka-ui-react-app/src/redux/reducers/consumerGroups/reducer.ts

@@ -1,31 +1,30 @@
-import { Action, ConsumerGroup } from 'redux/interfaces';
-import { ActionType } from 'redux/actionType';
-import { ConsumerGroupsState } from '../../interfaces/consumerGroup';
+import { Action, ConsumerGroup, ConsumerGroupsState } from 'redux/interfaces';
+import ActionType from 'redux/actionType';
 
 export const initialState: ConsumerGroupsState = {
   byID: {},
-  allIDs: []
+  allIDs: [],
 };
 
-const updateConsumerGroupsList = (state: ConsumerGroupsState, payload: ConsumerGroup[]): ConsumerGroupsState => {
+const updateConsumerGroupsList = (
+  state: ConsumerGroupsState,
+  payload: ConsumerGroup[]
+): ConsumerGroupsState => {
   const initialMemo: ConsumerGroupsState = {
     ...state,
-    allIDs: []
+    allIDs: [],
   };
 
-  return payload.reduce(
-    (memo: ConsumerGroupsState, consumerGroup) => {
-      const {consumerGroupId} = consumerGroup;
-      memo.byID[consumerGroupId] = {
-        ...memo.byID[consumerGroupId],
-        ...consumerGroup,
-      };
-      memo.allIDs.push(consumerGroupId);
+  return payload.reduce((memo: ConsumerGroupsState, consumerGroup) => {
+    const { consumerGroupId } = consumerGroup;
+    memo.byID[consumerGroupId] = {
+      ...memo.byID[consumerGroupId],
+      ...consumerGroup,
+    };
+    memo.allIDs.push(consumerGroupId);
 
-      return memo;
-    },
-    initialMemo,
-  );
+    return memo;
+  }, initialMemo);
 };
 
 const reducer = (state = initialState, action: Action): ConsumerGroupsState => {
@@ -40,8 +39,8 @@ const reducer = (state = initialState, action: Action): ConsumerGroupsState => {
           [action.payload.consumerGroupID]: {
             ...state.byID[action.payload.consumerGroupID],
             ...action.payload.details,
-          }
-        }
+          },
+        },
       };
     default:
       return state;

+ 21 - 18
kafka-ui-react-app/src/redux/reducers/topics/reducer.ts

@@ -1,9 +1,10 @@
 import { Action, TopicsState, Topic } from 'redux/interfaces';
-import { ActionType } from 'redux/actionType';
+import ActionType from 'redux/actionType';
 
 export const initialState: TopicsState = {
   byName: {},
   allNames: [],
+  messages: [],
 };
 
 const updateTopicList = (state: TopicsState, payload: Topic[]): TopicsState => {
@@ -12,24 +13,21 @@ const updateTopicList = (state: TopicsState, payload: Topic[]): TopicsState => {
     allNames: [],
   };
 
-  return payload.reduce(
-    (memo: TopicsState, topic) => {
-      const {name} = topic;
-      memo.byName[name] = {
-        ...memo.byName[name],
-        ...topic,
-      };
-      memo.allNames.push(name);
+  return payload.reduce((memo: TopicsState, topic) => {
+    const { name } = topic;
+    memo.byName[name] = {
+      ...memo.byName[name],
+      ...topic,
+    };
+    memo.allNames.push(name);
 
-      return memo;
-    },
-    initialMemo,
-  );
+    return memo;
+  }, initialMemo);
 };
 
 const addToTopicList = (state: TopicsState, payload: Topic): TopicsState => {
   const newState: TopicsState = {
-    ...state
+    ...state,
   };
   newState.allNames.push(payload.name);
   newState.byName[payload.name] = payload;
@@ -48,8 +46,13 @@ const reducer = (state = initialState, action: Action): TopicsState => {
           [action.payload.topicName]: {
             ...state.byName[action.payload.topicName],
             ...action.payload.details,
-          }
-        }
+          },
+        },
+      };
+    case ActionType.GET_TOPIC_MESSAGES__SUCCESS:
+      return {
+        ...state,
+        messages: action.payload,
       };
     case ActionType.GET_TOPIC_CONFIG__SUCCESS:
       return {
@@ -59,8 +62,8 @@ const reducer = (state = initialState, action: Action): TopicsState => {
           [action.payload.topicName]: {
             ...state.byName[action.payload.topicName],
             config: action.payload.config,
-          }
-        }
+          },
+        },
       };
     case ActionType.POST_TOPIC__SUCCESS:
       return addToTopicList(state, action.payload);

+ 60 - 13
kafka-ui-react-app/src/redux/reducers/topics/selectors.ts

@@ -1,35 +1,59 @@
 import { createSelector } from 'reselect';
-import { RootState, TopicName, FetchStatus, TopicsState } from 'redux/interfaces';
+import {
+  RootState,
+  TopicName,
+  FetchStatus,
+  TopicsState,
+  TopicConfigByName,
+} from 'redux/interfaces';
 import { createFetchingSelector } from 'redux/reducers/loader/selectors';
 
 const topicsState = ({ topics }: RootState): TopicsState => topics;
 
 const getAllNames = (state: RootState) => topicsState(state).allNames;
 const getTopicMap = (state: RootState) => topicsState(state).byName;
+export const getTopicMessages = (state: RootState) =>
+  topicsState(state).messages;
 
 const getTopicListFetchingStatus = createFetchingSelector('GET_TOPICS');
-const getTopicDetailsFetchingStatus = createFetchingSelector('GET_TOPIC_DETAILS');
+const getTopicDetailsFetchingStatus = createFetchingSelector(
+  'GET_TOPIC_DETAILS'
+);
+const getTopicMessagesFetchingStatus = createFetchingSelector(
+  'GET_TOPIC_MESSAGES'
+);
 const getTopicConfigFetchingStatus = createFetchingSelector('GET_TOPIC_CONFIG');
 const getTopicCreationStatus = createFetchingSelector('POST_TOPIC');
+const getTopicUpdateStatus = createFetchingSelector('PATCH_TOPIC');
 
 export const getIsTopicListFetched = createSelector(
   getTopicListFetchingStatus,
-  (status) => status === FetchStatus.fetched,
+  (status) => status === FetchStatus.fetched
 );
 
 export const getIsTopicDetailsFetched = createSelector(
   getTopicDetailsFetchingStatus,
-  (status) => status === FetchStatus.fetched,
+  (status) => status === FetchStatus.fetched
+);
+
+export const getIsTopicMessagesFetched = createSelector(
+  getTopicMessagesFetchingStatus,
+  (status) => status === FetchStatus.fetched
 );
 
 export const getTopicConfigFetched = createSelector(
   getTopicConfigFetchingStatus,
-  (status) => status === FetchStatus.fetched,
+  (status) => status === FetchStatus.fetched
 );
 
 export const getTopicCreated = createSelector(
   getTopicCreationStatus,
-  (status) => status === FetchStatus.fetched,
+  (status) => status === FetchStatus.fetched
+);
+
+export const getTopicUpdated = createSelector(
+  getTopicUpdateStatus,
+  (status) => status === FetchStatus.fetched
 );
 
 export const getTopicList = createSelector(
@@ -40,13 +64,12 @@ export const getTopicList = createSelector(
     if (!isFetched) {
       return [];
     }
-    return allNames.map((name) => byName[name])
-  },
+    return allNames.map((name) => byName[name]);
+  }
 );
 
-export const getExternalTopicList = createSelector(
-  getTopicList,
-  (topics) => topics.filter(({ internal }) => !internal),
+export const getExternalTopicList = createSelector(getTopicList, (topics) =>
+  topics.filter(({ internal }) => !internal)
 );
 
 const getTopicName = (_: RootState, topicName: TopicName) => topicName;
@@ -54,7 +77,31 @@ const getTopicName = (_: RootState, topicName: TopicName) => topicName;
 export const getTopicByName = createSelector(
   getTopicMap,
   getTopicName,
-  (topics, topicName) => topics[topicName],
+  (topics, topicName) => topics[topicName]
+);
+
+export const getFullTopic = createSelector(getTopicByName, (topic) =>
+  topic && topic.config && !!topic.partitionCount ? topic : undefined
+);
+
+export const getTopicConfig = createSelector(
+  getTopicByName,
+  ({ config }) => config
 );
 
-export const getTopicConfig = createSelector(getTopicByName, ({ config }) => config);
+export const getTopicConfigByParamName = createSelector(
+  getTopicConfig,
+  (config) => {
+    const byParamName: TopicConfigByName = {
+      byName: {},
+    };
+
+    if (config) {
+      config.forEach((param) => {
+        byParamName.byName[param.name] = param;
+      });
+    }
+
+    return byParamName;
+  }
+);

+ 3 - 0
package-lock.json

@@ -0,0 +1,3 @@
+{
+  "lockfileVersion": 1
+}

+ 16 - 0
pom.xml

@@ -28,9 +28,25 @@
 		<swagger-annotations.version>1.6.0</swagger-annotations.version>
 		<springdoc-openapi-webflux-ui.version>1.2.32</springdoc-openapi-webflux-ui.version>
 		<kafka.version>2.4.1</kafka.version>
+		<avro.version>1.9.2</avro.version>
+		<confluent.version>5.5.0</confluent.version>
 		<apache.commons.version>2.2</apache.commons.version>
 	</properties>
 
+	<repositories>
+		<repository>
+			<id>confluent</id>
+			<url>https://packages.confluent.io/maven/</url>
+		</repository>
+	</repositories>
+
+	<pluginRepositories>
+		<pluginRepository>
+			<id>confluent</id>
+			<url>https://packages.confluent.io/maven/</url>
+		</pluginRepository>
+	</pluginRepositories>
+
 	<groupId>com.provectus</groupId>
 	<artifactId>kafka-ui</artifactId>
 	<version>0.0.1-SNAPSHOT</version>

Some files were not shown because too many files changed in this diff