websocket.provider.dart 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. import 'package:flutter/foundation.dart';
  2. import 'package:hooks_riverpod/hooks_riverpod.dart';
  3. import 'package:immich_mobile/modules/login/providers/authentication.provider.dart';
  4. import 'package:immich_mobile/shared/models/asset.dart';
  5. import 'package:immich_mobile/shared/models/store.dart';
  6. import 'package:immich_mobile/shared/providers/asset.provider.dart';
  7. import 'package:immich_mobile/shared/providers/server_info.provider.dart';
  8. import 'package:immich_mobile/shared/services/sync.service.dart';
  9. import 'package:immich_mobile/utils/debounce.dart';
  10. import 'package:logging/logging.dart';
  11. import 'package:openapi/api.dart';
  12. import 'package:socket_io_client/socket_io_client.dart';
  13. enum PendingAction {
  14. assetDelete,
  15. }
  16. class PendingChange {
  17. final PendingAction action;
  18. final dynamic value;
  19. const PendingChange(this.action, this.value);
  20. }
  21. class WebsocketState {
  22. final Socket? socket;
  23. final bool isConnected;
  24. final List<PendingChange> pendingChanges;
  25. WebsocketState({
  26. this.socket,
  27. required this.isConnected,
  28. required this.pendingChanges,
  29. });
  30. WebsocketState copyWith({
  31. Socket? socket,
  32. bool? isConnected,
  33. List<PendingChange>? pendingChanges,
  34. }) {
  35. return WebsocketState(
  36. socket: socket ?? this.socket,
  37. isConnected: isConnected ?? this.isConnected,
  38. pendingChanges: pendingChanges ?? this.pendingChanges,
  39. );
  40. }
  41. @override
  42. String toString() =>
  43. 'WebsocketState(socket: $socket, isConnected: $isConnected)';
  44. @override
  45. bool operator ==(Object other) {
  46. if (identical(this, other)) return true;
  47. return other is WebsocketState &&
  48. other.socket == socket &&
  49. other.isConnected == isConnected;
  50. }
  51. @override
  52. int get hashCode => socket.hashCode ^ isConnected.hashCode;
  53. }
  54. class WebsocketNotifier extends StateNotifier<WebsocketState> {
  55. WebsocketNotifier(this._ref)
  56. : super(
  57. WebsocketState(socket: null, isConnected: false, pendingChanges: []),
  58. );
  59. final _log = Logger('WebsocketNotifier');
  60. final Ref _ref;
  61. final Debounce _debounce = Debounce(const Duration(milliseconds: 500));
  62. /// Connects websocket to server unless already connected
  63. void connect() {
  64. if (state.isConnected) return;
  65. final authenticationState = _ref.read(authenticationProvider);
  66. if (authenticationState.isAuthenticated) {
  67. final accessToken = Store.get(StoreKey.accessToken);
  68. try {
  69. final endpoint = Uri.parse(Store.get(StoreKey.serverEndpoint));
  70. debugPrint("Attempting to connect to websocket");
  71. // Configure socket transports must be specified
  72. Socket socket = io(
  73. endpoint.origin,
  74. OptionBuilder()
  75. .setPath("${endpoint.path}/socket.io")
  76. .setTransports(['websocket'])
  77. .enableReconnection()
  78. .enableForceNew()
  79. .enableForceNewConnection()
  80. .enableAutoConnect()
  81. .setExtraHeaders({"Authorization": "Bearer $accessToken"})
  82. .build(),
  83. );
  84. socket.onConnect((_) {
  85. debugPrint("Established Websocket Connection");
  86. state = WebsocketState(
  87. isConnected: true,
  88. socket: socket,
  89. pendingChanges: state.pendingChanges,
  90. );
  91. });
  92. socket.onDisconnect((_) {
  93. debugPrint("Disconnect to Websocket Connection");
  94. state = WebsocketState(
  95. isConnected: false,
  96. socket: null,
  97. pendingChanges: state.pendingChanges,
  98. );
  99. });
  100. socket.on('error', (errorMessage) {
  101. _log.severe("Websocket Error - $errorMessage");
  102. state = WebsocketState(
  103. isConnected: false,
  104. socket: null,
  105. pendingChanges: state.pendingChanges,
  106. );
  107. });
  108. socket.on('on_upload_success', _handleOnUploadSuccess);
  109. socket.on('on_config_update', _handleOnConfigUpdate);
  110. socket.on('on_asset_delete', _handleOnAssetDelete);
  111. socket.on('on_asset_trash', _handleServerUpdates);
  112. socket.on('on_asset_restore', _handleServerUpdates);
  113. socket.on('on_asset_update', _handleServerUpdates);
  114. } catch (e) {
  115. debugPrint("[WEBSOCKET] Catch Websocket Error - ${e.toString()}");
  116. }
  117. }
  118. }
  119. void disconnect() {
  120. debugPrint("Attempting to disconnect from websocket");
  121. var socket = state.socket?.disconnect();
  122. if (socket?.disconnected == true) {
  123. state = WebsocketState(
  124. isConnected: false,
  125. socket: null,
  126. pendingChanges: state.pendingChanges,
  127. );
  128. }
  129. }
  130. void stopListenToEvent(String eventName) {
  131. debugPrint("Stop listening to event $eventName");
  132. state.socket?.off(eventName);
  133. }
  134. void listenUploadEvent() {
  135. debugPrint("Start listening to event on_upload_success");
  136. state.socket?.on('on_upload_success', _handleOnUploadSuccess);
  137. }
  138. void addPendingChange(PendingAction action, dynamic value) {
  139. state = state.copyWith(
  140. pendingChanges: [...state.pendingChanges, PendingChange(action, value)],
  141. );
  142. }
  143. void handlePendingChanges() {
  144. final deleteChanges = state.pendingChanges
  145. .where((c) => c.action == PendingAction.assetDelete)
  146. .toList();
  147. if (deleteChanges.isNotEmpty) {
  148. List<String> remoteIds =
  149. deleteChanges.map((a) => a.value.toString()).toList();
  150. _ref.read(syncServiceProvider).handleRemoteAssetRemoval(remoteIds);
  151. state = state.copyWith(
  152. pendingChanges: state.pendingChanges
  153. .where((c) => c.action != PendingAction.assetDelete)
  154. .toList(),
  155. );
  156. }
  157. }
  158. void _handleOnUploadSuccess(dynamic data) {
  159. final dto = AssetResponseDto.fromJson(data);
  160. if (dto != null) {
  161. final newAsset = Asset.remote(dto);
  162. _ref.watch(assetProvider.notifier).onNewAssetUploaded(newAsset);
  163. }
  164. }
  165. void _handleOnConfigUpdate(dynamic _) {
  166. _ref.read(serverInfoProvider.notifier).getServerFeatures();
  167. _ref.read(serverInfoProvider.notifier).getServerConfig();
  168. }
  169. // Refresh updated assets
  170. void _handleServerUpdates(dynamic _) {
  171. _ref.read(assetProvider.notifier).getAllAsset();
  172. }
  173. void _handleOnAssetDelete(dynamic data) {
  174. addPendingChange(PendingAction.assetDelete, data);
  175. _debounce(handlePendingChanges);
  176. }
  177. }
  178. final websocketProvider =
  179. StateNotifierProvider<WebsocketNotifier, WebsocketState>((ref) {
  180. return WebsocketNotifier(ref);
  181. });