SharedCircularQueue.h 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. /*
  2. * Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org>
  3. *
  4. * SPDX-License-Identifier: BSD-2-Clause
  5. */
  6. #pragma once
  7. #include <AK/Assertions.h>
  8. #include <AK/Atomic.h>
  9. #include <AK/BuiltinWrappers.h>
  10. #include <AK/ByteString.h>
  11. #include <AK/Debug.h>
  12. #include <AK/Error.h>
  13. #include <AK/Format.h>
  14. #include <AK/Function.h>
  15. #include <AK/NonnullRefPtr.h>
  16. #include <AK/NumericLimits.h>
  17. #include <AK/Platform.h>
  18. #include <AK/RefCounted.h>
  19. #include <AK/RefPtr.h>
  20. #include <AK/Types.h>
  21. #include <AK/Variant.h>
  22. #include <AK/Weakable.h>
  23. #include <LibCore/AnonymousBuffer.h>
  24. #include <LibCore/System.h>
  25. #include <errno.h>
  26. #include <fcntl.h>
  27. #include <sched.h>
  28. #include <sys/mman.h>
  29. namespace Core {
  30. // A circular lock-free queue (or a buffer) with a single producer,
  31. // residing in shared memory and designed to be accessible to multiple processes.
  32. // This implementation makes use of the fact that any producer-related code can be sure that
  33. // it's the only producer-related code that is running, which simplifies a bunch of the synchronization code.
  34. // The exclusivity and liveliness for critical sections in this class is proven to be correct
  35. // under the assumption of correct synchronization primitives, i.e. atomics.
  36. // In many circumstances, this is enough for cross-process queues.
  37. // This class is designed to be transferred over IPC and mmap()ed into multiple processes' memory.
  38. // It is a synthetic pointer to the actual shared memory, which is abstracted away from the user.
  39. // FIXME: Make this independent of shared memory, so that we can move it to AK.
  40. template<typename T, size_t Size = 32>
  41. // Size must be a power of two, which speeds up the modulus operations for indexing.
  42. requires(popcount(Size) == 1)
  43. class SharedSingleProducerCircularQueue final {
  44. public:
  45. using ValueType = T;
  46. enum class QueueStatus : u8 {
  47. Invalid = 0,
  48. Full,
  49. Empty,
  50. };
  51. SharedSingleProducerCircularQueue() = default;
  52. SharedSingleProducerCircularQueue(SharedSingleProducerCircularQueue<ValueType, Size>& queue) = default;
  53. SharedSingleProducerCircularQueue(SharedSingleProducerCircularQueue&& queue) = default;
  54. SharedSingleProducerCircularQueue& operator=(SharedSingleProducerCircularQueue&& queue) = default;
  55. // Allocates a new circular queue in shared memory.
  56. static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create()
  57. {
  58. auto fd = TRY(System::anon_create(sizeof(SharedMemorySPCQ), O_CLOEXEC));
  59. return create_internal(fd, true);
  60. }
  61. // Uses an existing circular queue from given shared memory.
  62. static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create(int fd)
  63. {
  64. return create_internal(fd, false);
  65. }
  66. constexpr size_t size() const { return Size; }
  67. // These functions are provably inconsistent and should only be used as hints to the actual capacity and used count.
  68. ALWAYS_INLINE size_t weak_remaining_capacity() const { return Size - weak_used(); }
  69. ALWAYS_INLINE size_t weak_used() const
  70. {
  71. auto volatile head = m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed);
  72. auto volatile tail = m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed);
  73. return head - tail;
  74. }
  75. ALWAYS_INLINE constexpr int fd() const { return m_queue->m_fd; }
  76. ALWAYS_INLINE constexpr bool is_valid() const { return !m_queue.is_null(); }
  77. ALWAYS_INLINE constexpr size_t weak_head() const { return m_queue->m_queue->m_head.load(AK::MemoryOrder::memory_order_relaxed); }
  78. ALWAYS_INLINE constexpr size_t weak_tail() const { return m_queue->m_queue->m_tail.load(AK::MemoryOrder::memory_order_relaxed); }
  79. ErrorOr<void, QueueStatus> enqueue(ValueType to_insert)
  80. {
  81. VERIFY(!m_queue.is_null());
  82. if (!can_enqueue())
  83. return QueueStatus::Full;
  84. auto our_tail = m_queue->m_queue->m_tail.load() % Size;
  85. m_queue->m_queue->m_data[our_tail] = to_insert;
  86. m_queue->m_queue->m_tail.fetch_add(1);
  87. return {};
  88. }
  89. ALWAYS_INLINE bool can_enqueue() const
  90. {
  91. return ((head() - 1) % Size) != (m_queue->m_queue->m_tail.load() % Size);
  92. }
  93. // Repeatedly try to enqueue, using the wait_function to wait if it's not possible
  94. ErrorOr<void> blocking_enqueue(ValueType to_insert, Function<void()> wait_function)
  95. {
  96. ErrorOr<void, QueueStatus> result;
  97. while (true) {
  98. result = enqueue(to_insert);
  99. if (!result.is_error())
  100. break;
  101. if (result.error() != QueueStatus::Full)
  102. return Error::from_string_literal("Unexpected error while enqueuing");
  103. wait_function();
  104. }
  105. return {};
  106. }
  107. ErrorOr<ValueType, QueueStatus> dequeue()
  108. {
  109. VERIFY(!m_queue.is_null());
  110. while (true) {
  111. // This CAS only succeeds if nobody is currently dequeuing.
  112. auto size_max = NumericLimits<size_t>::max();
  113. if (m_queue->m_queue->m_head_protector.compare_exchange_strong(size_max, m_queue->m_queue->m_head.load())) {
  114. auto old_head = m_queue->m_queue->m_head.load();
  115. // This check looks like it's in a weird place (especially since we have to roll back the protector), but it's actually protecting against a race between multiple dequeuers.
  116. if (old_head >= m_queue->m_queue->m_tail.load()) {
  117. m_queue->m_queue->m_head_protector.store(NumericLimits<size_t>::max(), AK::MemoryOrder::memory_order_release);
  118. return QueueStatus::Empty;
  119. }
  120. auto data = move(m_queue->m_queue->m_data[old_head % Size]);
  121. m_queue->m_queue->m_head.fetch_add(1);
  122. m_queue->m_queue->m_head_protector.store(NumericLimits<size_t>::max(), AK::MemoryOrder::memory_order_release);
  123. return { move(data) };
  124. }
  125. }
  126. }
  127. // The "real" head as seen by the outside world. Don't use m_head directly unless you know what you're doing.
  128. size_t head() const
  129. {
  130. return min(m_queue->m_queue->m_head.load(), m_queue->m_queue->m_head_protector.load());
  131. }
  132. private:
  133. struct SharedMemorySPCQ {
  134. SharedMemorySPCQ() = default;
  135. SharedMemorySPCQ(SharedMemorySPCQ const&) = delete;
  136. SharedMemorySPCQ(SharedMemorySPCQ&&) = delete;
  137. ~SharedMemorySPCQ() = default;
  138. // Invariant: tail >= head
  139. // Invariant: head and tail are monotonically increasing
  140. // Invariant: tail always points to the next free location where an enqueue can happen.
  141. // Invariant: head always points to the element to be dequeued next.
  142. // Invariant: tail is only modified by enqueue functions.
  143. // Invariant: head is only modified by dequeue functions.
  144. // An empty queue is signalled with: tail = head
  145. // A full queue is signalled with: head - 1 mod size = tail mod size (i.e. head and tail point to the same index in the data array)
  146. // FIXME: These invariants aren't proven to be correct after each successful completion of each operation where it is relevant.
  147. // The work could be put in but for now I think the algorithmic correctness proofs of the functions are enough.
  148. AK_CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_tail { 0 };
  149. AK_CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_head { 0 };
  150. AK_CACHE_ALIGNED Atomic<size_t, AK::MemoryOrder::memory_order_seq_cst> m_head_protector { NumericLimits<size_t>::max() };
  151. alignas(ValueType) Array<ValueType, Size> m_data;
  152. };
  153. class RefCountedSharedMemorySPCQ : public RefCounted<RefCountedSharedMemorySPCQ> {
  154. friend class SharedSingleProducerCircularQueue;
  155. public:
  156. SharedMemorySPCQ* m_queue;
  157. void* m_raw;
  158. int m_fd;
  159. ~RefCountedSharedMemorySPCQ()
  160. {
  161. MUST(System::close(m_fd));
  162. MUST(System::munmap(m_raw, sizeof(SharedMemorySPCQ)));
  163. dbgln_if(SHARED_QUEUE_DEBUG, "destructed SSPCQ at {:p}, shared mem: {:p}", this, this->m_raw);
  164. }
  165. private:
  166. RefCountedSharedMemorySPCQ(SharedMemorySPCQ* queue, int fd)
  167. : m_queue(queue)
  168. , m_raw(reinterpret_cast<void*>(queue))
  169. , m_fd(fd)
  170. {
  171. }
  172. };
  173. static ErrorOr<SharedSingleProducerCircularQueue<T, Size>> create_internal(int fd, bool is_new)
  174. {
  175. auto name = ByteString::formatted("SharedSingleProducerCircularQueue@{:x}", fd);
  176. auto* raw_mapping = TRY(System::mmap(nullptr, sizeof(SharedMemorySPCQ), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0, 0, name));
  177. dbgln_if(SHARED_QUEUE_DEBUG, "successfully mmapped {} at {:p}", name, raw_mapping);
  178. SharedMemorySPCQ* shared_queue = is_new ? new (raw_mapping) SharedMemorySPCQ() : reinterpret_cast<SharedMemorySPCQ*>(raw_mapping);
  179. if (!shared_queue)
  180. return Error::from_string_literal("Unexpected error when creating shared queue from raw memory");
  181. return SharedSingleProducerCircularQueue<T, Size> { move(name), adopt_ref(*new (nothrow) RefCountedSharedMemorySPCQ(shared_queue, fd)) };
  182. }
  183. SharedSingleProducerCircularQueue(ByteString name, RefPtr<RefCountedSharedMemorySPCQ> queue)
  184. : m_queue(queue)
  185. , m_name(move(name))
  186. {
  187. }
  188. RefPtr<RefCountedSharedMemorySPCQ> m_queue;
  189. ByteString m_name {};
  190. };
  191. }