|
@@ -15,6 +15,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
|
import org.apache.kafka.common.TopicPartition;
|
|
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
|
+import org.springframework.http.ResponseEntity;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
@@ -180,4 +181,31 @@ public class ClusterService {
|
|
|
.orElse(Flux.empty());
|
|
|
}
|
|
|
|
|
|
+ public Mono<Long> expireToken(String clusterName, String tokenId) {
|
|
|
+ final byte[] decodedId = Base64.getDecoder().decode(tokenId);
|
|
|
+ return kafkaService.getOrCreateAdminClient(
|
|
|
+ clustersStorage.getClusterByName(clusterName).orElseThrow()
|
|
|
+ ).flatMap(cl -> ClusterUtil.toMono(
|
|
|
+ cl.getAdminClient().expireDelegationToken(decodedId).expiryTimestamp()
|
|
|
+ ));
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<Long> renewToken(String clusterName, String id) {
|
|
|
+ final byte[] decodedId = Base64.getDecoder().decode(id);
|
|
|
+ return kafkaService.getOrCreateAdminClient(
|
|
|
+ clustersStorage.getClusterByName(clusterName).orElseThrow()
|
|
|
+ ).flatMap(cl -> ClusterUtil.toMono(
|
|
|
+ cl.getAdminClient().renewDelegationToken(decodedId).expiryTimestamp()
|
|
|
+ ));
|
|
|
+ }
|
|
|
+
|
|
|
+ public Flux<DelegationToken> getTokens(String clusterName) {
|
|
|
+ return kafkaService.getOrCreateAdminClient(
|
|
|
+ clustersStorage.getClusterByName(clusterName).orElseThrow()
|
|
|
+ ).flatMap(cl -> ClusterUtil.toMono(
|
|
|
+ cl.getAdminClient().describeDelegationToken().delegationTokens()
|
|
|
+ )).map(list ->
|
|
|
+ list.stream().map(clusterMapper::mapToken).collect(Collectors.toList())
|
|
|
+ ).flatMapMany(Flux::fromIterable);
|
|
|
+ }
|
|
|
}
|