fsqueue.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. """
  2. Filesystem-based queue that uses os.rename as an atomic operation to ensure
  3. that items are handled correctly.
  4. """
  5. import gzip
  6. import json
  7. import os
  8. from abc import ABC
  9. from enum import Enum
  10. from typing import Union
  11. from uuid import uuid4
  12. from pathlib import Path
  13. from zstandard import ZstdCompressor, ZstdDecompressor
  14. class FSState(Enum):
  15. CREATING = 'creating'
  16. READY = 'ready'
  17. LOCKED = 'locked'
  18. DONE = 'done'
  19. class Serializer(ABC):
  20. def serialize(self, item) -> bytes:
  21. pass
  22. def deserialize(self, serialized_item: bytes):
  23. pass
  24. class ZstdJsonSerializer(Serializer):
  25. def __init__(self):
  26. self.compressor = ZstdCompressor()
  27. self.decompressor = ZstdDecompressor()
  28. def serialize(self, item) -> bytes:
  29. return self.compressor.compress(json.dumps(item).encode('utf8'))
  30. def deserialize(self, serialized_item: bytes):
  31. return json.loads(self.decompressor.decompress(serialized_item).decode('utf8'))
  32. class GzipJsonRowSerializer(Serializer):
  33. def serialize(self, items: list[object]) -> bytes:
  34. json_items = [json.dumps(item) for item in items]
  35. return gzip.compress('\n'.join(json_items).encode('utf8'))
  36. def deserialize(self, serialized_items: bytes) -> list[object]:
  37. lines = gzip.decompress(serialized_items).decode('utf8')
  38. return [json.loads(line) for line in lines.strip().split('\n')]
  39. class FSQueue:
  40. def __init__(self, directory: Union[str, Path], name: str, serializer: Serializer):
  41. self.directory = str(directory)
  42. self.name = name
  43. self.serializer = serializer
  44. if not os.path.isdir(self.directory):
  45. raise ValueError("Given path is not a directory")
  46. if '/' in name:
  47. raise ValueError("Name should not contain '/'")
  48. os.makedirs(os.path.join(self.directory, self.name), exist_ok=True)
  49. for state in FSState:
  50. os.makedirs(self._get_dir(state), exist_ok=True)
  51. def _get_dir(self, state: FSState):
  52. return os.path.join(self.directory, self.name, state.value)
  53. def _get_path(self, state: FSState, name: str):
  54. return os.path.join(self._get_dir(state), name)
  55. def _move(self, name: str, old_state: FSState, new_state: FSState):
  56. os.rename(self._get_path(old_state, name), self._get_path(new_state, name))
  57. def put(self, item: object):
  58. """
  59. Push a new item into the ready state
  60. """
  61. item_id = str(uuid4())
  62. with open(self._get_path(FSState.CREATING, item_id), 'wb') as output_file:
  63. output_file.write(self.serializer.serialize(item))
  64. self._move(item_id, FSState.CREATING, FSState.READY)
  65. def get(self) -> (str, object):
  66. """
  67. Get the next priority item from the queue, returning the item ID and the object
  68. """
  69. directory = self._get_dir(FSState.READY)
  70. print("Getting directory", directory)
  71. paths = list(Path(directory).iterdir())
  72. print("Top paths", paths[:10])
  73. for path in paths:
  74. # Try and lock the file
  75. try:
  76. print("Moving file", path.name)
  77. self._move(path.name, FSState.READY, FSState.LOCKED)
  78. except FileNotFoundError:
  79. print("File not found", path.name)
  80. continue
  81. with open(self._get_path(FSState.LOCKED, path.name), 'rb') as item_file:
  82. print("Opening file", path.name)
  83. return path.name, self.serializer.deserialize(item_file.read())
  84. def done(self, item_id: str):
  85. """
  86. Mark a task/file as done
  87. """
  88. self._move(item_id, FSState.LOCKED, FSState.DONE)
  89. def unlock_all(self):
  90. paths = sorted(Path(self._get_dir(FSState.LOCKED)).iterdir(), key=os.path.getmtime)
  91. for path in paths:
  92. # Try and lock the file
  93. self._move(path.name, FSState.LOCKED, FSState.READY)