Merge branch 'master' of github.com:provectus/kafka-ui into ISSUE-3732_quotas_apis

 Conflicts:
	kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java
This commit is contained in:
iliax 2023-08-11 18:57:08 +04:00
commit f69c484dd7
17 changed files with 213 additions and 20 deletions

View file

@ -152,7 +152,13 @@ public class ClustersProperties {
Integer auditTopicsPartitions;
Boolean topicAuditEnabled;
Boolean consoleAuditEnabled;
LogLevel level;
Map<String, String> auditTopicProperties;
public enum LogLevel {
ALL,
ALTER_ONLY //default
}
}
@PostConstruct

View file

@ -1,15 +1,25 @@
package com.provectus.kafka.ui.model.rbac.permission;
import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;
public enum AclAction implements PermissibleAction {
VIEW,
EDIT;
EDIT
;
public static final Set<AclAction> ALTER_ACTIONS = Set.of(EDIT);
@Nullable
public static AclAction fromString(String name) {
return EnumUtils.getEnum(AclAction.class, name);
}
@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;
import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;
@ -10,9 +11,15 @@ public enum ApplicationConfigAction implements PermissibleAction {
;
public static final Set<ApplicationConfigAction> ALTER_ACTIONS = Set.of(EDIT);
@Nullable
public static ApplicationConfigAction fromString(String name) {
return EnumUtils.getEnum(ApplicationConfigAction.class, name);
}
@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}

View file

@ -1,14 +1,24 @@
package com.provectus.kafka.ui.model.rbac.permission;
import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;
public enum AuditAction implements PermissibleAction {
VIEW;
VIEW
;
private static final Set<AuditAction> ALTER_ACTIONS = Set.of();
@Nullable
public static AuditAction fromString(String name) {
return EnumUtils.getEnum(AuditAction.class, name);
}
@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;
import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;
@ -10,9 +11,15 @@ public enum ClusterConfigAction implements PermissibleAction {
;
public static final Set<ClusterConfigAction> ALTER_ACTIONS = Set.of(EDIT);
@Nullable
public static ClusterConfigAction fromString(String name) {
return EnumUtils.getEnum(ClusterConfigAction.class, name);
}
@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;
import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;
@ -12,9 +13,15 @@ public enum ConnectAction implements PermissibleAction {
;
public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, RESTART);
@Nullable
public static ConnectAction fromString(String name) {
return EnumUtils.getEnum(ConnectAction.class, name);
}
@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;
import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;
@ -7,14 +8,19 @@ public enum ConsumerGroupAction implements PermissibleAction {
VIEW,
DELETE,
RESET_OFFSETS
;
public static final Set<ConsumerGroupAction> ALTER_ACTIONS = Set.of(DELETE, RESET_OFFSETS);
@Nullable
public static ConsumerGroupAction fromString(String name) {
return EnumUtils.getEnum(ConsumerGroupAction.class, name);
}
@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}

View file

@ -1,15 +1,24 @@
package com.provectus.kafka.ui.model.rbac.permission;
import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;
public enum KsqlAction implements PermissibleAction {
EXECUTE;
EXECUTE
;
public static final Set<KsqlAction> ALTER_ACTIONS = Set.of(EXECUTE);
@Nullable
public static KsqlAction fromString(String name) {
return EnumUtils.getEnum(KsqlAction.class, name);
}
@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}

View file

@ -5,4 +5,9 @@ public sealed interface PermissibleAction permits
ConsumerGroupAction, SchemaAction,
ConnectAction, ClusterConfigAction,
KsqlAction, TopicAction, AuditAction, ClientQuotaAction {
String name();
boolean isAlter();
}

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;
import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;
@ -13,9 +14,15 @@ public enum SchemaAction implements PermissibleAction {
;
public static final Set<SchemaAction> ALTER_ACTIONS = Set.of(CREATE, DELETE, EDIT, MODIFY_GLOBAL_COMPATIBILITY);
@Nullable
public static SchemaAction fromString(String name) {
return EnumUtils.getEnum(SchemaAction.class, name);
}
@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;
import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;
@ -9,16 +10,21 @@ public enum TopicAction implements PermissibleAction {
CREATE,
EDIT,
DELETE,
MESSAGES_READ,
MESSAGES_PRODUCE,
MESSAGES_DELETE,
;
public static final Set<TopicAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, MESSAGES_PRODUCE, MESSAGES_DELETE);
@Nullable
public static TopicAction fromString(String name) {
return EnumUtils.getEnum(TopicAction.class, name);
}
@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}

View file

@ -6,6 +6,7 @@ import com.provectus.kafka.ui.exception.CustomBaseException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.Resource;
import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@ -33,16 +34,20 @@ record AuditRecord(String timestamp,
return MAPPER.writeValueAsString(this);
}
record AuditResource(String accessType, Resource type, @Nullable Object id) {
record AuditResource(String accessType, boolean alter, Resource type, @Nullable Object id) {
private static AuditResource create(PermissibleAction action, Resource type, @Nullable Object id) {
return new AuditResource(action.name(), action.isAlter(), type, id);
}
static List<AuditResource> getAccessedResources(AccessContext ctx) {
List<AuditResource> resources = new ArrayList<>();
ctx.getClusterConfigActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.CLUSTERCONFIG, null)));
.forEach(a -> resources.add(create(a, Resource.CLUSTERCONFIG, null)));
ctx.getTopicActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.TOPIC, nameId(ctx.getTopic()))));
.forEach(a -> resources.add(create(a, Resource.TOPIC, nameId(ctx.getTopic()))));
ctx.getConsumerGroupActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
.forEach(a -> resources.add(create(a, Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
ctx.getConnectActions()
.forEach(a -> {
Map<String, String> resourceId = new LinkedHashMap<>();
@ -50,16 +55,16 @@ record AuditRecord(String timestamp,
if (ctx.getConnector() != null) {
resourceId.put("connector", ctx.getConnector());
}
resources.add(new AuditResource(a.name(), Resource.CONNECT, resourceId));
resources.add(create(a, Resource.CONNECT, resourceId));
});
ctx.getSchemaActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.SCHEMA, nameId(ctx.getSchema()))));
.forEach(a -> resources.add(create(a, Resource.SCHEMA, nameId(ctx.getSchema()))));
ctx.getKsqlActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.KSQL, null)));
.forEach(a -> resources.add(create(a, Resource.KSQL, null)));
ctx.getAclActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.ACL, null)));
.forEach(a -> resources.add(create(a, Resource.ACL, null)));
ctx.getAuditAction()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.AUDIT, null)));
.forEach(a -> resources.add(create(a, Resource.AUDIT, null)));
return resources;
}

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.service.audit;
import static com.provectus.kafka.ui.config.ClustersProperties.AuditProperties.LogLevel.ALTER_ONLY;
import static com.provectus.kafka.ui.service.MessagesService.createProducer;
import com.google.common.annotations.VisibleForTesting;
@ -80,12 +81,13 @@ public class AuditService implements Closeable {
}
boolean topicAudit = Optional.ofNullable(auditProps.getTopicAuditEnabled()).orElse(false);
boolean consoleAudit = Optional.ofNullable(auditProps.getConsoleAuditEnabled()).orElse(false);
boolean alterLogOnly = Optional.ofNullable(auditProps.getLevel()).map(lvl -> lvl == ALTER_ONLY).orElse(true);
if (!topicAudit && !consoleAudit) {
return Optional.empty();
}
if (!topicAudit) {
log.info("Audit initialization finished for cluster '{}' (console only)", cluster.getName());
return Optional.of(consoleOnlyWriter(cluster));
return Optional.of(consoleOnlyWriter(cluster, alterLogOnly));
}
String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
boolean topicAuditCanBeDone = createTopicIfNeeded(cluster, acSupplier, auditTopicName, auditProps);
@ -95,7 +97,7 @@ public class AuditService implements Closeable {
"Audit initialization finished for cluster '{}' (console only, topic audit init failed)",
cluster.getName()
);
return Optional.of(consoleOnlyWriter(cluster));
return Optional.of(consoleOnlyWriter(cluster, alterLogOnly));
}
return Optional.empty();
}
@ -103,6 +105,7 @@ public class AuditService implements Closeable {
return Optional.of(
new AuditWriter(
cluster.getName(),
alterLogOnly,
auditTopicName,
producerFactory.get(),
consoleAudit ? AUDIT_LOGGER : null
@ -110,8 +113,8 @@ public class AuditService implements Closeable {
);
}
private static AuditWriter consoleOnlyWriter(KafkaCluster cluster) {
return new AuditWriter(cluster.getName(), null, null, AUDIT_LOGGER);
private static AuditWriter consoleOnlyWriter(KafkaCluster cluster, boolean alterLogOnly) {
return new AuditWriter(cluster.getName(), alterLogOnly, null, null, AUDIT_LOGGER);
}
/**

View file

@ -18,6 +18,7 @@ import org.slf4j.Logger;
@Slf4j
record AuditWriter(String clusterName,
boolean logAlterOperationsOnly,
@Nullable String targetTopic,
@Nullable KafkaProducer<byte[], byte[]> producer,
@Nullable Logger consoleLogger) implements Closeable {
@ -39,6 +40,10 @@ record AuditWriter(String clusterName,
}
private void write(AuditRecord rec) {
if (logAlterOperationsOnly && rec.resources().stream().noneMatch(AuditResource::alter)) {
//we should only log alter operations, but this is read-only op
return;
}
String json = rec.toJson();
if (consoleLogger != null) {
consoleLogger.info(json);

View file

@ -30,8 +30,8 @@ class AuditServiceTest {
@Test
void isAuditTopicChecksIfAuditIsEnabledForCluster() {
Map<String, AuditWriter> writers = Map.of(
"c1", new AuditWriter("с1", "c1topic", null, null),
"c2", new AuditWriter("c2", "c2topic", mock(KafkaProducer.class), null)
"c1", new AuditWriter("с1", true, "c1topic", null, null),
"c2", new AuditWriter("c2", false, "c2topic", mock(KafkaProducer.class), null)
);
var auditService = new AuditService(writers);
@ -79,6 +79,17 @@ class AuditServiceTest {
.thenReturn(mock(KafkaProducer.class));
}
@Test
void logOnlyAlterOpsByDefault() {
var auditProps = new ClustersProperties.AuditProperties();
auditProps.setConsoleAuditEnabled(true);
clustersProperties.setAudit(auditProps);
var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
assertThat(maybeWriter)
.hasValueSatisfying(w -> assertThat(w.logAlterOperationsOnly()).isTrue());
}
@Test
void noWriterIfNoAuditPropsSet() {
var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);

View file

@ -0,0 +1,86 @@
package com.provectus.kafka.ui.service.audit;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.AccessContext.AccessContextBuilder;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import java.util.List;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import org.slf4j.Logger;
class AuditWriterTest {
final KafkaProducer<byte[], byte[]> producerMock = Mockito.mock(KafkaProducer.class);
final Logger loggerMock = Mockito.mock(Logger.class);
final AuthenticatedUser user = new AuthenticatedUser("someone", List.of());
@Nested
class AlterOperationsOnlyWriter {
final AuditWriter alterOnlyWriter = new AuditWriter("test", true, "test-topic", producerMock, loggerMock);
@ParameterizedTest
@MethodSource
void onlyLogsWhenAlterOperationIsPresentForOneOfResources(AccessContext ctxWithAlterOperation) {
alterOnlyWriter.write(ctxWithAlterOperation, user, null);
verify(producerMock).send(any(), any());
verify(loggerMock).info(any());
}
static Stream<AccessContext> onlyLogsWhenAlterOperationIsPresentForOneOfResources() {
Stream<UnaryOperator<AccessContextBuilder>> topicEditActions =
TopicAction.ALTER_ACTIONS.stream().map(a -> c -> c.topic("test").topicActions(a));
Stream<UnaryOperator<AccessContextBuilder>> clusterConfigEditActions =
ClusterConfigAction.ALTER_ACTIONS.stream().map(a -> c -> c.clusterConfigActions(a));
Stream<UnaryOperator<AccessContextBuilder>> aclEditActions =
AclAction.ALTER_ACTIONS.stream().map(a -> c -> c.aclActions(a));
Stream<UnaryOperator<AccessContextBuilder>> cgEditActions =
ConsumerGroupAction.ALTER_ACTIONS.stream().map(a -> c -> c.consumerGroup("cg").consumerGroupActions(a));
Stream<UnaryOperator<AccessContextBuilder>> schemaEditActions =
SchemaAction.ALTER_ACTIONS.stream().map(a -> c -> c.schema("sc").schemaActions(a));
Stream<UnaryOperator<AccessContextBuilder>> connEditActions =
ConnectAction.ALTER_ACTIONS.stream().map(a -> c -> c.connect("conn").connectActions(a));
return Stream.of(
topicEditActions, clusterConfigEditActions, aclEditActions,
cgEditActions, connEditActions, schemaEditActions
)
.flatMap(c -> c)
.map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build());
}
@ParameterizedTest
@MethodSource
void doesNothingIfNoResourceHasAlterAction(AccessContext readOnlyCxt) {
alterOnlyWriter.write(readOnlyCxt, user, null);
verifyNoInteractions(producerMock);
verifyNoInteractions(loggerMock);
}
static Stream<AccessContext> doesNothingIfNoResourceHasAlterAction() {
return Stream.<UnaryOperator<AccessContextBuilder>>of(
c -> c.topic("test").topicActions(TopicAction.VIEW),
c -> c.clusterConfigActions(ClusterConfigAction.VIEW),
c -> c.aclActions(AclAction.VIEW),
c -> c.consumerGroup("cg").consumerGroupActions(ConsumerGroupAction.VIEW),
c -> c.schema("sc").schemaActions(SchemaAction.VIEW),
c -> c.connect("conn").connectActions(ConnectAction.VIEW)
).map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build());
}
}
}

View file

@ -4091,6 +4091,9 @@ components:
audit:
type: object
properties:
level:
type: string
enum: [ "ALL", "ALTER_ONLY" ]
topic:
type: string
auditTopicsPartitions: