Browse Source

Implement updating registry compatibility level (#391)

* Implement updating registry compatibility level
Alexander Krivonosov 4 years ago
parent
commit
0deafa9d75

+ 78 - 0
kafka-ui-react-app/src/components/Schemas/List/GlobalSchemaSelector.tsx

@@ -0,0 +1,78 @@
+import ConfirmationModal from 'components/common/ConfirmationModal/ConfirmationModal';
+import PageLoader from 'components/common/PageLoader/PageLoader';
+import { CompatibilityLevelCompatibilityEnum } from 'generated-sources';
+import React from 'react';
+import { useForm } from 'react-hook-form';
+import { useParams } from 'react-router-dom';
+import { ClusterName } from 'redux/interfaces';
+
+export interface GlobalSchemaSelectorProps {
+  globalSchemaCompatibilityLevel?: CompatibilityLevelCompatibilityEnum;
+  updateGlobalSchemaCompatibilityLevel: (
+    clusterName: ClusterName,
+    compatibilityLevel: CompatibilityLevelCompatibilityEnum
+  ) => Promise<void>;
+}
+
+const GlobalSchemaSelector: React.FC<GlobalSchemaSelectorProps> = ({
+  globalSchemaCompatibilityLevel,
+  updateGlobalSchemaCompatibilityLevel,
+}) => {
+  const { clusterName } = useParams<{ clusterName: string }>();
+
+  const {
+    register,
+    handleSubmit,
+    formState: { isSubmitting },
+  } = useForm();
+
+  const [
+    isUpdateCompatibilityConfirmationVisible,
+    setUpdateCompatibilityConfirmationVisible,
+  ] = React.useState(false);
+
+  const onCompatibilityLevelUpdate = async ({
+    compatibilityLevel,
+  }: {
+    compatibilityLevel: CompatibilityLevelCompatibilityEnum;
+  }) => {
+    await updateGlobalSchemaCompatibilityLevel(clusterName, compatibilityLevel);
+    setUpdateCompatibilityConfirmationVisible(false);
+  };
+
+  return (
+    <div className="level-item">
+      <h5 className="is-5 mr-2">Global Compatibility Level: </h5>
+      <div className="select mr-2">
+        <select
+          name="compatibilityLevel"
+          defaultValue={globalSchemaCompatibilityLevel}
+          ref={register()}
+          onChange={() => setUpdateCompatibilityConfirmationVisible(true)}
+        >
+          {Object.keys(CompatibilityLevelCompatibilityEnum).map(
+            (level: string) => (
+              <option key={level} value={level}>
+                {level}
+              </option>
+            )
+          )}
+        </select>
+      </div>
+      <ConfirmationModal
+        isOpen={isUpdateCompatibilityConfirmationVisible}
+        onCancel={() => setUpdateCompatibilityConfirmationVisible(false)}
+        onConfirm={handleSubmit(onCompatibilityLevelUpdate)}
+      >
+        {isSubmitting ? (
+          <PageLoader />
+        ) : (
+          `Are you sure you want to update the global compatibility level?
+                  This may affect the compatibility levels of the schemas.`
+        )}
+      </ConfirmationModal>
+    </div>
+  );
+};
+
+export default GlobalSchemaSelector;

+ 26 - 2
kafka-ui-react-app/src/components/Schemas/List/List.tsx

@@ -1,5 +1,8 @@
 import React from 'react';
-import { SchemaSubject } from 'generated-sources';
+import {
+  CompatibilityLevelCompatibilityEnum,
+  SchemaSubject,
+} from 'generated-sources';
 import { Link, useParams } from 'react-router-dom';
 import { clusterSchemaNewPath } from 'lib/paths';
 import { ClusterName } from 'redux/interfaces';
@@ -8,23 +11,38 @@ import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';
 import ClusterContext from 'components/contexts/ClusterContext';
 
 import ListItem from './ListItem';
+import GlobalSchemaSelector from './GlobalSchemaSelector';
 
 export interface ListProps {
   schemas: SchemaSubject[];
   isFetching: boolean;
+  isGlobalSchemaCompatibilityLevelFetched: boolean;
+  globalSchemaCompatibilityLevel?: CompatibilityLevelCompatibilityEnum;
   fetchSchemasByClusterName: (clusterName: ClusterName) => void;
+  fetchGlobalSchemaCompatibilityLevel: (
+    clusterName: ClusterName
+  ) => Promise<void>;
+  updateGlobalSchemaCompatibilityLevel: (
+    clusterName: ClusterName,
+    compatibilityLevel: CompatibilityLevelCompatibilityEnum
+  ) => Promise<void>;
 }
 
 const List: React.FC<ListProps> = ({
   schemas,
   isFetching,
+  globalSchemaCompatibilityLevel,
+  isGlobalSchemaCompatibilityLevelFetched,
   fetchSchemasByClusterName,
+  fetchGlobalSchemaCompatibilityLevel,
+  updateGlobalSchemaCompatibilityLevel,
 }) => {
   const { isReadOnly } = React.useContext(ClusterContext);
   const { clusterName } = useParams<{ clusterName: string }>();
 
   React.useEffect(() => {
     fetchSchemasByClusterName(clusterName);
+    fetchGlobalSchemaCompatibilityLevel(clusterName);
   }, [fetchSchemasByClusterName, clusterName]);
 
   return (
@@ -32,8 +50,14 @@ const List: React.FC<ListProps> = ({
       <Breadcrumb>Schema Registry</Breadcrumb>
       <div className="box">
         <div className="level">
-          {!isReadOnly && (
+          {!isReadOnly && isGlobalSchemaCompatibilityLevelFetched && (
             <div className="level-item level-right">
+              <GlobalSchemaSelector
+                globalSchemaCompatibilityLevel={globalSchemaCompatibilityLevel}
+                updateGlobalSchemaCompatibilityLevel={
+                  updateGlobalSchemaCompatibilityLevel
+                }
+              />
               <Link
                 className="button is-primary"
                 to={clusterSchemaNewPath(clusterName)}

+ 13 - 1
kafka-ui-react-app/src/components/Schemas/List/ListContainer.tsx

@@ -1,9 +1,15 @@
 import { connect } from 'react-redux';
 import { RootState } from 'redux/interfaces';
-import { fetchSchemasByClusterName } from 'redux/actions';
+import {
+  fetchSchemasByClusterName,
+  fetchGlobalSchemaCompatibilityLevel,
+  updateGlobalSchemaCompatibilityLevel,
+} from 'redux/actions';
 import {
   getIsSchemaListFetching,
   getSchemaList,
+  getGlobalSchemaCompatibilityLevel,
+  getGlobalSchemaCompatibilityLevelFetched,
 } from 'redux/reducers/schemas/selectors';
 
 import List from './List';
@@ -11,10 +17,16 @@ import List from './List';
 const mapStateToProps = (state: RootState) => ({
   isFetching: getIsSchemaListFetching(state),
   schemas: getSchemaList(state),
+  globalSchemaCompatibilityLevel: getGlobalSchemaCompatibilityLevel(state),
+  isGlobalSchemaCompatibilityLevelFetched: getGlobalSchemaCompatibilityLevelFetched(
+    state
+  ),
 });
 
 const mapDispatchToProps = {
+  fetchGlobalSchemaCompatibilityLevel,
   fetchSchemasByClusterName,
+  updateGlobalSchemaCompatibilityLevel,
 };
 
 export default connect(mapStateToProps, mapDispatchToProps)(List);

+ 3 - 0
kafka-ui-react-app/src/components/Schemas/List/__test__/List.spec.tsx

@@ -32,6 +32,9 @@ describe('List', () => {
         <List
           isFetching
           fetchSchemasByClusterName={jest.fn()}
+          isGlobalSchemaCompatibilityLevelFetched
+          fetchGlobalSchemaCompatibilityLevel={jest.fn()}
+          updateGlobalSchemaCompatibilityLevel={jest.fn()}
           schemas={[]}
           {...props}
         />

+ 97 - 0
kafka-ui-react-app/src/redux/actions/__test__/thunks/schemas.spec.ts

@@ -188,4 +188,101 @@ describe('Thunks', () => {
       ]);
     });
   });
+
+  describe('fetchGlobalSchemaCompatibilityLevel', () => {
+    it('calls GET_GLOBAL_SCHEMA_COMPATIBILITY__REQUEST on the fucntion call', () => {
+      store.dispatch(thunks.fetchGlobalSchemaCompatibilityLevel(clusterName));
+      expect(store.getActions()).toEqual([
+        actions.fetchGlobalSchemaCompatibilityLevelAction.request(),
+      ]);
+    });
+
+    it('calls GET_GLOBAL_SCHEMA_COMPATIBILITY__SUCCESS on a successful API call', async () => {
+      fetchMock.getOnce(`/api/clusters/${clusterName}/schemas/compatibility`, {
+        compatibility: CompatibilityLevelCompatibilityEnum.FORWARD,
+      });
+      await store.dispatch(
+        thunks.fetchGlobalSchemaCompatibilityLevel(clusterName)
+      );
+      expect(store.getActions()).toEqual([
+        actions.fetchGlobalSchemaCompatibilityLevelAction.request(),
+        actions.fetchGlobalSchemaCompatibilityLevelAction.success(
+          CompatibilityLevelCompatibilityEnum.FORWARD
+        ),
+      ]);
+    });
+
+    it('calls GET_GLOBAL_SCHEMA_COMPATIBILITY__FAILURE on an unsuccessful API call', async () => {
+      fetchMock.getOnce(
+        `/api/clusters/${clusterName}/schemas/compatibility`,
+        404
+      );
+      try {
+        await store.dispatch(
+          thunks.fetchGlobalSchemaCompatibilityLevel(clusterName)
+        );
+      } catch (error) {
+        expect(error.status).toEqual(404);
+        expect(store.getActions()).toEqual([
+          actions.fetchGlobalSchemaCompatibilityLevelAction.request(),
+          actions.fetchGlobalSchemaCompatibilityLevelAction.failure(),
+        ]);
+      }
+    });
+  });
+
+  describe('updateGlobalSchemaCompatibilityLevel', () => {
+    const compatibilityLevel = CompatibilityLevelCompatibilityEnum.FORWARD;
+    it('calls POST_GLOBAL_SCHEMA_COMPATIBILITY__REQUEST on the fucntion call', () => {
+      store.dispatch(
+        thunks.updateGlobalSchemaCompatibilityLevel(
+          clusterName,
+          compatibilityLevel
+        )
+      );
+      expect(store.getActions()).toEqual([
+        actions.updateGlobalSchemaCompatibilityLevelAction.request(),
+      ]);
+    });
+
+    it('calls POST_GLOBAL_SCHEMA_COMPATIBILITY__SUCCESS on a successful API call', async () => {
+      fetchMock.putOnce(
+        `/api/clusters/${clusterName}/schemas/compatibility`,
+        200
+      );
+      await store.dispatch(
+        thunks.updateGlobalSchemaCompatibilityLevel(
+          clusterName,
+          compatibilityLevel
+        )
+      );
+      expect(store.getActions()).toEqual([
+        actions.updateGlobalSchemaCompatibilityLevelAction.request(),
+        actions.updateGlobalSchemaCompatibilityLevelAction.success(
+          CompatibilityLevelCompatibilityEnum.FORWARD
+        ),
+      ]);
+    });
+
+    it('calls POST_GLOBAL_SCHEMA_COMPATIBILITY__FAILURE on an unsuccessful API call', async () => {
+      fetchMock.putOnce(
+        `/api/clusters/${clusterName}/schemas/compatibility`,
+        404
+      );
+      try {
+        await store.dispatch(
+          thunks.updateGlobalSchemaCompatibilityLevel(
+            clusterName,
+            compatibilityLevel
+          )
+        );
+      } catch (error) {
+        expect(error.status).toEqual(404);
+        expect(store.getActions()).toEqual([
+          actions.updateGlobalSchemaCompatibilityLevelAction.request(),
+          actions.updateGlobalSchemaCompatibilityLevelAction.failure(),
+        ]);
+      }
+    });
+  });
 });

+ 13 - 0
kafka-ui-react-app/src/redux/actions/actions.ts

@@ -16,6 +16,7 @@ import {
   ConsumerGroup,
   ConsumerGroupDetails,
   SchemaSubject,
+  CompatibilityLevelCompatibilityEnum,
 } from 'generated-sources';
 
 export const fetchClusterStatsAction = createAsyncAction(
@@ -118,6 +119,18 @@ export const fetchSchemasByClusterNameAction = createAsyncAction(
   'GET_CLUSTER_SCHEMAS__FAILURE'
 )<undefined, SchemaSubject[], undefined>();
 
+export const fetchGlobalSchemaCompatibilityLevelAction = createAsyncAction(
+  'GET_GLOBAL_SCHEMA_COMPATIBILITY__REQUEST',
+  'GET_GLOBAL_SCHEMA_COMPATIBILITY__SUCCESS',
+  'GET_GLOBAL_SCHEMA_COMPATIBILITY__FAILURE'
+)<undefined, CompatibilityLevelCompatibilityEnum, undefined>();
+
+export const updateGlobalSchemaCompatibilityLevelAction = createAsyncAction(
+  'PUT_GLOBAL_SCHEMA_COMPATIBILITY__REQUEST',
+  'PUT_GLOBAL_SCHEMA_COMPATIBILITY__SUCCESS',
+  'PUT_GLOBAL_SCHEMA_COMPATIBILITY__FAILURE'
+)<undefined, CompatibilityLevelCompatibilityEnum, undefined>();
+
 export const fetchSchemaVersionsAction = createAsyncAction(
   'GET_SCHEMA_VERSIONS__REQUEST',
   'GET_SCHEMA_VERSIONS__SUCCESS',

+ 38 - 0
kafka-ui-react-app/src/redux/actions/thunks/schemas.ts

@@ -49,6 +49,44 @@ export const fetchSchemaVersions = (
   }
 };
 
+export const fetchGlobalSchemaCompatibilityLevel = (
+  clusterName: ClusterName
+): PromiseThunkResult<void> => async (dispatch) => {
+  dispatch(actions.fetchGlobalSchemaCompatibilityLevelAction.request());
+  try {
+    const result = await schemasApiClient.getGlobalSchemaCompatibilityLevel({
+      clusterName,
+    });
+    dispatch(
+      actions.fetchGlobalSchemaCompatibilityLevelAction.success(
+        result.compatibility
+      )
+    );
+  } catch (e) {
+    dispatch(actions.fetchGlobalSchemaCompatibilityLevelAction.failure());
+  }
+};
+
+export const updateGlobalSchemaCompatibilityLevel = (
+  clusterName: ClusterName,
+  compatibilityLevel: CompatibilityLevelCompatibilityEnum
+): PromiseThunkResult<void> => async (dispatch) => {
+  dispatch(actions.updateGlobalSchemaCompatibilityLevelAction.request());
+  try {
+    await schemasApiClient.updateGlobalSchemaCompatibilityLevel({
+      clusterName,
+      compatibilityLevel: { compatibility: compatibilityLevel },
+    });
+    dispatch(
+      actions.updateGlobalSchemaCompatibilityLevelAction.success(
+        compatibilityLevel
+      )
+    );
+  } catch (e) {
+    dispatch(actions.updateGlobalSchemaCompatibilityLevelAction.failure());
+  }
+};
+
 export const createSchema = (
   clusterName: ClusterName,
   newSchemaSubject: NewSchemaSubject

+ 6 - 1
kafka-ui-react-app/src/redux/interfaces/schema.ts

@@ -1,4 +1,8 @@
-import { NewSchemaSubject, SchemaSubject } from 'generated-sources';
+import {
+  CompatibilityLevelCompatibilityEnum,
+  NewSchemaSubject,
+  SchemaSubject,
+} from 'generated-sources';
 
 export type SchemaName = string;
 
@@ -6,6 +10,7 @@ export interface SchemasState {
   byName: { [subject: string]: SchemaSubject };
   allNames: SchemaName[];
   currentSchemaVersions: SchemaSubject[];
+  globalSchemaCompatibilityLevel?: CompatibilityLevelCompatibilityEnum;
 }
 
 export interface NewSchemaSubjectRaw extends NewSchemaSubject {

+ 40 - 1
kafka-ui-react-app/src/redux/reducers/schemas/__test__/reducer.spec.ts

@@ -1,7 +1,12 @@
-import { SchemaSubject, SchemaType } from 'generated-sources';
+import {
+  CompatibilityLevelCompatibilityEnum,
+  SchemaSubject,
+  SchemaType,
+} from 'generated-sources';
 import {
   createSchemaAction,
   deleteSchemaAction,
+  fetchGlobalSchemaCompatibilityLevelAction,
   fetchSchemasByClusterNameAction,
   fetchSchemaVersionsAction,
 } from 'redux/actions';
@@ -76,4 +81,38 @@ describe('Schemas reducer', () => {
       currentSchemaVersions: [],
     });
   });
+
+  it('adds global compatibility on successful fetch', () => {
+    expect(
+      reducer(
+        initialState,
+        fetchGlobalSchemaCompatibilityLevelAction.success(
+          CompatibilityLevelCompatibilityEnum.BACKWARD
+        )
+      )
+    ).toEqual({
+      ...initialState,
+      globalSchemaCompatibilityLevel:
+        CompatibilityLevelCompatibilityEnum.BACKWARD,
+    });
+  });
+
+  it('replaces global compatibility on successful update', () => {
+    expect(
+      reducer(
+        {
+          ...initialState,
+          globalSchemaCompatibilityLevel:
+            CompatibilityLevelCompatibilityEnum.FORWARD,
+        },
+        fetchGlobalSchemaCompatibilityLevelAction.success(
+          CompatibilityLevelCompatibilityEnum.BACKWARD
+        )
+      )
+    ).toEqual({
+      ...initialState,
+      globalSchemaCompatibilityLevel:
+        CompatibilityLevelCompatibilityEnum.BACKWARD,
+    });
+  });
 });

+ 16 - 0
kafka-ui-react-app/src/redux/reducers/schemas/__test__/selectors.spec.ts

@@ -1,11 +1,13 @@
 import { orderBy } from 'lodash';
 import {
   createSchemaAction,
+  fetchGlobalSchemaCompatibilityLevelAction,
   fetchSchemasByClusterNameAction,
   fetchSchemaVersionsAction,
 } from 'redux/actions';
 import configureStore from 'redux/store/configureStore';
 import * as selectors from 'redux/reducers/schemas/selectors';
+import { CompatibilityLevelCompatibilityEnum } from 'generated-sources';
 
 import {
   clusterSchemasPayload,
@@ -44,6 +46,11 @@ describe('Schemas selectors', () => {
       );
       store.dispatch(fetchSchemaVersionsAction.success(schemaVersionsPayload));
       store.dispatch(createSchemaAction.success(newSchemaPayload));
+      store.dispatch(
+        fetchGlobalSchemaCompatibilityLevelAction.success(
+          CompatibilityLevelCompatibilityEnum.BACKWARD
+        )
+      );
     });
 
     it('returns fetch status', () => {
@@ -52,6 +59,9 @@ describe('Schemas selectors', () => {
         selectors.getIsSchemaVersionFetched(store.getState())
       ).toBeTruthy();
       expect(selectors.getSchemaCreated(store.getState())).toBeTruthy();
+      expect(
+        selectors.getGlobalSchemaCompatibilityLevelFetched(store.getState())
+      ).toBeTruthy();
     });
 
     it('returns schema list', () => {
@@ -71,5 +81,11 @@ describe('Schemas selectors', () => {
         orderBy(schemaVersionsPayload, 'id', 'desc')
       );
     });
+
+    it('return registry compatibility level', () => {
+      expect(
+        selectors.getGlobalSchemaCompatibilityLevel(store.getState())
+      ).toEqual(CompatibilityLevelCompatibilityEnum.BACKWARD);
+    });
   });
 });

+ 4 - 0
kafka-ui-react-app/src/redux/reducers/schemas/reducer.ts

@@ -70,6 +70,10 @@ const reducer = (state = initialState, action: Action): SchemasState => {
       return addToSchemaList(state, action.payload);
     case getType(actions.deleteSchemaAction.success):
       return deleteFromSchemaList(state, action.payload);
+    case getType(actions.fetchGlobalSchemaCompatibilityLevelAction.success):
+      return { ...state, globalSchemaCompatibilityLevel: action.payload };
+    case getType(actions.updateGlobalSchemaCompatibilityLevelAction.success):
+      return { ...state, globalSchemaCompatibilityLevel: action.payload };
     default:
       return state;
   }

+ 11 - 0
kafka-ui-react-app/src/redux/reducers/schemas/selectors.ts

@@ -7,6 +7,8 @@ const schemasState = ({ schemas }: RootState): SchemasState => schemas;
 
 const getAllNames = (state: RootState) => schemasState(state).allNames;
 const getSchemaMap = (state: RootState) => schemasState(state).byName;
+export const getGlobalSchemaCompatibilityLevel = (state: RootState) =>
+  schemasState(state).globalSchemaCompatibilityLevel;
 
 const getSchemaListFetchingStatus = createFetchingSelector(
   'GET_CLUSTER_SCHEMAS'
@@ -18,11 +20,20 @@ const getSchemaVersionsFetchingStatus = createFetchingSelector(
 
 const getSchemaCreationStatus = createFetchingSelector('POST_SCHEMA');
 
+const getGlobalSchemaCompatibilityLevelFetchingStatus = createFetchingSelector(
+  'GET_GLOBAL_SCHEMA_COMPATIBILITY'
+);
+
 export const getIsSchemaListFetched = createSelector(
   getSchemaListFetchingStatus,
   (status) => status === 'fetched'
 );
 
+export const getGlobalSchemaCompatibilityLevelFetched = createSelector(
+  getGlobalSchemaCompatibilityLevelFetchingStatus,
+  (status) => status === 'fetched'
+);
+
 export const getIsSchemaListFetching = createSelector(
   getSchemaListFetchingStatus,
   (status) => status === 'fetching' || status === 'notFetched'