Fixed Zookeeper connecion leak
This commit is contained in:
parent
40d85643bb
commit
e65e490bb7
2 changed files with 42 additions and 14 deletions
|
@ -9,6 +9,7 @@ import com.provectus.kafka.ui.model.CompatibilityCheckResponse;
|
|||
import com.provectus.kafka.ui.model.CompatibilityLevel;
|
||||
import com.provectus.kafka.ui.model.NewSchemaSubject;
|
||||
import com.provectus.kafka.ui.model.SchemaSubject;
|
||||
import java.util.Formatter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
@ -51,9 +52,12 @@ public class SchemaRegistryService {
|
|||
.map(cluster -> webClient.get()
|
||||
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
||||
.bodyToFlux(Integer.class))
|
||||
.orElse(Flux.error(new NotFoundException("No such cluster")));
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(
|
||||
new NotFoundException(formatted("No such schema %s"))
|
||||
)
|
||||
).bodyToFlux(Integer.class)
|
||||
).orElse(Flux.error(new NotFoundException("No such cluster")));
|
||||
}
|
||||
|
||||
public Mono<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
|
||||
|
@ -70,8 +74,12 @@ public class SchemaRegistryService {
|
|||
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
|
||||
.bodyToMono(SchemaSubject.class)
|
||||
resp -> Mono.error(
|
||||
new NotFoundException(
|
||||
formatted("No such schema %s with version %s", schemaName, version)
|
||||
)
|
||||
)
|
||||
).bodyToMono(SchemaSubject.class)
|
||||
.zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName))
|
||||
.map(tuple -> {
|
||||
SchemaSubject schema = tuple.getT1();
|
||||
|
@ -97,9 +105,13 @@ public class SchemaRegistryService {
|
|||
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
|
||||
.toBodilessEntity())
|
||||
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||
resp -> Mono.error(
|
||||
new NotFoundException(
|
||||
formatted("No such schema %s with version %s", schemaName, version)
|
||||
)
|
||||
)
|
||||
).toBodilessEntity()
|
||||
).orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||
}
|
||||
|
||||
public Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String schemaName) {
|
||||
|
@ -107,7 +119,13 @@ public class SchemaRegistryService {
|
|||
.map(cluster -> webClient.delete()
|
||||
.uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName)
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(
|
||||
new NotFoundException(
|
||||
formatted("No such schema %s", schemaName)
|
||||
)
|
||||
)
|
||||
)
|
||||
.toBodilessEntity())
|
||||
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||
}
|
||||
|
@ -120,7 +138,9 @@ public class SchemaRegistryService {
|
|||
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
||||
resp -> Mono.error(
|
||||
new NotFoundException(formatted("No such schema %s", schemaName)))
|
||||
)
|
||||
.toEntity(SchemaSubject.class)
|
||||
.log())
|
||||
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||
|
@ -142,7 +162,7 @@ public class SchemaRegistryService {
|
|||
.body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class))
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
||||
resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
|
||||
.bodyToMono(Void.class);
|
||||
}).orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||
}
|
||||
|
@ -181,10 +201,14 @@ public class SchemaRegistryService {
|
|||
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
||||
.retrieve()
|
||||
.onStatus(HttpStatus.NOT_FOUND::equals,
|
||||
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
||||
resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
|
||||
.bodyToMono(InternalCompatibilityCheck.class)
|
||||
.map(mapper::toCompatibilityCheckResponse)
|
||||
.log()
|
||||
).orElse(Mono.error(new NotFoundException("No such cluster")));
|
||||
}
|
||||
|
||||
public String formatted(String str, Object... args) {
|
||||
return new Formatter().format(str, args).toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package com.provectus.kafka.ui.zookeeper;
|
||||
|
||||
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
|
@ -14,7 +15,7 @@ import java.util.Map;
|
|||
@Log4j2
|
||||
public class ZookeeperService {
|
||||
|
||||
private final Map<String, ZkClient> cachedZkClient = new HashMap<>();
|
||||
private final Map<String, ZkClient> cachedZkClient = new ConcurrentHashMap<>();
|
||||
|
||||
public boolean isZookeeperOnline(KafkaCluster kafkaCluster) {
|
||||
var isConnected = false;
|
||||
|
@ -33,7 +34,10 @@ public class ZookeeperService {
|
|||
|
||||
private ZkClient getOrCreateZkClient (KafkaCluster cluster) {
|
||||
try {
|
||||
return cachedZkClient.getOrDefault(cluster.getName(), new ZkClient(cluster.getZookeeper(), 1000));
|
||||
return cachedZkClient.computeIfAbsent(
|
||||
cluster.getName(),
|
||||
(n) -> new ZkClient(cluster.getZookeeper(), 1000)
|
||||
);
|
||||
} catch (Exception e) {
|
||||
log.error("Error while creating zookeeper client for cluster {}", cluster.getName());
|
||||
return null;
|
||||
|
|
Loading…
Add table
Reference in a new issue