Added custom properies & fixed reset policy

This commit is contained in:
German Osin 2020-12-02 21:36:34 +03:00
parent 4ec0422357
commit c02e3ac4e0
8 changed files with 30 additions and 9 deletions

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.cluster.config;
import java.util.Properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@ -24,5 +25,6 @@ public class ClustersProperties {
String protobufFile;
String protobufMessageName;
int jmxPort;
Properties properties;
}
}

View file

@ -97,7 +97,8 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
final List<Integer> versions = schemaRegistryClient.getAllVersions(schemaName);
if (!versions.isEmpty()) {
final Integer version = versions.iterator().next();
final Schema schema = schemaRegistryClient.getByVersion(record.topic(), version, false);
final String subjectName = String.format(cluster.getSchemaNameTemplate(), record.topic());
final Schema schema = schemaRegistryClient.getByVersion(subjectName, version, false);
if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) {
try {
protobufDeserializer.deserialize(record.topic(), record.value().get());

View file

@ -3,6 +3,7 @@ package com.provectus.kafka.ui.cluster.mapper;
import com.provectus.kafka.ui.cluster.config.ClustersProperties;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.model.*;
import java.util.Properties;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
@ -11,6 +12,7 @@ import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import scala.sys.Prop;
@Mapper(componentModel = "spring")
public interface ClusterMapper {
@ -22,6 +24,7 @@ public interface ClusterMapper {
@Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec", qualifiedByName = "sumMetrics")
Cluster toCluster(KafkaCluster cluster);
@Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName="resolvePath")
@Mapping(target = "properties", source = "properties", qualifiedByName="setProperties")
KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
@Mapping(target = "diskUsage", source = "internalBrokerDiskUsage", qualifiedByName="mapDiskUsage")
ClusterStats toClusterStats(InternalClusterMetrics metrics);
@ -73,4 +76,12 @@ public interface ClusterMapper {
}
}
default Properties setProperties(Properties properties) {
Properties copy = new Properties();
if (properties!=null) {
copy.putAll(properties);
}
return copy;
}
}

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.cluster.model;
import com.provectus.kafka.ui.model.ServerStatus;
import java.util.Properties;
import lombok.Builder;
import lombok.Data;
@ -24,4 +25,5 @@ public class KafkaCluster {
private final Throwable lastZookeeperException;
private final Path protobufFile;
private final String protobufMessageName;
private final Properties properties;
}

View file

@ -128,6 +128,7 @@ public class ClusterService {
public Map<TopicPartition, Long> topicPartitionsEndOffsets(KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
Properties properties = new Properties();
properties.putAll(cluster.getProperties());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

View file

@ -88,7 +88,7 @@ public class ConsumingService {
@RequiredArgsConstructor
private static class RecordEmitter {
private static final int MAX_POLLS_COUNT = 30;
private static final int MAX_EMPTY_POLLS_COUNT = 3;
private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
private final KafkaService kafkaService;
@ -98,15 +98,16 @@ public class ConsumingService {
public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
// assignPartitions(consumer);
// seekOffsets(consumer);
assignAndSeek(consumer);
int pollsCount = 0;
while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) {
int emptyPollsCount = 0;
log.info("assignment: {}", consumer.assignment());
while (!sink.isCancelled()) {
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
log.info("{} records polled", records.count());
if (records.count() == 0) {
if (records.count() == 0 && emptyPollsCount < MAX_EMPTY_POLLS_COUNT) {
break;
} else {
emptyPollsCount++;
}
records.iterator()
.forEachRemaining(sink::next);

View file

@ -209,6 +209,7 @@ public class KafkaService {
public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) {
Properties properties = new Properties();
properties.putAll(kafkaCluster.getProperties());
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
AdminClient adminClient = AdminClient.create(properties);
@ -245,10 +246,12 @@ public class KafkaService {
public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
Properties props = new Properties();
props.putAll(cluster.getProperties());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}

View file

@ -1,8 +1,8 @@
kafka:
clusters:
- name: local
bootstrapServers: b-1.sandbox.kbqc5i.c3.kafka.eu-central-1.amazonaws.com:9092
zookeeper: z-2.sandbox.kbqc5i.c3.kafka.eu-central-1.amazonaws.com:2181
bootstrapServers: localhost:9092
zookeeper: localhost:2181
schemaRegistry: http://localhost:8081
admin-client-timeout: 5000
zookeeper: