Added custom properties & fixed reset policy (#134)

* Added custom properties support
This commit is contained in:
German Osin 2020-12-02 21:47:57 +03:00 committed by GitHub
parent dd26014be1
commit cbc54f8416
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 29 additions and 9 deletions

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.cluster.config; package com.provectus.kafka.ui.cluster.config;
import java.util.Properties;
import lombok.Data; import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -24,5 +25,6 @@ public class ClustersProperties {
String protobufFile; String protobufFile;
String protobufMessageName; String protobufMessageName;
int jmxPort; int jmxPort;
Properties properties;
} }
} }

View file

@ -97,7 +97,8 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
final List<Integer> versions = schemaRegistryClient.getAllVersions(schemaName); final List<Integer> versions = schemaRegistryClient.getAllVersions(schemaName);
if (!versions.isEmpty()) { if (!versions.isEmpty()) {
final Integer version = versions.iterator().next(); final Integer version = versions.iterator().next();
final Schema schema = schemaRegistryClient.getByVersion(record.topic(), version, false); final String subjectName = String.format(cluster.getSchemaNameTemplate(), record.topic());
final Schema schema = schemaRegistryClient.getByVersion(subjectName, version, false);
if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) { if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) {
try { try {
protobufDeserializer.deserialize(record.topic(), record.value().get()); protobufDeserializer.deserialize(record.topic(), record.value().get());

View file

@ -3,6 +3,7 @@ package com.provectus.kafka.ui.cluster.mapper;
import com.provectus.kafka.ui.cluster.config.ClustersProperties; import com.provectus.kafka.ui.cluster.config.ClustersProperties;
import com.provectus.kafka.ui.cluster.model.*; import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.model.*; import com.provectus.kafka.ui.model.*;
import java.util.Properties;
import org.mapstruct.Mapper; import org.mapstruct.Mapper;
import org.mapstruct.Mapping; import org.mapstruct.Mapping;
@ -22,6 +23,7 @@ public interface ClusterMapper {
@Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec", qualifiedByName = "sumMetrics") @Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec", qualifiedByName = "sumMetrics")
Cluster toCluster(KafkaCluster cluster); Cluster toCluster(KafkaCluster cluster);
@Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName="resolvePath") @Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName="resolvePath")
@Mapping(target = "properties", source = "properties", qualifiedByName="setProperties")
KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties); KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
@Mapping(target = "diskUsage", source = "internalBrokerDiskUsage", qualifiedByName="mapDiskUsage") @Mapping(target = "diskUsage", source = "internalBrokerDiskUsage", qualifiedByName="mapDiskUsage")
ClusterStats toClusterStats(InternalClusterMetrics metrics); ClusterStats toClusterStats(InternalClusterMetrics metrics);
@ -73,4 +75,12 @@ public interface ClusterMapper {
} }
} }
default Properties setProperties(Properties properties) {
Properties copy = new Properties();
if (properties!=null) {
copy.putAll(properties);
}
return copy;
}
} }

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.cluster.model;
import com.provectus.kafka.ui.model.ServerStatus; import com.provectus.kafka.ui.model.ServerStatus;
import java.util.Properties;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
@ -24,4 +25,5 @@ public class KafkaCluster {
private final Throwable lastZookeeperException; private final Throwable lastZookeeperException;
private final Path protobufFile; private final Path protobufFile;
private final String protobufMessageName; private final String protobufMessageName;
private final Properties properties;
} }

View file

@ -128,6 +128,7 @@ public class ClusterService {
public Map<TopicPartition, Long> topicPartitionsEndOffsets(KafkaCluster cluster, Collection<TopicPartition> topicPartitions) { public Map<TopicPartition, Long> topicPartitionsEndOffsets(KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
Properties properties = new Properties(); Properties properties = new Properties();
properties.putAll(cluster.getProperties());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

View file

@ -88,7 +88,7 @@ public class ConsumingService {
@RequiredArgsConstructor @RequiredArgsConstructor
private static class RecordEmitter { private static class RecordEmitter {
private static final int MAX_POLLS_COUNT = 30; private static final int MAX_EMPTY_POLLS_COUNT = 3;
private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L); private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
private final KafkaService kafkaService; private final KafkaService kafkaService;
@ -98,15 +98,16 @@ public class ConsumingService {
public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) { public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) { try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
// assignPartitions(consumer);
// seekOffsets(consumer);
assignAndSeek(consumer); assignAndSeek(consumer);
int pollsCount = 0; int emptyPollsCount = 0;
while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) { log.info("assignment: {}", consumer.assignment());
while (!sink.isCancelled()) {
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS); ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
log.info("{} records polled", records.count()); log.info("{} records polled", records.count());
if (records.count() == 0) { if (records.count() == 0 && emptyPollsCount < MAX_EMPTY_POLLS_COUNT) {
break; break;
} else {
emptyPollsCount++;
} }
records.iterator() records.iterator()
.forEachRemaining(sink::next); .forEachRemaining(sink::next);

View file

@ -209,6 +209,7 @@ public class KafkaService {
public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) { public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) {
Properties properties = new Properties(); Properties properties = new Properties();
properties.putAll(kafkaCluster.getProperties());
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers()); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
AdminClient adminClient = AdminClient.create(properties); AdminClient adminClient = AdminClient.create(properties);
@ -245,10 +246,12 @@ public class KafkaService {
public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) { public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
Properties props = new Properties(); Properties props = new Properties();
props.putAll(cluster.getProperties());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui"); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props); return new KafkaConsumer<>(props);
} }

View file

@ -1,8 +1,8 @@
kafka: kafka:
clusters: clusters:
- name: local - name: local
bootstrapServers: b-1.sandbox.kbqc5i.c3.kafka.eu-central-1.amazonaws.com:9092 bootstrapServers: localhost:9092
zookeeper: z-2.sandbox.kbqc5i.c3.kafka.eu-central-1.amazonaws.com:2181 zookeeper: localhost:2181
schemaRegistry: http://localhost:8081 schemaRegistry: http://localhost:8081
admin-client-timeout: 5000 admin-client-timeout: 5000
zookeeper: zookeeper: