Przeglądaj źródła

Merge branch 'master' into acl_specific_endpoints

Roman Zabaluev 2 lat temu
rodzic
commit
be574b05b2
89 zmienionych plików z 1566 dodań i 372 usunięć
  1. 1 1
      .github/workflows/aws_publisher.yaml
  2. 5 2
      .github/workflows/backend.yml
  3. 1 1
      .github/workflows/block_merge.yml
  4. 1 1
      .github/workflows/branch-deploy.yml
  5. 1 1
      .github/workflows/branch-remove.yml
  6. 2 2
      .github/workflows/build-public-image.yml
  7. 1 1
      .github/workflows/delete-public-image.yml
  8. 1 1
      .github/workflows/documentation.yaml
  9. 1 1
      .github/workflows/e2e-automation.yml
  10. 5 3
      .github/workflows/e2e-checks.yaml
  11. 1 1
      .github/workflows/e2e-manual.yml
  12. 1 1
      .github/workflows/e2e-weekly.yml
  13. 5 2
      .github/workflows/frontend.yaml
  14. 2 1
      .github/workflows/master.yaml
  15. 3 2
      .github/workflows/pr-checks.yaml
  16. 1 1
      .github/workflows/release-serde-api.yaml
  17. 1 1
      .github/workflows/release.yaml
  18. 1 1
      .github/workflows/release_drafter.yml
  19. 1 1
      .github/workflows/separate_env_public_create.yml
  20. 1 1
      .github/workflows/separate_env_public_remove.yml
  21. 1 1
      .github/workflows/stale.yaml
  22. 1 1
      .github/workflows/terraform-deploy.yml
  23. 1 1
      .github/workflows/triage_issues.yml
  24. 1 1
      .github/workflows/triage_prs.yml
  25. 3 1
      .github/workflows/welcome-first-time-contributors.yml
  26. 1 1
      .github/workflows/workflow_linter.yaml
  27. 1 1
      kafka-ui-api/pom.xml
  28. 31 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CorsGlobalConfiguration.java
  29. 11 11
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/BasicAuthSecurityConfig.java
  30. 6 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/DisabledAuthSecurityConfig.java
  31. 2 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/LdapProperties.java
  32. 35 25
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/LdapSecurityConfig.java
  33. 12 17
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/OAuthSecurityConfig.java
  34. 5 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
  35. 7 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/JsonAvroConversionException.java
  36. 0 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/JsonToAvroConversionException.java
  37. 7 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaSrMapper.java
  38. 26 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java
  39. 0 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/AvroEmbeddedSerde.java
  40. 9 23
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Base64Serde.java
  41. 294 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java
  42. 80 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/HexSerde.java
  43. 2 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Int64Serde.java
  44. 3 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UInt64Serde.java
  45. 22 28
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UuidBinarySerde.java
  46. 4 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java
  47. 3 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java
  48. 3 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/OddExporter.java
  49. 55 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java
  50. 19 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java
  51. 3 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java
  52. 14 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java
  53. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java
  54. 2 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java
  55. 78 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/extractor/RbacLdapAuthoritiesExtractor.java
  56. 5 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java
  57. 11 25
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java
  58. 61 22
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversion.java
  59. 7 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
  60. 54 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java
  61. 185 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerdeTest.java
  62. 84 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/HexSerdeTest.java
  63. 86 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java
  64. 7 5
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java
  65. 2 3
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java
  66. 2 2
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java
  67. 2 3
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java
  68. 21 35
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java
  69. 0 6
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java
  70. 94 2
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversionTest.java
  71. 8 0
      kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml
  72. 23 2
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
  73. 3 3
      kafka-ui-e2e-checks/pom.xml
  74. 15 0
      kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/utilities/StringUtils.java
  75. 2 10
      kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/backlog/SmokeBacklog.java
  76. 34 0
      kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokesuite/brokers/BrokersTest.java
  77. 1 1
      kafka-ui-react-app/public/robots.txt
  78. 4 1
      kafka-ui-react-app/src/components/Brokers/Broker/Broker.tsx
  79. 1 1
      kafka-ui-react-app/src/components/Brokers/Broker/__test__/Broker.spec.tsx
  80. 1 0
      kafka-ui-react-app/src/components/Brokers/BrokersList/BrokersList.tsx
  81. 16 4
      kafka-ui-react-app/src/components/ConsumerGroups/Details/Details.tsx
  82. 18 3
      kafka-ui-react-app/src/components/ConsumerGroups/List.tsx
  83. 2 2
      kafka-ui-react-app/src/components/Topics/Topic/Messages/getDefaultSerdeName.ts
  84. 2 2
      kafka-ui-react-app/src/components/Topics/Topic/SendMessage/SendMessage.tsx
  85. 10 10
      kafka-ui-react-app/src/components/Topics/Topic/SendMessage/utils.ts
  86. 3 3
      kafka-ui-react-app/src/components/common/NewTable/SizeCell.tsx
  87. 12 1
      kafka-ui-react-app/src/lib/constants.ts
  88. 5 0
      kafka-ui-react-app/src/lib/hooks/api/topics.ts
  89. 5 5
      pom.xml

+ 1 - 1
.github/workflows/aws_publisher.yaml

@@ -1,4 +1,4 @@
-name: AWS Marketplace Publisher
+name: "Infra: Release: AWS Marketplace Publisher"
 on:
   workflow_dispatch:
     inputs:

+ 5 - 2
.github/workflows/backend.yml

@@ -1,4 +1,4 @@
-name: Backend build and test
+name: "Backend: PR/master build & test"
 on:
   push:
     branches:
@@ -8,6 +8,9 @@ on:
     paths:
       - "kafka-ui-api/**"
       - "pom.xml"
+permissions:
+  checks: write
+  pull-requests: write
 jobs:
   build-and-test:
     runs-on: ubuntu-latest
@@ -29,7 +32,7 @@ jobs:
           key: ${{ runner.os }}-sonar
           restore-keys: ${{ runner.os }}-sonar
       - name: Build and analyze pull request target
-        if: ${{ github.event_name == 'pull_request_target' }}
+        if: ${{ github.event_name == 'pull_request' }}
         env:
           GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
           SONAR_TOKEN: ${{ secrets.SONAR_TOKEN_BACKEND }}

+ 1 - 1
.github/workflows/block_merge.yml

@@ -1,4 +1,4 @@
-name: Pull Request Labels
+name: "Infra: PR block merge"
 on:
   pull_request:
     types: [opened, labeled, unlabeled, synchronize]

+ 1 - 1
.github/workflows/branch-deploy.yml

@@ -1,4 +1,4 @@
-name: Feature testing init
+name: "Infra: Feature Testing: Init env"
 on:
   workflow_dispatch:
 

+ 1 - 1
.github/workflows/branch-remove.yml

@@ -1,4 +1,4 @@
-name: Feature testing destroy
+name: "Infra: Feature Testing: Destroy env"
 on:
   workflow_dispatch:
   pull_request:

+ 2 - 2
.github/workflows/build-public-image.yml

@@ -1,4 +1,4 @@
-name: Build Docker image and push
+name: "Infra: Image Testing: Deploy"
 on:
   workflow_dispatch:
   pull_request:
@@ -65,7 +65,7 @@ jobs:
           cache-from: type=local,src=/tmp/.buildx-cache
           cache-to: type=local,dest=/tmp/.buildx-cache
       - name: make comment with private deployment link
-        uses: peter-evans/create-or-update-comment@v2
+        uses: peter-evans/create-or-update-comment@v3
         with:
           issue-number: ${{ github.event.pull_request.number }}
           body: |

+ 1 - 1
.github/workflows/delete-public-image.yml

@@ -1,4 +1,4 @@
-name: Delete Public ECR Image
+name: "Infra: Image Testing: Delete"
 on:
   workflow_dispatch:
   pull_request:

+ 1 - 1
.github/workflows/documentation.yaml

@@ -1,4 +1,4 @@
-name: Documentation URLs linter
+name: "Infra: Docs: URL linter"
 on:
   pull_request:
     types:

+ 1 - 1
.github/workflows/e2e-automation.yml

@@ -1,4 +1,4 @@
-name: E2E Automation suite
+name: "E2E: Automation suite"
 on:
   workflow_dispatch:
     inputs:

+ 5 - 3
.github/workflows/e2e-checks.yaml

@@ -1,4 +1,4 @@
-name: E2E PR health check
+name: "E2E: PR healthcheck"
 on:
   pull_request_target:
     types: [ "opened", "edited", "reopened", "synchronize" ]
@@ -8,6 +8,8 @@ on:
       - "kafka-ui-react-app/**"
       - "kafka-ui-e2e-checks/**"
       - "pom.xml"
+permissions:
+  statuses: write
 jobs:
   build-and-test:
     runs-on: ubuntu-latest
@@ -18,8 +20,8 @@ jobs:
       - name: Configure AWS credentials
         uses: aws-actions/configure-aws-credentials@v2
         with:
-          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
-          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
+          aws-access-key-id: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
+          aws-secret-access-key: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
           aws-region: eu-central-1
       - name: Set up environment
         id: set_env_values

+ 1 - 1
.github/workflows/e2e-manual.yml

@@ -1,4 +1,4 @@
-name: E2E Manual suite
+name: "E2E: Manual suite"
 on:
   workflow_dispatch:
     inputs:

+ 1 - 1
.github/workflows/e2e-weekly.yml

@@ -1,4 +1,4 @@
-name: E2E Weekly suite
+name: "E2E: Weekly suite"
 on:
   schedule:
     - cron: '0 1 * * 1'

+ 5 - 2
.github/workflows/frontend.yaml

@@ -1,4 +1,4 @@
-name: Frontend build and test
+name: "Frontend: PR/master build & test"
 on:
   push:
     branches:
@@ -8,6 +8,9 @@ on:
     paths:
       - "kafka-ui-contract/**"
       - "kafka-ui-react-app/**"
+permissions:
+  checks: write
+  pull-requests: write
 jobs:
   build-and-test:
     env:
@@ -24,7 +27,7 @@ jobs:
         with:
           version: 7.4.0
       - name: Install node
-        uses: actions/setup-node@v3.6.0
+        uses: actions/setup-node@v3.7.0
         with:
           node-version: "16.15.0"
           cache: "pnpm"

+ 2 - 1
.github/workflows/master.yaml

@@ -1,4 +1,4 @@
-name: Master branch build & deploy
+name: "Master: Build & deploy"
 on:
   workflow_dispatch:
   push:
@@ -58,6 +58,7 @@ jobs:
           builder: ${{ steps.buildx.outputs.name }}
           context: kafka-ui-api
           platforms: linux/amd64,linux/arm64
+          provenance: false
           push: true
           tags: |
             provectuslabs/kafka-ui:${{ steps.build.outputs.version }}

+ 3 - 2
.github/workflows/pr-checks.yaml

@@ -1,8 +1,9 @@
-name: "PR Checklist checked"
+name: "PR: Checklist linter"
 on:
   pull_request_target:
     types: [opened, edited, synchronize, reopened]
-
+permissions:
+  checks: write
 jobs:
   task-check:
     runs-on: ubuntu-latest

+ 1 - 1
.github/workflows/release-serde-api.yaml

@@ -1,4 +1,4 @@
-name: Release serde api
+name: "Infra: Release: Serde API"
 on: workflow_dispatch
 
 jobs:

+ 1 - 1
.github/workflows/release.yaml

@@ -1,4 +1,4 @@
-name: Release
+name: "Infra: Release"
 on:
   release:
     types: [published]

+ 1 - 1
.github/workflows/release_drafter.yml

@@ -1,4 +1,4 @@
-name: Release Drafter
+name: "Infra: Release Drafter run"
 
 on:
   push:

+ 1 - 1
.github/workflows/separate_env_public_create.yml

@@ -1,4 +1,4 @@
-name: Separate environment create
+name: "Infra: Feature Testing Public: Init env"
 on:
   workflow_dispatch:
     inputs:

+ 1 - 1
.github/workflows/separate_env_public_remove.yml

@@ -1,4 +1,4 @@
-name: Separate environment remove
+name: "Infra: Feature Testing Public: Destroy env"
 on:
   workflow_dispatch:
     inputs:

+ 1 - 1
.github/workflows/stale.yaml

@@ -1,4 +1,4 @@
-name: 'Close stale issues'
+name: 'Infra: Close stale issues'
 on:
   schedule:
     - cron: '30 1 * * *'

+ 1 - 1
.github/workflows/terraform-deploy.yml

@@ -1,4 +1,4 @@
-name: Terraform deploy
+name: "Infra: Terraform deploy"
 on:
   workflow_dispatch:
     inputs:

+ 1 - 1
.github/workflows/triage_issues.yml

@@ -1,4 +1,4 @@
-name: Add triage label to new issues
+name: "Infra: Triage: Apply triage label for issues"
 on:
   issues:
     types:

+ 1 - 1
.github/workflows/triage_prs.yml

@@ -1,4 +1,4 @@
-name: Add triage label to new PRs
+name: "Infra: Triage: Apply triage label for PRs"
 on:
   pull_request:
     types:

+ 3 - 1
.github/workflows/welcome-first-time-contributors.yml

@@ -7,7 +7,9 @@ on:
   issues:
     types:
       - opened
-
+permissions:
+  issues: write
+  pull-requests: write
 jobs:
   welcome:
     runs-on: ubuntu-latest

+ 1 - 1
.github/workflows/workflow_linter.yaml

@@ -1,4 +1,4 @@
-name: "Workflow linter"
+name: "Infra: Workflow linter"
 on:
   pull_request:
     types:

+ 1 - 1
kafka-ui-api/pom.xml

@@ -91,7 +91,7 @@
         <dependency>
             <groupId>software.amazon.msk</groupId>
             <artifactId>aws-msk-iam-auth</artifactId>
-            <version>1.1.6</version>
+            <version>1.1.7</version>
         </dependency>
 
         <dependency>

+ 31 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CorsGlobalConfiguration.java

@@ -1,18 +1,41 @@
 package com.provectus.kafka.ui.config;
 
+import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.http.server.reactive.ServerHttpResponse;
 import org.springframework.web.reactive.config.CorsRegistry;
 import org.springframework.web.reactive.config.WebFluxConfigurer;
+import org.springframework.web.server.ServerWebExchange;
+import org.springframework.web.server.WebFilter;
+import org.springframework.web.server.WebFilterChain;
+import reactor.core.publisher.Mono;
 
 @Configuration
-public class CorsGlobalConfiguration implements WebFluxConfigurer {
+public class CorsGlobalConfiguration {
 
-  @Override
-  public void addCorsMappings(CorsRegistry registry) {
-    registry.addMapping("/**")
-        .allowedOrigins("*")
-        .allowedMethods("*")
-        .allowedHeaders("*")
-        .allowCredentials(false);
+  @Bean
+  public WebFilter corsFilter() {
+    return (final ServerWebExchange ctx, final WebFilterChain chain) -> {
+      final ServerHttpRequest request = ctx.getRequest();
+
+      final ServerHttpResponse response = ctx.getResponse();
+      final HttpHeaders headers = response.getHeaders();
+      headers.add("Access-Control-Allow-Origin", "*");
+      headers.add("Access-Control-Allow-Methods", "GET, PUT, POST, DELETE, OPTIONS");
+      headers.add("Access-Control-Max-Age", "3600");
+      headers.add("Access-Control-Allow-Headers", "Content-Type");
+
+      if (request.getMethod() == HttpMethod.OPTIONS) {
+        response.setStatusCode(HttpStatus.OK);
+        return Mono.empty();
+      }
+
+      return chain.filter(ctx);
+    };
   }
+
 }

+ 11 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/BasicAuthSecurityConfig.java

@@ -7,12 +7,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
-import org.springframework.security.config.web.server.SecurityWebFiltersOrder;
 import org.springframework.security.config.web.server.ServerHttpSecurity;
 import org.springframework.security.web.server.SecurityWebFilterChain;
 import org.springframework.security.web.server.authentication.RedirectServerAuthenticationSuccessHandler;
 import org.springframework.security.web.server.authentication.logout.RedirectServerLogoutSuccessHandler;
-import org.springframework.security.web.server.ui.LogoutPageGeneratingWebFilter;
 
 @Configuration
 @EnableWebFluxSecurity
@@ -33,15 +31,17 @@ public class BasicAuthSecurityConfig extends AbstractAuthSecurityConfig {
     final var logoutSuccessHandler = new RedirectServerLogoutSuccessHandler();
     logoutSuccessHandler.setLogoutSuccessUrl(URI.create(LOGOUT_URL));
 
-    return http
-        .addFilterAfter(new LogoutPageGeneratingWebFilter(), SecurityWebFiltersOrder.REACTOR_CONTEXT)
-        .csrf().disable()
-        .authorizeExchange()
-        .pathMatchers(AUTH_WHITELIST).permitAll()
-        .anyExchange().authenticated()
-        .and().formLogin().loginPage(LOGIN_URL).authenticationSuccessHandler(authHandler)
-        .and().logout().logoutSuccessHandler(logoutSuccessHandler)
-        .and().build();
+
+    return http.authorizeExchange(spec -> spec
+            .pathMatchers(AUTH_WHITELIST)
+            .permitAll()
+            .anyExchange()
+            .authenticated()
+        )
+        .formLogin(spec -> spec.loginPage(LOGIN_URL).authenticationSuccessHandler(authHandler))
+        .logout(spec -> spec.logoutSuccessHandler(logoutSuccessHandler))
+        .csrf(ServerHttpSecurity.CsrfSpec::disable)
+        .build();
   }
 
 }

+ 6 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/DisabledAuthSecurityConfig.java

@@ -27,10 +27,12 @@ public class DisabledAuthSecurityConfig extends AbstractAuthSecurityConfig {
       System.exit(1);
     }
     log.warn("Authentication is disabled. Access will be unrestricted.");
-    return http.authorizeExchange()
-        .anyExchange().permitAll()
-        .and()
-        .csrf().disable()
+
+    return http.authorizeExchange(spec -> spec
+            .anyExchange()
+            .permitAll()
+        )
+        .csrf(ServerHttpSecurity.CsrfSpec::disable)
         .build();
   }
 

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/LdapProperties.java

@@ -15,6 +15,8 @@ public class LdapProperties {
   private String userFilterSearchBase;
   private String userFilterSearchFilter;
   private String groupFilterSearchBase;
+  private String groupFilterSearchFilter;
+  private String groupRoleAttribute;
 
   @Value("${oauth2.ldap.activeDirectory:false}")
   private boolean isActiveDirectory;

+ 35 - 25
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/LdapSecurityConfig.java

@@ -3,14 +3,16 @@ package com.provectus.kafka.ui.config.auth;
 import static com.provectus.kafka.ui.config.auth.AbstractAuthSecurityConfig.AUTH_WHITELIST;
 
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
+import com.provectus.kafka.ui.service.rbac.extractor.RbacLdapAuthoritiesExtractor;
 import java.util.Collection;
 import java.util.List;
-import javax.annotation.Nullable;
+import java.util.Optional;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Import;
@@ -22,6 +24,7 @@ import org.springframework.security.authentication.AuthenticationManager;
 import org.springframework.security.authentication.ProviderManager;
 import org.springframework.security.authentication.ReactiveAuthenticationManager;
 import org.springframework.security.authentication.ReactiveAuthenticationManagerAdapter;
+import org.springframework.security.config.Customizer;
 import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
 import org.springframework.security.config.web.server.ServerHttpSecurity;
 import org.springframework.security.core.GrantedAuthority;
@@ -50,9 +53,9 @@ public class LdapSecurityConfig {
 
   @Bean
   public ReactiveAuthenticationManager authenticationManager(BaseLdapPathContextSource contextSource,
-                                                             LdapAuthoritiesPopulator ldapAuthoritiesPopulator,
-                                                             @Nullable AccessControlService acs) {
-    var rbacEnabled = acs != null && acs.isRbacEnabled();
+                                                             LdapAuthoritiesPopulator authoritiesExtractor,
+                                                             AccessControlService acs) {
+    var rbacEnabled = acs.isRbacEnabled();
     BindAuthenticator ba = new BindAuthenticator(contextSource);
     if (props.getBase() != null) {
       ba.setUserDnPatterns(new String[] {props.getBase()});
@@ -67,7 +70,7 @@ public class LdapSecurityConfig {
     AbstractLdapAuthenticationProvider authenticationProvider;
     if (!props.isActiveDirectory()) {
       authenticationProvider = rbacEnabled
-          ? new LdapAuthenticationProvider(ba, ldapAuthoritiesPopulator)
+          ? new LdapAuthenticationProvider(ba, authoritiesExtractor)
           : new LdapAuthenticationProvider(ba);
     } else {
       authenticationProvider = new ActiveDirectoryLdapAuthenticationProvider(props.getActiveDirectoryDomain(),
@@ -97,11 +100,24 @@ public class LdapSecurityConfig {
 
   @Bean
   @Primary
-  public LdapAuthoritiesPopulator ldapAuthoritiesPopulator(BaseLdapPathContextSource contextSource) {
-    var authoritiesPopulator = new DefaultLdapAuthoritiesPopulator(contextSource, props.getGroupFilterSearchBase());
-    authoritiesPopulator.setRolePrefix("");
-    authoritiesPopulator.setConvertToUpperCase(false);
-    return authoritiesPopulator;
+  public DefaultLdapAuthoritiesPopulator ldapAuthoritiesExtractor(ApplicationContext context,
+                                                                  BaseLdapPathContextSource contextSource,
+                                                                  AccessControlService acs) {
+    var rbacEnabled = acs != null && acs.isRbacEnabled();
+
+    DefaultLdapAuthoritiesPopulator extractor;
+
+    if (rbacEnabled) {
+      extractor = new RbacLdapAuthoritiesExtractor(context, contextSource, props.getGroupFilterSearchBase());
+    } else {
+      extractor = new DefaultLdapAuthoritiesPopulator(contextSource, props.getGroupFilterSearchBase());
+    }
+
+    Optional.ofNullable(props.getGroupFilterSearchFilter()).ifPresent(extractor::setGroupSearchFilter);
+    extractor.setRolePrefix("");
+    extractor.setConvertToUpperCase(false);
+    extractor.setSearchSubtree(true);
+    return extractor;
   }
 
   @Bean
@@ -111,21 +127,15 @@ public class LdapSecurityConfig {
       log.info("Active Directory support for LDAP has been enabled.");
     }
 
-    return http
-        .authorizeExchange()
-        .pathMatchers(AUTH_WHITELIST)
-        .permitAll()
-        .anyExchange()
-        .authenticated()
-
-        .and()
-        .formLogin()
-
-        .and()
-        .logout()
-
-        .and()
-        .csrf().disable()
+    return http.authorizeExchange(spec -> spec
+            .pathMatchers(AUTH_WHITELIST)
+            .permitAll()
+            .anyExchange()
+            .authenticated()
+        )
+        .formLogin(Customizer.withDefaults())
+        .logout(Customizer.withDefaults())
+        .csrf(ServerHttpSecurity.CsrfSpec::disable)
         .build();
   }
 

+ 12 - 17
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/OAuthSecurityConfig.java

@@ -12,10 +12,11 @@ import lombok.extern.log4j.Log4j2;
 import org.jetbrains.annotations.Nullable;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientProperties;
-import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientPropertiesRegistrationAdapter;
+import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientPropertiesMapper;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.security.config.Customizer;
 import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
 import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
 import org.springframework.security.config.web.server.ServerHttpSecurity;
@@ -49,21 +50,15 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
   public SecurityWebFilterChain configure(ServerHttpSecurity http, OAuthLogoutSuccessHandler logoutHandler) {
     log.info("Configuring OAUTH2 authentication.");
 
-    return http.authorizeExchange()
-        .pathMatchers(AUTH_WHITELIST)
-        .permitAll()
-        .anyExchange()
-        .authenticated()
-
-        .and()
-        .oauth2Login()
-
-        .and()
-        .logout()
-        .logoutSuccessHandler(logoutHandler)
-
-        .and()
-        .csrf().disable()
+    return http.authorizeExchange(spec -> spec
+            .pathMatchers(AUTH_WHITELIST)
+            .permitAll()
+            .anyExchange()
+            .authenticated()
+        )
+        .oauth2Login(Customizer.withDefaults())
+        .logout(spec -> spec.logoutSuccessHandler(logoutHandler))
+        .csrf(ServerHttpSecurity.CsrfSpec::disable)
         .build();
   }
 
@@ -103,7 +98,7 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
   public InMemoryReactiveClientRegistrationRepository clientRegistrationRepository() {
     final OAuth2ClientProperties props = OAuthPropertiesConverter.convertProperties(properties);
     final List<ClientRegistration> registrations =
-        new ArrayList<>(OAuth2ClientPropertiesRegistrationAdapter.getClientRegistrations(props).values());
+        new ArrayList<>(new OAuth2ClientPropertiesMapper(props).asClientRegistrations().values());
     return new InMemoryReactiveClientRegistrationRepository(registrations);
   }
 

+ 5 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -123,9 +123,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .operationName("deleteTopic")
         .build();
 
-    return accessControlService.validateAccess(context).then(
-        topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    return accessControlService.validateAccess(context)
+        .then(
+            topicsService.deleteTopic(getCluster(clusterName), topicName)
+                .thenReturn(ResponseEntity.ok().<Void>build())
+        ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
 

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/JsonAvroConversionException.java

@@ -0,0 +1,7 @@
+package com.provectus.kafka.ui.exception;
+
+public class JsonAvroConversionException extends ValidationException {
+  public JsonAvroConversionException(String message) {
+    super(message);
+  }
+}

+ 0 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/JsonToAvroConversionException.java

@@ -1,7 +0,0 @@
-package com.provectus.kafka.ui.exception;
-
-public class JsonToAvroConversionException extends ValidationException {
-  public JsonToAvroConversionException(String message) {
-    super(message);
-  }
-}

+ 7 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaSrMapper.java

@@ -3,18 +3,21 @@ package com.provectus.kafka.ui.mapper;
 import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
 import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
 import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
+import com.provectus.kafka.ui.model.SchemaReferenceDTO;
 import com.provectus.kafka.ui.model.SchemaSubjectDTO;
 import com.provectus.kafka.ui.model.SchemaTypeDTO;
 import com.provectus.kafka.ui.service.SchemaRegistryService;
 import com.provectus.kafka.ui.sr.model.Compatibility;
 import com.provectus.kafka.ui.sr.model.CompatibilityCheckResponse;
 import com.provectus.kafka.ui.sr.model.NewSubject;
+import com.provectus.kafka.ui.sr.model.SchemaReference;
 import com.provectus.kafka.ui.sr.model.SchemaType;
+import java.util.List;
 import java.util.Optional;
 import org.mapstruct.Mapper;
 
 
-@Mapper(componentModel = "spring")
+@Mapper
 public interface KafkaSrMapper {
 
   default SchemaSubjectDTO toDto(SchemaRegistryService.SubjectWithCompatibilityLevel s) {
@@ -24,9 +27,12 @@ public interface KafkaSrMapper {
         .subject(s.getSubject())
         .schema(s.getSchema())
         .schemaType(SchemaTypeDTO.fromValue(Optional.ofNullable(s.getSchemaType()).orElse(SchemaType.AVRO).getValue()))
+        .references(toDto(s.getReferences()))
         .compatibilityLevel(s.getCompatibility().toString());
   }
 
+  List<SchemaReferenceDTO> toDto(List<SchemaReference> references);
+
   CompatibilityCheckResponseDTO toDto(CompatibilityCheckResponse ccr);
 
   CompatibilityLevelDTO.CompatibilityEnum toDto(Compatibility compatibility);

+ 26 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

@@ -11,6 +11,8 @@ import com.provectus.kafka.ui.serde.api.PropertyResolver;
 import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde;
 import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
+import com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde;
+import com.provectus.kafka.ui.serdes.builtin.HexSerde;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
 import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
@@ -46,6 +48,7 @@ public class SerdesInitializer {
             .put(UInt64Serde.name(), UInt64Serde.class)
             .put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
             .put(Base64Serde.name(), Base64Serde.class)
+            .put(HexSerde.name(), HexSerde.class)
             .put(UuidBinarySerde.name(), UuidBinarySerde.class)
             .build(),
         new CustomSerdeLoader()
@@ -118,6 +121,8 @@ public class SerdesInitializer {
       }
     });
 
+    registerTopicRelatedSerde(registeredSerdes);
+
     return new ClusterSerdes(
         registeredSerdes,
         Optional.ofNullable(clusterProperties.getDefaultKeySerde())
@@ -132,6 +137,27 @@ public class SerdesInitializer {
     );
   }
 
+  /**
+   * Registers serdse that should only be used for specific (hard-coded) topics, like ConsumerOffsetsSerde.
+   */
+  private void registerTopicRelatedSerde(Map<String, SerdeInstance> serdes) {
+    registerConsumerOffsetsSerde(serdes);
+  }
+
+  private void registerConsumerOffsetsSerde(Map<String, SerdeInstance> serdes) {
+    var pattern = Pattern.compile(ConsumerOffsetsSerde.TOPIC);
+    serdes.put(
+        ConsumerOffsetsSerde.name(),
+        new SerdeInstance(
+            ConsumerOffsetsSerde.name(),
+            new ConsumerOffsetsSerde(),
+            pattern,
+            pattern,
+            null
+        )
+    );
+  }
+
   private SerdeInstance createFallbackSerde() {
     StringSerde serde = new StringSerde();
     serde.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty());

+ 0 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/AvroEmbeddedSerde.java

@@ -19,12 +19,6 @@ public class AvroEmbeddedSerde implements BuiltInSerde {
     return "Avro (Embedded)";
   }
 
-  @Override
-  public void configure(PropertyResolver serdeProperties,
-                        PropertyResolver kafkaClusterProperties,
-                        PropertyResolver globalProperties) {
-  }
-
   @Override
   public Optional<String> getDescription() {
     return Optional.empty();

+ 9 - 23
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Base64Serde.java

@@ -1,8 +1,6 @@
 package com.provectus.kafka.ui.serdes.builtin;
 
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
-import com.provectus.kafka.ui.serde.api.PropertyResolver;
-import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import java.util.Base64;
@@ -16,12 +14,6 @@ public class Base64Serde implements BuiltInSerde {
     return "Base64";
   }
 
-  @Override
-  public void configure(PropertyResolver serdeProperties,
-                        PropertyResolver kafkaClusterProperties,
-                        PropertyResolver globalProperties) {
-  }
-
   @Override
   public Optional<String> getDescription() {
     return Optional.empty();
@@ -44,31 +36,25 @@ public class Base64Serde implements BuiltInSerde {
 
   @Override
   public Serializer serializer(String topic, Target type) {
-    return new Serializer() {
-      @Override
-      public byte[] serialize(String input) {
-        input = input.trim();
-        // it is actually a hack to provide ability to sent empty array as a key/value
-        if (input.length() == 0) {
-          return new byte[]{};
-        }
-        return Base64.getDecoder().decode(input);
+    var decoder = Base64.getDecoder();
+    return inputString -> {
+      inputString = inputString.trim();
+      // it is actually a hack to provide ability to sent empty array as a key/value
+      if (inputString.length() == 0) {
+        return new byte[] {};
       }
+      return decoder.decode(inputString);
     };
   }
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
     var encoder = Base64.getEncoder();
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        return new DeserializeResult(
+    return (headers, data) ->
+        new DeserializeResult(
             encoder.encodeToString(data),
             DeserializeResult.Type.STRING,
             Map.of()
         );
-      }
-    };
   }
 }

+ 294 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java

@@ -0,0 +1,294 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.SchemaDescription;
+import com.provectus.kafka.ui.serdes.BuiltInSerde;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Optional;
+import lombok.SneakyThrows;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.BoundField;
+import org.apache.kafka.common.protocol.types.CompactArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+
+// Deserialization logic and message's schemas can be found in
+// kafka.coordinator.group.GroupMetadataManager (readMessageKey, readOffsetMessageValue, readGroupMessageValue)
+public class ConsumerOffsetsSerde implements BuiltInSerde {
+
+  private static final JsonMapper JSON_MAPPER = createMapper();
+
+  public static final String TOPIC = "__consumer_offsets";
+
+  public static String name() {
+    return "__consumer_offsets";
+  }
+
+  private static JsonMapper createMapper() {
+    var module = new SimpleModule();
+    module.addSerializer(Struct.class, new JsonSerializer<>() {
+      @Override
+      public void serialize(Struct value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
+        gen.writeStartObject();
+        for (BoundField field : value.schema().fields()) {
+          var fieldVal = value.get(field);
+          gen.writeObjectField(field.def.name, fieldVal);
+        }
+        gen.writeEndObject();
+      }
+    });
+    var mapper = new JsonMapper();
+    mapper.registerModule(module);
+    return mapper;
+  }
+
+  @Override
+  public Optional<String> getDescription() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<SchemaDescription> getSchema(String topic, Target type) {
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean canDeserialize(String topic, Target type) {
+    return topic.equals(TOPIC);
+  }
+
+  @Override
+  public boolean canSerialize(String topic, Target type) {
+    return false;
+  }
+
+  @Override
+  public Serializer serializer(String topic, Target type) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Deserializer deserializer(String topic, Target type) {
+    return switch (type) {
+      case KEY -> keyDeserializer();
+      case VALUE -> valueDeserializer();
+    };
+  }
+
+  private Deserializer keyDeserializer() {
+    final Schema commitKeySchema = new Schema(
+        new Field("group", Type.STRING, ""),
+        new Field("topic", Type.STRING, ""),
+        new Field("partition", Type.INT32, "")
+    );
+
+    final Schema groupMetadataSchema = new Schema(
+        new Field("group", Type.STRING, "")
+    );
+
+    return (headers, data) -> {
+      var bb = ByteBuffer.wrap(data);
+      short version = bb.getShort();
+      return new DeserializeResult(
+          toJson(
+              switch (version) {
+                case 0, 1 -> commitKeySchema.read(bb);
+                case 2 -> groupMetadataSchema.read(bb);
+                default -> throw new IllegalStateException("Unknown group metadata message version: " + version);
+              }
+          ),
+          DeserializeResult.Type.JSON,
+          Map.of()
+      );
+    };
+  }
+
+  private Deserializer valueDeserializer() {
+    final Schema commitOffsetSchemaV0 =
+        new Schema(
+            new Field("offset", Type.INT64, ""),
+            new Field("metadata", Type.STRING, ""),
+            new Field("commit_timestamp", Type.INT64, "")
+        );
+
+    final Schema commitOffsetSchemaV1 =
+        new Schema(
+            new Field("offset", Type.INT64, ""),
+            new Field("metadata", Type.STRING, ""),
+            new Field("commit_timestamp", Type.INT64, ""),
+            new Field("expire_timestamp", Type.INT64, "")
+        );
+
+    final Schema commitOffsetSchemaV2 =
+        new Schema(
+            new Field("offset", Type.INT64, ""),
+            new Field("metadata", Type.STRING, ""),
+            new Field("commit_timestamp", Type.INT64, "")
+        );
+
+    final Schema commitOffsetSchemaV3 =
+        new Schema(
+            new Field("offset", Type.INT64, ""),
+            new Field("leader_epoch", Type.INT32, ""),
+            new Field("metadata", Type.STRING, ""),
+            new Field("commit_timestamp", Type.INT64, "")
+        );
+
+    final Schema commitOffsetSchemaV4 = new Schema(
+        new Field("offset", Type.INT64, ""),
+        new Field("leader_epoch", Type.INT32, ""),
+        new Field("metadata", Type.COMPACT_STRING, ""),
+        new Field("commit_timestamp", Type.INT64, ""),
+        Field.TaggedFieldsSection.of()
+    );
+
+    final Schema metadataSchema0 =
+        new Schema(
+            new Field("protocol_type", Type.STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.NULLABLE_STRING, ""),
+            new Field("leader", Type.NULLABLE_STRING, ""),
+            new Field("members", new ArrayOf(new Schema(
+                new Field("member_id", Type.STRING, ""),
+                new Field("client_id", Type.STRING, ""),
+                new Field("client_host", Type.STRING, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.BYTES, ""),
+                new Field("assignment", Type.BYTES, "")
+            )), "")
+        );
+
+    final Schema metadataSchema1 =
+        new Schema(
+            new Field("protocol_type", Type.STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.NULLABLE_STRING, ""),
+            new Field("leader", Type.NULLABLE_STRING, ""),
+            new Field("members", new ArrayOf(new Schema(
+                new Field("member_id", Type.STRING, ""),
+                new Field("client_id", Type.STRING, ""),
+                new Field("client_host", Type.STRING, ""),
+                new Field("rebalance_timeout", Type.INT32, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.BYTES, ""),
+                new Field("assignment", Type.BYTES, "")
+            )), "")
+        );
+
+    final Schema metadataSchema2 =
+        new Schema(
+            new Field("protocol_type", Type.STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.NULLABLE_STRING, ""),
+            new Field("leader", Type.NULLABLE_STRING, ""),
+            new Field("current_state_timestamp", Type.INT64, ""),
+            new Field("members", new ArrayOf(new Schema(
+                new Field("member_id", Type.STRING, ""),
+                new Field("client_id", Type.STRING, ""),
+                new Field("client_host", Type.STRING, ""),
+                new Field("rebalance_timeout", Type.INT32, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.BYTES, ""),
+                new Field("assignment", Type.BYTES, "")
+            )), "")
+        );
+
+    final Schema metadataSchema3 =
+        new Schema(
+            new Field("protocol_type", Type.STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.NULLABLE_STRING, ""),
+            new Field("leader", Type.NULLABLE_STRING, ""),
+            new Field("current_state_timestamp", Type.INT64, ""),
+            new Field("members", new ArrayOf(new Schema(
+                new Field("member_id", Type.STRING, ""),
+                new Field("group_instance_id", Type.NULLABLE_STRING, ""),
+                new Field("client_id", Type.STRING, ""),
+                new Field("client_host", Type.STRING, ""),
+                new Field("rebalance_timeout", Type.INT32, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.BYTES, ""),
+                new Field("assignment", Type.BYTES, "")
+            )), "")
+        );
+
+    final Schema metadataSchema4 =
+        new Schema(
+            new Field("protocol_type", Type.COMPACT_STRING, ""),
+            new Field("generation", Type.INT32, ""),
+            new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""),
+            new Field("leader", Type.COMPACT_NULLABLE_STRING, ""),
+            new Field("current_state_timestamp", Type.INT64, ""),
+            new Field("members", new CompactArrayOf(new Schema(
+                new Field("member_id", Type.COMPACT_STRING, ""),
+                new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""),
+                new Field("client_id", Type.COMPACT_STRING, ""),
+                new Field("client_host", Type.COMPACT_STRING, ""),
+                new Field("rebalance_timeout", Type.INT32, ""),
+                new Field("session_timeout", Type.INT32, ""),
+                new Field("subscription", Type.COMPACT_BYTES, ""),
+                new Field("assignment", Type.COMPACT_BYTES, ""),
+                Field.TaggedFieldsSection.of()
+            )), ""),
+            Field.TaggedFieldsSection.of()
+        );
+
+    return (headers, data) -> {
+      String result;
+      var bb = ByteBuffer.wrap(data);
+      short version = bb.getShort();
+      // ideally, we should distinguish if value is commit or metadata
+      // by checking record's key, but our current serde structure doesn't allow that.
+      // so, we trying to parse into metadata first and after into commit msg
+      try {
+        result = toJson(
+            switch (version) {
+              case 0 -> metadataSchema0.read(bb);
+              case 1 -> metadataSchema1.read(bb);
+              case 2 -> metadataSchema2.read(bb);
+              case 3 -> metadataSchema3.read(bb);
+              case 4 -> metadataSchema4.read(bb);
+              default -> throw new IllegalArgumentException("Unrecognized version: " + version);
+            }
+        );
+      } catch (Throwable e) {
+        bb = bb.rewind();
+        bb.getShort(); // skipping version
+        result = toJson(
+            switch (version) {
+              case 0 -> commitOffsetSchemaV0.read(bb);
+              case 1 -> commitOffsetSchemaV1.read(bb);
+              case 2 -> commitOffsetSchemaV2.read(bb);
+              case 3 -> commitOffsetSchemaV3.read(bb);
+              case 4 -> commitOffsetSchemaV4.read(bb);
+              default -> throw new IllegalArgumentException("Unrecognized version: " + version);
+            }
+        );
+      }
+
+      if (bb.remaining() != 0) {
+        throw new IllegalArgumentException(
+            "Message buffer is not read to the end, which is likely means message is unrecognized");
+      }
+      return new DeserializeResult(
+          result,
+          DeserializeResult.Type.JSON,
+          Map.of()
+      );
+    };
+  }
+
+  @SneakyThrows
+  private String toJson(Struct s) {
+    return JSON_MAPPER.writeValueAsString(s);
+  }
+}

+ 80 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/HexSerde.java

@@ -0,0 +1,80 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.PropertyResolver;
+import com.provectus.kafka.ui.serde.api.SchemaDescription;
+import com.provectus.kafka.ui.serdes.BuiltInSerde;
+import java.util.HexFormat;
+import java.util.Map;
+import java.util.Optional;
+
+public class HexSerde implements BuiltInSerde {
+
+  private HexFormat deserializeHexFormat;
+
+  public static String name() {
+    return "Hex";
+  }
+
+  @Override
+  public void configure(PropertyResolver serdeProperties,
+                        PropertyResolver kafkaClusterProperties,
+                        PropertyResolver globalProperties) {
+    String delim = serdeProperties.getProperty("delimiter", String.class).orElse(" ");
+    boolean uppercase = serdeProperties.getProperty("uppercase", Boolean.class).orElse(true);
+    deserializeHexFormat = HexFormat.ofDelimiter(delim);
+    if (uppercase) {
+      deserializeHexFormat = deserializeHexFormat.withUpperCase();
+    }
+  }
+
+  @Override
+  public Optional<String> getDescription() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<SchemaDescription> getSchema(String topic, Target type) {
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean canDeserialize(String topic, Target type) {
+    return true;
+  }
+
+  @Override
+  public boolean canSerialize(String topic, Target type) {
+    return true;
+  }
+
+  @Override
+  public Serializer serializer(String topic, Target type) {
+    return input -> {
+      input = input.trim();
+      // it is a hack to provide ability to sent empty array as a key/value
+      if (input.length() == 0) {
+        return new byte[] {};
+      }
+      return HexFormat.of().parseHex(prepareInputForParse(input));
+    };
+  }
+
+  // removing most-common delimiters and prefixes
+  private static String prepareInputForParse(String input) {
+    return input
+        .replaceAll(" ", "")
+        .replaceAll("#", "")
+        .replaceAll(":", "");
+  }
+
+  @Override
+  public Deserializer deserializer(String topic, Target type) {
+    return (headers, data) ->
+        new DeserializeResult(
+            deserializeHexFormat.formatHex(data),
+            DeserializeResult.Type.STRING,
+            Map.of()
+        );
+  }
+}

+ 2 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Int64Serde.java

@@ -55,15 +55,11 @@ public class Int64Serde implements BuiltInSerde {
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        return new DeserializeResult(
+    return (headers, data) ->
+        new DeserializeResult(
             String.valueOf(Longs.fromByteArray(data)),
             DeserializeResult.Type.JSON,
             Map.of()
         );
-      }
-    };
   }
 }

+ 3 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UInt64Serde.java

@@ -1,10 +1,8 @@
 package com.provectus.kafka.ui.serdes.builtin;
 
 import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedInteger;
 import com.google.common.primitives.UnsignedLong;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
-import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import java.util.Map;
@@ -32,7 +30,7 @@ public class UInt64Serde implements BuiltInSerde {
                     + "  \"minimum\" : 0, "
                     + "  \"maximum\" : %s "
                     + "}",
-                UnsignedInteger.MAX_VALUE
+                UnsignedLong.MAX_VALUE
             ),
             Map.of()
         )
@@ -56,15 +54,11 @@ public class UInt64Serde implements BuiltInSerde {
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        return new DeserializeResult(
+    return (headers, data) ->
+        new DeserializeResult(
             UnsignedLong.fromLongBits(Longs.fromByteArray(data)).toString(),
             DeserializeResult.Type.JSON,
             Map.of()
         );
-      }
-    };
   }
 }

+ 22 - 28
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UuidBinarySerde.java

@@ -50,41 +50,35 @@ public class UuidBinarySerde implements BuiltInSerde {
 
   @Override
   public Serializer serializer(String topic, Target type) {
-    return new Serializer() {
-      @Override
-      public byte[] serialize(String input) {
-        UUID uuid = UUID.fromString(input);
-        ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
-        if (mostSignificantBitsFirst) {
-          bb.putLong(uuid.getMostSignificantBits());
-          bb.putLong(uuid.getLeastSignificantBits());
-        } else {
-          bb.putLong(uuid.getLeastSignificantBits());
-          bb.putLong(uuid.getMostSignificantBits());
-        }
-        return bb.array();
+    return input -> {
+      UUID uuid = UUID.fromString(input);
+      ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
+      if (mostSignificantBitsFirst) {
+        bb.putLong(uuid.getMostSignificantBits());
+        bb.putLong(uuid.getLeastSignificantBits());
+      } else {
+        bb.putLong(uuid.getLeastSignificantBits());
+        bb.putLong(uuid.getMostSignificantBits());
       }
+      return bb.array();
     };
   }
 
   @Override
   public Deserializer deserializer(String topic, Target type) {
-    return new Deserializer() {
-      @Override
-      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
-        if (data.length != 16) {
-          throw new ValidationException("UUID data should be 16 bytes, but it is " + data.length);
-        }
-        ByteBuffer bb = ByteBuffer.wrap(data);
-        long msb = bb.getLong();
-        long lsb = bb.getLong();
-        UUID uuid = mostSignificantBitsFirst ? new UUID(msb, lsb) : new UUID(lsb, msb);
-        return new DeserializeResult(
-            uuid.toString(),
-            DeserializeResult.Type.STRING,
-            Map.of()
-        );
+    return (headers, data) -> {
+      if (data.length != 16) {
+        throw new ValidationException("UUID data should be 16 bytes, but it is " + data.length);
       }
+      ByteBuffer bb = ByteBuffer.wrap(data);
+      long msb = bb.getLong();
+      long lsb = bb.getLong();
+      UUID uuid = mostSignificantBitsFirst ? new UUID(msb, lsb) : new UUID(lsb, msb);
+      return new DeserializeResult(
+          uuid.toString(),
+          DeserializeResult.Type.STRING,
+          Map.of()
+      );
     };
   }
 }

+ 4 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java

@@ -20,6 +20,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
 import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
@@ -217,7 +218,9 @@ public class SchemaRegistrySerde implements BuiltInSerde {
       case AVRO -> new AvroJsonSchemaConverter()
           .convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
           .toJson();
-      case JSON -> schema.getSchema();
+      case JSON ->
+        //need to use confluent JsonSchema since it includes resolved references
+        ((JsonSchema) parsedSchema).rawSchema().toString();
     };
   }
 

+ 3 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java

@@ -14,8 +14,7 @@ import com.provectus.kafka.ui.sr.model.CompatibilityLevelChange;
 import com.provectus.kafka.ui.sr.model.NewSubject;
 import com.provectus.kafka.ui.sr.model.SchemaSubject;
 import com.provectus.kafka.ui.util.ReactiveFailover;
-import com.provectus.kafka.ui.util.WebClientConfigurator;
-import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.List;
 import java.util.stream.Collectors;
 import lombok.AllArgsConstructor;
@@ -92,7 +91,7 @@ public class SchemaRegistryService {
   private Mono<SubjectWithCompatibilityLevel> getSchemaSubject(KafkaCluster cluster, String schemaName,
                                                                String version) {
     return api(cluster)
-        .mono(c -> c.getSubjectVersion(schemaName, version))
+        .mono(c -> c.getSubjectVersion(schemaName, version, false))
         .zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
         .map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2()))
         .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException()));
@@ -126,7 +125,7 @@ public class SchemaRegistryService {
         .onErrorMap(WebClientResponseException.Conflict.class,
             th -> new SchemaCompatibilityException())
         .onErrorMap(WebClientResponseException.UnprocessableEntity.class,
-            th -> new ValidationException("Invalid schema"))
+            th -> new ValidationException("Invalid schema. Error from registry: " + th.getResponseBodyAsString()))
         .then(getLatestSchemaVersionBySubject(cluster, subject));
   }
 

+ 3 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/OddExporter.java

@@ -5,13 +5,10 @@ import com.google.common.base.Preconditions;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.KafkaConnectService;
 import com.provectus.kafka.ui.service.StatisticsCache;
-import java.util.List;
 import java.util.function.Predicate;
 import java.util.regex.Pattern;
-import lombok.SneakyThrows;
 import org.opendatadiscovery.client.ApiClient;
 import org.opendatadiscovery.client.api.OpenDataDiscoveryIngestionApi;
-import org.opendatadiscovery.client.model.DataEntity;
 import org.opendatadiscovery.client.model.DataEntityList;
 import org.opendatadiscovery.client.model.DataSource;
 import org.opendatadiscovery.client.model.DataSourceList;
@@ -68,14 +65,14 @@ class OddExporter {
   private Mono<Void> exportTopics(KafkaCluster c) {
     return createKafkaDataSource(c)
         .thenMany(topicsExporter.export(c))
-        .concatMap(this::sentDataEntities)
+        .concatMap(this::sendDataEntities)
         .then();
   }
 
   private Mono<Void> exportKafkaConnects(KafkaCluster cluster) {
     return createConnectDataSources(cluster)
         .thenMany(connectorsExporter.export(cluster))
-        .concatMap(this::sentDataEntities)
+        .concatMap(this::sendDataEntities)
         .then();
   }
 
@@ -99,7 +96,7 @@ class OddExporter {
     );
   }
 
-  private Mono<Void> sentDataEntities(DataEntityList dataEntityList) {
+  private Mono<Void> sendDataEntities(DataEntityList dataEntityList) {
     return oddApi.postDataEntityList(dataEntityList);
   }
 

+ 55 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java

@@ -0,0 +1,55 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
+import com.provectus.kafka.ui.sr.model.SchemaReference;
+import java.util.List;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import reactor.core.publisher.Mono;
+
+// logic copied from AbstractSchemaProvider:resolveReferences
+// https://github.com/confluentinc/schema-registry/blob/fd59613e2c5adf62e36705307f420712e4c8c1ea/client/src/main/java/io/confluent/kafka/schemaregistry/AbstractSchemaProvider.java#L54
+class SchemaReferencesResolver {
+
+  private final KafkaSrClientApi client;
+
+  SchemaReferencesResolver(KafkaSrClientApi client) {
+    this.client = client;
+  }
+
+  Mono<ImmutableMap<String, String>> resolve(List<SchemaReference> refs) {
+    return resolveReferences(refs, new Resolving(ImmutableMap.of(), ImmutableSet.of()))
+        .map(Resolving::resolved);
+  }
+
+  private record Resolving(ImmutableMap<String, String> resolved, ImmutableSet<String> visited) {
+
+    Resolving visit(String name) {
+      return new Resolving(resolved, ImmutableSet.<String>builder().addAll(visited).add(name).build());
+    }
+
+    Resolving resolve(String ref, String schema) {
+      return new Resolving(ImmutableMap.<String, String>builder().putAll(resolved).put(ref, schema).build(), visited);
+    }
+  }
+
+  private Mono<Resolving> resolveReferences(@Nullable List<SchemaReference> refs, Resolving initState) {
+    Mono<Resolving> result = Mono.just(initState);
+    for (SchemaReference reference : Optional.ofNullable(refs).orElse(List.of())) {
+      result = result.flatMap(state -> {
+        if (state.visited().contains(reference.getName())) {
+          return Mono.just(state);
+        } else {
+          final var newState = state.visit(reference.getName());
+          return client.getSubjectVersion(reference.getSubject(), String.valueOf(reference.getVersion()), true)
+              .flatMap(subj ->
+                  resolveReferences(subj.getReferences(), newState)
+                      .map(withNewRefs -> withNewRefs.resolve(reference.getName(), subj.getSchema())));
+        }
+      });
+    }
+    return result;
+  }
+}

+ 19 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java

@@ -5,6 +5,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.Statistics;
 import com.provectus.kafka.ui.service.StatisticsCache;
 import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors;
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
@@ -24,6 +25,8 @@ import org.opendatadiscovery.oddrn.model.KafkaPath;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
 
 @Slf4j
 @RequiredArgsConstructor
@@ -38,6 +41,8 @@ class TopicsExporter {
     return Flux.fromIterable(stats.getTopicDescriptions().keySet())
         .filter(topicFilter)
         .flatMap(topic -> createTopicDataEntity(cluster, topic, stats))
+        .onErrorContinue(
+            (th, topic) -> log.warn("Error exporting data for topic {}, cluster {}", topic, cluster.getName(), th))
         .buffer(100)
         .map(topicsEntities ->
             new DataEntityList()
@@ -89,23 +94,29 @@ class TopicsExporter {
         .build();
   }
 
+  //returns empty list if schemaRegistry is not configured or assumed subject not found
   private Mono<List<DataSetField>> getTopicSchema(KafkaCluster cluster,
                                                   String topic,
                                                   KafkaPath topicOddrn,
-                                                  //currently we only retrieve value schema
                                                   boolean isKey) {
     if (cluster.getSchemaRegistryClient() == null) {
       return Mono.just(List.of());
     }
     String subject = topic + (isKey ? "-key" : "-value");
-    return cluster.getSchemaRegistryClient()
-        .mono(client -> client.getSubjectVersion(subject, "latest"))
-        .map(subj -> DataSetFieldsExtractors.extract(subj, topicOddrn, isKey))
+    return getSubjWithResolvedRefs(cluster, subject)
+        .map(t -> DataSetFieldsExtractors.extract(t.getT1(), t.getT2(), topicOddrn, isKey))
         .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of()))
-        .onErrorResume(th -> true, th -> {
-          log.warn("Error retrieving subject {} for cluster {}", subject, cluster.getName(), th);
-          return Mono.just(List.of());
-        });
+        .onErrorMap(WebClientResponseException.class, err ->
+            new IllegalStateException("Error retrieving subject %s".formatted(subject), err));
+  }
+
+  private Mono<Tuple2<SchemaSubject, Map<String, String>>> getSubjWithResolvedRefs(KafkaCluster cluster,
+                                                                                   String subjectName) {
+    return cluster.getSchemaRegistryClient()
+        .mono(client ->
+            client.getSubjectVersion(subjectName, "latest", false)
+                .flatMap(subj -> new SchemaReferencesResolver(client).resolve(subj.getReferences())
+                    .map(resolvedRefs -> Tuples.of(subj, resolvedRefs))));
   }
 
 }

+ 3 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java

@@ -1,7 +1,7 @@
 package com.provectus.kafka.ui.service.integration.odd.schema;
 
 import com.google.common.collect.ImmutableSet;
-import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.avro.Schema;
@@ -14,8 +14,8 @@ final class AvroExtractor {
   private AvroExtractor() {
   }
 
-  static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
-    var schema = new Schema.Parser().parse(subject.getSchema());
+  static List<DataSetField> extract(AvroSchema avroSchema, KafkaPath topicOddrn, boolean isKey) {
+    var schema = avroSchema.rawSchema();
     List<DataSetField> result = new ArrayList<>();
     result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
     extract(

+ 14 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java

@@ -2,7 +2,11 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
 
 import com.provectus.kafka.ui.sr.model.SchemaSubject;
 import com.provectus.kafka.ui.sr.model.SchemaType;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import org.opendatadiscovery.client.model.DataSetField;
 import org.opendatadiscovery.client.model.DataSetFieldType;
@@ -10,12 +14,18 @@ import org.opendatadiscovery.oddrn.model.KafkaPath;
 
 public final class DataSetFieldsExtractors {
 
-  public static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
+  public static List<DataSetField> extract(SchemaSubject subject,
+                                           Map<String, String> resolvedRefs,
+                                           KafkaPath topicOddrn,
+                                           boolean isKey) {
     SchemaType schemaType = Optional.ofNullable(subject.getSchemaType()).orElse(SchemaType.AVRO);
     return switch (schemaType) {
-      case AVRO -> AvroExtractor.extract(subject, topicOddrn, isKey);
-      case JSON -> JsonSchemaExtractor.extract(subject, topicOddrn, isKey);
-      case PROTOBUF -> ProtoExtractor.extract(subject, topicOddrn, isKey);
+      case AVRO -> AvroExtractor.extract(
+          new AvroSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
+      case JSON -> JsonSchemaExtractor.extract(
+          new JsonSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
+      case PROTOBUF -> ProtoExtractor.extract(
+          new ProtobufSchema(subject.getSchema(), List.of(), resolvedRefs, null, null), topicOddrn, isKey);
     };
   }
 

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java

@@ -30,8 +30,8 @@ final class JsonSchemaExtractor {
   private JsonSchemaExtractor() {
   }
 
-  static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
-    Schema schema = new JsonSchema(subject.getSchema()).rawSchema();
+  static List<DataSetField> extract(JsonSchema jsonSchema, KafkaPath topicOddrn, boolean isKey) {
+    Schema schema = jsonSchema.rawSchema();
     List<DataSetField> result = new ArrayList<>();
     result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
     extract(

+ 2 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java

@@ -15,7 +15,6 @@ import com.google.protobuf.Timestamp;
 import com.google.protobuf.UInt32Value;
 import com.google.protobuf.UInt64Value;
 import com.google.protobuf.Value;
-import com.provectus.kafka.ui.sr.model.SchemaSubject;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import java.util.ArrayList;
 import java.util.List;
@@ -42,8 +41,8 @@ final class ProtoExtractor {
   private ProtoExtractor() {
   }
 
-  static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
-    Descriptor schema = new ProtobufSchema(subject.getSchema()).toDescriptor();
+  static List<DataSetField> extract(ProtobufSchema protobufSchema, KafkaPath topicOddrn, boolean isKey) {
+    Descriptor schema = protobufSchema.toDescriptor();
     List<DataSetField> result = new ArrayList<>();
     result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
     var rootOddrn = topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value");

+ 78 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/extractor/RbacLdapAuthoritiesExtractor.java

@@ -0,0 +1,78 @@
+package com.provectus.kafka.ui.service.rbac.extractor;
+
+import com.provectus.kafka.ui.config.auth.LdapProperties;
+import com.provectus.kafka.ui.model.rbac.Role;
+import com.provectus.kafka.ui.model.rbac.provider.Provider;
+import com.provectus.kafka.ui.service.rbac.AccessControlService;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
+import org.springframework.ldap.core.DirContextOperations;
+import org.springframework.ldap.core.support.BaseLdapPathContextSource;
+import org.springframework.security.core.GrantedAuthority;
+import org.springframework.security.core.authority.SimpleGrantedAuthority;
+import org.springframework.security.ldap.userdetails.DefaultLdapAuthoritiesPopulator;
+import org.springframework.util.Assert;
+
+@Slf4j
+public class RbacLdapAuthoritiesExtractor extends DefaultLdapAuthoritiesPopulator {
+
+  private final AccessControlService acs;
+  private final LdapProperties props;
+
+  public RbacLdapAuthoritiesExtractor(ApplicationContext context,
+                                      BaseLdapPathContextSource contextSource, String groupFilterSearchBase) {
+    super(contextSource, groupFilterSearchBase);
+    this.acs = context.getBean(AccessControlService.class);
+    this.props = context.getBean(LdapProperties.class);
+  }
+
+  @Override
+  protected Set<GrantedAuthority> getAdditionalRoles(DirContextOperations user, String username) {
+    var ldapGroups = getRoles(user.getNameInNamespace(), username);
+
+    return acs.getRoles()
+        .stream()
+        .filter(r -> r.getSubjects()
+            .stream()
+            .filter(subject -> subject.getProvider().equals(Provider.LDAP))
+            .filter(subject -> subject.getType().equals("group"))
+            .anyMatch(subject -> ldapGroups.contains(subject.getValue()))
+        )
+        .map(Role::getName)
+        .peek(role -> log.trace("Mapped role [{}] for user [{}]", role, username))
+        .map(SimpleGrantedAuthority::new)
+        .collect(Collectors.toSet());
+  }
+
+  private Set<String> getRoles(String userDn, String username) {
+    var groupSearchBase = props.getGroupFilterSearchBase();
+    Assert.notNull(groupSearchBase, "groupSearchBase is empty");
+
+    var groupRoleAttribute = props.getGroupRoleAttribute();
+    if (groupRoleAttribute == null) {
+
+      groupRoleAttribute = "cn";
+    }
+
+    log.trace(
+        "Searching for roles for user [{}] with DN [{}], groupRoleAttribute [{}] and filter [{}] in search base [{}]",
+        username, userDn, groupRoleAttribute, getGroupSearchFilter(), groupSearchBase);
+
+    var ldapTemplate = getLdapTemplate();
+    ldapTemplate.setIgnoreNameNotFoundException(true);
+
+    Set<Map<String, List<String>>> userRoles = ldapTemplate.searchForMultipleAttributeValues(
+        groupSearchBase, getGroupSearchFilter(), new String[] {userDn, username},
+        new String[] {groupRoleAttribute});
+
+    return userRoles.stream()
+        .map(record -> record.get(getGroupRoleAttribute()).get(0))
+        .peek(group -> log.trace("Found LDAP group [{}] for user [{}]", group, username))
+        .collect(Collectors.toSet());
+  }
+
+}

+ 5 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java

@@ -28,6 +28,9 @@ import reactor.netty.http.client.HttpClient;
 public class WebClientConfigurator {
 
   private final WebClient.Builder builder = WebClient.builder();
+  private HttpClient httpClient = HttpClient
+      .create()
+      .proxyWithSystemProperties();
 
   public WebClientConfigurator() {
     configureObjectMapper(defaultOM());
@@ -90,12 +93,7 @@ public class WebClientConfigurator {
     // Create webclient
     SslContext context = contextBuilder.build();
 
-    var httpClient = HttpClient
-        .create()
-        .secure(t -> t.sslContext(context))
-        .proxyWithSystemProperties();
-
-    builder.clientConnector(new ReactorClientHttpConnector(httpClient));
+    httpClient = httpClient.secure(t -> t.sslContext(context));
     return this;
   }
 
@@ -131,6 +129,6 @@ public class WebClientConfigurator {
   }
 
   public WebClient build() {
-    return builder.build();
+    return builder.clientConnector(new ReactorClientHttpConnector(httpClient)).build();
   }
 }

+ 11 - 25
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java

@@ -151,30 +151,16 @@ public class AvroJsonSchemaConverter implements JsonSchemaConverter<Schema> {
   }
 
   private JsonType convertType(Schema schema) {
-    switch (schema.getType()) {
-      case INT:
-      case LONG:
-        return new SimpleJsonType(JsonType.Type.INTEGER);
-      case MAP:
-      case RECORD:
-        return new SimpleJsonType(JsonType.Type.OBJECT);
-      case ENUM:
-        return new EnumJsonType(schema.getEnumSymbols());
-      case BYTES:
-      case STRING:
-        return new SimpleJsonType(JsonType.Type.STRING);
-      case NULL:
-        return new SimpleJsonType(JsonType.Type.NULL);
-      case ARRAY:
-        return new SimpleJsonType(JsonType.Type.ARRAY);
-      case FIXED:
-      case FLOAT:
-      case DOUBLE:
-        return new SimpleJsonType(JsonType.Type.NUMBER);
-      case BOOLEAN:
-        return new SimpleJsonType(JsonType.Type.BOOLEAN);
-      default:
-        return new SimpleJsonType(JsonType.Type.STRING);
-    }
+    return switch (schema.getType()) {
+      case INT, LONG -> new SimpleJsonType(JsonType.Type.INTEGER);
+      case MAP, RECORD -> new SimpleJsonType(JsonType.Type.OBJECT);
+      case ENUM -> new EnumJsonType(schema.getEnumSymbols());
+      case BYTES, STRING -> new SimpleJsonType(JsonType.Type.STRING);
+      case NULL -> new SimpleJsonType(JsonType.Type.NULL);
+      case ARRAY -> new SimpleJsonType(JsonType.Type.ARRAY);
+      case FIXED, FLOAT, DOUBLE -> new SimpleJsonType(JsonType.Type.NUMBER);
+      case BOOLEAN -> new SimpleJsonType(JsonType.Type.BOOLEAN);
+      default -> new SimpleJsonType(JsonType.Type.STRING);
+    };
   }
 }

+ 61 - 22
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversion.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.util.jsonschema;
 
 import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.json.JsonMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -15,7 +16,7 @@ import com.fasterxml.jackson.databind.node.NullNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.TextNode;
 import com.google.common.collect.Lists;
-import com.provectus.kafka.ui.exception.JsonToAvroConversionException;
+import com.provectus.kafka.ui.exception.JsonAvroConversionException;
 import io.confluent.kafka.serializers.AvroData;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -34,7 +35,6 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.stream.Stream;
-import lombok.SneakyThrows;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 
@@ -42,12 +42,17 @@ import org.apache.avro.generic.GenericData;
 public class JsonAvroConversion {
 
   private static final JsonMapper MAPPER = new JsonMapper();
+  private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
 
   // converts json into Object that is expected input for KafkaAvroSerializer
   // (with AVRO_USE_LOGICAL_TYPE_CONVERTERS flat enabled!)
-  @SneakyThrows
   public static Object convertJsonToAvro(String jsonString, Schema avroSchema) {
-    JsonNode rootNode = MAPPER.readTree(jsonString);
+    JsonNode rootNode = null;
+    try {
+      rootNode = MAPPER.readTree(jsonString);
+    } catch (JsonProcessingException e) {
+      throw new JsonAvroConversionException("String is not a valid json");
+    }
     return convert(rootNode, avroSchema);
   }
 
@@ -80,7 +85,7 @@ public class JsonAvroConversion {
         assertJsonType(node, JsonNodeType.STRING);
         String symbol = node.textValue();
         if (!avroSchema.getEnumSymbols().contains(symbol)) {
-          throw new JsonToAvroConversionException("%s is not a part of enum symbols [%s]"
+          throw new JsonAvroConversionException("%s is not a part of enum symbols [%s]"
               .formatted(symbol, avroSchema.getEnumSymbols()));
         }
         yield new GenericData.EnumSymbol(avroSchema, symbol);
@@ -88,23 +93,35 @@ public class JsonAvroConversion {
       case UNION -> {
         // for types from enum (other than null) payload should be an object with single key == name of type
         // ex: schema = [ "null", "int", "string" ], possible payloads = null, { "string": "str" },  { "int": 123 }
-        if (node.isNull() && avroSchema.getTypes().contains(Schema.create(Schema.Type.NULL))) {
+        if (node.isNull() && avroSchema.getTypes().contains(NULL_SCHEMA)) {
           yield null;
         }
 
         assertJsonType(node, JsonNodeType.OBJECT);
         var elements = Lists.newArrayList(node.fields());
         if (elements.size() != 1) {
-          throw new JsonToAvroConversionException(
+          throw new JsonAvroConversionException(
               "UNION field value should be an object with single field == type name");
         }
-        var typeNameToValue = elements.get(0);
+        Map.Entry<String, JsonNode> typeNameToValue = elements.get(0);
+        List<Schema> candidates = new ArrayList<>();
         for (Schema unionType : avroSchema.getTypes()) {
           if (typeNameToValue.getKey().equals(unionType.getFullName())) {
             yield convert(typeNameToValue.getValue(), unionType);
           }
+          if (typeNameToValue.getKey().equals(unionType.getName())) {
+            candidates.add(unionType);
+          }
+        }
+        if (candidates.size() == 1) {
+          yield convert(typeNameToValue.getValue(), candidates.get(0));
         }
-        throw new JsonToAvroConversionException(
+        if (candidates.size() > 1) {
+          throw new JsonAvroConversionException(
+              "Can't select type within union for value '%s'. Provide full type name.".formatted(node)
+          );
+        }
+        throw new JsonAvroConversionException(
             "json value '%s' is cannot be converted to any of union types [%s]"
                 .formatted(node, avroSchema.getTypes()));
       }
@@ -164,7 +181,7 @@ public class JsonAvroConversion {
         assertJsonType(node, JsonNodeType.STRING);
         byte[] bytes = node.textValue().getBytes(StandardCharsets.ISO_8859_1);
         if (bytes.length != avroSchema.getFixedSize()) {
-          throw new JsonToAvroConversionException(
+          throw new JsonAvroConversionException(
               "Fixed field has unexpected size %d (should be %d)"
                   .formatted(bytes.length, avroSchema.getFixedSize()));
         }
@@ -208,8 +225,11 @@ public class JsonAvroConversion {
       case UNION -> {
         ObjectNode node = MAPPER.createObjectNode();
         int unionIdx = AvroData.getGenericData().resolveUnion(avroSchema, obj);
-        Schema unionType = avroSchema.getTypes().get(unionIdx);
-        node.set(unionType.getFullName(), convertAvroToJson(obj, unionType));
+        Schema selectedType = avroSchema.getTypes().get(unionIdx);
+        node.set(
+            selectUnionTypeFieldName(avroSchema, selectedType, unionIdx),
+            convertAvroToJson(obj, selectedType)
+        );
         yield node;
       }
       case STRING -> {
@@ -252,11 +272,30 @@ public class JsonAvroConversion {
     };
   }
 
+  // select name for a key field that represents type name of union.
+  // For records selects short name, if it is possible.
+  private static String selectUnionTypeFieldName(Schema unionSchema,
+                                                 Schema chosenType,
+                                                 int chosenTypeIdx) {
+    var types = unionSchema.getTypes();
+    if (types.size() == 2 && types.contains(NULL_SCHEMA)) {
+      return chosenType.getName();
+    }
+    for (int i = 0; i < types.size(); i++) {
+      if (i != chosenTypeIdx && chosenType.getName().equals(types.get(i).getName())) {
+        // there is another type inside union with the same name
+        // so, we have to use fullname
+        return chosenType.getFullName();
+      }
+    }
+    return chosenType.getName();
+  }
+
   private static Object processLogicalType(JsonNode node, Schema schema) {
     return findConversion(schema)
         .map(c -> c.jsonToAvroConversion.apply(node, schema))
         .orElseThrow(() ->
-            new JsonToAvroConversionException("'%s' logical type is not supported"
+            new JsonAvroConversionException("'%s' logical type is not supported"
                 .formatted(schema.getLogicalType().getName())));
   }
 
@@ -264,7 +303,7 @@ public class JsonAvroConversion {
     return findConversion(schema)
         .map(c -> c.avroToJsonConversion.apply(obj, schema))
         .orElseThrow(() ->
-            new JsonToAvroConversionException("'%s' logical type is not supported"
+            new JsonAvroConversionException("'%s' logical type is not supported"
                 .formatted(schema.getLogicalType().getName())));
   }
 
@@ -281,7 +320,7 @@ public class JsonAvroConversion {
 
   private static void assertJsonType(JsonNode node, JsonNodeType... allowedTypes) {
     if (Stream.of(allowedTypes).noneMatch(t -> node.getNodeType() == t)) {
-      throw new JsonToAvroConversionException(
+      throw new JsonAvroConversionException(
           "%s node has unexpected type, allowed types %s, actual type %s"
               .formatted(node, Arrays.toString(allowedTypes), node.getNodeType()));
     }
@@ -289,7 +328,7 @@ public class JsonAvroConversion {
 
   private static void assertJsonNumberType(JsonNode node, JsonParser.NumberType... allowedTypes) {
     if (Stream.of(allowedTypes).noneMatch(t -> node.numberType() == t)) {
-      throw new JsonToAvroConversionException(
+      throw new JsonAvroConversionException(
           "%s node has unexpected numeric type, allowed types %s, actual type %s"
               .formatted(node, Arrays.toString(allowedTypes), node.numberType()));
     }
@@ -318,7 +357,7 @@ public class JsonAvroConversion {
           } else if (node.isNumber()) {
             return new BigDecimal(node.numberValue().toString());
           }
-          throw new JsonToAvroConversionException(
+          throw new JsonAvroConversionException(
               "node '%s' can't be converted to decimal logical type"
                   .formatted(node));
         },
@@ -335,7 +374,7 @@ public class JsonAvroConversion {
           } else if (node.isTextual()) {
             return LocalDate.parse(node.asText());
           } else {
-            throw new JsonToAvroConversionException(
+            throw new JsonAvroConversionException(
                 "node '%s' can't be converted to date logical type"
                     .formatted(node));
           }
@@ -356,7 +395,7 @@ public class JsonAvroConversion {
           } else if (node.isTextual()) {
             return LocalTime.parse(node.asText());
           } else {
-            throw new JsonToAvroConversionException(
+            throw new JsonAvroConversionException(
                 "node '%s' can't be converted to time-millis logical type"
                     .formatted(node));
           }
@@ -377,7 +416,7 @@ public class JsonAvroConversion {
           } else if (node.isTextual()) {
             return LocalTime.parse(node.asText());
           } else {
-            throw new JsonToAvroConversionException(
+            throw new JsonAvroConversionException(
                 "node '%s' can't be converted to time-micros logical type"
                     .formatted(node));
           }
@@ -398,7 +437,7 @@ public class JsonAvroConversion {
           } else if (node.isTextual()) {
             return Instant.parse(node.asText());
           } else {
-            throw new JsonToAvroConversionException(
+            throw new JsonAvroConversionException(
                 "node '%s' can't be converted to timestamp-millis logical type"
                     .formatted(node));
           }
@@ -423,7 +462,7 @@ public class JsonAvroConversion {
           } else if (node.isTextual()) {
             return Instant.parse(node.asText());
           } else {
-            throw new JsonToAvroConversionException(
+            throw new JsonAvroConversionException(
                 "node '%s' can't be converted to timestamp-millis logical type"
                     .formatted(node));
           }

+ 7 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui;
 
 import com.provectus.kafka.ui.container.KafkaConnectContainer;
+import com.provectus.kafka.ui.container.KsqlDbContainer;
 import com.provectus.kafka.ui.container.SchemaRegistryContainer;
 import java.nio.file.Path;
 import java.util.List;
@@ -32,7 +33,7 @@ public abstract class AbstractIntegrationTest {
   public static final String LOCAL = "local";
   public static final String SECOND_LOCAL = "secondLocal";
 
-  private static final String CONFLUENT_PLATFORM_VERSION = "5.5.0";
+  private static final String CONFLUENT_PLATFORM_VERSION = "7.2.1"; // Append ".arm64" for a local run
 
   public static final KafkaContainer kafka = new KafkaContainer(
       DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION))
@@ -49,6 +50,11 @@ public abstract class AbstractIntegrationTest {
           .dependsOn(kafka)
           .dependsOn(schemaRegistry);
 
+  protected static final KsqlDbContainer KSQL_DB = new KsqlDbContainer(
+      DockerImageName.parse("confluentinc/cp-ksqldb-server")
+          .withTag(CONFLUENT_PLATFORM_VERSION))
+      .withKafka(kafka);
+
   @TempDir
   public static Path tmpDir;
 

+ 54 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui;
 
 import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
 import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
+import com.provectus.kafka.ui.model.SchemaReferenceDTO;
 import com.provectus.kafka.ui.model.SchemaSubjectDTO;
 import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
 import com.provectus.kafka.ui.model.SchemaTypeDTO;
@@ -190,6 +191,58 @@ class SchemaRegistryServiceTests extends AbstractIntegrationTest {
     Assertions.assertEquals(schema, actual.getSchema());
   }
 
+
+  @Test
+  void shouldCreateNewProtobufSchemaWithRefs() {
+    NewSchemaSubjectDTO requestBody = new NewSchemaSubjectDTO()
+        .schemaType(SchemaTypeDTO.PROTOBUF)
+        .subject(subject + "-ref")
+        .schema("""
+            syntax = "proto3";
+            message MyRecord {
+              int32 id = 1;
+              string name = 2;
+            }
+            """);
+
+    webTestClient
+        .post()
+        .uri("/api/clusters/{clusterName}/schemas", LOCAL)
+        .contentType(MediaType.APPLICATION_JSON)
+        .body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
+        .exchange()
+        .expectStatus()
+        .isOk();
+
+    requestBody = new NewSchemaSubjectDTO()
+        .schemaType(SchemaTypeDTO.PROTOBUF)
+        .subject(subject)
+        .schema("""
+            syntax = "proto3";
+            import "MyRecord.proto";
+            message MyRecordWithRef {
+              int32 id = 1;
+              MyRecord my_ref = 2;
+            }
+            """)
+        .references(List.of(new SchemaReferenceDTO().name("MyRecord.proto").subject(subject + "-ref").version(1)));
+
+    SchemaSubjectDTO actual = webTestClient
+        .post()
+        .uri("/api/clusters/{clusterName}/schemas", LOCAL)
+        .contentType(MediaType.APPLICATION_JSON)
+        .body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
+        .exchange()
+        .expectStatus()
+        .isOk()
+        .expectBody(SchemaSubjectDTO.class)
+        .returnResult()
+        .getResponseBody();
+
+    Assertions.assertNotNull(actual);
+    Assertions.assertEquals(requestBody.getReferences(), actual.getReferences());
+  }
+
   @Test
   public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
     webTestClient
@@ -278,7 +331,7 @@ class SchemaRegistryServiceTests extends AbstractIntegrationTest {
   void shouldCreateNewSchemaWhenSubjectIncludesNonAsciiCharacters() {
     String schema =
         "{\"subject\":\"test/test\",\"schemaType\":\"JSON\",\"schema\":"
-        + "\"{\\\"type\\\": \\\"string\\\"}\"}";
+            + "\"{\\\"type\\\": \\\"string\\\"}\"}";
 
     webTestClient
         .post()

+ 185 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerdeTest.java

@@ -0,0 +1,185 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import static com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.producer.KafkaTestProducer;
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.Serde;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import lombok.SneakyThrows;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+class ConsumerOffsetsSerdeTest extends AbstractIntegrationTest {
+
+  private static final int MSGS_TO_GENERATE = 10;
+
+  private static String consumerGroupName;
+  private static String committedTopic;
+
+  @BeforeAll
+  static void createTopicAndCommitItsOffset() {
+    committedTopic = ConsumerOffsetsSerdeTest.class.getSimpleName() + "-" + UUID.randomUUID();
+    consumerGroupName = committedTopic + "-group";
+    createTopic(new NewTopic(committedTopic, 1, (short) 1));
+
+    try (var producer = KafkaTestProducer.forKafka(kafka)) {
+      for (int i = 0; i < MSGS_TO_GENERATE; i++) {
+        producer.send(committedTopic, "i=" + i);
+      }
+    }
+    try (var consumer = createConsumer(consumerGroupName)) {
+      consumer.subscribe(List.of(committedTopic));
+      int polled = 0;
+      while (polled < MSGS_TO_GENERATE) {
+        polled += consumer.poll(Duration.ofMillis(100)).count();
+      }
+      consumer.commitSync();
+    }
+  }
+
+  @AfterAll
+  static void cleanUp() {
+    deleteTopic(committedTopic);
+  }
+
+  @Test
+  void canOnlyDeserializeConsumerOffsetsTopic() {
+    var serde = new ConsumerOffsetsSerde();
+    assertThat(serde.canDeserialize(ConsumerOffsetsSerde.TOPIC, Serde.Target.KEY)).isTrue();
+    assertThat(serde.canDeserialize(ConsumerOffsetsSerde.TOPIC, Serde.Target.VALUE)).isTrue();
+    assertThat(serde.canDeserialize("anyOtherTopic", Serde.Target.KEY)).isFalse();
+    assertThat(serde.canDeserialize("anyOtherTopic", Serde.Target.VALUE)).isFalse();
+  }
+
+  @Test
+  void deserializesMessagesMadeByConsumerActivity() {
+    var serde = new ConsumerOffsetsSerde();
+    var keyDeserializer = serde.deserializer(TOPIC, Serde.Target.KEY);
+    var valueDeserializer = serde.deserializer(TOPIC, Serde.Target.VALUE);
+
+    try (var consumer = createConsumer(consumerGroupName + "-check")) {
+      consumer.subscribe(List.of(ConsumerOffsetsSerde.TOPIC));
+      List<Tuple2<DeserializeResult, DeserializeResult>> polled = new ArrayList<>();
+
+      Awaitility.await()
+          .pollInSameThread()
+          .atMost(Duration.ofMinutes(1))
+          .untilAsserted(() -> {
+            for (var rec : consumer.poll(Duration.ofMillis(200))) {
+              DeserializeResult key = rec.key() != null
+                  ? keyDeserializer.deserialize(null, rec.key().get())
+                  : null;
+              DeserializeResult val = rec.value() != null
+                  ? valueDeserializer.deserialize(null, rec.value().get())
+                  : null;
+              if (key != null && val != null) {
+                polled.add(Tuples.of(key, val));
+              }
+            }
+            assertThat(polled).anyMatch(t -> isCommitMessage(t.getT1(), t.getT2()));
+            assertThat(polled).anyMatch(t -> isGroupMetadataMessage(t.getT1(), t.getT2()));
+          });
+    }
+  }
+
+  // Sample commit record:
+  //
+  // key: {
+  //  "group": "test_Members_3",
+  //  "topic": "test",
+  //  "partition": 0
+  // }
+  //
+  // value:
+  // {
+  //  "offset": 2,
+  //  "leader_epoch": 0,
+  //  "metadata": "",
+  //  "commit_timestamp": 1683112980588
+  // }
+  private boolean isCommitMessage(DeserializeResult key, DeserializeResult value) {
+    var keyJson = toMapFromJsom(key);
+    boolean keyIsOk = consumerGroupName.equals(keyJson.get("group"))
+        && committedTopic.equals(keyJson.get("topic"))
+        && ((Integer) 0).equals(keyJson.get("partition"));
+
+    var valueJson = toMapFromJsom(value);
+    boolean valueIsOk = valueJson.containsKey("offset")
+        && valueJson.get("offset").equals(MSGS_TO_GENERATE)
+        && valueJson.containsKey("commit_timestamp");
+
+    return keyIsOk && valueIsOk;
+  }
+
+  // Sample group metadata record:
+  //
+  // key: {
+  //  "group": "test_Members_3"
+  // }
+  //
+  // value:
+  // {
+  //  "protocol_type": "consumer",
+  //  "generation": 1,
+  //  "protocol": "range",
+  //  "leader": "consumer-test_Members_3-1-5a37876e-e42f-420e-9c7d-6902889bd5dd",
+  //  "current_state_timestamp": 1683112974561,
+  //  "members": [
+  //    {
+  //      "member_id": "consumer-test_Members_3-1-5a37876e-e42f-420e-9c7d-6902889bd5dd",
+  //      "group_instance_id": null,
+  //      "client_id": "consumer-test_Members_3-1",
+  //      "client_host": "/192.168.16.1",
+  //      "rebalance_timeout": 300000,
+  //      "session_timeout": 45000,
+  //      "subscription": "AAEAAAABAAR0ZXN0/////wAAAAA=",
+  //      "assignment": "AAEAAAABAAR0ZXN0AAAAAQAAAAD/////"
+  //    }
+  //  ]
+  // }
+  private boolean isGroupMetadataMessage(DeserializeResult key, DeserializeResult value) {
+    var keyJson = toMapFromJsom(key);
+    boolean keyIsOk = consumerGroupName.equals(keyJson.get("group")) && keyJson.size() == 1;
+
+    var valueJson = toMapFromJsom(value);
+    boolean valueIsOk = valueJson.keySet()
+        .containsAll(Set.of("protocol_type", "generation", "leader", "members"));
+
+    return keyIsOk && valueIsOk;
+  }
+
+  @SneakyThrows
+  private Map<String, Object> toMapFromJsom(DeserializeResult result) {
+    return new JsonMapper().readValue(result.getResult(), Map.class);
+  }
+
+  private static KafkaConsumer<Bytes, Bytes> createConsumer(String groupId) {
+    Properties props = new Properties();
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    props.put(ConsumerConfig.CLIENT_ID_CONFIG, groupId);
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    return new KafkaConsumer<>(props);
+  }
+}

+ 84 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/HexSerdeTest.java

@@ -0,0 +1,84 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
+import com.provectus.kafka.ui.serdes.RecordHeadersImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class HexSerdeTest {
+
+  private static final byte[] TEST_BYTES = "hello world".getBytes();
+  private static final String TEST_BYTES_HEX_ENCODED = "68 65 6C 6C 6F 20 77 6F 72 6C 64";
+
+  private Serde hexSerde;
+
+  @BeforeEach
+  void init() {
+    hexSerde = new HexSerde();
+    hexSerde.configure(
+        PropertyResolverImpl.empty(),
+        PropertyResolverImpl.empty(),
+        PropertyResolverImpl.empty()
+    );
+  }
+
+
+  @ParameterizedTest
+  @CsvSource({
+      "68656C6C6F20776F726C64", // uppercase
+      "68656c6c6f20776f726c64", // lowercase
+      "68:65:6c:6c:6f:20:77:6f:72:6c:64", // ':' delim
+      "68 65 6C 6C 6F 20 77 6F 72 6C 64", // space delim, UC
+      "68 65 6c 6c 6f 20 77 6f 72 6c 64", // space delim, LC
+      "#68 #65 #6C #6C #6F #20 #77 #6F #72 #6C #64"  // '#' prefix, space delim
+  })
+  void serializesInputAsHexString(String hexString) {
+    for (Serde.Target type : Serde.Target.values()) {
+      var serializer = hexSerde.serializer("anyTopic", type);
+      byte[] bytes = serializer.serialize(hexString);
+      assertThat(bytes).isEqualTo(TEST_BYTES);
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void serializesEmptyStringAsEmptyBytesArray(Serde.Target type) {
+    var serializer = hexSerde.serializer("anyTopic", type);
+    byte[] bytes = serializer.serialize("");
+    assertThat(bytes).isEqualTo(new byte[] {});
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void deserializesDataAsHexBytes(Serde.Target type) {
+    var deserializer = hexSerde.deserializer("anyTopic", type);
+    var result = deserializer.deserialize(new RecordHeadersImpl(), TEST_BYTES);
+    assertThat(result.getResult()).isEqualTo(TEST_BYTES_HEX_ENCODED);
+    assertThat(result.getType()).isEqualTo(DeserializeResult.Type.STRING);
+    assertThat(result.getAdditionalProperties()).isEmpty();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void getSchemaReturnsEmpty(Serde.Target type) {
+    assertThat(hexSerde.getSchema("anyTopic", type)).isEmpty();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void canDeserializeReturnsTrueForAllInputs(Serde.Target type) {
+    assertThat(hexSerde.canDeserialize("anyTopic", type)).isTrue();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void canSerializeReturnsTrueForAllInput(Serde.Target type) {
+    assertThat(hexSerde.canSerialize("anyTopic", type)).isTrue();
+  }
+}

+ 86 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java

@@ -0,0 +1,86 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
+import com.provectus.kafka.ui.sr.model.SchemaReference;
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import java.util.List;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+class SchemaReferencesResolverTest {
+
+  private final KafkaSrClientApi srClientMock = mock(KafkaSrClientApi.class);
+
+  private final SchemaReferencesResolver schemaReferencesResolver = new SchemaReferencesResolver(srClientMock);
+
+  @Test
+  void resolvesRefsUsingSrClient() {
+    mockSrCall("sub1", 1,
+        new SchemaSubject()
+            .schema("schema1"));
+
+    mockSrCall("sub2", 1,
+        new SchemaSubject()
+            .schema("schema2")
+            .references(
+                List.of(
+                    new SchemaReference().name("ref2_1").subject("sub2_1").version(2),
+                    new SchemaReference().name("ref2_2").subject("sub1").version(1))));
+
+    mockSrCall("sub2_1", 2,
+        new SchemaSubject()
+            .schema("schema2_1")
+            .references(
+                List.of(
+                    new SchemaReference().name("ref2_1_1").subject("sub2_1_1").version(3),
+                    new SchemaReference().name("ref1").subject("should_not_be_called").version(1)
+                ))
+    );
+
+    mockSrCall("sub2_1_1", 3,
+        new SchemaSubject()
+            .schema("schema2_1_1"));
+
+    var resolvedRefsMono = schemaReferencesResolver.resolve(
+        List.of(
+            new SchemaReference().name("ref1").subject("sub1").version(1),
+            new SchemaReference().name("ref2").subject("sub2").version(1)));
+
+    StepVerifier.create(resolvedRefsMono)
+        .assertNext(refs ->
+            assertThat(refs)
+                .containsExactlyEntriesOf(
+                    // checking map should be ordered
+                    ImmutableMap.<String, String>builder()
+                        .put("ref1", "schema1")
+                        .put("ref2_1_1", "schema2_1_1")
+                        .put("ref2_1", "schema2_1")
+                        .put("ref2_2", "schema1")
+                        .put("ref2", "schema2")
+                        .build()))
+        .verifyComplete();
+  }
+
+  @Test
+  void returnsEmptyMapOnEmptyInputs() {
+    StepVerifier.create(schemaReferencesResolver.resolve(null))
+        .assertNext(map -> assertThat(map).isEmpty())
+        .verifyComplete();
+
+    StepVerifier.create(schemaReferencesResolver.resolve(List.of()))
+        .assertNext(map -> assertThat(map).isEmpty())
+        .verifyComplete();
+  }
+
+  private void mockSrCall(String subject, int version, SchemaSubject subjectToReturn) {
+    when(srClientMock.getSubjectVersion(subject, version + "", true))
+        .thenReturn(Mono.just(subjectToReturn));
+  }
+
+}

+ 7 - 5
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.service.integration.odd;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -22,6 +23,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.opendatadiscovery.client.model.DataEntity;
 import org.opendatadiscovery.client.model.DataEntityType;
+import org.springframework.http.HttpHeaders;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -52,9 +55,8 @@ class TopicsExporterTest {
 
   @Test
   void doesNotExportTopicsWhichDontFitFiltrationRule() {
-    when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString()))
-        .thenReturn(Mono.error(new RuntimeException("Not found")));
-
+    when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString(), anyBoolean()))
+        .thenReturn(Mono.error(WebClientResponseException.create(404, "NF", new HttpHeaders(), null, null, null)));
     stats = Statistics.empty()
         .toBuilder()
         .topicDescriptions(
@@ -83,14 +85,14 @@ class TopicsExporterTest {
 
   @Test
   void doesExportTopicData() {
-    when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest"))
+    when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest", false))
         .thenReturn(Mono.just(
             new SchemaSubject()
                 .schema("\"string\"")
                 .schemaType(SchemaType.AVRO)
         ));
 
-    when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest"))
+    when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest", false))
         .thenReturn(Mono.just(
             new SchemaSubject()
                 .schema("\"int\"")

+ 2 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java

@@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.opendatadiscovery.client.model.DataSetField;
@@ -15,8 +15,7 @@ class AvroExtractorTest {
   @ValueSource(booleans = {true, false})
   void test(boolean isKey) {
     var list = AvroExtractor.extract(
-        new SchemaSubject()
-            .schema("""
+        new AvroSchema("""
                 {
                     "type": "record",
                     "name": "Message",

+ 2 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java

@@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
@@ -40,7 +40,7 @@ class JsonSchemaExtractorTest {
         }
         """;
     var fields = JsonSchemaExtractor.extract(
-        new SchemaSubject().schema(jsonSchema),
+        new JsonSchema(jsonSchema),
         KafkaPath.builder()
             .cluster("localhost:9092")
             .topic("someTopic")

+ 2 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java

@@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.opendatadiscovery.client.model.DataSetField;
@@ -54,8 +54,7 @@ class ProtoExtractorTest {
         }""";
 
     var list = ProtoExtractor.extract(
-        new SchemaSubject()
-            .schema(protoSchema),
+        new ProtobufSchema(protoSchema),
         KafkaPath.builder()
             .cluster("localhost:9092")
             .topic("someTopic")

+ 21 - 35
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java

@@ -3,28 +3,22 @@ package com.provectus.kafka.ui.service.ksql;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.DecimalNode;
 import com.fasterxml.jackson.databind.node.IntNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.TextNode;
 import com.provectus.kafka.ui.AbstractIntegrationTest;
-import com.provectus.kafka.ui.container.KsqlDbContainer;
+import java.math.BigDecimal;
 import java.time.Duration;
-import java.util.List;
 import java.util.Map;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
 import reactor.test.StepVerifier;
 
 class KsqlApiClientTest extends AbstractIntegrationTest {
 
-  private static final KsqlDbContainer KSQL_DB = new KsqlDbContainer(
-      DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.24.0"))
-      .withKafka(kafka);
-
   @BeforeAll
   static void startContainer() {
     KSQL_DB.start();
@@ -72,7 +66,7 @@ class KsqlApiClientTest extends AbstractIntegrationTest {
   private void assertLastKsqTutorialQueryResult(KsqlApiClient client) {
     // expected results:
     //{"header":"Schema","columnNames":[...],"values":null}
-    //{"header":"Row","columnNames":null,"values":[[0.0,["4ab5cbad","8b6eae59","4a7c7b41"],3]]}
+    //{"header":"Row","columnNames":null,"values":[[0,["4ab5cbad","8b6eae59","4a7c7b41"],3]]}
     //{"header":"Row","columnNames":null,"values":[[10.0,["18f4ea86"],1]]}
     StepVerifier.create(
             client.execute(
@@ -86,34 +80,26 @@ class KsqlApiClientTest extends AbstractIntegrationTest {
           assertThat(header.getValues()).isNull();
         })
         .assertNext(row -> {
-          assertThat(row).isEqualTo(
-              KsqlApiClient.KsqlResponseTable.builder()
-                  .header("Row")
-                  .columnNames(null)
-                  .values(List.of(List.of(
-                      new DoubleNode(0.0),
-                      new ArrayNode(JsonNodeFactory.instance)
-                          .add(new TextNode("4ab5cbad"))
-                          .add(new TextNode("8b6eae59"))
-                          .add(new TextNode("4a7c7b41")),
-                      new IntNode(3)
-                  )))
-                  .build()
-          );
+          var distance = (DecimalNode) row.getValues().get(0).get(0);
+          var riders = (ArrayNode) row.getValues().get(0).get(1);
+          var count = (IntNode) row.getValues().get(0).get(2);
+
+          assertThat(distance).isEqualTo(new DecimalNode(new BigDecimal(0)));
+          assertThat(riders).isEqualTo(new ArrayNode(JsonNodeFactory.instance)
+              .add(new TextNode("4ab5cbad"))
+              .add(new TextNode("8b6eae59"))
+              .add(new TextNode("4a7c7b41")));
+          assertThat(count).isEqualTo(new IntNode(3));
         })
         .assertNext(row -> {
-          assertThat(row).isEqualTo(
-              KsqlApiClient.KsqlResponseTable.builder()
-                  .header("Row")
-                  .columnNames(null)
-                  .values(List.of(List.of(
-                      new DoubleNode(10.0),
-                      new ArrayNode(JsonNodeFactory.instance)
-                          .add(new TextNode("18f4ea86")),
-                      new IntNode(1)
-                  )))
-                  .build()
-          );
+          var distance = (DecimalNode) row.getValues().get(0).get(0);
+          var riders = (ArrayNode) row.getValues().get(0).get(1);
+          var count = (IntNode) row.getValues().get(0).get(2);
+
+          assertThat(distance).isEqualTo(new DecimalNode(new BigDecimal(10)));
+          assertThat(riders).isEqualTo(new ArrayNode(JsonNodeFactory.instance)
+              .add(new TextNode("18f4ea86")));
+          assertThat(count).isEqualTo(new IntNode(1));
         })
         .verifyComplete();
   }

+ 0 - 6
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java

@@ -3,7 +3,6 @@ package com.provectus.kafka.ui.service.ksql;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
-import com.provectus.kafka.ui.container.KsqlDbContainer;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
 import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
@@ -15,14 +14,9 @@ import java.util.concurrent.CopyOnWriteArraySet;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.utility.DockerImageName;
 
 class KsqlServiceV2Test extends AbstractIntegrationTest {
 
-  private static final KsqlDbContainer KSQL_DB = new KsqlDbContainer(
-      DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.24.0"))
-      .withKafka(kafka);
-
   private static final Set<String> STREAMS_TO_DELETE = new CopyOnWriteArraySet<>();
   private static final Set<String> TABLES_TO_DELETE = new CopyOnWriteArraySet<>();
 

+ 94 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversionTest.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.util.jsonschema;
 import static com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion.convertAvroToJson;
 import static com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion.convertJsonToAvro;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.json.JsonMapper;
@@ -13,6 +14,7 @@ import com.fasterxml.jackson.databind.node.IntNode;
 import com.fasterxml.jackson.databind.node.LongNode;
 import com.fasterxml.jackson.databind.node.TextNode;
 import com.google.common.primitives.Longs;
+import com.provectus.kafka.ui.exception.JsonAvroConversionException;
 import io.confluent.kafka.schemaregistry.avro.AvroSchema;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -181,12 +183,62 @@ class JsonAvroConversionTest {
       record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
       assertThat(record.get("f_union")).isEqualTo(123);
 
-      //inner-record's name should be fully-qualified!
-      jsonPayload = "{ \"f_union\": { \"com.test.TestAvroRecord\": { \"f_union\": { \"int\": 123  } } } }";
+      //short name can be used since there is no clash with other type names
+      jsonPayload = "{ \"f_union\": { \"TestAvroRecord\": { \"f_union\": { \"int\": 123  } } } }";
       record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
       assertThat(record.get("f_union")).isInstanceOf(GenericData.Record.class);
       var innerRec = (GenericData.Record) record.get("f_union");
       assertThat(innerRec.get("f_union")).isEqualTo(123);
+
+      assertThatThrownBy(() ->
+          convertJsonToAvro("{ \"f_union\": { \"NotExistingType\": 123 } }", schema)
+      ).isInstanceOf(JsonAvroConversionException.class);
+    }
+
+    @Test
+    void unionFieldWithTypeNamesClash() {
+      var schema = createSchema(
+          """
+               {
+                 "type": "record",
+                 "namespace": "com.test",
+                 "name": "TestAvroRecord",
+                 "fields": [
+                   {
+                     "name": "nestedClass",
+                     "type": {
+                       "type": "record",
+                       "namespace": "com.nested",
+                       "name": "TestAvroRecord",
+                       "fields": [
+                         {"name" : "inner_obj_field", "type": "int" }
+                       ]
+                     }
+                   },
+                   {
+                     "name": "f_union",
+                     "type": [ "null", "int", "com.test.TestAvroRecord", "com.nested.TestAvroRecord"]
+                   }
+                 ]
+              }"""
+      );
+      //short name can't can be used since there is a clash with other type names
+      var jsonPayload = "{ \"f_union\": { \"com.test.TestAvroRecord\": { \"f_union\": { \"int\": 123  } } } }";
+      var record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
+      assertThat(record.get("f_union")).isInstanceOf(GenericData.Record.class);
+      var innerRec = (GenericData.Record) record.get("f_union");
+      assertThat(innerRec.get("f_union")).isEqualTo(123);
+
+      //short name can't can be used since there is a clash with other type names
+      jsonPayload = "{ \"f_union\": { \"com.nested.TestAvroRecord\": { \"inner_obj_field\":  234 } } }";
+      record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
+      assertThat(record.get("f_union")).isInstanceOf(GenericData.Record.class);
+      innerRec = (GenericData.Record) record.get("f_union");
+      assertThat(innerRec.get("inner_obj_field")).isEqualTo(234);
+
+      assertThatThrownBy(() ->
+          convertJsonToAvro("{ \"f_union\": { \"TestAvroRecord\": { \"inner_obj_field\":  234 } } }", schema)
+      ).isInstanceOf(JsonAvroConversionException.class);
     }
 
     @Test
@@ -599,6 +651,46 @@ class JsonAvroConversionTest {
       var innerRec = new GenericData.Record(schema);
       innerRec.put("f_union", 123);
       r.put("f_union", innerRec);
+      // short type name can be set since there is NO clash with other types name
+      assertJsonsEqual(
+          " { \"f_union\" : { \"TestAvroRecord\" : { \"f_union\" : { \"int\" : 123 } } } }",
+          convertAvroToJson(r, schema)
+      );
+    }
+
+    @Test
+    void unionFieldWithInnerTypesNamesClash() {
+      var schema = createSchema(
+          """
+               {
+                 "type": "record",
+                 "namespace": "com.test",
+                 "name": "TestAvroRecord",
+                 "fields": [
+                   {
+                     "name": "nestedClass",
+                     "type": {
+                       "type": "record",
+                       "namespace": "com.nested",
+                       "name": "TestAvroRecord",
+                       "fields": [
+                         {"name" : "inner_obj_field", "type": "int" }
+                       ]
+                     }
+                   },
+                   {
+                     "name": "f_union",
+                     "type": [ "null", "int", "com.test.TestAvroRecord", "com.nested.TestAvroRecord"]
+                   }
+                 ]
+              }"""
+      );
+
+      var r = new GenericData.Record(schema);
+      var innerRec = new GenericData.Record(schema);
+      innerRec.put("f_union", 123);
+      r.put("f_union", innerRec);
+      // full type name should be set since there is a clash with other type name
       assertJsonsEqual(
           " { \"f_union\" : { \"com.test.TestAvroRecord\" : { \"f_union\" : { \"int\" : 123 } } } }",
           convertAvroToJson(r, schema)

+ 8 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml

@@ -77,6 +77,10 @@ paths:
                 required: true
                 schema:
                   type: string
+              - name: deleted
+                in: query
+                schema:
+                  type: boolean
             responses:
                 200:
                     description: OK
@@ -317,6 +321,10 @@ components:
                 type: string
               schemaType:
                   $ref: '#/components/schemas/SchemaType'
+              references:
+                type: array
+                items:
+                  $ref: '#/components/schemas/SchemaReference'
             required:
               - id
               - subject

+ 23 - 2
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -2979,6 +2979,10 @@ components:
           type: string
         schemaType:
           $ref: '#/components/schemas/SchemaType'
+        references:
+          type: array
+          items:
+            $ref: '#/components/schemas/SchemaReference'
       required:
         - id
         - subject
@@ -2996,13 +3000,30 @@ components:
         schema:
           type: string
         schemaType:
-          $ref: '#/components/schemas/SchemaType'
-          # upon updating a schema, the type of existing schema can't be changed
+          $ref: '#/components/schemas/SchemaType' # upon updating a schema, the type of existing schema can't be changed
+        references:
+          type: array
+          items:
+            $ref: '#/components/schemas/SchemaReference'
       required:
         - subject
         - schema
         - schemaType
 
+    SchemaReference:
+      type: object
+      properties:
+        name:
+          type: string
+        subject:
+          type: string
+        version:
+          type: integer
+      required:
+        - name
+        - subject
+        - version
+
     CompatibilityLevel:
       type: object
       properties:

+ 3 - 3
kafka-ui-e2e-checks/pom.xml

@@ -19,12 +19,12 @@
         <selenium.version>4.8.1</selenium.version>
         <selenide.version>6.12.3</selenide.version>
         <testng.version>7.7.1</testng.version>
-        <allure.version>2.21.0</allure.version>
-        <qase.io.version>3.0.4</qase.io.version>
+        <allure.version>2.22.2</allure.version>
+        <qase.io.version>3.0.5</qase.io.version>
         <aspectj.version>1.9.9.1</aspectj.version>
         <assertj.version>3.24.2</assertj.version>
         <hamcrest.version>2.2</hamcrest.version>
-        <slf4j.version>2.0.5</slf4j.version>
+        <slf4j.version>2.0.7</slf4j.version>
         <kafka.version>3.3.1</kafka.version>
     </properties>
 

+ 15 - 0
kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/utilities/StringUtils.java

@@ -0,0 +1,15 @@
+package com.provectus.kafka.ui.utilities;
+
+import java.util.stream.IntStream;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class StringUtils {
+
+  public static String getMixedCase(String original) {
+    return IntStream.range(0, original.length())
+        .mapToObj(i -> i % 2 == 0 ? Character.toUpperCase(original.charAt(i)) : original.charAt(i))
+        .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
+        .toString();
+  }
+}

+ 2 - 10
kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/backlog/SmokeBacklog.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.manualsuite.backlog;
 
-import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.BROKERS_SUITE_ID;
 import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.SCHEMAS_SUITE_ID;
 import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.TOPICS_PROFILE_SUITE_ID;
 import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.TOPICS_SUITE_ID;
@@ -57,24 +56,17 @@ public class SmokeBacklog extends BaseManualTest {
   public void testCaseF() {
   }
 
-  @Automation(state = TO_BE_AUTOMATED)
-  @Suite(id = BROKERS_SUITE_ID)
-  @QaseId(348)
-  @Test
-  public void testCaseG() {
-  }
-
   @Automation(state = NOT_AUTOMATED)
   @Suite(id = TOPICS_SUITE_ID)
   @QaseId(50)
   @Test
-  public void testCaseH() {
+  public void testCaseG() {
   }
 
   @Automation(state = NOT_AUTOMATED)
   @Suite(id = SCHEMAS_SUITE_ID)
   @QaseId(351)
   @Test
-  public void testCaseI() {
+  public void testCaseH() {
   }
 }

+ 34 - 0
kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokesuite/brokers/BrokersTest.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.smokesuite.brokers;
 
 import static com.provectus.kafka.ui.pages.brokers.BrokersDetails.DetailsTab.CONFIGS;
+import static com.provectus.kafka.ui.utilities.StringUtils.getMixedCase;
 import static com.provectus.kafka.ui.variables.Expected.BROKER_SOURCE_INFO_TOOLTIP;
 
 import com.codeborne.selenide.Condition;
@@ -8,6 +9,7 @@ import com.provectus.kafka.ui.BaseTest;
 import com.provectus.kafka.ui.pages.brokers.BrokersConfigTab;
 import io.qameta.allure.Issue;
 import io.qase.api.annotation.QaseId;
+import java.util.List;
 import org.testng.Assert;
 import org.testng.annotations.Ignore;
 import org.testng.annotations.Test;
@@ -100,6 +102,38 @@ public class BrokersTest extends BaseTest {
         String.format("getAllConfigs().contains(%s)", anyConfigKeySecondPage));
   }
 
+  @Ignore
+  @Issue("https://github.com/provectus/kafka-ui/issues/3347")
+  @QaseId(348)
+  @Test
+  public void brokersConfigCaseInsensitiveSearchCheck() {
+    navigateToBrokersAndOpenDetails(DEFAULT_BROKER_ID);
+    brokersDetails
+        .openDetailsTab(CONFIGS);
+    String anyConfigKeyFirstPage = brokersConfigTab
+        .getAllConfigs().stream()
+        .findAny().orElseThrow()
+        .getKey();
+    brokersConfigTab
+        .clickNextButton();
+    Assert.assertFalse(brokersConfigTab.getAllConfigs().stream()
+            .map(BrokersConfigTab.BrokersConfigItem::getKey)
+            .toList().contains(anyConfigKeyFirstPage),
+        String.format("getAllConfigs().contains(%s)", anyConfigKeyFirstPage));
+    SoftAssert softly = new SoftAssert();
+    List.of(anyConfigKeyFirstPage.toLowerCase(), anyConfigKeyFirstPage.toUpperCase(),
+            getMixedCase(anyConfigKeyFirstPage))
+        .forEach(configCase -> {
+          brokersConfigTab
+              .searchConfig(configCase);
+          softly.assertTrue(brokersConfigTab.getAllConfigs().stream()
+                  .map(BrokersConfigTab.BrokersConfigItem::getKey)
+                  .toList().contains(anyConfigKeyFirstPage),
+              String.format("getAllConfigs().contains(%s)", configCase));
+        });
+    softly.assertAll();
+  }
+
   @QaseId(331)
   @Test
   public void brokersSourceInfoCheck() {

+ 1 - 1
kafka-ui-react-app/public/robots.txt

@@ -1,2 +1,2 @@
-# https://www.robotstxt.org/robotstxt.html
 User-agent: *
+Disallow: /

+ 4 - 1
kafka-ui-react-app/src/components/Brokers/Broker/Broker.tsx

@@ -44,7 +44,10 @@ const Broker: React.FC = () => {
       <Metrics.Wrapper>
         <Metrics.Section>
           <Metrics.Indicator label="Segment Size">
-            <BytesFormatted value={brokerDiskUsage?.segmentSize} />
+            <BytesFormatted
+              value={brokerDiskUsage?.segmentSize}
+              precision={2}
+            />
           </Metrics.Indicator>
           <Metrics.Indicator label="Segment Count">
             {brokerDiskUsage?.segmentCount}

+ 1 - 1
kafka-ui-react-app/src/components/Brokers/Broker/__test__/Broker.spec.tsx

@@ -66,7 +66,7 @@ describe('Broker Component', () => {
     expect(
       screen.getByText(brokerDiskUsage?.segmentCount || '')
     ).toBeInTheDocument();
-    expect(screen.getByText('12 MB')).toBeInTheDocument();
+    expect(screen.getByText('11.77 MB')).toBeInTheDocument();
 
     expect(screen.getByText('Segment Count')).toBeInTheDocument();
     expect(

+ 1 - 0
kafka-ui-react-app/src/components/Brokers/BrokersList/BrokersList.tsx

@@ -105,6 +105,7 @@ const BrokersList: React.FC = () => {
               getValue={getValue}
               renderValue={renderValue}
               renderSegments
+              precision={2}
             />
           ),
       },

+ 16 - 4
kafka-ui-react-app/src/components/ConsumerGroups/Details/Details.tsx

@@ -16,13 +16,15 @@ import { Table } from 'components/common/table/Table/Table.styled';
 import getTagColor from 'components/common/Tag/getTagColor';
 import { Dropdown } from 'components/common/Dropdown';
 import { ControlPanelWrapper } from 'components/common/ControlPanel/ControlPanel.styled';
-import { Action, ResourceType } from 'generated-sources';
+import { Action, ConsumerGroupState, ResourceType } from 'generated-sources';
 import { ActionDropdownItem } from 'components/common/ActionComponent';
 import TableHeaderCell from 'components/common/table/TableHeaderCell/TableHeaderCell';
 import {
   useConsumerGroupDetails,
   useDeleteConsumerGroupMutation,
 } from 'lib/hooks/api/consumers';
+import Tooltip from 'components/common/Tooltip/Tooltip';
+import { CONSUMER_GROUP_STATE_TOOLTIPS } from 'lib/constants';
 
 import ListItem from './ListItem';
 
@@ -96,9 +98,19 @@ const Details: React.FC = () => {
       <Metrics.Wrapper>
         <Metrics.Section>
           <Metrics.Indicator label="State">
-            <Tag color={getTagColor(consumerGroup.data?.state)}>
-              {consumerGroup.data?.state}
-            </Tag>
+            <Tooltip
+              value={
+                <Tag color={getTagColor(consumerGroup.data?.state)}>
+                  {consumerGroup.data?.state}
+                </Tag>
+              }
+              content={
+                CONSUMER_GROUP_STATE_TOOLTIPS[
+                  consumerGroup.data?.state || ConsumerGroupState.UNKNOWN
+                ]
+              }
+              placement="bottom-start"
+            />
           </Metrics.Indicator>
           <Metrics.Indicator label="Members">
             {consumerGroup.data?.members}

+ 18 - 3
kafka-ui-react-app/src/components/ConsumerGroups/List.tsx

@@ -5,15 +5,17 @@ import { ControlPanelWrapper } from 'components/common/ControlPanel/ControlPanel
 import {
   ConsumerGroupDetails,
   ConsumerGroupOrdering,
+  ConsumerGroupState,
   SortOrder,
 } from 'generated-sources';
 import useAppParams from 'lib/hooks/useAppParams';
 import { clusterConsumerGroupDetailsPath, ClusterNameRoute } from 'lib/paths';
 import { ColumnDef } from '@tanstack/react-table';
-import Table, { TagCell, LinkCell } from 'components/common/NewTable';
+import Table, { LinkCell, TagCell } from 'components/common/NewTable';
 import { useNavigate, useSearchParams } from 'react-router-dom';
-import { PER_PAGE } from 'lib/constants';
+import { CONSUMER_GROUP_STATE_TOOLTIPS, PER_PAGE } from 'lib/constants';
 import { useConsumerGroups } from 'lib/hooks/api/consumers';
+import Tooltip from 'components/common/Tooltip/Tooltip';
 
 const List = () => {
   const { clusterName } = useAppParams<ClusterNameRoute>();
@@ -59,6 +61,9 @@ const List = () => {
         id: ConsumerGroupOrdering.MESSAGES_BEHIND,
         header: 'Consumer Lag',
         accessorKey: 'consumerLag',
+        cell: (args) => {
+          return args.getValue() || 'N/A';
+        },
       },
       {
         header: 'Coordinator',
@@ -69,7 +74,17 @@ const List = () => {
         id: ConsumerGroupOrdering.STATE,
         header: 'State',
         accessorKey: 'state',
-        cell: TagCell,
+        // eslint-disable-next-line react/no-unstable-nested-components
+        cell: (args) => {
+          const value = args.getValue() as ConsumerGroupState;
+          return (
+            <Tooltip
+              value={<TagCell {...args} />}
+              content={CONSUMER_GROUP_STATE_TOOLTIPS[value]}
+              placement="bottom-end"
+            />
+          );
+        },
       },
     ],
     []

+ 2 - 2
kafka-ui-react-app/src/components/Topics/Topic/Messages/getDefaultSerdeName.ts

@@ -1,8 +1,8 @@
 import { SerdeDescription } from 'generated-sources';
-import { getPrefferedDescription } from 'components/Topics/Topic/SendMessage/utils';
+import { getPreferredDescription } from 'components/Topics/Topic/SendMessage/utils';
 
 export const getDefaultSerdeName = (serdes: SerdeDescription[]) => {
-  const preffered = getPrefferedDescription(serdes);
+  const preffered = getPreferredDescription(serdes);
   if (preffered) {
     return preffered.name || '';
   }

+ 2 - 2
kafka-ui-react-app/src/components/Topics/Topic/SendMessage/SendMessage.tsx

@@ -118,8 +118,8 @@ const SendMessage: React.FC<{ closeSidebar: () => void }> = ({
         valueSerde,
       });
       if (!keepContents) {
-        setValue('key', '');
-        setValue('content', '');
+        setValue('key', defaultValues.key || '');
+        setValue('content', defaultValues.content || '');
         closeSidebar();
       }
     } catch (e) {

+ 10 - 10
kafka-ui-react-app/src/components/Topics/Topic/SendMessage/utils.ts

@@ -13,21 +13,21 @@ jsf.option('fillProperties', false);
 jsf.option('alwaysFakeOptionals', true);
 jsf.option('failOnInvalidFormat', false);
 
-const generateValueFromSchema = (preffered?: SerdeDescription) => {
-  if (!preffered?.schema) {
+const generateValueFromSchema = (preferred?: SerdeDescription) => {
+  if (!preferred?.schema) {
     return undefined;
   }
-  const parsedSchema = JSON.parse(preffered.schema);
+  const parsedSchema = JSON.parse(preferred.schema);
   const value = jsf.generate(parsedSchema);
   return JSON.stringify(value);
 };
 
-export const getPrefferedDescription = (serdes: SerdeDescription[]) =>
+export const getPreferredDescription = (serdes: SerdeDescription[]) =>
   serdes.find((s) => s.preferred);
 
 export const getDefaultValues = (serdes: TopicSerdeSuggestion) => {
-  const keySerde = getPrefferedDescription(serdes.key || []);
-  const valueSerde = getPrefferedDescription(serdes.value || []);
+  const keySerde = getPreferredDescription(serdes.key || []);
+  const valueSerde = getPreferredDescription(serdes.value || []);
 
   return {
     key: generateValueFromSchema(keySerde),
@@ -65,15 +65,15 @@ export const validateBySchema = (
     return errors;
   }
 
-  let parcedSchema;
+  let parsedSchema;
   let parsedValue;
 
   try {
-    parcedSchema = JSON.parse(schema);
+    parsedSchema = JSON.parse(schema);
   } catch (e) {
     return [`Error in parsing the "${type}" field schema`];
   }
-  if (parcedSchema.type === 'string') {
+  if (parsedSchema.type === 'string') {
     return [];
   }
   try {
@@ -84,7 +84,7 @@ export const validateBySchema = (
   try {
     const ajv = new Ajv();
     addFormats(ajv);
-    const validate = ajv.compile(parcedSchema);
+    const validate = ajv.compile(parsedSchema);
     validate(parsedValue);
     if (validate.errors) {
       errors = validate.errors.map(

+ 3 - 3
kafka-ui-react-app/src/components/common/NewTable/SizeCell.tsx

@@ -6,10 +6,10 @@ import BytesFormatted from 'components/common/BytesFormatted/BytesFormatted';
 type AsAny = any;
 
 const SizeCell: React.FC<
-  CellContext<AsAny, unknown> & { renderSegments?: boolean }
-> = ({ getValue, row, renderSegments = false }) => (
+  CellContext<AsAny, unknown> & { renderSegments?: boolean; precision?: number }
+> = ({ getValue, row, renderSegments = false, precision = 0 }) => (
   <>
-    <BytesFormatted value={getValue<string | number>()} />
+    <BytesFormatted value={getValue<string | number>()} precision={precision} />
     {renderSegments ? `, ${row?.original.count} segment(s)` : null}
   </>
 );

+ 12 - 1
kafka-ui-react-app/src/lib/constants.ts

@@ -1,5 +1,5 @@
 import { SelectOption } from 'components/common/Select/Select';
-import { ConfigurationParameters } from 'generated-sources';
+import { ConfigurationParameters, ConsumerGroupState } from 'generated-sources';
 
 declare global {
   interface Window {
@@ -96,3 +96,14 @@ export const METRICS_OPTIONS: SelectOption[] = [
   { value: 'JMX', label: 'JMX' },
   { value: 'PROMETHEUS', label: 'PROMETHEUS' },
 ];
+
+export const CONSUMER_GROUP_STATE_TOOLTIPS: Record<ConsumerGroupState, string> =
+  {
+    EMPTY: 'The group exists but has no members.',
+    STABLE: 'Consumers are happily consuming and have assigned partitions.',
+    PREPARING_REBALANCE:
+      'Something has changed, and the reassignment of partitions is required.',
+    COMPLETING_REBALANCE: 'Partition reassignment is in progress.',
+    DEAD: 'The group is going to be removed. It might be due to the inactivity, or the group is being migrated to different group coordinator.',
+    UNKNOWN: '',
+  } as const;

+ 5 - 0
kafka-ui-react-app/src/lib/hooks/api/topics.ts

@@ -304,6 +304,11 @@ export function useTopicAnalysis(
       useErrorBoundary: true,
       retry: false,
       suspense: false,
+      onError: (error: Response) => {
+        if (error.status !== 404) {
+          showServerError(error as Response);
+        }
+      },
     }
   );
 }

+ 5 - 5
pom.xml

@@ -26,17 +26,17 @@
         <assertj.version>3.19.0</assertj.version>
         <avro.version>1.11.1</avro.version>
         <byte-buddy.version>1.12.19</byte-buddy.version>
-        <confluent.version>7.3.3</confluent.version>
+        <confluent.version>7.4.0</confluent.version>
         <datasketches-java.version>3.1.0</datasketches-java.version>
         <groovy.version>3.0.13</groovy.version>
         <jackson.version>2.14.0</jackson.version>
-        <kafka-clients.version>3.3.1</kafka-clients.version>
-        <org.mapstruct.version>1.4.2.Final</org.mapstruct.version>
+        <kafka-clients.version>3.5.0</kafka-clients.version>
+        <org.mapstruct.version>1.5.5.Final</org.mapstruct.version>
         <org.projectlombok.version>1.18.24</org.projectlombok.version>
-        <protobuf-java.version>3.21.9</protobuf-java.version>
+        <protobuf-java.version>3.23.3</protobuf-java.version>
         <scala-lang.library.version>2.13.9</scala-lang.library.version>
         <snakeyaml.version>2.0</snakeyaml.version>
-        <spring-boot.version>3.0.6</spring-boot.version>
+        <spring-boot.version>3.1.1</spring-boot.version>
         <kafka-ui-serde-api.version>1.0.0</kafka-ui-serde-api.version>
         <odd-oddrn-generator.version>0.1.17</odd-oddrn-generator.version>
         <odd-oddrn-client.version>0.1.23</odd-oddrn-client.version>