Browse Source

topics page construction fix

iliax 2 years ago
parent
commit
949a94ef1f

+ 3 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -8,8 +8,6 @@ import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.VIEW;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toList;
 
 
 import com.provectus.kafka.ui.api.TopicsApi;
 import com.provectus.kafka.ui.api.TopicsApi;
-import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
-import com.provectus.kafka.ui.config.auth.RbacUser;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopicConfig;
 import com.provectus.kafka.ui.model.InternalTopicConfig;
@@ -34,21 +32,16 @@ import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.Comparator;
 import java.util.Comparator;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.Set;
 import javax.validation.Valid;
 import javax.validation.Valid;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.http.ResponseEntity;
-import org.springframework.security.core.context.ReactiveSecurityContextHolder;
-import org.springframework.security.core.context.SecurityContext;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
 import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Scheduler;
-import reactor.core.scheduler.Schedulers;
 
 
 @RestController
 @RestController
 @RequiredArgsConstructor
 @RequiredArgsConstructor
@@ -192,12 +185,13 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .build();
         .build();
 
 
     return topicsService.getTopicsForPagination(getCluster(clusterName))
     return topicsService.getTopicsForPagination(getCluster(clusterName))
-        .flatMap(existingTopics -> {
+        .flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
+        .flatMap(topics -> {
           int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
           int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
           var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
           var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
           var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
           var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
               ? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
               ? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
-          List<InternalTopic> filtered = existingTopics.stream()
+          List<InternalTopic> filtered = topics.stream()
               .filter(topic -> !topic.isInternal()
               .filter(topic -> !topic.isInternal()
                   || showInternal != null && showInternal)
                   || showInternal != null && showInternal)
               .filter(topic -> search == null || StringUtils.containsIgnoreCase(topic.getName(), search))
               .filter(topic -> search == null || StringUtils.containsIgnoreCase(topic.getName(), search))
@@ -213,9 +207,6 @@ public class TopicsController extends AbstractController implements TopicsApi {
               .collect(toList());
               .collect(toList());
 
 
           return topicsService.loadTopics(getCluster(clusterName), topicsPage)
           return topicsService.loadTopics(getCluster(clusterName), topicsPage)
-              .flatMapMany(Flux::fromIterable)
-              .filterWhen(dto -> accessControlService.isTopicAccessible(dto, clusterName))
-              .collectList()
               .map(topicsToRender ->
               .map(topicsToRender ->
                   new TopicsResponseDTO()
                   new TopicsResponseDTO()
                       .topics(topicsToRender.stream().map(clusterMapper::toTopic).collect(toList()))
                       .topics(topicsToRender.stream().map(clusterMapper::toTopic).collect(toList()))

+ 14 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java

@@ -203,19 +203,22 @@ public class AccessControlService {
     return isAccessible(Resource.TOPIC, context.getTopic(), user, context, requiredActions);
     return isAccessible(Resource.TOPIC, context.getTopic(), user, context, requiredActions);
   }
   }
 
 
-  public Mono<Boolean> isTopicAccessible(InternalTopic dto, String clusterName) {
+  public Mono<List<InternalTopic>> filterViewableTopics(List<InternalTopic> topics, String clusterName) {
     if (!rbacEnabled) {
     if (!rbacEnabled) {
-      return Mono.just(true);
+      return Mono.just(topics);
     }
     }
-
-    AccessContext accessContext = AccessContext
-        .builder()
-        .cluster(clusterName)
-        .topic(dto.getName())
-        .topicActions(TopicAction.VIEW)
-        .build();
-
-    return getUser().map(u -> isTopicAccessible(accessContext, u));
+    return getUser()
+        .map(user -> topics.stream()
+            .filter(topic -> {
+                  var accessContext = AccessContext
+                      .builder()
+                      .cluster(clusterName)
+                      .topic(topic.getName())
+                      .topicActions(TopicAction.VIEW)
+                      .build();
+                  return isTopicAccessible(accessContext, user);
+                }
+            ).toList());
   }
   }
 
 
   private boolean isConsumerGroupAccessible(AccessContext context, AuthenticatedUser user) {
   private boolean isConsumerGroupAccessible(AccessContext context, AuthenticatedUser user) {

+ 1 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java

@@ -34,7 +34,6 @@ import java.util.stream.IntStream;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Test;
-import org.springframework.test.util.ReflectionTestUtils;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
 class TopicsServicePaginationTest {
 class TopicsServicePaginationTest {
@@ -60,7 +59,7 @@ class TopicsServicePaginationTest {
           List<String> lst = a.getArgument(1);
           List<String> lst = a.getArgument(1);
           return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList()));
           return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList()));
         });
         });
-    ReflectionTestUtils.setField(topicsController, "clustersStorage", clustersStorage);
+    topicsController.setClustersStorage(clustersStorage);
   }
   }
 
 
   @Test
   @Test

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/AccessControlServiceMock.java

@@ -16,7 +16,7 @@ public class AccessControlServiceMock {
     when(mock.validateAccess(any())).thenReturn(Mono.empty());
     when(mock.validateAccess(any())).thenReturn(Mono.empty());
     when(mock.isSchemaAccessible(anyString(), anyString())).thenReturn(Mono.just(true));
     when(mock.isSchemaAccessible(anyString(), anyString())).thenReturn(Mono.just(true));
 
 
-    when(mock.isTopicAccessible(any(), anyString())).thenReturn(Mono.just(true));
+    when(mock.filterViewableTopics(any(), any())).then(i -> Mono.just(i.getArgument(0)));
 
 
     return mock;
     return mock;
   }
   }