websocket.provider.dart 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. import 'package:collection/collection.dart';
  2. import 'package:flutter/foundation.dart';
  3. import 'package:flutter/widgets.dart';
  4. import 'package:hooks_riverpod/hooks_riverpod.dart';
  5. import 'package:immich_mobile/modules/login/providers/authentication.provider.dart';
  6. import 'package:immich_mobile/shared/models/asset.dart';
  7. import 'package:immich_mobile/shared/models/server_info/server_version.model.dart';
  8. import 'package:immich_mobile/shared/models/store.dart';
  9. import 'package:immich_mobile/shared/providers/asset.provider.dart';
  10. import 'package:immich_mobile/shared/providers/db.provider.dart';
  11. import 'package:immich_mobile/shared/providers/server_info.provider.dart';
  12. import 'package:immich_mobile/shared/services/sync.service.dart';
  13. import 'package:immich_mobile/utils/debounce.dart';
  14. import 'package:logging/logging.dart';
  15. import 'package:openapi/api.dart';
  16. import 'package:socket_io_client/socket_io_client.dart';
  17. enum PendingAction {
  18. assetDelete,
  19. assetUploaded,
  20. assetHidden,
  21. }
  22. class PendingChange {
  23. final String id;
  24. final PendingAction action;
  25. final dynamic value;
  26. const PendingChange(
  27. this.id,
  28. this.action,
  29. this.value,
  30. );
  31. @override
  32. String toString() => 'PendingChange(id: $id, action: $action, value: $value)';
  33. @override
  34. bool operator ==(Object other) {
  35. if (identical(this, other)) return true;
  36. return other is PendingChange && other.id == id && other.action == action;
  37. }
  38. @override
  39. int get hashCode => id.hashCode ^ action.hashCode;
  40. }
  41. class WebsocketState {
  42. final Socket? socket;
  43. final bool isConnected;
  44. final List<PendingChange> pendingChanges;
  45. WebsocketState({
  46. this.socket,
  47. required this.isConnected,
  48. required this.pendingChanges,
  49. });
  50. WebsocketState copyWith({
  51. Socket? socket,
  52. bool? isConnected,
  53. List<PendingChange>? pendingChanges,
  54. }) {
  55. return WebsocketState(
  56. socket: socket ?? this.socket,
  57. isConnected: isConnected ?? this.isConnected,
  58. pendingChanges: pendingChanges ?? this.pendingChanges,
  59. );
  60. }
  61. @override
  62. String toString() =>
  63. 'WebsocketState(socket: $socket, isConnected: $isConnected)';
  64. @override
  65. bool operator ==(Object other) {
  66. if (identical(this, other)) return true;
  67. return other is WebsocketState &&
  68. other.socket == socket &&
  69. other.isConnected == isConnected;
  70. }
  71. @override
  72. int get hashCode => socket.hashCode ^ isConnected.hashCode;
  73. }
  74. class WebsocketNotifier extends StateNotifier<WebsocketState> {
  75. WebsocketNotifier(this._ref)
  76. : super(
  77. WebsocketState(socket: null, isConnected: false, pendingChanges: []),
  78. );
  79. final _log = Logger('WebsocketNotifier');
  80. final Ref _ref;
  81. final Debounce _debounce = Debounce(const Duration(milliseconds: 500));
  82. /// Connects websocket to server unless already connected
  83. void connect() {
  84. if (state.isConnected) return;
  85. final authenticationState = _ref.read(authenticationProvider);
  86. if (authenticationState.isAuthenticated) {
  87. final accessToken = Store.get(StoreKey.accessToken);
  88. try {
  89. final endpoint = Uri.parse(Store.get(StoreKey.serverEndpoint));
  90. debugPrint("Attempting to connect to websocket");
  91. // Configure socket transports must be specified
  92. Socket socket = io(
  93. endpoint.origin,
  94. OptionBuilder()
  95. .setPath("${endpoint.path}/socket.io")
  96. .setTransports(['websocket'])
  97. .enableReconnection()
  98. .enableForceNew()
  99. .enableForceNewConnection()
  100. .enableAutoConnect()
  101. .setExtraHeaders({"Authorization": "Bearer $accessToken"})
  102. .build(),
  103. );
  104. socket.onConnect((_) {
  105. debugPrint("Established Websocket Connection");
  106. state = WebsocketState(
  107. isConnected: true,
  108. socket: socket,
  109. pendingChanges: state.pendingChanges,
  110. );
  111. });
  112. socket.onDisconnect((_) {
  113. debugPrint("Disconnect to Websocket Connection");
  114. state = WebsocketState(
  115. isConnected: false,
  116. socket: null,
  117. pendingChanges: state.pendingChanges,
  118. );
  119. });
  120. socket.on('error', (errorMessage) {
  121. _log.severe("Websocket Error - $errorMessage");
  122. state = WebsocketState(
  123. isConnected: false,
  124. socket: null,
  125. pendingChanges: state.pendingChanges,
  126. );
  127. });
  128. socket.on('on_upload_success', _handleOnUploadSuccess);
  129. socket.on('on_config_update', _handleOnConfigUpdate);
  130. socket.on('on_asset_delete', _handleOnAssetDelete);
  131. socket.on('on_asset_trash', _handleServerUpdates);
  132. socket.on('on_asset_restore', _handleServerUpdates);
  133. socket.on('on_asset_update', _handleServerUpdates);
  134. socket.on('on_asset_hidden', _handleOnAssetHidden);
  135. socket.on('on_new_release', _handleReleaseUpdates);
  136. } catch (e) {
  137. debugPrint("[WEBSOCKET] Catch Websocket Error - ${e.toString()}");
  138. }
  139. }
  140. }
  141. void disconnect() {
  142. debugPrint("Attempting to disconnect from websocket");
  143. var socket = state.socket?.disconnect();
  144. if (socket?.disconnected == true) {
  145. state = WebsocketState(
  146. isConnected: false,
  147. socket: null,
  148. pendingChanges: state.pendingChanges,
  149. );
  150. }
  151. }
  152. void stopListenToEvent(String eventName) {
  153. debugPrint("Stop listening to event $eventName");
  154. state.socket?.off(eventName);
  155. }
  156. void listenUploadEvent() {
  157. debugPrint("Start listening to event on_upload_success");
  158. state.socket?.on('on_upload_success', _handleOnUploadSuccess);
  159. }
  160. void addPendingChange(PendingAction action, dynamic value) {
  161. final now = DateTime.now();
  162. state = state.copyWith(
  163. pendingChanges: [
  164. ...state.pendingChanges,
  165. PendingChange(now.millisecondsSinceEpoch.toString(), action, value),
  166. ],
  167. );
  168. _debounce(handlePendingChanges);
  169. }
  170. Future<void> _handlePendingDeletes() async {
  171. final deleteChanges = state.pendingChanges
  172. .where((c) => c.action == PendingAction.assetDelete)
  173. .toList();
  174. if (deleteChanges.isNotEmpty) {
  175. List<String> remoteIds =
  176. deleteChanges.map((a) => a.value.toString()).toList();
  177. await _ref.read(syncServiceProvider).handleRemoteAssetRemoval(remoteIds);
  178. state = state.copyWith(
  179. pendingChanges: state.pendingChanges
  180. .whereNot((c) => deleteChanges.contains(c))
  181. .toList(),
  182. );
  183. }
  184. }
  185. Future<void> _handlePendingUploaded() async {
  186. final uploadedChanges = state.pendingChanges
  187. .where((c) => c.action == PendingAction.assetUploaded)
  188. .toList();
  189. if (uploadedChanges.isNotEmpty) {
  190. List<AssetResponseDto?> remoteAssets = uploadedChanges
  191. .map((a) => AssetResponseDto.fromJson(a.value))
  192. .toList();
  193. for (final dto in remoteAssets) {
  194. if (dto != null) {
  195. final newAsset = Asset.remote(dto);
  196. await _ref.watch(assetProvider.notifier).onNewAssetUploaded(newAsset);
  197. }
  198. }
  199. state = state.copyWith(
  200. pendingChanges: state.pendingChanges
  201. .whereNot((c) => uploadedChanges.contains(c))
  202. .toList(),
  203. );
  204. }
  205. }
  206. Future<void> _handlingPendingHidden() async {
  207. final hiddenChanges = state.pendingChanges
  208. .where((c) => c.action == PendingAction.assetHidden)
  209. .toList();
  210. if (hiddenChanges.isNotEmpty) {
  211. List<String> remoteIds =
  212. hiddenChanges.map((a) => a.value.toString()).toList();
  213. final db = _ref.watch(dbProvider);
  214. await db.writeTxn(() => db.assets.deleteAllByRemoteId(remoteIds));
  215. state = state.copyWith(
  216. pendingChanges: state.pendingChanges
  217. .whereNot((c) => hiddenChanges.contains(c))
  218. .toList(),
  219. );
  220. }
  221. }
  222. void handlePendingChanges() async {
  223. await _handlePendingUploaded();
  224. await _handlePendingDeletes();
  225. await _handlingPendingHidden();
  226. }
  227. void _handleOnConfigUpdate(dynamic _) {
  228. _ref.read(serverInfoProvider.notifier).getServerFeatures();
  229. _ref.read(serverInfoProvider.notifier).getServerConfig();
  230. }
  231. // Refresh updated assets
  232. void _handleServerUpdates(dynamic _) {
  233. _ref.read(assetProvider.notifier).getAllAsset();
  234. }
  235. void _handleOnUploadSuccess(dynamic data) =>
  236. addPendingChange(PendingAction.assetUploaded, data);
  237. void _handleOnAssetDelete(dynamic data) =>
  238. addPendingChange(PendingAction.assetDelete, data);
  239. void _handleOnAssetHidden(dynamic data) =>
  240. addPendingChange(PendingAction.assetHidden, data);
  241. _handleReleaseUpdates(dynamic data) {
  242. // Json guard
  243. if (data is! Map) {
  244. return;
  245. }
  246. final json = data.cast<String, dynamic>();
  247. final serverVersionJson =
  248. json.containsKey('serverVersion') ? json['serverVersion'] : null;
  249. final releaseVersionJson =
  250. json.containsKey('releaseVersion') ? json['releaseVersion'] : null;
  251. if (serverVersionJson == null || releaseVersionJson == null) {
  252. return;
  253. }
  254. final serverVersionDto =
  255. ServerVersionResponseDto.fromJson(serverVersionJson);
  256. final releaseVersionDto =
  257. ServerVersionResponseDto.fromJson(releaseVersionJson);
  258. if (serverVersionDto == null || releaseVersionDto == null) {
  259. return;
  260. }
  261. final serverVersion = ServerVersion.fromDto(serverVersionDto);
  262. final releaseVersion = ServerVersion.fromDto(releaseVersionDto);
  263. _ref
  264. .read(serverInfoProvider.notifier)
  265. .handleNewRelease(serverVersion, releaseVersion);
  266. }
  267. }
  268. final websocketProvider =
  269. StateNotifierProvider<WebsocketNotifier, WebsocketState>((ref) {
  270. return WebsocketNotifier(ref);
  271. });