Stream.h 39 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117
  1. /*
  2. * Copyright (c) 2021, sin-ack <sin-ack@protonmail.com>
  3. * Copyright (c) 2022, the SerenityOS developers.
  4. *
  5. * SPDX-License-Identifier: BSD-2-Clause
  6. */
  7. #pragma once
  8. #include <AK/Badge.h>
  9. #include <AK/CircularBuffer.h>
  10. #include <AK/DeprecatedString.h>
  11. #include <AK/EnumBits.h>
  12. #include <AK/Function.h>
  13. #include <AK/IPv4Address.h>
  14. #include <AK/Noncopyable.h>
  15. #include <AK/Result.h>
  16. #include <AK/Span.h>
  17. #include <AK/Time.h>
  18. #include <AK/Variant.h>
  19. #include <LibCore/Notifier.h>
  20. #include <LibCore/SocketAddress.h>
  21. #include <LibIPC/Forward.h>
  22. #include <errno.h>
  23. #include <netdb.h>
  24. namespace Core::Stream {
  25. template<DerivedFrom<Core::Stream::Stream> T>
  26. class Handle {
  27. public:
  28. template<DerivedFrom<T> U>
  29. Handle(NonnullOwnPtr<U> handle)
  30. : m_handle(adopt_own<T>(*handle.leak_ptr()))
  31. {
  32. }
  33. // This is made `explicit` to not accidentally create a non-owning Handle,
  34. // which may not always be intended.
  35. explicit Handle(T& handle)
  36. : m_handle(&handle)
  37. {
  38. }
  39. T* ptr()
  40. {
  41. if (m_handle.template has<T*>())
  42. return m_handle.template get<T*>();
  43. else
  44. return m_handle.template get<NonnullOwnPtr<T>>();
  45. }
  46. T const* ptr() const
  47. {
  48. if (m_handle.template has<T*>())
  49. return m_handle.template get<T*>();
  50. else
  51. return m_handle.template get<NonnullOwnPtr<T>>();
  52. }
  53. T* operator->() { return ptr(); }
  54. T const* operator->() const { return ptr(); }
  55. T& operator*() { return *ptr(); }
  56. T const& operator*() const { return *ptr(); }
  57. private:
  58. Variant<NonnullOwnPtr<T>, T*> m_handle;
  59. };
  60. /// The base, abstract class for stream operations. This class defines the
  61. /// operations one can perform on every stream in LibCore.
  62. class Stream {
  63. public:
  64. /// Reads into a buffer, with the maximum size being the size of the buffer.
  65. /// The amount of bytes read can be smaller than the size of the buffer.
  66. /// Returns either the bytes that were read, or an errno in the case of
  67. /// failure.
  68. virtual ErrorOr<Bytes> read(Bytes) = 0;
  69. /// Tries to fill the entire buffer through reading. Returns whether the
  70. /// buffer was filled without an error.
  71. virtual ErrorOr<void> read_entire_buffer(Bytes);
  72. /// Reads the stream until EOF, storing the contents into a ByteBuffer which
  73. /// is returned once EOF is encountered. The block size determines the size
  74. /// of newly allocated chunks while reading.
  75. virtual ErrorOr<ByteBuffer> read_until_eof(size_t block_size = 4096);
  76. /// Discards the given number of bytes from the stream. As this is usually used
  77. /// as an efficient version of `read_entire_buffer`, it returns an error
  78. /// if reading failed or if not all bytes could be discarded.
  79. /// Unless specifically overwritten, this just uses read() to read into an
  80. /// internal stack-based buffer.
  81. virtual ErrorOr<void> discard(size_t discarded_bytes);
  82. /// Tries to write the entire contents of the buffer. It is possible for
  83. /// less than the full buffer to be written. Returns either the amount of
  84. /// bytes written into the stream, or an errno in the case of failure.
  85. virtual ErrorOr<size_t> write(ReadonlyBytes) = 0;
  86. /// Same as write, but does not return until either the entire buffer
  87. /// contents are written or an error occurs.
  88. virtual ErrorOr<void> write_entire_buffer(ReadonlyBytes);
  89. // This is a wrapper around `write_entire_buffer` that is compatible with
  90. // `write_or_error`. This is required by some templated code in LibProtocol
  91. // that needs to work with either type of stream.
  92. // TODO: Fully port or wrap `Request::stream_into_impl` into `Core::Stream` and remove this.
  93. bool write_or_error(ReadonlyBytes buffer)
  94. {
  95. return !write_entire_buffer(buffer).is_error();
  96. }
  97. template<typename T>
  98. requires(Traits<T>::is_trivially_serializable())
  99. ErrorOr<T> read_value()
  100. {
  101. alignas(T) u8 buffer[sizeof(T)] = {};
  102. TRY(read_entire_buffer({ &buffer, sizeof(buffer) }));
  103. return bit_cast<T>(buffer);
  104. }
  105. template<typename T>
  106. requires(Traits<T>::is_trivially_serializable())
  107. ErrorOr<void> write_value(T const& value)
  108. {
  109. return write_entire_buffer({ &value, sizeof(value) });
  110. }
  111. /// Returns whether the stream has reached the end of file. For sockets,
  112. /// this most likely means that the protocol has disconnected (in the case
  113. /// of TCP). For seekable streams, this means the end of the file. Note that
  114. /// is_eof will only return true _after_ a read with 0 length, so this
  115. /// method should be called after a read.
  116. virtual bool is_eof() const = 0;
  117. virtual bool is_open() const = 0;
  118. virtual void close() = 0;
  119. virtual ~Stream()
  120. {
  121. }
  122. protected:
  123. /// Provides a default implementation of read_until_eof that works for streams
  124. /// that behave like POSIX file descriptors. expected_file_size can be
  125. /// passed as a heuristic for what the Stream subclass expects the file
  126. /// content size to be in order to reduce allocations (does not affect
  127. /// actual reading).
  128. ErrorOr<ByteBuffer> read_until_eof_impl(size_t block_size, size_t expected_file_size = 0);
  129. };
  130. enum class SeekMode {
  131. SetPosition,
  132. FromCurrentPosition,
  133. FromEndPosition,
  134. };
  135. /// Adds seekability to Core::Stream. Classes inheriting from SeekableStream
  136. /// will be seekable to any point in the stream.
  137. class SeekableStream : public Stream {
  138. public:
  139. /// Seeks to the given position in the given mode. Returns either the
  140. /// current position of the file, or an errno in the case of an error.
  141. virtual ErrorOr<off_t> seek(i64 offset, SeekMode) = 0;
  142. /// Returns the current position of the file, or an errno in the case of
  143. /// an error.
  144. virtual ErrorOr<off_t> tell() const;
  145. /// Returns the total size of the stream, or an errno in the case of an
  146. /// error. May not preserve the original position on the stream on failure.
  147. virtual ErrorOr<off_t> size();
  148. /// Shrinks or extends the stream to the given size. Returns an errno in
  149. /// the case of an error.
  150. virtual ErrorOr<void> truncate(off_t length) = 0;
  151. /// Seeks until after the given amount of bytes to be discarded instead of
  152. /// reading and discarding everything manually;
  153. virtual ErrorOr<void> discard(size_t discarded_bytes) override;
  154. };
  155. enum class PreventSIGPIPE {
  156. No,
  157. Yes,
  158. };
  159. /// The Socket class is the base class for all concrete BSD-style socket
  160. /// classes. Sockets are non-seekable streams which can be read byte-wise.
  161. class Socket : public Stream {
  162. public:
  163. Socket(Socket&&) = default;
  164. Socket& operator=(Socket&&) = default;
  165. /// Checks how many bytes of data are currently available to read on the
  166. /// socket. For datagram-based socket, this is the size of the first
  167. /// datagram that can be read. Returns either the amount of bytes, or an
  168. /// errno in the case of failure.
  169. virtual ErrorOr<size_t> pending_bytes() const = 0;
  170. /// Returns whether there's any data that can be immediately read, or an
  171. /// errno on failure.
  172. virtual ErrorOr<bool> can_read_without_blocking(int timeout = 0) const = 0;
  173. // Sets the blocking mode of the socket. If blocking mode is disabled, reads
  174. // will fail with EAGAIN when there's no data available to read, and writes
  175. // will fail with EAGAIN when the data cannot be written without blocking
  176. // (due to the send buffer being full, for example).
  177. virtual ErrorOr<void> set_blocking(bool enabled) = 0;
  178. // Sets the close-on-exec mode of the socket. If close-on-exec mode is
  179. // enabled, then the socket will be automatically closed by the kernel when
  180. // an exec call happens.
  181. virtual ErrorOr<void> set_close_on_exec(bool enabled) = 0;
  182. /// Disables any listening mechanisms that this socket uses.
  183. /// Can be called with 'false' when `on_ready_to_read` notifications are no longer needed.
  184. /// Conversely, set_notifications_enabled(true) will re-enable notifications.
  185. virtual void set_notifications_enabled(bool) { }
  186. Function<void()> on_ready_to_read;
  187. protected:
  188. enum class SocketDomain {
  189. Local,
  190. Inet,
  191. };
  192. enum class SocketType {
  193. Stream,
  194. Datagram,
  195. };
  196. Socket(PreventSIGPIPE prevent_sigpipe = PreventSIGPIPE::No)
  197. : m_prevent_sigpipe(prevent_sigpipe == PreventSIGPIPE::Yes)
  198. {
  199. }
  200. static ErrorOr<int> create_fd(SocketDomain, SocketType);
  201. // FIXME: This will need to be updated when IPv6 socket arrives. Perhaps a
  202. // base class for all address types is appropriate.
  203. static ErrorOr<IPv4Address> resolve_host(DeprecatedString const&, SocketType);
  204. static ErrorOr<void> connect_local(int fd, DeprecatedString const& path);
  205. static ErrorOr<void> connect_inet(int fd, SocketAddress const&);
  206. int default_flags() const
  207. {
  208. int flags = 0;
  209. if (m_prevent_sigpipe)
  210. flags |= MSG_NOSIGNAL;
  211. return flags;
  212. }
  213. private:
  214. bool m_prevent_sigpipe { false };
  215. };
  216. /// A reusable socket maintains state about being connected in addition to
  217. /// normal Socket capabilities, and can be reconnected once disconnected.
  218. class ReusableSocket : public Socket {
  219. public:
  220. /// Returns whether the socket is currently connected.
  221. virtual bool is_connected() = 0;
  222. /// Reconnects the socket to the given host and port. Returns EALREADY if
  223. /// is_connected() is true.
  224. virtual ErrorOr<void> reconnect(DeprecatedString const& host, u16 port) = 0;
  225. /// Connects the socket to the given socket address (IP address + port).
  226. /// Returns EALREADY is_connected() is true.
  227. virtual ErrorOr<void> reconnect(SocketAddress const&) = 0;
  228. };
  229. // Concrete classes.
  230. enum class OpenMode : unsigned {
  231. NotOpen = 0,
  232. Read = 1,
  233. Write = 2,
  234. ReadWrite = 3,
  235. Append = 4,
  236. Truncate = 8,
  237. MustBeNew = 16,
  238. KeepOnExec = 32,
  239. Nonblocking = 64,
  240. };
  241. enum class ShouldCloseFileDescriptor {
  242. Yes,
  243. No,
  244. };
  245. AK_ENUM_BITWISE_OPERATORS(OpenMode)
  246. class File final : public SeekableStream {
  247. AK_MAKE_NONCOPYABLE(File);
  248. public:
  249. static ErrorOr<NonnullOwnPtr<File>> open(StringView filename, OpenMode, mode_t = 0644);
  250. static ErrorOr<NonnullOwnPtr<File>> adopt_fd(int fd, OpenMode, ShouldCloseFileDescriptor = ShouldCloseFileDescriptor::Yes);
  251. static ErrorOr<NonnullOwnPtr<File>> standard_input();
  252. static ErrorOr<NonnullOwnPtr<File>> standard_output();
  253. static ErrorOr<NonnullOwnPtr<File>> standard_error();
  254. static ErrorOr<NonnullOwnPtr<File>> open_file_or_standard_stream(StringView filename, OpenMode mode);
  255. File(File&& other) { operator=(move(other)); }
  256. File& operator=(File&& other)
  257. {
  258. if (&other == this)
  259. return *this;
  260. m_mode = exchange(other.m_mode, OpenMode::NotOpen);
  261. m_fd = exchange(other.m_fd, -1);
  262. m_last_read_was_eof = exchange(other.m_last_read_was_eof, false);
  263. return *this;
  264. }
  265. virtual ErrorOr<Bytes> read(Bytes) override;
  266. virtual ErrorOr<ByteBuffer> read_until_eof(size_t block_size = 4096) override;
  267. virtual ErrorOr<size_t> write(ReadonlyBytes) override;
  268. virtual bool is_eof() const override;
  269. virtual bool is_open() const override;
  270. virtual void close() override;
  271. virtual ErrorOr<off_t> seek(i64 offset, SeekMode) override;
  272. virtual ErrorOr<void> truncate(off_t length) override;
  273. int leak_fd(Badge<::IPC::File>)
  274. {
  275. m_should_close_file_descriptor = ShouldCloseFileDescriptor::No;
  276. return m_fd;
  277. }
  278. int fd() const
  279. {
  280. return m_fd;
  281. }
  282. virtual ~File() override
  283. {
  284. if (m_should_close_file_descriptor == ShouldCloseFileDescriptor::Yes)
  285. close();
  286. }
  287. static int open_mode_to_options(OpenMode mode);
  288. private:
  289. File(OpenMode mode, ShouldCloseFileDescriptor should_close = ShouldCloseFileDescriptor::Yes)
  290. : m_mode(mode)
  291. , m_should_close_file_descriptor(should_close)
  292. {
  293. }
  294. ErrorOr<void> open_path(StringView filename, mode_t);
  295. OpenMode m_mode { OpenMode::NotOpen };
  296. int m_fd { -1 };
  297. bool m_last_read_was_eof { false };
  298. ShouldCloseFileDescriptor m_should_close_file_descriptor { ShouldCloseFileDescriptor::Yes };
  299. };
  300. class PosixSocketHelper {
  301. AK_MAKE_NONCOPYABLE(PosixSocketHelper);
  302. public:
  303. template<typename T>
  304. PosixSocketHelper(Badge<T>)
  305. requires(IsBaseOf<Socket, T>)
  306. {
  307. }
  308. PosixSocketHelper(PosixSocketHelper&& other)
  309. {
  310. operator=(move(other));
  311. }
  312. PosixSocketHelper& operator=(PosixSocketHelper&& other)
  313. {
  314. m_fd = exchange(other.m_fd, -1);
  315. m_last_read_was_eof = exchange(other.m_last_read_was_eof, false);
  316. m_notifier = move(other.m_notifier);
  317. return *this;
  318. }
  319. int fd() const { return m_fd; }
  320. void set_fd(int fd) { m_fd = fd; }
  321. ErrorOr<Bytes> read(Bytes, int flags);
  322. ErrorOr<size_t> write(ReadonlyBytes, int flags);
  323. bool is_eof() const { return !is_open() || m_last_read_was_eof; }
  324. bool is_open() const { return m_fd != -1; }
  325. void close();
  326. ErrorOr<size_t> pending_bytes() const;
  327. ErrorOr<bool> can_read_without_blocking(int timeout) const;
  328. ErrorOr<void> set_blocking(bool enabled);
  329. ErrorOr<void> set_close_on_exec(bool enabled);
  330. ErrorOr<void> set_receive_timeout(Time timeout);
  331. void setup_notifier();
  332. RefPtr<Core::Notifier> notifier() { return m_notifier; }
  333. private:
  334. int m_fd { -1 };
  335. bool m_last_read_was_eof { false };
  336. RefPtr<Core::Notifier> m_notifier;
  337. };
  338. class TCPSocket final : public Socket {
  339. public:
  340. static ErrorOr<NonnullOwnPtr<TCPSocket>> connect(DeprecatedString const& host, u16 port);
  341. static ErrorOr<NonnullOwnPtr<TCPSocket>> connect(SocketAddress const& address);
  342. static ErrorOr<NonnullOwnPtr<TCPSocket>> adopt_fd(int fd);
  343. TCPSocket(TCPSocket&& other)
  344. : Socket(static_cast<Socket&&>(other))
  345. , m_helper(move(other.m_helper))
  346. {
  347. if (is_open())
  348. setup_notifier();
  349. }
  350. TCPSocket& operator=(TCPSocket&& other)
  351. {
  352. Socket::operator=(static_cast<Socket&&>(other));
  353. m_helper = move(other.m_helper);
  354. if (is_open())
  355. setup_notifier();
  356. return *this;
  357. }
  358. virtual ErrorOr<Bytes> read(Bytes buffer) override { return m_helper.read(buffer, default_flags()); }
  359. virtual ErrorOr<size_t> write(ReadonlyBytes buffer) override { return m_helper.write(buffer, default_flags()); }
  360. virtual bool is_eof() const override { return m_helper.is_eof(); }
  361. virtual bool is_open() const override { return m_helper.is_open(); };
  362. virtual void close() override { m_helper.close(); };
  363. virtual ErrorOr<size_t> pending_bytes() const override { return m_helper.pending_bytes(); }
  364. virtual ErrorOr<bool> can_read_without_blocking(int timeout = 0) const override { return m_helper.can_read_without_blocking(timeout); }
  365. virtual void set_notifications_enabled(bool enabled) override
  366. {
  367. if (auto notifier = m_helper.notifier())
  368. notifier->set_enabled(enabled);
  369. }
  370. ErrorOr<void> set_blocking(bool enabled) override { return m_helper.set_blocking(enabled); }
  371. ErrorOr<void> set_close_on_exec(bool enabled) override { return m_helper.set_close_on_exec(enabled); }
  372. virtual ~TCPSocket() override { close(); }
  373. private:
  374. TCPSocket(PreventSIGPIPE prevent_sigpipe = PreventSIGPIPE::No)
  375. : Socket(prevent_sigpipe)
  376. {
  377. }
  378. void setup_notifier()
  379. {
  380. VERIFY(is_open());
  381. m_helper.setup_notifier();
  382. m_helper.notifier()->on_ready_to_read = [this] {
  383. if (on_ready_to_read)
  384. on_ready_to_read();
  385. };
  386. }
  387. PosixSocketHelper m_helper { Badge<TCPSocket> {} };
  388. };
  389. class UDPSocket final : public Socket {
  390. public:
  391. static ErrorOr<NonnullOwnPtr<UDPSocket>> connect(DeprecatedString const& host, u16 port, Optional<Time> timeout = {});
  392. static ErrorOr<NonnullOwnPtr<UDPSocket>> connect(SocketAddress const& address, Optional<Time> timeout = {});
  393. UDPSocket(UDPSocket&& other)
  394. : Socket(static_cast<Socket&&>(other))
  395. , m_helper(move(other.m_helper))
  396. {
  397. if (is_open())
  398. setup_notifier();
  399. }
  400. UDPSocket& operator=(UDPSocket&& other)
  401. {
  402. Socket::operator=(static_cast<Socket&&>(other));
  403. m_helper = move(other.m_helper);
  404. if (is_open())
  405. setup_notifier();
  406. return *this;
  407. }
  408. virtual ErrorOr<Bytes> read(Bytes buffer) override
  409. {
  410. auto pending_bytes = TRY(this->pending_bytes());
  411. if (pending_bytes > buffer.size()) {
  412. // With UDP datagrams, reading a datagram into a buffer that's
  413. // smaller than the datagram's size will cause the rest of the
  414. // datagram to be discarded. That's not very nice, so let's bail
  415. // early, telling the caller that he should allocate a bigger
  416. // buffer.
  417. return Error::from_errno(EMSGSIZE);
  418. }
  419. return m_helper.read(buffer, default_flags());
  420. }
  421. virtual ErrorOr<size_t> write(ReadonlyBytes buffer) override { return m_helper.write(buffer, default_flags()); }
  422. virtual bool is_eof() const override { return m_helper.is_eof(); }
  423. virtual bool is_open() const override { return m_helper.is_open(); }
  424. virtual void close() override { m_helper.close(); }
  425. virtual ErrorOr<size_t> pending_bytes() const override { return m_helper.pending_bytes(); }
  426. virtual ErrorOr<bool> can_read_without_blocking(int timeout = 0) const override { return m_helper.can_read_without_blocking(timeout); }
  427. virtual void set_notifications_enabled(bool enabled) override
  428. {
  429. if (auto notifier = m_helper.notifier())
  430. notifier->set_enabled(enabled);
  431. }
  432. ErrorOr<void> set_blocking(bool enabled) override { return m_helper.set_blocking(enabled); }
  433. ErrorOr<void> set_close_on_exec(bool enabled) override { return m_helper.set_close_on_exec(enabled); }
  434. virtual ~UDPSocket() override { close(); }
  435. private:
  436. UDPSocket(PreventSIGPIPE prevent_sigpipe = PreventSIGPIPE::No)
  437. : Socket(prevent_sigpipe)
  438. {
  439. }
  440. void setup_notifier()
  441. {
  442. VERIFY(is_open());
  443. m_helper.setup_notifier();
  444. m_helper.notifier()->on_ready_to_read = [this] {
  445. if (on_ready_to_read)
  446. on_ready_to_read();
  447. };
  448. }
  449. PosixSocketHelper m_helper { Badge<UDPSocket> {} };
  450. };
  451. class LocalSocket final : public Socket {
  452. public:
  453. static ErrorOr<NonnullOwnPtr<LocalSocket>> connect(DeprecatedString const& path, PreventSIGPIPE = PreventSIGPIPE::No);
  454. static ErrorOr<NonnullOwnPtr<LocalSocket>> adopt_fd(int fd, PreventSIGPIPE = PreventSIGPIPE::No);
  455. LocalSocket(LocalSocket&& other)
  456. : Socket(static_cast<Socket&&>(other))
  457. , m_helper(move(other.m_helper))
  458. {
  459. if (is_open())
  460. setup_notifier();
  461. }
  462. LocalSocket& operator=(LocalSocket&& other)
  463. {
  464. Socket::operator=(static_cast<Socket&&>(other));
  465. m_helper = move(other.m_helper);
  466. if (is_open())
  467. setup_notifier();
  468. return *this;
  469. }
  470. virtual ErrorOr<Bytes> read(Bytes buffer) override { return m_helper.read(buffer, default_flags()); }
  471. virtual ErrorOr<size_t> write(ReadonlyBytes buffer) override { return m_helper.write(buffer, default_flags()); }
  472. virtual bool is_eof() const override { return m_helper.is_eof(); }
  473. virtual bool is_open() const override { return m_helper.is_open(); }
  474. virtual void close() override { m_helper.close(); }
  475. virtual ErrorOr<size_t> pending_bytes() const override { return m_helper.pending_bytes(); }
  476. virtual ErrorOr<bool> can_read_without_blocking(int timeout = 0) const override { return m_helper.can_read_without_blocking(timeout); }
  477. virtual ErrorOr<void> set_blocking(bool enabled) override { return m_helper.set_blocking(enabled); }
  478. virtual ErrorOr<void> set_close_on_exec(bool enabled) override { return m_helper.set_close_on_exec(enabled); }
  479. virtual void set_notifications_enabled(bool enabled) override
  480. {
  481. if (auto notifier = m_helper.notifier())
  482. notifier->set_enabled(enabled);
  483. }
  484. ErrorOr<int> receive_fd(int flags);
  485. ErrorOr<void> send_fd(int fd);
  486. ErrorOr<pid_t> peer_pid() const;
  487. ErrorOr<Bytes> read_without_waiting(Bytes buffer);
  488. /// Release the fd associated with this LocalSocket. After the fd is
  489. /// released, the socket will be considered "closed" and all operations done
  490. /// on it will fail with ENOTCONN. Fails with ENOTCONN if the socket is
  491. /// already closed.
  492. ErrorOr<int> release_fd();
  493. Optional<int> fd() const;
  494. RefPtr<Core::Notifier> notifier() { return m_helper.notifier(); }
  495. virtual ~LocalSocket() { close(); }
  496. private:
  497. LocalSocket(PreventSIGPIPE prevent_sigpipe = PreventSIGPIPE::No)
  498. : Socket(prevent_sigpipe)
  499. {
  500. }
  501. void setup_notifier()
  502. {
  503. VERIFY(is_open());
  504. m_helper.setup_notifier();
  505. m_helper.notifier()->on_ready_to_read = [this] {
  506. if (on_ready_to_read)
  507. on_ready_to_read();
  508. };
  509. }
  510. PosixSocketHelper m_helper { Badge<LocalSocket> {} };
  511. };
  512. // Buffered stream wrappers
  513. template<typename T>
  514. concept StreamLike = IsBaseOf<Stream, T>;
  515. template<typename T>
  516. concept SeekableStreamLike = IsBaseOf<SeekableStream, T>;
  517. template<typename T>
  518. concept SocketLike = IsBaseOf<Socket, T>;
  519. template<typename T>
  520. class BufferedHelper {
  521. AK_MAKE_NONCOPYABLE(BufferedHelper);
  522. public:
  523. template<StreamLike U>
  524. BufferedHelper(Badge<U>, NonnullOwnPtr<T> stream, CircularBuffer buffer)
  525. : m_stream(move(stream))
  526. , m_buffer(move(buffer))
  527. {
  528. }
  529. BufferedHelper(BufferedHelper&& other)
  530. : m_stream(move(other.m_stream))
  531. , m_buffer(move(other.m_buffer))
  532. {
  533. }
  534. BufferedHelper& operator=(BufferedHelper&& other)
  535. {
  536. m_stream = move(other.m_stream);
  537. m_buffer = move(other.m_buffer);
  538. return *this;
  539. }
  540. template<template<typename> typename BufferedType>
  541. static ErrorOr<NonnullOwnPtr<BufferedType<T>>> create_buffered(NonnullOwnPtr<T> stream, size_t buffer_size)
  542. {
  543. if (!buffer_size)
  544. return Error::from_errno(EINVAL);
  545. if (!stream->is_open())
  546. return Error::from_errno(ENOTCONN);
  547. auto buffer = TRY(CircularBuffer::create_empty(buffer_size));
  548. return adopt_nonnull_own_or_enomem(new BufferedType<T>(move(stream), move(buffer)));
  549. }
  550. T& stream() { return *m_stream; }
  551. T const& stream() const { return *m_stream; }
  552. ErrorOr<Bytes> read(Bytes buffer)
  553. {
  554. if (!stream().is_open())
  555. return Error::from_errno(ENOTCONN);
  556. if (buffer.is_empty())
  557. return buffer;
  558. // Fill the internal buffer if it has run dry.
  559. if (m_buffer.used_space() == 0)
  560. TRY(populate_read_buffer());
  561. // Let's try to take all we can from the buffer first.
  562. return m_buffer.read(buffer);
  563. }
  564. // Reads into the buffer until \n is encountered.
  565. // The size of the Bytes object is the maximum amount of bytes that will be
  566. // read. Returns the bytes read as a StringView.
  567. ErrorOr<StringView> read_line(Bytes buffer)
  568. {
  569. return StringView { TRY(read_until(buffer, "\n"sv)) };
  570. }
  571. ErrorOr<Bytes> read_until(Bytes buffer, StringView candidate)
  572. {
  573. return read_until_any_of(buffer, Array { candidate });
  574. }
  575. template<size_t N>
  576. ErrorOr<Bytes> read_until_any_of(Bytes buffer, Array<StringView, N> candidates)
  577. {
  578. if (!stream().is_open())
  579. return Error::from_errno(ENOTCONN);
  580. if (buffer.is_empty())
  581. return buffer;
  582. auto const candidate = TRY(find_and_populate_until_any_of(candidates, buffer.size()));
  583. if (stream().is_eof()) {
  584. if (buffer.size() < m_buffer.used_space()) {
  585. // Normally, reading from an EOFed stream and receiving bytes
  586. // would mean that the stream is no longer EOF. However, it's
  587. // possible with a buffered stream that the user is able to read
  588. // the buffer contents even when the underlying stream is EOF.
  589. // We already violate this invariant once by giving the user the
  590. // chance to read the remaining buffer contents, but if the user
  591. // doesn't give us a big enough buffer, then we would be
  592. // violating the invariant twice the next time the user attempts
  593. // to read, which is No Good. So let's give a descriptive error
  594. // to the caller about why it can't read.
  595. return Error::from_errno(EMSGSIZE);
  596. }
  597. }
  598. if (candidate.has_value()) {
  599. auto const read_bytes = m_buffer.read(buffer.trim(candidate->offset));
  600. TRY(m_buffer.discard(candidate->size));
  601. return read_bytes;
  602. }
  603. // If we still haven't found anything, then it's most likely the case
  604. // that the delimiter ends beyond the length of the caller-passed
  605. // buffer. Let's just fill the caller's buffer up.
  606. return m_buffer.read(buffer);
  607. }
  608. struct Match {
  609. size_t offset {};
  610. size_t size {};
  611. };
  612. template<size_t N>
  613. ErrorOr<Optional<Match>> find_and_populate_until_any_of(Array<StringView, N> const& candidates, Optional<size_t> max_offset = {})
  614. {
  615. Optional<size_t> longest_candidate;
  616. for (auto& candidate : candidates) {
  617. if (candidate.length() >= longest_candidate.value_or(candidate.length()))
  618. longest_candidate = candidate.length();
  619. }
  620. // The intention here is to try to match all the possible
  621. // delimiter candidates and try to find the longest one we can
  622. // remove from the buffer after copying up to the delimiter to the
  623. // user buffer.
  624. auto const find_candidates = [this, &candidates, &longest_candidate](size_t min_offset, Optional<size_t> max_offset = {}) -> Optional<Match> {
  625. auto const corrected_minimum_offset = *longest_candidate > min_offset ? 0 : min_offset - *longest_candidate;
  626. max_offset = max_offset.value_or(m_buffer.used_space());
  627. Optional<size_t> longest_match;
  628. size_t match_size = 0;
  629. for (auto& candidate : candidates) {
  630. auto const result = m_buffer.offset_of(candidate, corrected_minimum_offset, *max_offset);
  631. if (result.has_value()) {
  632. auto previous_match = longest_match.value_or(*result);
  633. if ((previous_match < *result) || (previous_match == *result && match_size < candidate.length())) {
  634. longest_match = result;
  635. match_size = candidate.length();
  636. }
  637. }
  638. }
  639. if (longest_match.has_value())
  640. return Match { *longest_match, match_size };
  641. return {};
  642. };
  643. if (auto first_find = find_candidates(0, max_offset); first_find.has_value())
  644. return first_find;
  645. auto last_size = m_buffer.used_space();
  646. while (m_buffer.used_space() < max_offset.value_or(m_buffer.capacity())) {
  647. auto const read_bytes = TRY(populate_read_buffer());
  648. if (read_bytes == 0)
  649. break;
  650. if (auto first_find = find_candidates(last_size, max_offset); first_find.has_value())
  651. return first_find;
  652. last_size = m_buffer.used_space();
  653. }
  654. return Optional<Match> {};
  655. }
  656. // Returns whether a line can be read, populating the buffer in the process.
  657. ErrorOr<bool> can_read_line()
  658. {
  659. if (stream().is_eof())
  660. return m_buffer.used_space() > 0;
  661. return TRY(find_and_populate_until_any_of(Array<StringView, 1> { "\n"sv })).has_value();
  662. }
  663. bool is_eof() const
  664. {
  665. if (m_buffer.used_space() > 0) {
  666. return false;
  667. }
  668. return stream().is_eof();
  669. }
  670. size_t buffer_size() const
  671. {
  672. return m_buffer.capacity();
  673. }
  674. size_t buffered_data_size() const
  675. {
  676. return m_buffer.used_space();
  677. }
  678. void clear_buffer()
  679. {
  680. m_buffer.clear();
  681. }
  682. ErrorOr<void> discard_bytes(size_t count)
  683. {
  684. return m_buffer.discard(count);
  685. }
  686. private:
  687. ErrorOr<size_t> populate_read_buffer()
  688. {
  689. if (m_buffer.empty_space() == 0)
  690. return 0;
  691. // TODO: Figure out if we can do direct writes in a comfortable way.
  692. Array<u8, 1024> temporary_buffer;
  693. auto const fillable_slice = temporary_buffer.span().trim(min(temporary_buffer.size(), m_buffer.empty_space()));
  694. size_t nread = 0;
  695. do {
  696. auto result = stream().read(fillable_slice);
  697. if (result.is_error()) {
  698. if (!result.error().is_errno())
  699. return result.error();
  700. if (result.error().code() == EINTR)
  701. continue;
  702. if (result.error().code() == EAGAIN)
  703. break;
  704. return result.error();
  705. }
  706. auto const filled_slice = result.value();
  707. VERIFY(m_buffer.write(filled_slice) == filled_slice.size());
  708. nread += filled_slice.size();
  709. break;
  710. } while (true);
  711. return nread;
  712. }
  713. NonnullOwnPtr<T> m_stream;
  714. CircularBuffer m_buffer;
  715. };
  716. // NOTE: A Buffered which accepts any Stream could be added here, but it is not
  717. // needed at the moment.
  718. template<SeekableStreamLike T>
  719. class BufferedSeekable final : public SeekableStream {
  720. friend BufferedHelper<T>;
  721. public:
  722. static ErrorOr<NonnullOwnPtr<BufferedSeekable<T>>> create(NonnullOwnPtr<T> stream, size_t buffer_size = 16384)
  723. {
  724. return BufferedHelper<T>::template create_buffered<BufferedSeekable>(move(stream), buffer_size);
  725. }
  726. BufferedSeekable(BufferedSeekable&& other) = default;
  727. BufferedSeekable& operator=(BufferedSeekable&& other) = default;
  728. virtual ErrorOr<Bytes> read(Bytes buffer) override { return m_helper.read(move(buffer)); }
  729. virtual ErrorOr<size_t> write(ReadonlyBytes buffer) override { return m_helper.stream().write(buffer); }
  730. virtual bool is_eof() const override { return m_helper.is_eof(); }
  731. virtual bool is_open() const override { return m_helper.stream().is_open(); }
  732. virtual void close() override { m_helper.stream().close(); }
  733. virtual ErrorOr<off_t> seek(i64 offset, SeekMode mode) override
  734. {
  735. if (mode == SeekMode::FromCurrentPosition) {
  736. // If possible, seek using the buffer alone.
  737. if (0 <= offset && static_cast<u64>(offset) <= m_helper.buffered_data_size()) {
  738. MUST(m_helper.discard_bytes(offset));
  739. return TRY(m_helper.stream().tell()) - m_helper.buffered_data_size();
  740. }
  741. offset = offset - m_helper.buffered_data_size();
  742. }
  743. auto result = TRY(m_helper.stream().seek(offset, mode));
  744. m_helper.clear_buffer();
  745. return result;
  746. }
  747. virtual ErrorOr<void> truncate(off_t length) override
  748. {
  749. return m_helper.stream().truncate(length);
  750. }
  751. ErrorOr<StringView> read_line(Bytes buffer) { return m_helper.read_line(move(buffer)); }
  752. ErrorOr<Bytes> read_until(Bytes buffer, StringView candidate) { return m_helper.read_until(move(buffer), move(candidate)); }
  753. template<size_t N>
  754. ErrorOr<Bytes> read_until_any_of(Bytes buffer, Array<StringView, N> candidates) { return m_helper.read_until_any_of(move(buffer), move(candidates)); }
  755. ErrorOr<bool> can_read_line() { return m_helper.can_read_line(); }
  756. size_t buffer_size() const { return m_helper.buffer_size(); }
  757. virtual ~BufferedSeekable() override = default;
  758. private:
  759. BufferedSeekable(NonnullOwnPtr<T> stream, CircularBuffer buffer)
  760. : m_helper(Badge<BufferedSeekable<T>> {}, move(stream), move(buffer))
  761. {
  762. }
  763. BufferedHelper<T> m_helper;
  764. };
  765. class BufferedSocketBase : public Socket {
  766. public:
  767. virtual ErrorOr<StringView> read_line(Bytes buffer) = 0;
  768. virtual ErrorOr<Bytes> read_until(Bytes buffer, StringView candidate) = 0;
  769. virtual ErrorOr<bool> can_read_line() = 0;
  770. virtual size_t buffer_size() const = 0;
  771. };
  772. template<SocketLike T>
  773. class BufferedSocket final : public BufferedSocketBase {
  774. friend BufferedHelper<T>;
  775. public:
  776. static ErrorOr<NonnullOwnPtr<BufferedSocket<T>>> create(NonnullOwnPtr<T> stream, size_t buffer_size = 16384)
  777. {
  778. return BufferedHelper<T>::template create_buffered<BufferedSocket>(move(stream), buffer_size);
  779. }
  780. BufferedSocket(BufferedSocket&& other)
  781. : BufferedSocketBase(static_cast<BufferedSocketBase&&>(other))
  782. , m_helper(move(other.m_helper))
  783. {
  784. setup_notifier();
  785. }
  786. BufferedSocket& operator=(BufferedSocket&& other)
  787. {
  788. Socket::operator=(static_cast<Socket&&>(other));
  789. m_helper = move(other.m_helper);
  790. setup_notifier();
  791. return *this;
  792. }
  793. virtual ErrorOr<Bytes> read(Bytes buffer) override { return m_helper.read(move(buffer)); }
  794. virtual ErrorOr<size_t> write(ReadonlyBytes buffer) override { return m_helper.stream().write(buffer); }
  795. virtual bool is_eof() const override { return m_helper.is_eof(); }
  796. virtual bool is_open() const override { return m_helper.stream().is_open(); }
  797. virtual void close() override { m_helper.stream().close(); }
  798. virtual ErrorOr<size_t> pending_bytes() const override
  799. {
  800. return TRY(m_helper.stream().pending_bytes()) + m_helper.buffered_data_size();
  801. }
  802. virtual ErrorOr<bool> can_read_without_blocking(int timeout = 0) const override { return m_helper.buffered_data_size() > 0 || TRY(m_helper.stream().can_read_without_blocking(timeout)); }
  803. virtual ErrorOr<void> set_blocking(bool enabled) override { return m_helper.stream().set_blocking(enabled); }
  804. virtual ErrorOr<void> set_close_on_exec(bool enabled) override { return m_helper.stream().set_close_on_exec(enabled); }
  805. virtual void set_notifications_enabled(bool enabled) override { m_helper.stream().set_notifications_enabled(enabled); }
  806. virtual ErrorOr<StringView> read_line(Bytes buffer) override { return m_helper.read_line(move(buffer)); }
  807. virtual ErrorOr<Bytes> read_until(Bytes buffer, StringView candidate) override { return m_helper.read_until(move(buffer), move(candidate)); }
  808. template<size_t N>
  809. ErrorOr<Bytes> read_until_any_of(Bytes buffer, Array<StringView, N> candidates) { return m_helper.read_until_any_of(move(buffer), move(candidates)); }
  810. virtual ErrorOr<bool> can_read_line() override { return m_helper.can_read_line(); }
  811. virtual size_t buffer_size() const override { return m_helper.buffer_size(); }
  812. virtual ~BufferedSocket() override = default;
  813. private:
  814. BufferedSocket(NonnullOwnPtr<T> stream, CircularBuffer buffer)
  815. : m_helper(Badge<BufferedSocket<T>> {}, move(stream), move(buffer))
  816. {
  817. setup_notifier();
  818. }
  819. void setup_notifier()
  820. {
  821. m_helper.stream().on_ready_to_read = [this] {
  822. if (on_ready_to_read)
  823. on_ready_to_read();
  824. };
  825. }
  826. BufferedHelper<T> m_helper;
  827. };
  828. using BufferedFile = BufferedSeekable<File>;
  829. using BufferedTCPSocket = BufferedSocket<TCPSocket>;
  830. using BufferedUDPSocket = BufferedSocket<UDPSocket>;
  831. using BufferedLocalSocket = BufferedSocket<LocalSocket>;
  832. /// A BasicReusableSocket allows one to use one of the base Core::Stream classes
  833. /// as a ReusableSocket. It does not preserve any connection state or options,
  834. /// and instead just recreates the stream when reconnecting.
  835. template<SocketLike T>
  836. class BasicReusableSocket final : public ReusableSocket {
  837. public:
  838. static ErrorOr<NonnullOwnPtr<BasicReusableSocket<T>>> connect(DeprecatedString const& host, u16 port)
  839. {
  840. return make<BasicReusableSocket<T>>(TRY(T::connect(host, port)));
  841. }
  842. static ErrorOr<NonnullOwnPtr<BasicReusableSocket<T>>> connect(SocketAddress const& address)
  843. {
  844. return make<BasicReusableSocket<T>>(TRY(T::connect(address)));
  845. }
  846. virtual bool is_connected() override
  847. {
  848. return m_socket.is_open();
  849. }
  850. virtual ErrorOr<void> reconnect(DeprecatedString const& host, u16 port) override
  851. {
  852. if (is_connected())
  853. return Error::from_errno(EALREADY);
  854. m_socket = TRY(T::connect(host, port));
  855. return {};
  856. }
  857. virtual ErrorOr<void> reconnect(SocketAddress const& address) override
  858. {
  859. if (is_connected())
  860. return Error::from_errno(EALREADY);
  861. m_socket = TRY(T::connect(address));
  862. return {};
  863. }
  864. virtual ErrorOr<Bytes> read(Bytes buffer) override { return m_socket.read(move(buffer)); }
  865. virtual ErrorOr<size_t> write(ReadonlyBytes buffer) override { return m_socket.write(buffer); }
  866. virtual bool is_eof() const override { return m_socket.is_eof(); }
  867. virtual bool is_open() const override { return m_socket.is_open(); }
  868. virtual void close() override { m_socket.close(); }
  869. virtual ErrorOr<size_t> pending_bytes() const override { return m_socket.pending_bytes(); }
  870. virtual ErrorOr<bool> can_read_without_blocking(int timeout = 0) const override { return m_socket.can_read_without_blocking(timeout); }
  871. virtual ErrorOr<void> set_blocking(bool enabled) override { return m_socket.set_blocking(enabled); }
  872. virtual ErrorOr<void> set_close_on_exec(bool enabled) override { return m_socket.set_close_on_exec(enabled); }
  873. private:
  874. BasicReusableSocket(NonnullOwnPtr<T> socket)
  875. : m_socket(move(socket))
  876. {
  877. }
  878. NonnullOwnPtr<T> m_socket;
  879. };
  880. using ReusableTCPSocket = BasicReusableSocket<TCPSocket>;
  881. using ReusableUDPSocket = BasicReusableSocket<UDPSocket>;
  882. // Note: This is only a temporary hack, to break up the task of moving away from AK::Stream into smaller parts.
  883. class WrappedAKInputStream final : public Stream {
  884. public:
  885. WrappedAKInputStream(NonnullOwnPtr<InputStream> stream);
  886. virtual ErrorOr<Bytes> read(Bytes) override;
  887. virtual ErrorOr<void> discard(size_t discarded_bytes) override;
  888. virtual ErrorOr<size_t> write(ReadonlyBytes) override;
  889. virtual bool is_eof() const override;
  890. virtual bool is_open() const override;
  891. virtual void close() override;
  892. private:
  893. NonnullOwnPtr<InputStream> m_stream;
  894. };
  895. // Note: This is only a temporary hack, to break up the task of moving away from AK::Stream into smaller parts.
  896. class WrappedAKOutputStream final : public Stream {
  897. public:
  898. WrappedAKOutputStream(NonnullOwnPtr<OutputStream> stream);
  899. virtual ErrorOr<Bytes> read(Bytes) override;
  900. virtual ErrorOr<size_t> write(ReadonlyBytes) override;
  901. virtual bool is_eof() const override;
  902. virtual bool is_open() const override;
  903. virtual void close() override;
  904. private:
  905. NonnullOwnPtr<OutputStream> m_stream;
  906. };
  907. // Note: This is only a temporary hack, to break up the task of moving away from AK::Stream into smaller parts.
  908. class WrapInAKInputStream final : public InputStream {
  909. public:
  910. WrapInAKInputStream(Core::Stream::Stream& stream);
  911. virtual size_t read(Bytes) override;
  912. virtual bool unreliable_eof() const override;
  913. virtual bool read_or_error(Bytes) override;
  914. virtual bool discard_or_error(size_t count) override;
  915. private:
  916. Core::Stream::Stream& m_stream;
  917. };
  918. // Note: This is only a temporary hack, to break up the task of moving away from AK::Stream into smaller parts.
  919. class WrapInAKOutputStream final : public OutputStream {
  920. public:
  921. WrapInAKOutputStream(Core::Stream::Stream& stream);
  922. virtual size_t write(ReadonlyBytes) override;
  923. virtual bool write_or_error(ReadonlyBytes) override;
  924. private:
  925. Core::Stream::Stream& m_stream;
  926. };
  927. }