|
@@ -15,7 +15,6 @@ import com.provectus.kafka.ui.exception.ValidationException;
|
|
import com.provectus.kafka.ui.util.KafkaVersion;
|
|
import com.provectus.kafka.ui.util.KafkaVersion;
|
|
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
|
|
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
|
|
import java.io.Closeable;
|
|
import java.io.Closeable;
|
|
-import java.time.Duration;
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
@@ -23,7 +22,6 @@ import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Optional;
|
|
import java.util.Optional;
|
|
-import java.util.Properties;
|
|
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.concurrent.CompletionException;
|
|
import java.util.concurrent.CompletionException;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.ExecutionException;
|
|
@@ -58,8 +56,6 @@ import org.apache.kafka.clients.admin.NewTopic;
|
|
import org.apache.kafka.clients.admin.OffsetSpec;
|
|
import org.apache.kafka.clients.admin.OffsetSpec;
|
|
import org.apache.kafka.clients.admin.RecordsToDelete;
|
|
import org.apache.kafka.clients.admin.RecordsToDelete;
|
|
import org.apache.kafka.clients.admin.TopicDescription;
|
|
import org.apache.kafka.clients.admin.TopicDescription;
|
|
-import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
|
|
-import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
import org.apache.kafka.common.KafkaException;
|
|
import org.apache.kafka.common.KafkaException;
|
|
import org.apache.kafka.common.KafkaFuture;
|
|
import org.apache.kafka.common.KafkaFuture;
|
|
@@ -81,8 +77,6 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
|
|
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
|
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
|
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
|
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
|
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
|
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
|
-import org.apache.kafka.common.serialization.BytesDeserializer;
|
|
|
|
-import org.apache.kafka.common.utils.Bytes;
|
|
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.scheduler.Schedulers;
|
|
import reactor.core.scheduler.Schedulers;
|
|
@@ -184,18 +178,18 @@ public class ReactiveAdminClient implements Closeable {
|
|
// (see MonoSink.success(..) javadoc for details)
|
|
// (see MonoSink.success(..) javadoc for details)
|
|
public static <T> Mono<T> toMono(KafkaFuture<T> future) {
|
|
public static <T> Mono<T> toMono(KafkaFuture<T> future) {
|
|
return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
|
|
return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
|
|
- if (ex != null) {
|
|
|
|
- // KafkaFuture doc is unclear about what exception wrapper will be used
|
|
|
|
- // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
|
|
|
|
- if (ex instanceof CompletionException || ex instanceof ExecutionException) {
|
|
|
|
- sink.error(ex.getCause()); //unwrapping exception
|
|
|
|
- } else {
|
|
|
|
- sink.error(ex);
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- sink.success(res);
|
|
|
|
- }
|
|
|
|
- })).doOnCancel(() -> future.cancel(true))
|
|
|
|
|
|
+ if (ex != null) {
|
|
|
|
+ // KafkaFuture doc is unclear about what exception wrapper will be used
|
|
|
|
+ // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
|
|
|
|
+ if (ex instanceof CompletionException || ex instanceof ExecutionException) {
|
|
|
|
+ sink.error(ex.getCause()); //unwrapping exception
|
|
|
|
+ } else {
|
|
|
|
+ sink.error(ex);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ sink.success(res);
|
|
|
|
+ }
|
|
|
|
+ })).doOnCancel(() -> future.cancel(true))
|
|
// AdminClient is using single thread for kafka communication
|
|
// AdminClient is using single thread for kafka communication
|
|
// and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
|
|
// and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
|
|
// If some of downstream operation are blocking (by mistake) this can lead to
|
|
// If some of downstream operation are blocking (by mistake) this can lead to
|
|
@@ -400,12 +394,12 @@ public class ReactiveAdminClient implements Closeable {
|
|
result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
|
|
result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
|
|
return toMono(allOfFuture).then(
|
|
return toMono(allOfFuture).then(
|
|
Mono.fromCallable(() ->
|
|
Mono.fromCallable(() ->
|
|
- new ClusterDescription(
|
|
|
|
- result.controller().get(),
|
|
|
|
- result.clusterId().get(),
|
|
|
|
- result.nodes().get(),
|
|
|
|
- result.authorizedOperations().get()
|
|
|
|
- )
|
|
|
|
|
|
+ new ClusterDescription(
|
|
|
|
+ result.controller().get(),
|
|
|
|
+ result.clusterId().get(),
|
|
|
|
+ result.nodes().get(),
|
|
|
|
+ result.authorizedOperations().get()
|
|
|
|
+ )
|
|
)
|
|
)
|
|
);
|
|
);
|
|
}
|
|
}
|
|
@@ -559,8 +553,8 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
|
|
static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
|
|
- Predicate<TopicPartition> partitionPredicate,
|
|
|
|
- boolean failOnUnknownLeader) {
|
|
|
|
|
|
+ Predicate<TopicPartition> partitionPredicate,
|
|
|
|
+ boolean failOnUnknownLeader) {
|
|
var goodPartitions = new HashSet<TopicPartition>();
|
|
var goodPartitions = new HashSet<TopicPartition>();
|
|
for (TopicDescription description : topicDescriptions) {
|
|
for (TopicDescription description : topicDescriptions) {
|
|
var goodTopicPartitions = new ArrayList<TopicPartition>();
|
|
var goodTopicPartitions = new ArrayList<TopicPartition>();
|
|
@@ -726,26 +720,4 @@ public class ReactiveAdminClient implements Closeable {
|
|
public void close() {
|
|
public void close() {
|
|
client.close();
|
|
client.close();
|
|
}
|
|
}
|
|
-
|
|
|
|
-
|
|
|
|
- public static void main(String[] args) {
|
|
|
|
- Properties props = new Properties();
|
|
|
|
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group_1");
|
|
|
|
- props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
|
|
|
|
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
|
|
|
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
|
|
|
|
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
|
|
|
|
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
|
|
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
|
|
|
- props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
|
|
|
|
-
|
|
|
|
- try (var consumer = new KafkaConsumer<Bytes, Bytes>(props)) {
|
|
|
|
- consumer.subscribe(List.of("test"));
|
|
|
|
- while (true) {
|
|
|
|
- consumer.poll(Duration.ofMillis(500));
|
|
|
|
- //consumer.commitSync();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
}
|
|
}
|