Added begin and end offset param (#78)

* Added begin and end offset param

* moved consumer to try with resources block

* Fixed some problems

* Moved to gettopicdetails

* Cleanup code

Co-authored-by: Roman Nedzvetskiy <roman@Romans-MacBook-Pro.local>
Co-authored-by: German Osin <german.osin@gmail.com>
This commit is contained in:
Roman Nedzvetskiy 2020-07-24 14:16:49 +03:00 committed by GitHub
parent 5ad3f5ce79
commit 66afaa4971
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 88 additions and 52 deletions

View file

@ -23,35 +23,39 @@ services:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:9092,PLAINTEXT_HOST://localhost:29091 #,PLAIN://kafka0:29090 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:9092,PLAINTEXT_HOST://localhost:29091 #,PLAIN://kafka0:29090
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT #,PLAIN:PLAINTEXT KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT #,PLAIN:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
kafka01:
image: confluentinc/cp-kafka:5.1.0
depends_on:
- zookeeper0
ports:
- 29093:29093
- 9999:9999
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
JMX_PORT: 9997 JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997 KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
#
# kafka01:
# image: confluentinc/cp-kafka:5.1.0
# depends_on:
# - zookeeper0
# ports:
# - 29093:29093
# - 9999:9999
# environment:
# KAFKA_BROKER_ID: 2
# KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
# KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
# JMX_PORT: 9997
# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
kafka-init-topics0: kafka-init-topics0:
image: confluentinc/cp-kafka:5.1.0 image: confluentinc/cp-kafka:5.1.0
depends_on: depends_on:
- kafka0 - kafka0
command: "bash -c 'echo Waiting for Kafka to be ready... && \ - kafka1
cub kafka-ready -b kafka0:9092 1 20 && \ command:
kafka-topics --create --topic users --partitions 2 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183 && \ "kafka-console-producer --broker-list kafka1:9092 --topic secondUsers && \
kafka-topics --create --topic messages --partitions 3 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183'" This is message 1 && \
This is message 2 && \
This is message 3 && \
Message 4 && \
Message 5"
environment: environment:
KAFKA_BROKER_ID: ignored KAFKA_BROKER_ID: ignored
KAFKA_ZOOKEEPER_CONNECT: ignored KAFKA_ZOOKEEPER_CONNECT: ignored
@ -79,7 +83,7 @@ services:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092,PLAIN://localhost:29090 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092,PLAIN://localhost:29090
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
JMX_PORT: 9998 JMX_PORT: 9998
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9998 KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9998
@ -97,21 +101,21 @@ services:
networks: networks:
- default - default
schemaregistry0: # schemaregistry0:
image: confluentinc/cp-schema-registry:5.1.0 # image: confluentinc/cp-schema-registry:5.1.0
depends_on: # depends_on:
- zookeeper0 # - zookeeper0
- kafka0 # - kafka0
- kafka01 # - kafka01
ports: # ports:
- 8085:8085 # - 8085:8085
environment: # environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092 # SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183 # SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT # SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_HOST_NAME: schemaregistry # SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085 # SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
#
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" # SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO # SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas # SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.cluster.model; package com.provectus.kafka.ui.cluster.model;
import com.provectus.kafka.ui.model.TopicPartitionDto;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;

View file

@ -55,9 +55,12 @@ public class ClusterService {
public Optional<TopicDetails> getTopicDetails(String name, String topicName) { public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
return clustersStorage.getClusterByName(name) return clustersStorage.getClusterByName(name)
.map(KafkaCluster::getTopics) .map(c -> {
.map(t -> t.get(topicName)) var topic = c.getTopics().get(topicName);
.map(clusterMapper::toTopicDetails); return clusterMapper
.toTopicDetails(topic)
.partitions(kafkaService.partitionDtoList(topic, c));
});
} }
public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) { public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {

View file

@ -3,10 +3,7 @@ package com.provectus.kafka.ui.kafka;
import com.provectus.kafka.ui.cluster.model.*; import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.cluster.util.JmxClusterUtil; import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.*;
import com.provectus.kafka.ui.model.ServerStatus;
import com.provectus.kafka.ui.model.Topic;
import com.provectus.kafka.ui.model.TopicFormData;
import com.provectus.kafka.ui.zookeeper.ZookeeperService; import com.provectus.kafka.ui.zookeeper.ZookeeperService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
@ -26,7 +23,6 @@ import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2; import reactor.util.function.Tuple2;
import reactor.util.function.Tuples; import reactor.util.function.Tuples;
import java.math.BigDecimal;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -351,4 +347,26 @@ public class KafkaService {
}) })
); );
} }
public List<TopicPartitionDto> partitionDtoList (InternalTopic topic, KafkaCluster cluster) {
var topicPartitions = topic.getPartitions().stream().map(t -> new TopicPartition(topic.getName(), t.getPartition())).collect(Collectors.toList());
return getTopicPartitionOffset(cluster, topicPartitions);
}
private List<TopicPartitionDto> getTopicPartitionOffset(KafkaCluster c, List<TopicPartition> topicPartitions ) {
try (var consumer = createConsumer(c)) {
final Map<TopicPartition, Long> earliest = consumer.beginningOffsets(topicPartitions);
final Map<TopicPartition, Long> latest = consumer.endOffsets(topicPartitions);
return topicPartitions.stream()
.map( tp -> new TopicPartitionDto()
.topic(tp.topic())
.partition(tp.partition())
.offsetMin(Optional.ofNullable(earliest.get(tp)).orElse(0L))
.offsetMax(Optional.ofNullable(latest.get(tp)).orElse(0L))
).collect(Collectors.toList());
} catch (Exception e) {
return Collections.emptyList();
}
}
} }

View file

@ -5,7 +5,6 @@ import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.service.ClusterService; import com.provectus.kafka.ui.cluster.service.ClusterService;
import com.provectus.kafka.ui.model.*; import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
@ -14,12 +13,11 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import javax.validation.Valid;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.function.Function; import java.util.function.Function;
import javax.validation.Valid;
@RestController @RestController
@RequiredArgsConstructor @RequiredArgsConstructor
public class MetricsRestController implements ApiClustersApi { public class MetricsRestController implements ApiClustersApi {

View file

@ -383,6 +383,10 @@ components:
TopicDetails: TopicDetails:
type: object type: object
properties: properties:
partitions:
type: array
items:
$ref: "#/components/schemas/TopicPartitionDto"
partitionCount: partitionCount:
type: integer type: integer
replicationFactor: replicationFactor:
@ -486,9 +490,17 @@ components:
type: string type: string
partition: partition:
type: integer type: integer
offsetMax:
type: integer
format: int64
offsetMin:
type: integer
format: int64
required: required:
- topic - topic
- partition - partition
- offsetMax
- offsetMin
ConsumerTopicPartitionDetail: ConsumerTopicPartitionDetail:
type: object type: object