stream.h 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. // Copyright © 2018-2019 Ariadne Devos
  3. /* sHT -- TCP/IP streams associated with tasks */
  4. #ifndef _sHT_STREAM_H
  5. #define _sHT_STREAM_H
  6. #include <sHT/paper.h>
  7. #include <sHT/task.h>
  8. #include <sHT/test.h>
  9. #include <sys/epoll.h>
  10. #include <stdint.h>
  11. struct sHT_objcache;
  12. struct sHT_task;
  13. struct sHT_worker;
  14. #define sHT_STREAM_WRITE_EOF 1
  15. #define sHT_STREAM_READ_EOF 2
  16. #define sHT_STREAM_RESET_GRACEFUL 4
  17. #define sHT_STREAM_RESET_BLUNT 8
  18. #define sHT_STREAM_USER1 16
  19. #define sHT_STREAM_USER2 32
  20. #define sHT_STREAM_USER3 64
  21. #define sHT_STREAM_USER4 128
  22. /** An interface to a stream socket of the operating system.
  23. Conceptually, stream sockets come in symmetrical pairs. Assuming a perfect
  24. network link and no closes, data written to the first will be send
  25. to the second for reading, and data read from the first has been sent
  26. by the second to the first. */
  27. struct sHT_stream
  28. {
  29. /** An identifier for the socket that can be passed to the kernel via
  30. syscalls to perform all kinds of operations upon the socket. It
  31. usually is not portable across processes. It is, however, preserved
  32. across a fork.
  33. See also socket(7). */
  34. int fd;
  35. /** @var{sHT_STREAM_WRITE_EOF}: we know our write end has been broken
  36. (so the socket cannot be written to anymore).
  37. @var{sHT_STREAM_READ_EOF}: we know our read end has been broken
  38. (so the socket cannot be read from anymore).
  39. @var{sHT_STREAM_RESET_GRACEFUL}: the socket has been orderly closed,
  40. according to the protocol's mechanisms, and not due to some kind of
  41. error (e.g. timeout). This does not imply that @var{fd} became
  42. unusable or dangling.
  43. @var{sHT_STREAM_RESET_BLUNT}: the socket has been brutely closed,
  44. without any respect to aesthetics or protocol. This is typically
  45. caused by a time out, buggy software at the other end, or clogged
  46. or leaky pipes. */
  47. unsigned int flags;
  48. /** A circular buffer, of limited capacity, of octets to write to the
  49. socket.
  50. The typical use case would be appending as many bytes as possible
  51. during a tasks's time slice, and at the end sending all bytes to
  52. the operating system. */
  53. struct sHT_buffer to_write;
  54. /** A circular buffer, of limited capacity, of octets that have been
  55. read from the socket.
  56. The typical use case would be fetching as many bytes as possible
  57. from the kernel at the start of a task's time slice, then popping
  58. of bytes to process. */
  59. struct sHT_buffer has_read;
  60. };
  61. struct sHT_task_stream
  62. {
  63. struct sHT_task task;
  64. struct sHT_stream stream;
  65. };
  66. /** Try to initialise a stream to its start state, i.e. before any IO is
  67. performed
  68. @var{papers}: the object cache to allocate papers from
  69. @var{stream}: the stream structure to initialise
  70. @var{errno} may be set.
  71. @var{papers} and @var{stream} may not be concurrently accessed. This
  72. function doesn't know anything about POSIX asynchronuous cancellation or
  73. signal handlers.
  74. No socket is created; there are no guarantees on the value or defined-ness
  75. of @code{stream->fd}.
  76. There may be memory leaks on speculative executions.
  77. Once the return value is despeculated, and it turns out to be 0,
  78. @var{stream} is initialised, even on subsequent speculative executions.
  79. Initialisation entails allocated and empty buffers and cleared flags.
  80. If it turns out to be 1, @var{stream} has not been initialised and there
  81. is no net change is memory allocation.
  82. While the return value is being speculated, there are no guarantees on
  83. the values of the fields of @var{stream}.
  84. */
  85. __attribute__((warn_unused_result))
  86. __attribute__((nonnull (1, 2)))
  87. _Bool
  88. sHT_init_stream(struct sHT_objcache *papers, struct sHT_stream *stream);
  89. /** Free a stream's resources, except for its socket.
  90. @var{papers}: the object cache papers were allocated from for @var{stream}'s
  91. initialisation (e.g. by @var{sHT_init_stream}).
  92. @var{stream}: the stream to free
  93. @var{errno} may be set.
  94. @var{papers} and @var{stream} may not be concurrently accessed. This
  95. function doesn't know anything about POSIX asynchronuous cancellation or
  96. signal handlers.
  97. @code{stream->to_write} and @code{stream->has_read} may be modified.
  98. @code{stream->fd} is not touched.
  99. This cannot fail in any way. */
  100. __attribute__((nonnull (1, 2)))
  101. void
  102. sHT_free_stream(struct sHT_objcache *papers, struct sHT_stream *stream);
  103. __attribute__((pure))
  104. static inline _Bool
  105. sHT_socket_mayread(const struct sHT_task_stream *task)
  106. {
  107. /* Observe the duality with @var{sHT_socket_maywrite}. */
  108. uint32_t epollflags = task->task.epollflags;
  109. unsigned int done = task->stream.has_read.length;
  110. if (sHT_and_none(epollflags, EPOLLIN))
  111. return 0;
  112. if (sHT_ge(done, sHT_PAPER_SIZE))
  113. return 0;
  114. return 1;
  115. }
  116. __attribute__((pure))
  117. static inline _Bool
  118. sHT_socket_maywrite(const struct sHT_task_stream *task)
  119. {
  120. /* Observe the duality with @var{sHT_socket_mayread}. */
  121. uint32_t epollflags = task->task.epollflags;
  122. unsigned int done = task->stream.to_write.length;
  123. if (sHT_and_none(epollflags, EPOLLOUT))
  124. return 0;
  125. if (sHT_ge(done, sHT_PAPER_SIZE))
  126. return 0;
  127. return 1;
  128. }
  129. /* Try to send pending bytes to the kernel for processing.
  130. If the write end of the socket is saturated for the moment,
  131. unset EPOLLOUT.
  132. The flag sHT_TASK_SCHEDULE is set in case of a transient write error.
  133. If the connection is reset due to a timeout, unreachable host ..., set
  134. sHT_STREAM_RESET_BLUNT.
  135. If the connection is 'just' reset, set sHT_STREAM_RESET_GRACEFUL.
  136. If the write-end has been closed, set sHT_STREAM_WRITE_EOF.
  137. It doesn't make much sense to call this without EPOLLOUT being set. */
  138. __attribute__((nonnull (1, 2)))
  139. void
  140. sHT_socket_sendsome_tcp(struct sHT_worker *worker, struct sHT_task_stream *task);
  141. /* Try to receive pending bytes from the kernel.
  142. If the read end of the socket is empty, unset EPOLLIN.
  143. The flag sHT_TASK_SCHEDULE is set in case of a transient read error.
  144. If the connection is reset due to a timeout, unreachable host ..., set
  145. sHT_STREAM_RESET_BLUNT.
  146. If the connection is 'just' reset, set sHT_STREAM_RESET_GRACEFUL.
  147. If the write-end has been closed, set sHT_STREAM_READ_EOF.
  148. It doesn't make much sense to call this without EPOLLIN being set. */
  149. __attribute__((nonnull (1, 2)))
  150. void
  151. sHT_socket_readsome_tcp(struct sHT_worker *worker, struct sHT_task_stream *task);
  152. /* Do reading, writing ... and possibly reschedule */
  153. __attribute__((nonnull (1, 2)))
  154. void
  155. sHT_socket_task(struct sHT_worker *worker, struct sHT_task_stream *task);
  156. #endif