diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClients.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClients.java index c39a53497a..2dd2ed585f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClients.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClients.java @@ -6,6 +6,10 @@ import java.util.concurrent.ConcurrentHashMap; public final class KafkaConnectClients { + private KafkaConnectClients() { + + } + private static final Map CACHE = new ConcurrentHashMap<>(); public static KafkaConnectClientApi withBaseUrl(String basePath) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java index 39d0741865..9cdca46a3b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java @@ -20,7 +20,6 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; -import reactor.util.retry.RetryBackoffSpec; @Slf4j public class RetryingKafkaConnectClient extends KafkaConnectClientApi { @@ -32,7 +31,7 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi { } private static Retry conflictCodeRetry() { - return RetryBackoffSpec + return Retry .fixedDelay(MAX_RETRIES, RETRIES_DELAY) .filter(e -> e instanceof WebClientResponseException.Conflict) .onRetryExhaustedThrow((spec, signal) -> diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index 8befe58efc..cf00c9cd81 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -61,14 +61,14 @@ public class ClustersProperties { private void validateClusterNames() { // if only one cluster provided it is ok not to set name - if (clusters.size() == 1 && StringUtils.isEmpty(clusters.get(0).getName())) { + if (clusters.size() == 1 && !StringUtils.hasText(clusters.get(0).getName())) { clusters.get(0).setName("Default"); return; } Set clusterNames = new HashSet<>(); for (Cluster clusterProperties : clusters) { - if (StringUtils.isEmpty(clusterProperties.getName())) { + if (!StringUtils.hasText(clusterProperties.getName())) { throw new IllegalStateException( "Application config isn't valid. " + "Cluster names should be provided in case of multiple clusters present"); @@ -79,5 +79,4 @@ public class ClustersProperties { } } } - } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java index 2b256d6eb0..92174d2a47 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java @@ -1,6 +1,5 @@ package com.provectus.kafka.ui.config; -import com.fasterxml.jackson.databind.Module; import com.provectus.kafka.ui.model.JmxConnectionInfo; import com.provectus.kafka.ui.util.JmxPoolFactory; import java.util.Collections; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ReadOnlyModeFilter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ReadOnlyModeFilter.java index 41b6cc408f..8a8085cad6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ReadOnlyModeFilter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ReadOnlyModeFilter.java @@ -45,7 +45,7 @@ public class ReadOnlyModeFilter implements WebFilter { () -> new ClusterNotFoundException( String.format("No cluster for name '%s'", clusterName))); - if (!kafkaCluster.getReadOnly()) { + if (!kafkaCluster.isReadOnly()) { return chain.filter(exchange); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java index 5b835106f0..3d8757cada 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java @@ -2,7 +2,11 @@ package com.provectus.kafka.ui.config.auth; abstract class AbstractAuthSecurityConfig { - public static final String[] AUTH_WHITELIST = { + protected AbstractAuthSecurityConfig() { + + } + + protected static final String[] AUTH_WHITELIST = { "/css/**", "/js/**", "/media/**", diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/LdapSecurityConfig.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/LdapSecurityConfig.java index ebd7e09c24..0d629a8836 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/LdapSecurityConfig.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/LdapSecurityConfig.java @@ -43,11 +43,11 @@ public class LdapSecurityConfig extends AbstractAuthSecurityConfig { public ReactiveAuthenticationManager authenticationManager(BaseLdapPathContextSource contextSource) { BindAuthenticator ba = new BindAuthenticator(contextSource); if (ldapUserDnPattern != null) { - ba.setUserDnPatterns(new String[]{ldapUserDnPattern}); + ba.setUserDnPatterns(new String[] {ldapUserDnPattern}); } if (userFilterSearchFilter != null) { LdapUserSearch userSearch = - new FilterBasedLdapUserSearch(userFilterSearchBase, userFilterSearchFilter, contextSource); + new FilterBasedLdapUserSearch(userFilterSearchBase, userFilterSearchFilter, contextSource); ba.setUserSearch(userSearch); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/OAuthSecurityConfig.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/OAuthSecurityConfig.java index d2cbda6ece..657cab2644 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/OAuthSecurityConfig.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/OAuthSecurityConfig.java @@ -39,14 +39,14 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig { .authenticated(); if (IS_OAUTH2_PRESENT && OAuth2ClasspathGuard.shouldConfigure(this.context)) { - OAuth2ClasspathGuard.configure(this.context, http); + OAuth2ClasspathGuard.configure(http); } return http.csrf().disable().build(); } private static class OAuth2ClasspathGuard { - static void configure(ApplicationContext context, ServerHttpSecurity http) { + static void configure(ServerHttpSecurity http) { http .oauth2Login() .and() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AuthController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AuthController.java index 558b4dcf09..2dbe3593b9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AuthController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AuthController.java @@ -15,8 +15,8 @@ import reactor.core.publisher.Mono; @Slf4j public class AuthController { - @GetMapping(value = "/auth", produces = { "text/html" }) - private Mono getAuth(ServerWebExchange exchange) { + @GetMapping(value = "/auth", produces = {"text/html"}) + public Mono getAuth(ServerWebExchange exchange) { Mono token = exchange.getAttributeOrDefault(CsrfToken.class.getName(), Mono.empty()); return token .map(AuthController::csrfToken) @@ -30,25 +30,25 @@ public class AuthController { String contextPath = exchange.getRequest().getPath().contextPath().value(); String page = "\n" + "\n" + " \n" - + " \n" - + " \n" - + " \n" - + " \n" - + " Please sign in\n" - + " \n" - + " \n" - + " \n" - + " \n" - + "
\n" - + formLogin(queryParams, contextPath, csrfTokenHtmlInput) - + "
\n" - + " \n" - + ""; + + " \n" + + " \n" + + " \n" + + " \n" + + " Please sign in\n" + + " \n" + + " \n" + + " \n" + + " \n" + + "
\n" + + formLogin(queryParams, contextPath, csrfTokenHtmlInput) + + "
\n" + + " \n" + + ""; return page.getBytes(Charset.defaultCharset()); } @@ -61,21 +61,21 @@ public class AuthController { boolean isLogoutSuccess = queryParams.containsKey("logout"); return "
\n" - + " \n" - + createError(isError) - + createLogoutSuccess(isLogoutSuccess) - + "

\n" - + " \n" - + " \n" - + "

\n" + "

\n" - + " \n" - + " \n" - + "

\n" + csrfTokenHtmlInput - + " \n" - + "
\n"; + + " \n" + + createError(isError) + + createLogoutSuccess(isLogoutSuccess) + + "

\n" + + " \n" + + " \n" + + "

\n" + "

\n" + + " \n" + + " \n" + + "

\n" + csrfTokenHtmlInput + + " \n" + + " \n"; } private static String csrfToken(CsrfToken token) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java index cae7f9fffe..e0e1d5dc78 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java @@ -25,7 +25,7 @@ public class BrokersController extends AbstractController implements BrokersApi @Override public Mono> getBrokersMetrics(String clusterName, Integer id, - ServerWebExchange exchange) { + ServerWebExchange exchange) { return brokerService.getBrokerMetrics(getCluster(clusterName), id) .map(ResponseEntity::ok) .onErrorReturn(ResponseEntity.notFound().build()); @@ -33,14 +33,14 @@ public class BrokersController extends AbstractController implements BrokersApi @Override public Mono>> getBrokers(String clusterName, - ServerWebExchange exchange) { + ServerWebExchange exchange) { return Mono.just(ResponseEntity.ok(brokerService.getBrokers(getCluster(clusterName)))); } @Override public Mono>> getAllBrokersLogdirs(String clusterName, - List brokers, - ServerWebExchange exchange + List brokers, + ServerWebExchange exchange ) { return Mono.just(ResponseEntity.ok( brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokers))); @@ -48,7 +48,7 @@ public class BrokersController extends AbstractController implements BrokersApi @Override public Mono>> getBrokerConfig(String clusterName, Integer id, - ServerWebExchange exchange) { + ServerWebExchange exchange) { return Mono.just(ResponseEntity.ok( brokerService.getBrokerConfig(getCluster(clusterName), id))); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java index c06947308b..38b2b1dc65 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java @@ -21,7 +21,7 @@ public class ClustersController extends AbstractController implements ClustersAp @Override public Mono> getClusterMetrics(String clusterName, - ServerWebExchange exchange) { + ServerWebExchange exchange) { return clusterService.getClusterMetrics(getCluster(clusterName)) .map(ResponseEntity::ok) .onErrorReturn(ResponseEntity.notFound().build()); @@ -29,7 +29,7 @@ public class ClustersController extends AbstractController implements ClustersAp @Override public Mono> getClusterStats(String clusterName, - ServerWebExchange exchange) { + ServerWebExchange exchange) { return clusterService.getClusterStats(getCluster(clusterName)) .map(ResponseEntity::ok) .onErrorReturn(ResponseEntity.notFound().build()); @@ -42,7 +42,7 @@ public class ClustersController extends AbstractController implements ClustersAp @Override public Mono> updateClusterInfo(String clusterName, - ServerWebExchange exchange) { + ServerWebExchange exchange) { return clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java index 28ce1bbada..c0d4d0296c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java @@ -56,7 +56,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons @Override public Mono>> getConsumerGroups(String clusterName, - ServerWebExchange exchange) { + ServerWebExchange exchange) { return consumerGroupService.getAllConsumerGroups(getCluster(clusterName)) .map(Flux::fromIterable) .map(f -> f.map(ConsumerGroupMapper::toDto)) @@ -96,7 +96,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons } private ConsumerGroupsPageResponseDTO convertPage(ConsumerGroupService.ConsumerGroupsPage - consumerGroupConsumerGroupsPage) { + consumerGroupConsumerGroupsPage) { return new ConsumerGroupsPageResponseDTO() .pageCount(consumerGroupConsumerGroupsPage.getTotalPages()) .consumerGroups(consumerGroupConsumerGroupsPage.getConsumerGroups() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java index 9abc3884f9..8011fe8e5f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java @@ -28,7 +28,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC @Override public Mono>> getConnects(String clusterName, - ServerWebExchange exchange) { + ServerWebExchange exchange) { return kafkaConnectService.getConnects(getCluster(clusterName)).map(ResponseEntity::ok); } @@ -41,16 +41,16 @@ public class KafkaConnectController extends AbstractController implements KafkaC @Override public Mono> createConnector(String clusterName, String connectName, - @Valid Mono connector, - ServerWebExchange exchange) { + @Valid Mono connector, + ServerWebExchange exchange) { return kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector) .map(ResponseEntity::ok); } @Override public Mono> getConnector(String clusterName, String connectName, - String connectorName, - ServerWebExchange exchange) { + String connectorName, + ServerWebExchange exchange) { return kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName) .map(ResponseEntity::ok); } @@ -87,9 +87,9 @@ public class KafkaConnectController extends AbstractController implements KafkaC @Override public Mono> setConnectorConfig(String clusterName, String connectName, - String connectorName, - @Valid Mono requestBody, - ServerWebExchange exchange) { + String connectorName, + @Valid Mono requestBody, + ServerWebExchange exchange) { return kafkaConnectService .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody) .map(ResponseEntity::ok); @@ -135,8 +135,8 @@ public class KafkaConnectController extends AbstractController implements KafkaC @Override public Mono> validateConnectorPluginConfig( - String clusterName, String connectName, String pluginName, @Valid Mono requestBody, - ServerWebExchange exchange) { + String clusterName, String connectName, String pluginName, @Valid Mono requestBody, + ServerWebExchange exchange) { return kafkaConnectService .validateConnectorPluginConfig( getCluster(clusterName), connectName, pluginName, requestBody) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java index 152113d29d..face9206bc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java @@ -27,9 +27,9 @@ public class KsqlController extends AbstractController implements KsqlApi { @Override public Mono> executeKsqlCommand(String clusterName, - Mono - ksqlCommand, - ServerWebExchange exchange) { + Mono + ksqlCommand, + ServerWebExchange exchange) { return ksqlService.executeKsqlCommand(getCluster(clusterName), ksqlCommand) .map(ResponseEntity::ok); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/StaticController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/StaticController.java index c00500fdea..c2d6a10b36 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/StaticController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/StaticController.java @@ -1,13 +1,11 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.util.ResourceUtil; -import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.core.io.Resource; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; @@ -24,7 +22,7 @@ public class StaticController { private Resource indexFile; private final AtomicReference renderedIndexFile = new AtomicReference<>(); - @GetMapping(value = "/index.html", produces = { "text/html" }) + @GetMapping(value = "/index.html", produces = {"text/html"}) public Mono> getIndex(ServerWebExchange exchange) { return Mono.just(ResponseEntity.ok(getRenderedIndexFile(exchange))); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index 6d58f5eaf6..b05d19d853 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -43,7 +43,7 @@ public class TopicsController extends AbstractController implements TopicsApi { public Mono> recreateTopic(String clusterName, String topicName, ServerWebExchange serverWebExchange) { return topicsService.recreateTopic(getCluster(clusterName), topicName) - .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED)); + .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED)); } @Override @@ -70,12 +70,12 @@ public class TopicsController extends AbstractController implements TopicsApi { @Override public Mono> getTopics(String clusterName, @Valid Integer page, - @Valid Integer perPage, - @Valid Boolean showInternal, - @Valid String search, - @Valid TopicColumnsToSortDTO orderBy, - @Valid SortOrderDTO sortOrder, - ServerWebExchange exchange) { + @Valid Integer perPage, + @Valid Boolean showInternal, + @Valid String search, + @Valid TopicColumnsToSortDTO orderBy, + @Valid SortOrderDTO sortOrder, + ServerWebExchange exchange) { return topicsService .getTopics( getCluster(clusterName), @@ -101,10 +101,9 @@ public class TopicsController extends AbstractController implements TopicsApi { String clusterName, String topicName, Mono partitionsIncrease, ServerWebExchange exchange) { - return partitionsIncrease.flatMap( - partitions -> - topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions)) - .map(ResponseEntity::ok); + return partitionsIncrease.flatMap(partitions -> + topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions) + ).map(ResponseEntity::ok); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java index 76e46138b6..9b7905fe3e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java @@ -19,7 +19,7 @@ public abstract class AbstractEmitter { private final RecordSerDe recordDeserializer; private final ConsumingStats consumingStats = new ConsumingStats(); - public AbstractEmitter(RecordSerDe recordDeserializer) { + protected AbstractEmitter(RecordSerDe recordDeserializer) { this.recordDeserializer = recordDeserializer; } @@ -33,7 +33,7 @@ public abstract class AbstractEmitter { } protected FluxSink sendMessage(FluxSink sink, - ConsumerRecord msg) { + ConsumerRecord msg) { final TopicMessageDTO topicMessage = ClusterUtil.mapToTopicMessage(msg, recordDeserializer); return sink.next( new TopicMessageEventDTO() @@ -45,8 +45,8 @@ public abstract class AbstractEmitter { protected void sendPhase(FluxSink sink, String name) { sink.next( new TopicMessageEventDTO() - .type(TopicMessageEventDTO.TypeEnum.PHASE) - .phase(new TopicMessagePhaseDTO().name(name)) + .type(TopicMessageEventDTO.TypeEnum.PHASE) + .phase(new TopicMessagePhaseDTO().name(name)) ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java index f21c1533d0..302cb9879b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java @@ -87,7 +87,7 @@ public class BackwardRecordEmitter // This is workaround for case when partition begin offset is less than // real minimal offset, usually appear in compcated topics - if (records.count() > 0 && partitionRecords.isEmpty()) { + if (records.count() > 0 && partitionRecords.isEmpty()) { waitingOffsets.markPolled(entry.getKey().partition()); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java index 359d49386a..830eb87320 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java @@ -15,15 +15,15 @@ class ConsumingStats { private long elapsed = 0; void sendConsumingEvt(FluxSink sink, - ConsumerRecords polledRecords, - long elapsed) { - for (ConsumerRecord record : polledRecords) { - for (Header header : record.headers()) { + ConsumerRecords polledRecords, + long elapsed) { + for (ConsumerRecord rec : polledRecords) { + for (Header header : rec.headers()) { bytes += (header.key() != null ? header.key().getBytes().length : 0L) + (header.value() != null ? header.value().length : 0L); } - bytes += record.serializedKeySize() + record.serializedValueSize(); + bytes += rec.serializedKeySize() + rec.serializedValueSize(); } this.records += polledRecords.count(); this.elapsed += elapsed; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java index ee14f1b66d..446f166bfe 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java @@ -18,6 +18,9 @@ public class MessageFilters { private static GroovyScriptEngineImpl GROOVY_ENGINE; + private MessageFilters() { + } + public static Predicate createMsgFilter(String query, MessageFilterTypeDTO type) { switch (type) { case STRING_CONTAINS: diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/CustomBaseException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/CustomBaseException.java index 9b43e699b7..550243b82b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/CustomBaseException.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/CustomBaseException.java @@ -19,7 +19,7 @@ public abstract class CustomBaseException extends RuntimeException { } protected CustomBaseException(String message, Throwable cause, boolean enableSuppression, - boolean writableStackTrace) { + boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java index d6ed8b8a93..2c801d3d05 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java @@ -34,7 +34,7 @@ public enum ErrorCode { for (ErrorCode value : ErrorCode.values()) { if (!codes.add(value.code())) { LoggerFactory.getLogger(ErrorCode.class) - .warn("Multiple {} values refer to code {}", ErrorCode.class, value.code); + .warn("Multiple {} values refer to code {}", ErrorCode.class, value.code); } } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaTypeIsNotSupportedException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaTypeIsNotSupportedException.java deleted file mode 100644 index eabaaf97e5..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaTypeIsNotSupportedException.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.provectus.kafka.ui.exception; - -public class SchemaTypeIsNotSupportedException extends UnprocessableEntityException { - - private static final String REQUIRED_SCHEMA_REGISTRY_VERSION = "5.5.0"; - - public SchemaTypeIsNotSupportedException() { - super(String.format("Current version of Schema Registry does " - + "not support provided schema type," - + " version %s or later is required here.", REQUIRED_SCHEMA_REGISTRY_VERSION)); - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaTypeNotSupportedException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaTypeNotSupportedException.java new file mode 100644 index 0000000000..9fd7a06af8 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaTypeNotSupportedException.java @@ -0,0 +1,12 @@ +package com.provectus.kafka.ui.exception; + +public class SchemaTypeNotSupportedException extends UnprocessableEntityException { + + private static final String REQUIRED_SCHEMA_REGISTRY_VERSION = "5.5.0"; + + public SchemaTypeNotSupportedException() { + super(String.format("Current version of Schema Registry does " + + "not support provided schema type," + + " version %s or later is required here.", REQUIRED_SCHEMA_REGISTRY_VERSION)); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicRecreationException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicRecreationException.java index 0eca1fb62d..0425ed1157 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicRecreationException.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicRecreationException.java @@ -8,6 +8,6 @@ public class TopicRecreationException extends CustomBaseException { public TopicRecreationException(String topicName, int seconds) { super(String.format("Can't create topic '%s' in %d seconds: " - + "topic deletion is still in progress", topicName, seconds)); + + "topic deletion is still in progress", topicName, seconds)); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java index 192058449f..d04484b381 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java @@ -17,6 +17,9 @@ import org.apache.kafka.common.TopicPartition; public class ConsumerGroupMapper { + private ConsumerGroupMapper() { + } + public static ConsumerGroupDTO toDto(InternalConsumerGroup c) { return convertToConsumerGroup(c, new ConsumerGroupDTO()); } @@ -47,7 +50,7 @@ public class ConsumerGroupMapper { for (TopicPartition topicPartition : member.getAssignment()) { final ConsumerGroupTopicPartitionDTO partition = partitionMap.computeIfAbsent( topicPartition, - (tp) -> new ConsumerGroupTopicPartitionDTO() + tp -> new ConsumerGroupTopicPartitionDTO() .topic(tp.topic()) .partition(tp.partition()) ); @@ -99,12 +102,18 @@ public class ConsumerGroupMapper { private static ConsumerGroupStateDTO mapConsumerGroupState( org.apache.kafka.common.ConsumerGroupState state) { switch (state) { - case DEAD: return ConsumerGroupStateDTO.DEAD; - case EMPTY: return ConsumerGroupStateDTO.EMPTY; - case STABLE: return ConsumerGroupStateDTO.STABLE; - case PREPARING_REBALANCE: return ConsumerGroupStateDTO.PREPARING_REBALANCE; - case COMPLETING_REBALANCE: return ConsumerGroupStateDTO.COMPLETING_REBALANCE; - default: return ConsumerGroupStateDTO.UNKNOWN; + case DEAD: + return ConsumerGroupStateDTO.DEAD; + case EMPTY: + return ConsumerGroupStateDTO.EMPTY; + case STABLE: + return ConsumerGroupStateDTO.STABLE; + case PREPARING_REBALANCE: + return ConsumerGroupStateDTO.PREPARING_REBALANCE; + case COMPLETING_REBALANCE: + return ConsumerGroupStateDTO.COMPLETING_REBALANCE; + default: + return ConsumerGroupStateDTO.UNKNOWN; } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java index 3b029fa2d3..c7e66d0f45 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java @@ -25,7 +25,7 @@ public class DescribeLogDirsMapper { } private BrokersLogdirsDTO toBrokerLogDirs(Integer broker, String dirName, - DescribeLogDirsResponse.LogDirInfo logDirInfo) { + DescribeLogDirsResponse.LogDirInfo logDirInfo) { BrokersLogdirsDTO result = new BrokersLogdirsDTO(); result.setName(dirName); if (logDirInfo.error != null) { @@ -40,8 +40,8 @@ public class DescribeLogDirsMapper { } private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name, - List> partitions) { + List> partitions) { BrokerTopicLogdirsDTO topic = new BrokerTopicLogdirsDTO(); topic.setName(name); topic.setPartitions( @@ -53,8 +53,8 @@ public class DescribeLogDirsMapper { } private BrokerTopicPartitionLogdirDTO topicPartitionLogDir(Integer broker, Integer partition, - DescribeLogDirsResponse.ReplicaInfo - replicaInfo) { + DescribeLogDirsResponse.ReplicaInfo + replicaInfo) { BrokerTopicPartitionLogdirDTO logDir = new BrokerTopicPartitionLogdirDTO(); logDir.setBroker(broker); logDir.setPartition(partition); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/CleanupPolicy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/CleanupPolicy.java index 3f7cdfca4c..809dffa4c1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/CleanupPolicy.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/CleanupPolicy.java @@ -1,6 +1,5 @@ package com.provectus.kafka.ui.model; -import com.provectus.kafka.ui.exception.IllegalEntityStateException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -11,24 +10,24 @@ public enum CleanupPolicy { COMPACT_DELETE(Arrays.asList("compact,delete", "delete,compact")), UNKNOWN("unknown"); - private final List cleanUpPolicy; + private final List policies; - CleanupPolicy(String cleanUpPolicy) { - this(Collections.singletonList(cleanUpPolicy)); + CleanupPolicy(String policy) { + this(Collections.singletonList(policy)); } - CleanupPolicy(List cleanUpPolicy) { - this.cleanUpPolicy = cleanUpPolicy; + CleanupPolicy(List policies) { + this.policies = policies; } - public String getCleanUpPolicy() { - return cleanUpPolicy.get(0); + public String getPolicy() { + return policies.get(0); } public static CleanupPolicy fromString(String string) { return Arrays.stream(CleanupPolicy.values()) .filter(v -> - v.cleanUpPolicy.stream().anyMatch( + v.policies.stream().anyMatch( s -> s.equals(string.replace(" ", "") ) ) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java index b46b756e9c..756a2740c6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java @@ -38,7 +38,7 @@ public class InternalClusterMetrics { // zk stats @Deprecated //use 'zookeeperStatus' field with enum type instead - private final int zooKeeperStatus; + private final int zooKeeperStatusEnum; private final ServerStatusDTO zookeeperStatus; private final Throwable lastZookeeperException; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java index 8626047c4c..3fe6ebe3dc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java @@ -67,7 +67,7 @@ public class InternalClusterState { inSyncReplicasCount = partitionsStats.getInSyncReplicasCount(); outOfSyncReplicasCount = partitionsStats.getOutOfSyncReplicasCount(); underReplicatedPartitionCount = partitionsStats.getUnderReplicatedPartitionCount(); - readOnly = cluster.getReadOnly(); + readOnly = cluster.isReadOnly(); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java index 2a16bea48a..df5df2ef1f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java @@ -36,7 +36,7 @@ public class InternalConsumerGroup { } public static InternalConsumerGroup create( - ConsumerGroupDescription description, + ConsumerGroupDescription description, Map groupOffsets, Map topicEndOffsets) { var builder = InternalConsumerGroup.builder(); @@ -65,21 +65,21 @@ public class InternalConsumerGroup { // removes data for all partitions that are not fit filter public InternalConsumerGroup retainDataForPartitions(Predicate partitionsFilter) { - var offsets = getOffsets().entrySet().stream() + var offsetsMap = getOffsets().entrySet().stream() .filter(e -> partitionsFilter.test(e.getKey())) .collect(Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue )); - var members = getMembers().stream() + var nonEmptyMembers = getMembers().stream() .map(m -> filterConsumerMemberTopic(m, partitionsFilter)) .filter(m -> !m.getAssignment().isEmpty()) .collect(Collectors.toList()); return toBuilder() - .offsets(offsets) - .members(members) + .offsets(offsetsMap) + .members(nonEmptyMembers) .build(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java index 0606579d78..5516c05a6d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java @@ -12,7 +12,7 @@ public class InternalSchemaRegistry { private final List url; public String getFirstUrl() { - return url != null && !url.isEmpty() ? url.iterator().next() : null; + return url != null && !url.isEmpty() ? url.iterator().next() : null; } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index 56bbabdf89..1a97fe13a0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -30,6 +30,6 @@ public class KafkaCluster { private final String protobufMessageName; private final Map protobufMessageNameByTopic; private final Properties properties; - private final Boolean readOnly; - private final Boolean disableLogDirsCollection; + private final boolean readOnly; + private final boolean disableLogDirsCollection; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/schemaregistry/InternalNewSchema.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/schemaregistry/InternalNewSchema.java index 92fe3f5b9e..ba5721724a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/schemaregistry/InternalNewSchema.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/schemaregistry/InternalNewSchema.java @@ -2,7 +2,6 @@ package com.provectus.kafka.ui.model.schemaregistry; import com.fasterxml.jackson.annotation.JsonInclude; import com.provectus.kafka.ui.model.SchemaTypeDTO; -import com.provectus.kafka.ui.model.SchemaTypeDTO; import lombok.Data; @Data diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java index 2aa93bfa4d..7553535999 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java @@ -87,8 +87,8 @@ public class ProtobufFileRecordSerDe implements RecordSerDe { @SneakyThrows private String parse(byte[] value, Descriptor descriptor) { DynamicMessage protoMsg = DynamicMessage.parseFrom( - descriptor, - new ByteArrayInputStream(value) + descriptor, + new ByteArrayInputStream(value) ); byte[] jsonFromProto = ProtobufSchemaUtils.toJson(protoMsg); return new String(jsonFromProto); @@ -121,8 +121,8 @@ public class ProtobufFileRecordSerDe implements RecordSerDe { public TopicMessageSchemaDTO getTopicSchema(String topic) { final JsonSchema jsonSchema = schemaConverter.convert( - protobufSchemaPath.toUri(), - getDescriptor(topic) + protobufSchemaPath.toUri(), + getDescriptor(topic) ); final MessageSchemaDTO keySchema = new MessageSchemaDTO() .name(protobufSchema.fullName()) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/RecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/RecordSerDe.java index f3792179e0..af5a188e3d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/RecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/RecordSerDe.java @@ -14,12 +14,18 @@ public interface RecordSerDe { @Value @Builder class DeserializedKeyValue { - @Nullable String key; - @Nullable String value; - @Nullable MessageFormat keyFormat; - @Nullable MessageFormat valueFormat; - @Nullable String keySchemaId; - @Nullable String valueSchemaId; + @Nullable + String key; + @Nullable + String value; + @Nullable + MessageFormat keyFormat; + @Nullable + MessageFormat valueFormat; + @Nullable + String keySchemaId; + @Nullable + String valueSchemaId; } DeserializedKeyValue deserialize(ConsumerRecord msg); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/AvroMessageReader.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/AvroMessageReader.java index fcf5173a27..7f2efbbd64 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/AvroMessageReader.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/AvroMessageReader.java @@ -6,8 +6,8 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; -import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import java.io.IOException; import java.util.Map; import org.apache.kafka.common.serialization.Serializer; @@ -27,8 +27,8 @@ public class AvroMessageReader extends MessageReader { serializer.configure( Map.of( "schema.registry.url", "wontbeused", - KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false, - KafkaAvroSerializerConfig.USE_LATEST_VERSION, true + AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false, + AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true ), isKey ); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageReader.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageReader.java index 1cecdf7fd5..de56ed462e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageReader.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageReader.java @@ -10,8 +10,8 @@ import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer; -import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig; import java.io.IOException; import java.util.Map; import org.apache.kafka.common.serialization.Serializer; @@ -33,8 +33,8 @@ public class JsonSchemaMessageReader extends MessageReader { serializer.configure( Map.of( "schema.registry.url", "wontbeused", - KafkaJsonSchemaSerializerConfig.AUTO_REGISTER_SCHEMAS, false, - KafkaJsonSchemaSerializerConfig.USE_LATEST_VERSION, true + AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false, + AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true ), isKey ); @@ -69,10 +69,10 @@ public class JsonSchemaMessageReader extends MessageReader { * possible in our case. So, we just skip all infer logic and pass schema directly. */ @Override - public byte[] serialize(String topic, JsonNode record) { + public byte[] serialize(String topic, JsonNode rec) { return super.serializeImpl( - super.getSubjectName(topic, isKey, record, schema), - record, + super.getSubjectName(topic, isKey, rec, schema), + rec, (JsonSchema) schema ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java index ce3150467c..faa9cde049 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java @@ -8,8 +8,8 @@ import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; -import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig; import java.io.IOException; import java.util.Map; import org.apache.kafka.common.serialization.Serializer; @@ -28,8 +28,8 @@ public class ProtobufMessageReader extends MessageReader { serializer.configure( Map.of( "schema.registry.url", "wontbeused", - KafkaProtobufSerializerConfig.AUTO_REGISTER_SCHEMAS, false, - KafkaProtobufSerializerConfig.USE_LATEST_VERSION, true + AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false, + AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true ), isKey ); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java index ab3c7a0213..4a903598c4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java @@ -113,7 +113,7 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { DeserializedKeyValueBuilder builder) { Optional schemaId = extractSchemaIdFromMsg(rec, isKey); Optional format = schemaId.flatMap(this::getMessageFormatBySchemaId); - if (format.isPresent() && schemaRegistryFormatters.containsKey(format.get())) { + if (schemaId.isPresent() && format.isPresent() && schemaRegistryFormatters.containsKey(format.get())) { var formatter = schemaRegistryFormatters.get(format.get()); try { var deserialized = formatter.format(rec.topic(), isKey ? rec.key().get() : rec.value().get()); @@ -135,12 +135,13 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { // fallback if (isKey) { - builder.key(FALLBACK_FORMATTER.format(rec.topic(), isKey ? rec.key().get() : rec.value().get())); + builder.key(FALLBACK_FORMATTER.format(rec.topic(), rec.key().get())); builder.keyFormat(FALLBACK_FORMATTER.getFormat()); } else { - builder.value(FALLBACK_FORMATTER.format(rec.topic(), isKey ? rec.key().get() : rec.value().get())); + builder.value(FALLBACK_FORMATTER.format(rec.topic(), rec.value().get())); builder.valueFormat(FALLBACK_FORMATTER.getFormat()); } + } @Override @@ -202,14 +203,14 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { final MessageSchemaDTO keySchema = new MessageSchemaDTO() .name(maybeKeySchema.map( - (s) -> schemaSubject(topic, true) + s -> schemaSubject(topic, true) ).orElse("unknown")) .source(MessageSchemaDTO.SourceEnum.SCHEMA_REGISTRY) .schema(sourceKeySchema); final MessageSchemaDTO valueSchema = new MessageSchemaDTO() .name(maybeValueSchema.map( - (s) -> schemaSubject(topic, false) + s -> schemaSubject(topic, false) ).orElse("unknown")) .source(MessageSchemaDTO.SourceEnum.SCHEMA_REGISTRY) .schema(sourceValueSchema); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java index 0f5085eb33..b5b3aa1680 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java @@ -105,7 +105,7 @@ public class BrokerService { Map req = Map.of( new TopicPartitionReplica(b.getTopic(), b.getPartition(), broker), b.getLogDir()); - return admin.alterReplicaLogDirs(req) + return admin.alterReplicaLogDirs(req) .onErrorResume(UnknownTopicOrPartitionException.class, e -> Mono.error(new TopicOrPartitionNotFoundException())) .onErrorResume(LogDirNotFoundException.class, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java index 790dcd0e61..cba51a921a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java @@ -11,7 +11,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; -import java.util.function.Function; +import java.util.function.ToIntFunction; import java.util.stream.Collectors; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; @@ -141,18 +141,25 @@ public class ConsumerGroupService { case NAME: return Comparator.comparing(ConsumerGroupDescription::groupId); case STATE: - Function statesPriorities = cg -> { + ToIntFunction statesPriorities = cg -> { switch (cg.state()) { - case STABLE: return 0; - case COMPLETING_REBALANCE: return 1; - case PREPARING_REBALANCE: return 2; - case EMPTY: return 3; - case DEAD: return 4; - case UNKNOWN: return 5; - default: return 100; + case STABLE: + return 0; + case COMPLETING_REBALANCE: + return 1; + case PREPARING_REBALANCE: + return 2; + case EMPTY: + return 3; + case DEAD: + return 4; + case UNKNOWN: + return 5; + default: + return 100; } }; - return Comparator.comparingInt(statesPriorities::apply); + return Comparator.comparingInt(statesPriorities); case MEMBERS: return Comparator.comparingInt(cg -> -cg.members().size()); default: diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index b7d4c0881a..24a824fb16 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -44,7 +44,7 @@ public class FeatureService { if (controller != null) { features.add( isTopicDeletionEnabled(cluster, controller) - .flatMap(r -> r ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty()) + .flatMap(r -> Boolean.TRUE.equals(r) ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty()) ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConfigSanitizer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConfigSanitizer.java index a09ae48161..30daa1ca57 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConfigSanitizer.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConfigSanitizer.java @@ -30,7 +30,7 @@ class KafkaConfigSanitizer extends Sanitizer { var keysToSanitize = new HashSet<>( patternsToSanitize.isEmpty() ? DEFAULT_PATTERNS_TO_SANITIZE : patternsToSanitize); keysToSanitize.addAll(kafkaConfigKeysToSanitize()); - setKeysToSanitize(keysToSanitize.toArray(new String[]{})); + setKeysToSanitize(keysToSanitize.toArray(new String[] {})); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java index 1c3d247492..393a219d8e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java @@ -103,19 +103,19 @@ public class KafkaConnectService { } private Predicate matchesSearchTerm(final String search) { - return (connector) -> getSearchValues(connector) + return connector -> getSearchValues(connector) .anyMatch(value -> value.contains( StringUtils.defaultString( - search, - StringUtils.EMPTY) + search, + StringUtils.EMPTY) .toUpperCase())); } private Stream getSearchValues(FullConnectorInfoDTO fullConnectorInfo) { return Stream.of( - fullConnectorInfo.getName(), - fullConnectorInfo.getStatus().getState().getValue(), - fullConnectorInfo.getType().getValue()) + fullConnectorInfo.getName(), + fullConnectorInfo.getStatus().getState().getValue(), + fullConnectorInfo.getType().getValue()) .map(String::toUpperCase); } @@ -158,7 +158,7 @@ public class KafkaConnectService { connector .flatMap(c -> connectorExists(cluster, connectName, c.getName()) .map(exists -> { - if (exists) { + if (Boolean.TRUE.equals(exists)) { throw new ValidationException( String.format("Connector with name %s already exists", c.getName())); } @@ -179,7 +179,7 @@ public class KafkaConnectService { } public Mono getConnector(KafkaCluster cluster, String connectName, - String connectorName) { + String connectorName) { return withConnectClient(cluster, connectName) .flatMap(client -> client.getConnector(connectorName) .map(kafkaConnectMapper::fromClient) @@ -240,8 +240,8 @@ public class KafkaConnectService { } public Mono setConnectorConfig(KafkaCluster cluster, String connectName, - String connectorName, Mono requestBody) { - return withConnectClient(cluster, connectName) + String connectorName, Mono requestBody) { + return withConnectClient(cluster, connectName) .flatMap(c -> requestBody .flatMap(body -> c.setConnectorConfig(connectorName, (Map) body)) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java index 7fe54bf03b..4e970dc0c8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java @@ -20,7 +20,7 @@ public class KsqlService { private final List ksqlStatementStrategies; public Mono executeKsqlCommand(KafkaCluster cluster, - Mono ksqlCommand) { + Mono ksqlCommand) { return Mono.justOrEmpty(cluster) .map(KafkaCluster::getKsqldbServer) .onErrorResume(e -> { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index 0a5dc14737..34eca2ac3f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -84,7 +84,7 @@ public class MessagesService { CreateTopicMessageDTO msg) { if (msg.getPartition() != null && msg.getPartition() > metricsCache.get(cluster).getTopicDescriptions() - .get(topic).partitions().size() - 1) { + .get(topic).partitions().size() - 1) { throw new ValidationException("Invalid partition"); } RecordSerDe serde = diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java index f6bf6b552d..565d6e74bc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java @@ -60,7 +60,7 @@ public class MetricsService { } private Mono getLogDirInfo(KafkaCluster cluster, ReactiveAdminClient c) { - if (cluster.getDisableLogDirsCollection() == null || !cluster.getDisableLogDirsCollection()) { + if (!cluster.isDisableLogDirsCollection()) { return c.describeLogDirs().map(InternalLogDirStats::new); } return Mono.just(InternalLogDirStats.empty()); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java index eda4a4dc36..8e04b1ebac 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java @@ -5,7 +5,7 @@ import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY; import com.provectus.kafka.ui.exception.SchemaFailedToDeleteException; import com.provectus.kafka.ui.exception.SchemaNotFoundException; -import com.provectus.kafka.ui.exception.SchemaTypeIsNotSupportedException; +import com.provectus.kafka.ui.exception.SchemaTypeNotSupportedException; import com.provectus.kafka.ui.exception.UnprocessableEntityException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.mapper.ClusterMapper; @@ -212,7 +212,7 @@ public class SchemaRegistryService { .onStatus(UNPROCESSABLE_ENTITY::equals, r -> r.bodyToMono(ErrorResponse.class) .flatMap(x -> Mono.error(isUnrecognizedFieldSchemaTypeMessage(x.getMessage()) - ? new SchemaTypeIsNotSupportedException() + ? new SchemaTypeNotSupportedException() : new UnprocessableEntityException(x.getMessage())))) .bodyToMono(SubjectIdResponse.class); } @@ -294,7 +294,9 @@ public class SchemaRegistryService { } public String formatted(String str, Object... args) { - return new Formatter().format(str, args).toString(); + try (Formatter formatter = new Formatter()) { + return formatter.format(str, args).toString(); + } } private void setBasicAuthIfEnabled(InternalSchemaRegistry schemaRegistry, HttpHeaders headers) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java index f0929b1ac3..ae57a668a9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java @@ -80,14 +80,14 @@ public class TopicsService { Optional sortBy, Optional sortOrder) { return adminClientService.get(cluster).flatMap(ac -> - new Pagination(ac, metricsCache.get(cluster)) - .getPage(pageNum, nullablePerPage, showInternal, search, sortBy, sortOrder) - .flatMap(page -> - loadTopics(cluster, page.getTopics()) - .map(topics -> - new TopicsResponseDTO() - .topics(topics.stream().map(clusterMapper::toTopic).collect(toList())) - .pageCount(page.getTotalPages())))); + new Pagination(ac, metricsCache.get(cluster)) + .getPage(pageNum, nullablePerPage, showInternal, search, sortBy, sortOrder) + .flatMap(page -> + loadTopics(cluster, page.getTopics()) + .map(topics -> + new TopicsResponseDTO() + .topics(topics.stream().map(clusterMapper::toTopic).collect(toList())) + .pageCount(page.getTotalPages())))); } private Mono> loadTopics(KafkaCluster c, List topics) { @@ -193,31 +193,31 @@ public class TopicsService { public Mono recreateTopic(KafkaCluster cluster, String topicName) { return loadTopic(cluster, topicName) - .flatMap(t -> deleteTopic(cluster, topicName) - .thenReturn(t).delayElement(Duration.ofSeconds(recreateDelayInSeconds)) - .flatMap(topic -> adminClientService.get(cluster).flatMap(ac -> ac.createTopic(topic.getName(), - topic.getPartitionCount(), - (short) topic.getReplicationFactor(), - topic.getTopicConfigs() - .stream() - .collect(Collectors - .toMap(InternalTopicConfig::getName, - InternalTopicConfig::getValue))) - .thenReturn(topicName)) - .retryWhen(Retry.fixedDelay(recreateMaxRetries, - Duration.ofSeconds(recreateDelayInSeconds)) - .filter(throwable -> throwable instanceof TopicExistsException) - .onRetryExhaustedThrow((a, b) -> - new TopicRecreationException(topicName, - recreateMaxRetries * recreateDelayInSeconds))) - .flatMap(a -> loadTopic(cluster, topicName)).map(clusterMapper::toTopic) - ) - ); + .flatMap(t -> deleteTopic(cluster, topicName) + .thenReturn(t).delayElement(Duration.ofSeconds(recreateDelayInSeconds)) + .flatMap(topic -> adminClientService.get(cluster).flatMap(ac -> ac.createTopic(topic.getName(), + topic.getPartitionCount(), + (short) topic.getReplicationFactor(), + topic.getTopicConfigs() + .stream() + .collect(Collectors + .toMap(InternalTopicConfig::getName, + InternalTopicConfig::getValue))) + .thenReturn(topicName)) + .retryWhen(Retry.fixedDelay(recreateMaxRetries, + Duration.ofSeconds(recreateDelayInSeconds)) + .filter(TopicExistsException.class::isInstance) + .onRetryExhaustedThrow((a, b) -> + new TopicRecreationException(topicName, + recreateMaxRetries * recreateDelayInSeconds))) + .flatMap(a -> loadTopic(cluster, topicName)).map(clusterMapper::toTopic) + ) + ); } private Mono updateTopic(KafkaCluster cluster, - String topicName, - TopicUpdateDTO topicUpdate) { + String topicName, + TopicUpdateDTO topicUpdate) { return adminClientService.get(cluster) .flatMap(ac -> ac.updateTopicConfig(topicName, topicUpdate.getConfigs()) @@ -403,10 +403,11 @@ public class TopicsService { ); return ac.createPartitions(newPartitionsMap) .then(loadTopic(cluster, topicName)); - }) - .map(t -> new PartitionsIncreaseResponseDTO() + }).map(t -> new PartitionsIncreaseResponseDTO() .topicName(t.getName()) - .totalPartitionsCount(t.getPartitionCount()))); + .totalPartitionsCount(t.getPartitionCount()) + ) + ); } public Mono deleteTopic(KafkaCluster cluster, String topicName) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java index d7563fc458..03638afc7f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java @@ -15,7 +15,6 @@ import org.apache.zookeeper.ZooKeeper; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; @Service @RequiredArgsConstructor @@ -82,10 +81,11 @@ public class ZookeeperService { private ZooKeeper createClient(KafkaCluster cluster) { try { - return new ZooKeeper(cluster.getZookeeper(), 60 * 1000, watchedEvent -> {}); + return new ZooKeeper(cluster.getZookeeper(), 60 * 1000, watchedEvent -> { + }); } catch (IOException e) { log.error("Error while creating a zookeeper client for cluster [{}]", - cluster.getName()); + cluster.getName()); throw new ZooKeeperException(e); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java index 0604b9a9ff..ebd5bd86a5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java @@ -101,7 +101,7 @@ public class KsqlApiClient { if (parsed.getStatements().size() > 1) { throw new ValidationException("Only single statement supported now"); } - if (parsed.getStatements().size() == 0) { + if (parsed.getStatements().isEmpty()) { throw new ValidationException("No valid ksql statement found"); } if (KsqlGrammar.isSelect(parsed.getStatements().get(0))) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlGrammar.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlGrammar.java index efb759258c..a98ac20bb5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlGrammar.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlGrammar.java @@ -18,6 +18,9 @@ import org.antlr.v4.runtime.atn.PredictionMode; class KsqlGrammar { + private KsqlGrammar() { + } + @Value static class KsqlStatements { List statements; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/DynamicParser.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/DynamicParser.java index c86a3293ff..384a73af38 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/DynamicParser.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/DynamicParser.java @@ -12,6 +12,9 @@ import java.util.stream.StreamSupport; class DynamicParser { + private DynamicParser() { + } + static KsqlResponseTable parseArray(String tableName, JsonNode array) { return parseArray(tableName, getFieldNamesFromArray(array), array); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java index 7e0d8cb483..eccc19f0f6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java @@ -14,6 +14,9 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti public class ResponseParser { + private ResponseParser() { + } + public static Optional parseSelectResponse(JsonNode jsonNode) { // in response we getting either header record or row data if (arrayFieldNonEmpty(jsonNode, "header")) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java index c39135c49e..48315dd0ce 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java @@ -18,6 +18,9 @@ import org.apache.kafka.common.utils.Bytes; @Slf4j public class ClusterUtil { + private ClusterUtil() { + } + private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); public static int convertToIntServerStatus(ServerStatusDTO serverStatus) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java index 834e346d88..b0b2b36d81 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java @@ -86,14 +86,14 @@ public class JmxClusterUtil { @SneakyThrows private List getJmxMetrics(String host, int port, boolean jmxSsl, - @Nullable String username, @Nullable String password) { + @Nullable String username, @Nullable String password) { String jmxUrl = JMX_URL + host + ":" + port + "/" + JMX_SERVICE_TYPE; final var connectionInfo = JmxConnectionInfo.builder() - .url(jmxUrl) - .ssl(jmxSsl) - .username(username) - .password(password) - .build(); + .url(jmxUrl) + .ssl(jmxSsl) + .username(username) + .password(password) + .build(); JMXConnector srv; try { srv = pool.borrowObject(connectionInfo); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxPoolFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxPoolFactory.java index c5e7f91fe8..49e73a58f2 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxPoolFactory.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxPoolFactory.java @@ -21,7 +21,7 @@ public class JmxPoolFactory extends BaseKeyedPooledObjectFactory env = new HashMap<>(); if (StringUtils.isNotEmpty(info.getUsername()) && StringUtils.isNotEmpty(info.getPassword())) { - env.put("jmx.remote.credentials", new String[]{info.getUsername(), info.getPassword()}); + env.put("jmx.remote.credentials", new String[] {info.getUsername(), info.getPassword()}); } if (info.isSsl()) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java index faeb40179d..7b044fe087 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java @@ -30,20 +30,21 @@ import java.util.Map; public final class KafkaConstants { + private static final String LONG_MAX_STRING = Long.valueOf(Long.MAX_VALUE).toString(); + public static final Map TOPIC_DEFAULT_CONFIGS = Map.ofEntries( new AbstractMap.SimpleEntry<>(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE), new AbstractMap.SimpleEntry<>(COMPRESSION_TYPE_CONFIG, "producer"), new AbstractMap.SimpleEntry<>(DELETE_RETENTION_MS_CONFIG, "86400000"), new AbstractMap.SimpleEntry<>(FILE_DELETE_DELAY_MS_CONFIG, "60000"), - new AbstractMap.SimpleEntry<>(FLUSH_MESSAGES_INTERVAL_CONFIG, "9223372036854775807"), - new AbstractMap.SimpleEntry<>(FLUSH_MS_CONFIG, "9223372036854775807"), + new AbstractMap.SimpleEntry<>(FLUSH_MESSAGES_INTERVAL_CONFIG, LONG_MAX_STRING), + new AbstractMap.SimpleEntry<>(FLUSH_MS_CONFIG, LONG_MAX_STRING), new AbstractMap.SimpleEntry<>("follower.replication.throttled.replicas", ""), new AbstractMap.SimpleEntry<>(INDEX_INTERVAL_BYTES_CONFIG, "4096"), new AbstractMap.SimpleEntry<>("leader.replication.throttled.replicas", ""), - new AbstractMap.SimpleEntry<>(MAX_COMPACTION_LAG_MS_CONFIG, "9223372036854775807"), + new AbstractMap.SimpleEntry<>(MAX_COMPACTION_LAG_MS_CONFIG, LONG_MAX_STRING), new AbstractMap.SimpleEntry<>(MAX_MESSAGE_BYTES_CONFIG, "1000012"), - new AbstractMap.SimpleEntry<>(MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, - "9223372036854775807"), + new AbstractMap.SimpleEntry<>(MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG_MAX_STRING), new AbstractMap.SimpleEntry<>(MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"), new AbstractMap.SimpleEntry<>(MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.5"), new AbstractMap.SimpleEntry<>(MIN_COMPACTION_LAG_MS_CONFIG, "0"), diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java index b282a9f4c0..fa3ff78e02 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java @@ -107,8 +107,8 @@ public abstract class OffsetsSeek { .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); this.beginOffsets = this.endOffsets.keySet().stream() - .map(p -> Tuples.of(p, allBeginningOffsets.get(new TopicPartition(topic, p)))) - .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); + .map(p -> Tuples.of(p, allBeginningOffsets.get(new TopicPartition(topic, p)))) + .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); } public List topicPartitions() { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java index c05791c293..e3d8f1b5b8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java @@ -36,13 +36,13 @@ public class OffsetsSeekBackward extends OffsetsSeek { protected Map offsetsFromPositions(Consumer consumer, - List partitions) { + List partitions) { return findOffsetsInt(consumer, consumerPosition.getSeekTo(), partitions); } protected Map offsetsFromBeginning(Consumer consumer, - List partitions) { + List partitions) { return findOffsets(consumer, Map.of(), partitions); } @@ -51,7 +51,7 @@ public class OffsetsSeekBackward extends OffsetsSeek { consumerPosition.getSeekTo().entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, - e -> e.getValue() + Map.Entry::getValue )); Map offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch) .entrySet().stream() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java index 733726eae8..6b6ea735fc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java @@ -19,7 +19,7 @@ public class OffsetsSeekForward extends OffsetsSeek { } protected Map offsetsFromPositions(Consumer consumer, - List partitions) { + List partitions) { final Map offsets = offsetsFromBeginning(consumer, partitions); @@ -54,7 +54,7 @@ public class OffsetsSeekForward extends OffsetsSeek { } protected Map offsetsFromBeginning(Consumer consumer, - List partitions) { + List partitions) { return consumer.beginningOffsets(partitions); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java index e6cff30893..61f742e721 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java @@ -59,7 +59,8 @@ public class AvroJsonSchemaConverter implements JsonSchemaConverter { } case ARRAY: return createArraySchema(name, schema, definitions); - default: throw new RuntimeException("Unknown type"); + default: + throw new RuntimeException("Unknown type"); } } else { return createUnionSchema(schema, definitions); @@ -87,9 +88,9 @@ public class AvroJsonSchemaConverter implements JsonSchemaConverter { if (nullable) { return new OneOfFieldSchema( List.of( - new SimpleFieldSchema(new SimpleJsonType(JsonType.Type.NULL)), - new ObjectFieldSchema(fields, Collections.emptyList()) - ) + new SimpleFieldSchema(new SimpleJsonType(JsonType.Type.NULL)), + new ObjectFieldSchema(fields, Collections.emptyList()) + ) ); } else { return new ObjectFieldSchema(fields, Collections.emptyList()); @@ -138,14 +139,18 @@ public class AvroJsonSchemaConverter implements JsonSchemaConverter { case BYTES: case STRING: return new SimpleJsonType(JsonType.Type.STRING); - case NULL: return new SimpleJsonType(JsonType.Type.NULL); - case ARRAY: return new SimpleJsonType(JsonType.Type.ARRAY); + case NULL: + return new SimpleJsonType(JsonType.Type.NULL); + case ARRAY: + return new SimpleJsonType(JsonType.Type.ARRAY); case FIXED: case FLOAT: case DOUBLE: return new SimpleJsonType(JsonType.Type.NUMBER); - case BOOLEAN: return new SimpleJsonType(JsonType.Type.BOOLEAN); - default: return new SimpleJsonType(JsonType.Type.STRING); + case BOOLEAN: + return new SimpleJsonType(JsonType.Type.BOOLEAN); + default: + return new SimpleJsonType(JsonType.Type.STRING); } } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonType.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonType.java index 23aa4b7790..79d73c6813 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonType.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonType.java @@ -8,7 +8,7 @@ public abstract class JsonType { protected final Type type; - public JsonType(Type type) { + protected JsonType(Type type) { this.type = type; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/ObjectFieldSchema.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/ObjectFieldSchema.java index 296c5e0715..7a279e465e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/ObjectFieldSchema.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/ObjectFieldSchema.java @@ -2,10 +2,7 @@ package com.provectus.kafka.ui.util.jsonschema; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.BooleanNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.node.TextNode; import java.util.List; import java.util.Map; import java.util.stream.Collectors; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverter.java index a442705047..57b4936323 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverter.java @@ -41,9 +41,9 @@ public class ProtobufSchemaConverter implements JsonSchemaConverter allOneOfFields = schema.getOneofs().stream().flatMap(o -> - o.getFields().stream().map(Descriptors.FieldDescriptor::getName) - ).collect(Collectors.toList()); + o.getFields().stream().map(Descriptors.FieldDescriptor::getName) + ).collect(Collectors.toList()); final Map excludedOneOf = fields.entrySet().stream() .filter(f -> !allOneOfFields.contains(f.getKey())) @@ -79,7 +79,7 @@ public class ProtobufSchemaConverter implements JsonSchemaConverter definitions) { + Map definitions) { final JsonType jsonType = convertType(field); FieldSchema fieldSchema;