diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ReadOnlyModeFilter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ReadOnlyModeFilter.java index 608f10120d..35e0f8397b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ReadOnlyModeFilter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ReadOnlyModeFilter.java @@ -1,6 +1,6 @@ package com.provectus.kafka.ui.config; -import com.provectus.kafka.ui.exception.NotFoundException; +import com.provectus.kafka.ui.exception.ClusterNotFoundException; import com.provectus.kafka.ui.exception.ReadOnlyModeException; import com.provectus.kafka.ui.service.ClustersStorage; import java.util.regex.Pattern; @@ -39,7 +39,8 @@ public class ReadOnlyModeFilter implements WebFilter { var clusterName = matcher.group("clusterName"); var kafkaCluster = clustersStorage.getClusterByName(clusterName) .orElseThrow( - () -> new NotFoundException(String.format("No cluster for name '%s'", clusterName))); + () -> new ClusterNotFoundException( + String.format("No cluster for name '%s'", clusterName))); if (!kafkaCluster.getReadOnly()) { return chain.filter(exchange); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java index 0a3b55b997..cd5336cf2b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java @@ -32,7 +32,6 @@ public class ConsumerGroupsController implements ConsumerGroupsApi { return clusterService.getConsumerGroups(clusterName) .map(Flux::fromIterable) .map(ResponseEntity::ok) - .switchIfEmpty(Mono.just(ResponseEntity.notFound() - .build())); // TODO: check behaviour on cluster not found and empty groups list + .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ClusterNotFoundException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ClusterNotFoundException.java new file mode 100644 index 0000000000..5ba5e5df9c --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ClusterNotFoundException.java @@ -0,0 +1,17 @@ +package com.provectus.kafka.ui.exception; + +public class ClusterNotFoundException extends CustomBaseException { + + public ClusterNotFoundException() { + super("Cluster not found"); + } + + public ClusterNotFoundException(String message) { + super(message); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.CLUSTER_NOT_FOUND; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ConnectNotFoundException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ConnectNotFoundException.java new file mode 100644 index 0000000000..f4fcc069fa --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ConnectNotFoundException.java @@ -0,0 +1,13 @@ +package com.provectus.kafka.ui.exception; + +public class ConnectNotFoundException extends CustomBaseException { + + public ConnectNotFoundException() { + super("Connect not found"); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.CONNECT_NOT_FOUND; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/CustomBaseException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/CustomBaseException.java index ab1e3cf205..9b43e699b7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/CustomBaseException.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/CustomBaseException.java @@ -2,22 +2,23 @@ package com.provectus.kafka.ui.exception; public abstract class CustomBaseException extends RuntimeException { - public CustomBaseException() { + protected CustomBaseException() { + super(); } - public CustomBaseException(String message) { + protected CustomBaseException(String message) { super(message); } - public CustomBaseException(String message, Throwable cause) { + protected CustomBaseException(String message, Throwable cause) { super(message, cause); } - public CustomBaseException(Throwable cause) { + protected CustomBaseException(Throwable cause) { super(cause); } - public CustomBaseException(String message, Throwable cause, boolean enableSuppression, + protected CustomBaseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java index 7432c293cd..4517507764 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java @@ -10,11 +10,14 @@ public enum ErrorCode { UNEXPECTED(5000, HttpStatus.INTERNAL_SERVER_ERROR), BINDING_FAIL(4001, HttpStatus.BAD_REQUEST), VALIDATION_FAIL(4002, HttpStatus.BAD_REQUEST), - ENTITY_NOT_FOUND(4003, HttpStatus.NOT_FOUND), - READ_ONLY_MODE_ENABLE(4004, HttpStatus.METHOD_NOT_ALLOWED), - REBALANCE_IN_PROGRESS(4005, HttpStatus.CONFLICT), - DUPLICATED_ENTITY(4006, HttpStatus.CONFLICT), - UNPROCESSABLE_ENTITY(4007, HttpStatus.UNPROCESSABLE_ENTITY); + READ_ONLY_MODE_ENABLE(4003, HttpStatus.METHOD_NOT_ALLOWED), + REBALANCE_IN_PROGRESS(4004, HttpStatus.CONFLICT), + DUPLICATED_ENTITY(4005, HttpStatus.CONFLICT), + UNPROCESSABLE_ENTITY(4006, HttpStatus.UNPROCESSABLE_ENTITY), + CLUSTER_NOT_FOUND(4007, HttpStatus.NOT_FOUND), + TOPIC_NOT_FOUND(4008, HttpStatus.NOT_FOUND), + SCHEMA_NOT_FOUND(4009, HttpStatus.NOT_FOUND), + CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND); static { // codes uniqueness check diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/NotFoundException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/NotFoundException.java deleted file mode 100644 index 773c398b30..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/NotFoundException.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.provectus.kafka.ui.exception; - - -public class NotFoundException extends CustomBaseException { - - public NotFoundException(String message) { - super(message); - } - - @Override - public ErrorCode getErrorCode() { - return ErrorCode.ENTITY_NOT_FOUND; - } -} \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaNotFoundException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaNotFoundException.java new file mode 100644 index 0000000000..9faa18e970 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaNotFoundException.java @@ -0,0 +1,17 @@ +package com.provectus.kafka.ui.exception; + +public class SchemaNotFoundException extends CustomBaseException { + + public SchemaNotFoundException() { + super("Schema not found"); + } + + public SchemaNotFoundException(String message) { + super(message); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.SCHEMA_NOT_FOUND; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicNotFoundException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicNotFoundException.java new file mode 100644 index 0000000000..b516037e7e --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicNotFoundException.java @@ -0,0 +1,13 @@ +package com.provectus.kafka.ui.exception; + +public class TopicNotFoundException extends CustomBaseException { + + public TopicNotFoundException() { + super("Topic not found"); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.TOPIC_NOT_FOUND; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 3676d71b8c..18afa02d49 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.service; -import com.provectus.kafka.ui.exception.NotFoundException; +import com.provectus.kafka.ui.exception.ClusterNotFoundException; +import com.provectus.kafka.ui.exception.TopicNotFoundException; import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.model.Broker; import com.provectus.kafka.ui.model.BrokerMetrics; @@ -88,7 +89,7 @@ public class ClusterService { int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE); var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage; var cluster = clustersStorage.getClusterByName(name) - .orElseThrow(() -> new NotFoundException("No such cluster")); + .orElseThrow(ClusterNotFoundException::new); var totalPages = (cluster.getTopics().size() / perPage) + (cluster.getTopics().size() % perPage == 0 ? 0 : 1); return new TopicsResponse() @@ -178,11 +179,10 @@ public class ClusterService { } } - @SneakyThrows public Mono> getConsumerGroups(String clusterName) { - return clustersStorage.getClusterByName(clusterName) - .map(kafkaService::getConsumerGroups) - .orElse(Mono.empty()); + return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) + .switchIfEmpty(Mono.error(ClusterNotFoundException::new)) + .flatMap(kafkaService::getConsumerGroups); } public Flux getBrokers(String clusterName) { @@ -211,10 +211,10 @@ public class ClusterService { public Mono deleteTopic(String clusterName, String topicName) { var cluster = clustersStorage.getClusterByName(clusterName) - .orElseThrow(() -> new NotFoundException("No such cluster")); - getTopicDetails(clusterName, topicName) - .orElseThrow(() -> new NotFoundException("No such topic")); - return kafkaService.deleteTopic(cluster, topicName) + .orElseThrow(ClusterNotFoundException::new); + var topic = getTopicDetails(clusterName, topicName) + .orElseThrow(TopicNotFoundException::new); + return kafkaService.deleteTopic(cluster, topic.getName()) .doOnNext(t -> updateCluster(topicName, clusterName, cluster)); } @@ -243,9 +243,9 @@ public class ClusterService { public Mono deleteTopicMessages(String clusterName, String topicName, List partitions) { var cluster = clustersStorage.getClusterByName(clusterName) - .orElseThrow(() -> new NotFoundException("No such cluster")); + .orElseThrow(ClusterNotFoundException::new); if (!cluster.getTopics().containsKey(topicName)) { - throw new NotFoundException("No such topic"); + throw new TopicNotFoundException(); } return consumingService.offsetsForDeletion(cluster, topicName, partitions) .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets)); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java index 6559acce24..85e163f174 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java @@ -1,7 +1,8 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.client.KafkaConnectClients; -import com.provectus.kafka.ui.exception.NotFoundException; +import com.provectus.kafka.ui.exception.ClusterNotFoundException; +import com.provectus.kafka.ui.exception.ConnectNotFoundException; import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.mapper.KafkaConnectMapper; import com.provectus.kafka.ui.model.Connect; @@ -181,7 +182,7 @@ public class KafkaConnectService { private Mono getCluster(String clusterName) { return clustersStorage.getClusterByName(clusterName) .map(Mono::just) - .orElse(Mono.error(new NotFoundException("No such cluster"))); + .orElse(Mono.error(ClusterNotFoundException::new)); } private Mono getConnectAddress(String clusterName, String connectName) { @@ -194,7 +195,7 @@ public class KafkaConnectService { ) .flatMap(connect -> connect .map(Mono::just) - .orElse(Mono.error(new NotFoundException("No such connect cluster"))) + .orElse(Mono.error(ConnectNotFoundException::new)) ); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index 1a5e8c48c9..3483218725 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -351,6 +351,7 @@ public class KafkaService { .all(), topicCr.name()); } + @SuppressWarnings("deprecation") private Mono alterConfig(TopicFormData topicFormData, ConfigResource topicCr, ExtendedAdminClient ac) { List configEntries = topicFormData.getConfigs().entrySet().stream() @@ -359,7 +360,6 @@ public class KafkaService { Config config = new Config(configEntries); Map map = Collections.singletonMap(topicCr, config); return ClusterUtil.toMono(ac.getAdminClient().alterConfigs(map).all(), topicCr.name()); - } private InternalTopic mergeWithStats(InternalTopic topic, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java index e66b5cdf2f..f8eee0ca9d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java @@ -3,8 +3,9 @@ package com.provectus.kafka.ui.service; import static org.springframework.http.HttpStatus.NOT_FOUND; import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY; +import com.provectus.kafka.ui.exception.ClusterNotFoundException; import com.provectus.kafka.ui.exception.DuplicateEntityException; -import com.provectus.kafka.ui.exception.NotFoundException; +import com.provectus.kafka.ui.exception.SchemaNotFoundException; import com.provectus.kafka.ui.exception.UnprocessableEntityException; import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.model.CompatibilityCheckResponse; @@ -39,7 +40,6 @@ import reactor.core.publisher.Mono; public class SchemaRegistryService { public static final String NO_SUCH_SCHEMA_VERSION = "No such schema %s with version %s"; public static final String NO_SUCH_SCHEMA = "No such schema %s"; - public static final String NO_SUCH_CLUSTER = "No such cluster"; private static final String URL_SUBJECTS = "/subjects"; private static final String URL_SUBJECT = "/subjects/{schemaName}"; @@ -66,7 +66,7 @@ public class SchemaRegistryService { .bodyToMono(String[].class) .doOnError(log::error) ) - .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); + .orElse(Mono.error(ClusterNotFoundException::new)); } public Flux getAllVersionsBySubject(String clusterName, String subject) { @@ -82,7 +82,7 @@ public class SchemaRegistryService { .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA)) ).bodyToFlux(Integer.class) - ).orElse(Flux.error(new NotFoundException(NO_SUCH_CLUSTER))); + ).orElse(Flux.error(ClusterNotFoundException::new)); } public Mono getSchemaSubjectByVersion(String clusterName, String schemaName, @@ -113,7 +113,7 @@ public class SchemaRegistryService { return schema; }) ) - .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); + .orElse(Mono.error(ClusterNotFoundException::new)); } /** @@ -145,7 +145,7 @@ public class SchemaRegistryService { .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version)) ).toBodilessEntity() - ).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); + ).orElse(Mono.error(ClusterNotFoundException::new)); } public Mono> deleteSchemaSubjectEntirely(String clusterName, @@ -158,7 +158,7 @@ public class SchemaRegistryService { throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)) ) .toBodilessEntity()) - .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); + .orElse(Mono.error(ClusterNotFoundException::new)); } /** @@ -181,7 +181,7 @@ public class SchemaRegistryService { .flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistryUrl)) .flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject)) ) - .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); + .orElse(Mono.error(ClusterNotFoundException::new)); }); } @@ -219,7 +219,7 @@ public class SchemaRegistryService { @NotNull private Function> throwIfNotFoundStatus( String formatted) { - return resp -> Mono.error(new NotFoundException(formatted)); + return resp -> Mono.error(new SchemaNotFoundException(formatted)); } /** @@ -241,7 +241,7 @@ public class SchemaRegistryService { .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))) .bodyToMono(Void.class); - }).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); + }).orElse(Mono.error(ClusterNotFoundException::new)); } public Mono updateSchemaCompatibility(String clusterName, @@ -287,7 +287,7 @@ public class SchemaRegistryService { .bodyToMono(InternalCompatibilityCheck.class) .map(mapper::toCompatibilityCheckResponse) .log() - ).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); + ).orElse(Mono.error(ClusterNotFoundException::new)); } public String formatted(String str, Object... args) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java index 341a4fbf8b..3bad2dcea4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java @@ -158,9 +158,9 @@ public class ClusterUtil { topic.inSyncReplicas(inSyncReplicasCount); topic.replicationFactor( - topicDescription.partitions().size() > 0 - ? topicDescription.partitions().get(0).replicas().size() - : 0 + topicDescription.partitions().isEmpty() + ? 0 + : topicDescription.partitions().get(0).replicas().size() ); topic.underReplicatedPartitions(urpCount); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java index 3939666b2a..c0680708a0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java @@ -77,10 +77,9 @@ public class JmxClusterUtil { var attrNames = msc.getMBeanInfo(name).getAttributes(); for (MBeanAttributeInfo attrName : attrNames) { var value = msc.getAttribute(name, attrName.getName()); - if (value instanceof Number) { - if (!(value instanceof Double) || !((Double) value).isInfinite()) { - resultAttr.put(attrName.getName(), new BigDecimal(value.toString())); - } + if ((value instanceof Number) + && (!(value instanceof Double) || !((Double) value).isInfinite())) { + resultAttr.put(attrName.getName(), new BigDecimal(value.toString())); } } } catch (MalformedURLException url) { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsSeekTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsSeekTest.java index 8f5ec97ecc..550b2ce558 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsSeekTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsSeekTest.java @@ -57,7 +57,7 @@ class OffsetsSeekTest { new ConsumerPosition(SeekType.BEGINNING, Map.of(0, 0L, 1, 0L))); seek.assignAndSeek(consumer); assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1); - assertThat(consumer.position(tp0)).isEqualTo(0L); + assertThat(consumer.position(tp0)).isZero(); assertThat(consumer.position(tp1)).isEqualTo(10L); } @@ -68,9 +68,9 @@ class OffsetsSeekTest { new ConsumerPosition(SeekType.BEGINNING, Map.of())); seek.assignAndSeek(consumer); assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2, tp3); - assertThat(consumer.position(tp0)).isEqualTo(0L); + assertThat(consumer.position(tp0)).isZero(); assertThat(consumer.position(tp1)).isEqualTo(10L); - assertThat(consumer.position(tp2)).isEqualTo(0L); + assertThat(consumer.position(tp2)).isZero(); assertThat(consumer.position(tp3)).isEqualTo(25L); } @@ -81,7 +81,7 @@ class OffsetsSeekTest { new ConsumerPosition(SeekType.OFFSET, Map.of(0, 0L, 1, 1L, 2, 2L))); seek.assignAndSeek(consumer); assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2); - assertThat(consumer.position(tp0)).isEqualTo(0L); + assertThat(consumer.position(tp0)).isZero(); assertThat(consumer.position(tp1)).isEqualTo(1L); assertThat(consumer.position(tp2)).isEqualTo(2L); }