websocket.provider.dart 6.2 KB


  1. import 'package:flutter/foundation.dart';
  2. import 'package:hooks_riverpod/hooks_riverpod.dart';
  3. import 'package:immich_mobile/modules/login/providers/authentication.provider.dart';
  4. import 'package:immich_mobile/shared/models/asset.dart';
  5. import 'package:immich_mobile/shared/models/store.dart';
  6. import 'package:immich_mobile/shared/providers/asset.provider.dart';
  7. import 'package:immich_mobile/shared/providers/server_info.provider.dart';
  8. import 'package:immich_mobile/shared/services/sync.service.dart';
  9. import 'package:immich_mobile/utils/debounce.dart';
  10. import 'package:logging/logging.dart';
  11. import 'package:openapi/api.dart';
  12. import 'package:socket_io_client/socket_io_client.dart';
  13. enum PendingAction {
  14. assetDelete,
  15. }
  16. class PendingChange {
  17. final PendingAction action;
  18. final dynamic value;
  19. const PendingChange(this.action, this.value);
  20. }
  21. class WebsocketState {
  22. final Socket? socket;
  23. final bool isConnected;
  24. final List<PendingChange> pendingChanges;
  25. WebsocketState({
  26. this.socket,
  27. required this.isConnected,
  28. required this.pendingChanges,
  29. });
  30. WebsocketState copyWith({
  31. Socket? socket,
  32. bool? isConnected,
  33. List<PendingChange>? pendingChanges,
  34. }) {
  35. return WebsocketState(
  36. socket: socket ?? this.socket,
  37. isConnected: isConnected ?? this.isConnected,
  38. pendingChanges: pendingChanges ?? this.pendingChanges,
  39. );
  40. }
  41. @override
  42. String toString() =>
  43. 'WebsocketState(socket: $socket, isConnected: $isConnected)';
  44. @override
  45. bool operator ==(Object other) {
  46. if (identical(this, other)) return true;
  47. return other is WebsocketState &&
  48. other.socket == socket &&
  49. other.isConnected == isConnected;
  50. }
  51. @override
  52. int get hashCode => socket.hashCode ^ isConnected.hashCode;
  53. }
  54. class WebsocketNotifier extends StateNotifier<WebsocketState> {
  55. WebsocketNotifier(this.ref)
  56. : super(
  57. WebsocketState(socket: null, isConnected: false, pendingChanges: []),
  58. ) {
  59. debounce = Debounce(
  60. const Duration(milliseconds: 500),
  61. );
  62. }
  63. final log = Logger('WebsocketNotifier');
  64. final Ref ref;
  65. late final Debounce debounce;
  66. connect() {
  67. var authenticationState = ref.read(authenticationProvider);
  68. if (authenticationState.isAuthenticated) {
  69. final accessToken = Store.get(StoreKey.accessToken);
  70. try {
  71. final endpoint = Uri.parse(Store.get(StoreKey.serverEndpoint));
  72. debugPrint("Attempting to connect to websocket");
  73. // Configure socket transports must be specified
  74. Socket socket = io(
  75. endpoint.origin,
  76. OptionBuilder()
  77. .setPath("${endpoint.path}/socket.io")
  78. .setTransports(['websocket'])
  79. .enableReconnection()
  80. .enableForceNew()
  81. .enableForceNewConnection()
  82. .enableAutoConnect()
  83. .setExtraHeaders({"Authorization": "Bearer $accessToken"})
  84. .build(),
  85. );
  86. socket.onConnect((_) {
  87. debugPrint("Established Websocket Connection");
  88. state = WebsocketState(
  89. isConnected: true,
  90. socket: socket,
  91. pendingChanges: state.pendingChanges,
  92. );
  93. });
  94. socket.onDisconnect((_) {
  95. debugPrint("Disconnect to Websocket Connection");
  96. state = WebsocketState(
  97. isConnected: false,
  98. socket: null,
  99. pendingChanges: state.pendingChanges,
  100. );
  101. });
  102. socket.on('error', (errorMessage) {
  103. log.severe("Websocket Error - $errorMessage");
  104. state = WebsocketState(
  105. isConnected: false,
  106. socket: null,
  107. pendingChanges: state.pendingChanges,
  108. );
  109. });
  110. socket.on('on_upload_success', _handleOnUploadSuccess);
  111. socket.on('on_config_update', _handleOnConfigUpdate);
  112. socket.on('on_asset_delete', _handleOnAssetDelete);
  113. socket.on('on_asset_trash', _handleServerUpdates);
  114. socket.on('on_asset_restore', _handleServerUpdates);
  115. socket.on('on_asset_update', _handleServerUpdates);
  116. } catch (e) {
  117. debugPrint("[WEBSOCKET] Catch Websocket Error - ${e.toString()}");
  118. }
  119. }
  120. }
  121. disconnect() {
  122. debugPrint("Attempting to disconnect from websocket");
  123. var socket = state.socket?.disconnect();
  124. if (socket?.disconnected == true) {
  125. state = WebsocketState(
  126. isConnected: false,
  127. socket: null,
  128. pendingChanges: state.pendingChanges,
  129. );
  130. }
  131. }
  132. stopListenToEvent(String eventName) {
  133. debugPrint("Stop listening to event $eventName");
  134. state.socket?.off(eventName);
  135. }
  136. listenUploadEvent() {
  137. debugPrint("Start listening to event on_upload_success");
  138. state.socket?.on('on_upload_success', _handleOnUploadSuccess);
  139. }
  140. addPendingChange(PendingAction action, dynamic value) {
  141. state = state.copyWith(
  142. pendingChanges: [...state.pendingChanges, PendingChange(action, value)],
  143. );
  144. }
  145. handlePendingChanges() {
  146. final deleteChanges = state.pendingChanges
  147. .where((c) => c.action == PendingAction.assetDelete)
  148. .toList();
  149. if (deleteChanges.isNotEmpty) {
  150. List<String> remoteIds =
  151. deleteChanges.map((a) => a.value.toString()).toList();
  152. ref.read(syncServiceProvider).handleRemoteAssetRemoval(remoteIds);
  153. state = state.copyWith(
  154. pendingChanges: state.pendingChanges
  155. .where((c) => c.action != PendingAction.assetDelete)
  156. .toList(),
  157. );
  158. }
  159. }
  160. _handleOnUploadSuccess(dynamic data) {
  161. final dto = AssetResponseDto.fromJson(data);
  162. if (dto != null) {
  163. final newAsset = Asset.remote(dto);
  164. ref.watch(assetProvider.notifier).onNewAssetUploaded(newAsset);
  165. }
  166. }
  167. _handleOnConfigUpdate(dynamic _) {
  168. ref.read(serverInfoProvider.notifier).getServerFeatures();
  169. ref.read(serverInfoProvider.notifier).getServerConfig();
  170. }
  171. // Refresh updated assets
  172. _handleServerUpdates(dynamic _) {
  173. ref.read(assetProvider.notifier).getAllAsset();
  174. }
  175. _handleOnAssetDelete(dynamic data) {
  176. addPendingChange(PendingAction.assetDelete, data);
  177. debounce(handlePendingChanges);
  178. }
  179. }
  180. final websocketProvider =
  181. StateNotifierProvider<WebsocketNotifier, WebsocketState>((ref) {
  182. return WebsocketNotifier(ref);
  183. });