fsqueue.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. """
  2. Filesystem-based queue that uses os.rename as an atomic operation to ensure
  3. that items are handled correctly.
  4. """
  5. import json
  6. import os
  7. from abc import ABC
  8. from enum import Enum
  9. from uuid import uuid4
  10. from pathlib import Path
  11. from zstandard import ZstdCompressor, ZstdDecompressor
  12. class FSState(Enum):
  13. CREATING = 'creating'
  14. READY = 'ready'
  15. LOCKED = 'locked'
  16. DONE = 'done'
  17. class Serializer(ABC):
  18. def serialize(self, item) -> bytes:
  19. pass
  20. def deserialize(self, serialized_item: bytes):
  21. pass
  22. class ZstdJsonSerializer(Serializer):
  23. def __init__(self):
  24. self.compressor = ZstdCompressor()
  25. self.decompressor = ZstdDecompressor()
  26. def serialize(self, item) -> bytes:
  27. return self.compressor.compress(json.dumps(item).encode('utf8'))
  28. def deserialize(self, serialized_item: bytes):
  29. return json.loads(self.decompressor.decompress(serialized_item).decode('utf8'))
  30. class FSQueue:
  31. def __init__(self, directory: str, name: str, serializer: Serializer):
  32. self.directory = directory
  33. self.name = name
  34. self.serializer = serializer
  35. if not os.path.isdir(self.directory):
  36. raise ValueError("Given path is not a directory")
  37. if '/' in name:
  38. raise ValueError("Name should not contain '/'")
  39. os.makedirs(os.path.join(self.directory, self.name), exist_ok=True)
  40. for state in FSState:
  41. os.makedirs(self._get_dir(state), exist_ok=True)
  42. def _get_dir(self, state: FSState):
  43. return os.path.join(self.directory, self.name, state.value)
  44. def _get_path(self, state: FSState, name: str):
  45. return os.path.join(self._get_dir(state), name)
  46. def _move(self, name: str, old_state: FSState, new_state: FSState):
  47. os.rename(self._get_path(old_state, name), self._get_path(new_state, name))
  48. def put(self, item: object):
  49. """
  50. Push a new item into the ready state
  51. """
  52. item_id = str(uuid4())
  53. with open(self._get_path(FSState.CREATING, item_id), 'wb') as output_file:
  54. output_file.write(self.serializer.serialize(item))
  55. self._move(item_id, FSState.CREATING, FSState.READY)
  56. def get(self) -> (str, object):
  57. """
  58. Get the next priority item from the queue, returning the item ID and the object
  59. """
  60. paths = sorted(Path(self._get_dir(FSState.READY)).iterdir(), key=os.path.getmtime)
  61. for path in paths:
  62. # Try and lock the file
  63. self._move(path.name, FSState.READY, FSState.LOCKED)
  64. with open(self._get_path(FSState.LOCKED, path.name), 'rb') as item_file:
  65. return path.name, self.serializer.deserialize(item_file.read())
  66. def done(self, item_id: str):
  67. """
  68. Mark a task/file as done
  69. """
  70. self._move(item_id, FSState.LOCKED, FSState.DONE)