123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- #include "fd.h"
- #include <sHT/bugs.h>
- #include <sHT/compiler.h>
- #include <sHT/nospec.h>
- #include <sHT/stream.h>
- #include <sHT/test.h>
- #include "worker.h"
- #include <errno.h>
- #include <stddef.h>
- #include <stdint.h>
- #include <sys/epoll.h>
- #include <sys/socket.h>
- enum sHT_send_err_type {
- sHT_SEND_BLOCKING,
- sHT_SEND_GRACEFUL_RESET,
- sHT_SEND_BLUNT_RESET,
- sHT_SEND_GRACEFUL_CLOSE,
-
- sHT_SEND_INTERRUPTED,
-
- sHT_SEND_TRANSIENT,
- sHT_SEND_KERNEL_OOM,
-
- sHT_SEND_OTHER,
- };
- __attribute__((const))
- static enum sHT_send_err_type
- sHT_classify_sentrecv_tcp(int err)
- {
- switch (err) {
- #if EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
- #endif
- case EAGAIN:
- return sHT_SEND_BLOCKING;
- case EINTR:
- return sHT_SEND_INTERRUPTED;
- case ECONNRESET:
- return sHT_SEND_GRACEFUL_RESET;
- case EPIPE:
- return sHT_SEND_GRACEFUL_CLOSE;
- case ETIMEDOUT:
- case EHOSTUNREACH:
- return sHT_SEND_BLUNT_RESET;
- case ENOBUFS:
-
- return sHT_SEND_TRANSIENT;
- case ENOMEM:
- return sHT_SEND_KERNEL_OOM;
- default:
- return sHT_SEND_OTHER;
- }
- }
- __attribute__((nonnull (1, 2)))
- static _Bool
- sHT_socket_sendrecv_errno(struct sHT_worker *worker, struct sHT_task_stream *task, int err, uint32_t epollflags)
- {
-
- switch (sHT_classify_sentrecv_tcp(errno)) {
- case sHT_SEND_BLOCKING:
- task->task.epollflags &= ~epollflags;
- return 0;
- case sHT_SEND_GRACEFUL_CLOSE:
- task->stream.flags |= sHT_STREAM_WRITE_EOF;
- task->task.epollflags &= ~epollflags;
- return 0;
- case sHT_SEND_GRACEFUL_RESET:
- task->stream.flags |= sHT_STREAM_WRITE_EOF | sHT_STREAM_READ_EOF | sHT_STREAM_RESET_GRACEFUL;
- task->task.epollflags &= ~(EPOLLIN | EPOLLOUT);
- return 0;
- case sHT_SEND_BLUNT_RESET:
- task->stream.flags |= sHT_STREAM_WRITE_EOF | sHT_STREAM_READ_EOF | sHT_STREAM_RESET_BLUNT;
- task->task.epollflags &= ~(EPOLLIN | EPOLLOUT);
- return 0;
- case sHT_SEND_INTERRUPTED:
- return 1;
- case sHT_SEND_TRANSIENT:
-
- task->task.flags |= sHT_TASK_SCHEDULE;
- return 0;
- case sHT_SEND_KERNEL_OOM:
-
- worker->flags |= sHT_WORKER_OOM;
- task->task.flags |= sHT_TASK_SCHEDULE;
- return 0;
- case sHT_SEND_OTHER:
- sHT_todo("didn't recognise TCP error");
- default:
- sHT_assert(0);
- }
- }
- void
- sHT_socket_sendsome_tcp(struct sHT_worker *worker, struct sHT_task_stream *task)
- {
- const unsigned char *buf = task->stream.to_write.first;
- size_t start = task->stream.to_write.offset;
- size_t end = (task->stream.to_write.offset + task->stream.to_write.length) % sHT_PAPER_SIZE;
-
- if (sHT_gt(start, end))
- end = sHT_PAPER_SIZE;
- end = sHT_index_nospec(end, sHT_PAPER_SIZE - start);
- do {
-
- ssize_t sent = send(task->stream.fd, buf + start, end - start, MSG_DONTWAIT | MSG_NOSIGNAL);
- if (sHT_lt0(sent))
- continue;
-
- sHT_assert(sent <= task->stream.to_write.length);
- task->stream.to_write.offset = (task->stream.to_write.offset + sent) % sHT_PAPER_SIZE;
- task->stream.to_write.length -= sent;
- return;
-
- } while (sHT_unlikely(sHT_socket_sendrecv_errno(worker, task, errno, EPOLLOUT)));
- }
- void
- sHT_socket_readsome_tcp(struct sHT_worker *worker, struct sHT_task_stream *task)
- {
- unsigned char *buf = task->stream.has_read.first;
- size_t start = (task->stream.has_read.offset + task->stream.has_read.length) % sHT_PAPER_SIZE;
- size_t end = task->stream.has_read.offset;
- if (sHT_gt(start, end))
- end = sHT_PAPER_SIZE;
-
- end %= sHT_PAPER_SIZE - start;
-
- do {
- ssize_t received;
- received = recv(task->stream.fd, buf + start, end - start, MSG_DONTWAIT);
- if (sHT_lt0(received))
- continue;
- sHT_assert(received <= sHT_PAPER_SIZE - task->stream.has_read.length);
- task->stream.has_read.length += received;
- return;
-
- } while (sHT_unlikely(sHT_socket_sendrecv_errno(worker, task, errno, EPOLLOUT)));
- }
|