|
@@ -12,9 +12,12 @@ import com.google.common.collect.Table;
|
|
|
import com.provectus.kafka.ui.exception.IllegalEntityStateException;
|
|
|
import com.provectus.kafka.ui.exception.NotFoundException;
|
|
|
import com.provectus.kafka.ui.exception.ValidationException;
|
|
|
+import com.provectus.kafka.ui.model.KafkaCluster;
|
|
|
import com.provectus.kafka.ui.util.KafkaVersion;
|
|
|
+import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
|
|
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
|
|
|
import java.io.Closeable;
|
|
|
+import java.time.Duration;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
@@ -22,6 +25,7 @@ import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
+import java.util.Properties;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.CompletionException;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
@@ -55,6 +59,8 @@ import org.apache.kafka.clients.admin.NewTopic;
|
|
|
import org.apache.kafka.clients.admin.OffsetSpec;
|
|
|
import org.apache.kafka.clients.admin.RecordsToDelete;
|
|
|
import org.apache.kafka.clients.admin.TopicDescription;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
|
+import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
|
import org.apache.kafka.common.KafkaException;
|
|
|
import org.apache.kafka.common.KafkaFuture;
|
|
@@ -77,6 +83,8 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
|
|
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
|
|
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
|
|
|
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
|
|
+import org.apache.kafka.common.serialization.BytesDeserializer;
|
|
|
+import org.apache.kafka.common.utils.Bytes;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
import reactor.core.scheduler.Schedulers;
|
|
@@ -178,18 +186,18 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
// (see MonoSink.success(..) javadoc for details)
|
|
|
public static <T> Mono<T> toMono(KafkaFuture<T> future) {
|
|
|
return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
|
|
|
- if (ex != null) {
|
|
|
- // KafkaFuture doc is unclear about what exception wrapper will be used
|
|
|
- // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
|
|
|
- if (ex instanceof CompletionException || ex instanceof ExecutionException) {
|
|
|
- sink.error(ex.getCause()); //unwrapping exception
|
|
|
- } else {
|
|
|
- sink.error(ex);
|
|
|
- }
|
|
|
- } else {
|
|
|
- sink.success(res);
|
|
|
- }
|
|
|
- })).doOnCancel(() -> future.cancel(true))
|
|
|
+ if (ex != null) {
|
|
|
+ // KafkaFuture doc is unclear about what exception wrapper will be used
|
|
|
+ // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
|
|
|
+ if (ex instanceof CompletionException || ex instanceof ExecutionException) {
|
|
|
+ sink.error(ex.getCause()); //unwrapping exception
|
|
|
+ } else {
|
|
|
+ sink.error(ex);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ sink.success(res);
|
|
|
+ }
|
|
|
+ })).doOnCancel(() -> future.cancel(true))
|
|
|
// AdminClient is using single thread for kafka communication
|
|
|
// and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
|
|
|
// If some of downstream operation are blocking (by mistake) this can lead to
|
|
@@ -401,12 +409,12 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
|
|
|
return toMono(allOfFuture).then(
|
|
|
Mono.fromCallable(() ->
|
|
|
- new ClusterDescription(
|
|
|
- result.controller().get(),
|
|
|
- result.clusterId().get(),
|
|
|
- result.nodes().get(),
|
|
|
- result.authorizedOperations().get()
|
|
|
- )
|
|
|
+ new ClusterDescription(
|
|
|
+ result.controller().get(),
|
|
|
+ result.clusterId().get(),
|
|
|
+ result.nodes().get(),
|
|
|
+ result.authorizedOperations().get()
|
|
|
+ )
|
|
|
)
|
|
|
);
|
|
|
}
|
|
@@ -560,8 +568,8 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
|
|
|
@VisibleForTesting
|
|
|
static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
|
|
|
- Predicate<TopicPartition> partitionPredicate,
|
|
|
- boolean failOnUnknownLeader) {
|
|
|
+ Predicate<TopicPartition> partitionPredicate,
|
|
|
+ boolean failOnUnknownLeader) {
|
|
|
var goodPartitions = new HashSet<TopicPartition>();
|
|
|
for (TopicDescription description : topicDescriptions) {
|
|
|
var goodTopicPartitions = new ArrayList<TopicPartition>();
|
|
@@ -727,4 +735,26 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
public void close() {
|
|
|
client.close();
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ public static void main(String[] args) {
|
|
|
+ Properties props = new Properties();
|
|
|
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group_1");
|
|
|
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
|
|
|
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
|
|
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
|
|
|
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
|
|
|
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
|
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
|
|
+ props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
|
|
|
+
|
|
|
+ try (var consumer = new KafkaConsumer<Bytes, Bytes>(props)) {
|
|
|
+ consumer.subscribe(List.of("test"));
|
|
|
+ while (true) {
|
|
|
+ consumer.poll(Duration.ofMillis(500));
|
|
|
+ //consumer.commitSync();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|