123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- // SPDX-License-Identifier: GPL-3.0-or-later
- // Copyright © 2018-2019 Ariadne Devos
- /* sHT -- TCP/IP streams associated with tasks */
- #ifndef _sHT_STREAM_H
- #define _sHT_STREAM_H
- #include <sHT/paper.h>
- #include <sHT/task.h>
- #include <sHT/test.h>
- #include <sys/epoll.h>
- #include <stdint.h>
- struct sHT_objcache;
- struct sHT_task;
- struct sHT_worker;
- #define sHT_STREAM_WRITE_EOF 1
- #define sHT_STREAM_READ_EOF 2
- #define sHT_STREAM_RESET_GRACEFUL 4
- #define sHT_STREAM_RESET_BLUNT 8
- #define sHT_STREAM_USER1 16
- #define sHT_STREAM_USER2 32
- #define sHT_STREAM_USER3 64
- #define sHT_STREAM_USER4 128
- /** An interface to a stream socket of the operating system.
- Conceptually, stream sockets come in symmetrical pairs. Assuming a perfect
- network link and no closes, data written to the first will be send
- to the second for reading, and data read from the first has been sent
- by the second to the first. */
- struct sHT_stream
- {
- /** An identifier for the socket that can be passed to the kernel via
- syscalls to perform all kinds of operations upon the socket. It
- usually is not portable across processes. It is, however, preserved
- across a fork.
- See also socket(7). */
- int fd;
- /** @var{sHT_STREAM_WRITE_EOF}: we know our write end has been broken
- (so the socket cannot be written to anymore).
- @var{sHT_STREAM_READ_EOF}: we know our read end has been broken
- (so the socket cannot be read from anymore).
- @var{sHT_STREAM_RESET_GRACEFUL}: the socket has been orderly closed,
- according to the protocol's mechanisms, and not due to some kind of
- error (e.g. timeout). This does not imply that @var{fd} became
- unusable or dangling.
- @var{sHT_STREAM_RESET_BLUNT}: the socket has been brutely closed,
- without any respect to aesthetics or protocol. This is typically
- caused by a time out, buggy software at the other end, or clogged
- or leaky pipes. */
- unsigned int flags;
- /** A circular buffer, of limited capacity, of octets to write to the
- socket.
- The typical use case would be appending as many bytes as possible
- during a tasks's time slice, and at the end sending all bytes to
- the operating system. */
- struct sHT_buffer to_write;
- /** A circular buffer, of limited capacity, of octets that have been
- read from the socket.
- The typical use case would be fetching as many bytes as possible
- from the kernel at the start of a task's time slice, then popping
- of bytes to process. */
- struct sHT_buffer has_read;
- };
- struct sHT_task_stream
- {
- struct sHT_task task;
- struct sHT_stream stream;
- };
- /** Try to initialise a stream to its start state, i.e. before any IO is
- performed
- @var{papers}: the object cache to allocate papers from
- @var{stream}: the stream structure to initialise
- @var{errno} may be set.
- @var{papers} and @var{stream} may not be concurrently accessed. This
- function doesn't know anything about POSIX asynchronuous cancellation or
- signal handlers.
- No socket is created; there are no guarantees on the value or defined-ness
- of @code{stream->fd}.
- There may be memory leaks on speculative executions.
- Once the return value is despeculated, and it turns out to be 0,
- @var{stream} is initialised, even on subsequent speculative executions.
- Initialisation entails allocated and empty buffers and cleared flags.
- If it turns out to be 1, @var{stream} has not been initialised and there
- is no net change is memory allocation.
- While the return value is being speculated, there are no guarantees on
- the values of the fields of @var{stream}.
- */
- __attribute__((warn_unused_result))
- __attribute__((nonnull (1, 2)))
- _Bool
- sHT_init_stream(struct sHT_objcache *papers, struct sHT_stream *stream);
- /** Free a stream's resources, except for its socket.
- @var{papers}: the object cache papers were allocated from for @var{stream}'s
- initialisation (e.g. by @var{sHT_init_stream}).
- @var{stream}: the stream to free
- @var{errno} may be set.
- @var{papers} and @var{stream} may not be concurrently accessed. This
- function doesn't know anything about POSIX asynchronuous cancellation or
- signal handlers.
- @code{stream->to_write} and @code{stream->has_read} may be modified.
- @code{stream->fd} is not touched.
- This cannot fail in any way. */
- __attribute__((nonnull (1, 2)))
- void
- sHT_free_stream(struct sHT_objcache *papers, struct sHT_stream *stream);
- __attribute__((pure))
- static inline _Bool
- sHT_socket_mayread(const struct sHT_task_stream *task)
- {
- /* Observe the duality with @var{sHT_socket_maywrite}. */
- uint32_t epollflags = task->task.epollflags;
- unsigned int done = task->stream.has_read.length;
- if (sHT_and_none(epollflags, EPOLLIN))
- return 0;
- if (sHT_ge(done, sHT_PAPER_SIZE))
- return 0;
- return 1;
- }
- __attribute__((pure))
- static inline _Bool
- sHT_socket_maywrite(const struct sHT_task_stream *task)
- {
- /* Observe the duality with @var{sHT_socket_mayread}. */
- uint32_t epollflags = task->task.epollflags;
- unsigned int done = task->stream.to_write.length;
- if (sHT_and_none(epollflags, EPOLLOUT))
- return 0;
- if (sHT_ge(done, sHT_PAPER_SIZE))
- return 0;
- return 1;
- }
- /* Try to send pending bytes to the kernel for processing.
- If the write end of the socket is saturated for the moment,
- unset EPOLLOUT.
- The flag sHT_TASK_SCHEDULE is set in case of a transient write error.
- If the connection is reset due to a timeout, unreachable host ..., set
- sHT_STREAM_RESET_BLUNT.
- If the connection is 'just' reset, set sHT_STREAM_RESET_GRACEFUL.
- If the write-end has been closed, set sHT_STREAM_WRITE_EOF.
- It doesn't make much sense to call this without EPOLLOUT being set. */
- __attribute__((nonnull (1, 2)))
- void
- sHT_socket_sendsome_tcp(struct sHT_worker *worker, struct sHT_task_stream *task);
- /* Try to receive pending bytes from the kernel.
- If the read end of the socket is empty, unset EPOLLIN.
- The flag sHT_TASK_SCHEDULE is set in case of a transient read error.
- If the connection is reset due to a timeout, unreachable host ..., set
- sHT_STREAM_RESET_BLUNT.
- If the connection is 'just' reset, set sHT_STREAM_RESET_GRACEFUL.
- If the write-end has been closed, set sHT_STREAM_READ_EOF.
- It doesn't make much sense to call this without EPOLLIN being set. */
- __attribute__((nonnull (1, 2)))
- void
- sHT_socket_readsome_tcp(struct sHT_worker *worker, struct sHT_task_stream *task);
- /* Do reading, writing ... and possibly reschedule */
- __attribute__((nonnull (1, 2)))
- void
- sHT_socket_task(struct sHT_worker *worker, struct sHT_task_stream *task);
- #endif
|