12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211 |
- /*
- Copyright (c) 2013 Martin Sustrik All rights reserved.
- Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
- Copyright 2016 Garrett D'Amore <garrett@damore.org>
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"),
- to deal in the Software without restriction, including without limitation
- the rights to use, copy, modify, merge, publish, distribute, sublicense,
- and/or sell copies of the Software, and to permit persons to whom
- the Software is furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included
- in all copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- IN THE SOFTWARE.
- */
- #include "../utils/alloc.h"
- #include "../utils/closefd.h"
- #include "../utils/cont.h"
- #include "../utils/fast.h"
- #include "../utils/err.h"
- #include "../utils/attr.h"
- #include <string.h>
- #include <unistd.h>
- #include <fcntl.h>
- #include <sys/uio.h>
- #define NN_USOCK_STATE_IDLE 1
- #define NN_USOCK_STATE_STARTING 2
- #define NN_USOCK_STATE_BEING_ACCEPTED 3
- #define NN_USOCK_STATE_ACCEPTED 4
- #define NN_USOCK_STATE_CONNECTING 5
- #define NN_USOCK_STATE_ACTIVE 6
- #define NN_USOCK_STATE_REMOVING_FD 7
- #define NN_USOCK_STATE_DONE 8
- #define NN_USOCK_STATE_LISTENING 9
- #define NN_USOCK_STATE_ACCEPTING 10
- #define NN_USOCK_STATE_CANCELLING 11
- #define NN_USOCK_STATE_STOPPING 12
- #define NN_USOCK_STATE_STOPPING_ACCEPT 13
- #define NN_USOCK_STATE_ACCEPTING_ERROR 14
- #define NN_USOCK_ACTION_ACCEPT 1
- #define NN_USOCK_ACTION_BEING_ACCEPTED 2
- #define NN_USOCK_ACTION_CANCEL 3
- #define NN_USOCK_ACTION_LISTEN 4
- #define NN_USOCK_ACTION_CONNECT 5
- #define NN_USOCK_ACTION_ACTIVATE 6
- #define NN_USOCK_ACTION_DONE 7
- #define NN_USOCK_ACTION_ERROR 8
- #define NN_USOCK_ACTION_STARTED 9
- #define NN_USOCK_SRC_FD 1
- #define NN_USOCK_SRC_TASK_CONNECTING 2
- #define NN_USOCK_SRC_TASK_CONNECTED 3
- #define NN_USOCK_SRC_TASK_ACCEPT 4
- #define NN_USOCK_SRC_TASK_SEND 5
- #define NN_USOCK_SRC_TASK_RECV 6
- #define NN_USOCK_SRC_TASK_STOP 7
- /* Private functions. */
- static void nn_usock_init_from_fd (struct nn_usock *self, int s);
- static int nn_usock_send_raw (struct nn_usock *self, struct msghdr *hdr);
- static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len);
- static int nn_usock_geterr (struct nn_usock *self);
- static void nn_usock_handler (struct nn_fsm *self, int src, int type,
- void *srcptr);
- static void nn_usock_shutdown (struct nn_fsm *self, int src, int type,
- void *srcptr);
- void nn_usock_init (struct nn_usock *self, int src, struct nn_fsm *owner)
- {
- /* Initalise the state machine. */
- nn_fsm_init (&self->fsm, nn_usock_handler, nn_usock_shutdown,
- src, self, owner);
- self->state = NN_USOCK_STATE_IDLE;
- /* Choose a worker thread to handle this socket. */
- self->worker = nn_fsm_choose_worker (&self->fsm);
- /* Actual file descriptor will be generated during 'start' step. */
- self->s = -1;
- self->errnum = 0;
- self->in.buf = NULL;
- self->in.len = 0;
- self->in.batch = NULL;
- self->in.batch_len = 0;
- self->in.batch_pos = 0;
- self->in.pfd = NULL;
- memset (&self->out.hdr, 0, sizeof (struct msghdr));
- /* Initialise tasks for the worker thread. */
- nn_worker_fd_init (&self->wfd, NN_USOCK_SRC_FD, &self->fsm);
- nn_worker_task_init (&self->task_connecting, NN_USOCK_SRC_TASK_CONNECTING,
- &self->fsm);
- nn_worker_task_init (&self->task_connected, NN_USOCK_SRC_TASK_CONNECTED,
- &self->fsm);
- nn_worker_task_init (&self->task_accept, NN_USOCK_SRC_TASK_ACCEPT,
- &self->fsm);
- nn_worker_task_init (&self->task_send, NN_USOCK_SRC_TASK_SEND, &self->fsm);
- nn_worker_task_init (&self->task_recv, NN_USOCK_SRC_TASK_RECV, &self->fsm);
- nn_worker_task_init (&self->task_stop, NN_USOCK_SRC_TASK_STOP, &self->fsm);
- /* Intialise events raised by usock. */
- nn_fsm_event_init (&self->event_established);
- nn_fsm_event_init (&self->event_sent);
- nn_fsm_event_init (&self->event_received);
- nn_fsm_event_init (&self->event_error);
- /* accepting is not going on at the moment. */
- self->asock = NULL;
- }
- void nn_usock_term (struct nn_usock *self)
- {
- nn_assert_state (self, NN_USOCK_STATE_IDLE);
- if (self->in.batch)
- nn_free (self->in.batch);
- nn_fsm_event_term (&self->event_error);
- nn_fsm_event_term (&self->event_received);
- nn_fsm_event_term (&self->event_sent);
- nn_fsm_event_term (&self->event_established);
- nn_worker_cancel (self->worker, &self->task_recv);
- nn_worker_task_term (&self->task_stop);
- nn_worker_task_term (&self->task_recv);
- nn_worker_task_term (&self->task_send);
- nn_worker_task_term (&self->task_accept);
- nn_worker_task_term (&self->task_connected);
- nn_worker_task_term (&self->task_connecting);
- nn_worker_fd_term (&self->wfd);
- nn_fsm_term (&self->fsm);
- }
- int nn_usock_isidle (struct nn_usock *self)
- {
- return nn_fsm_isidle (&self->fsm);
- }
- int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol)
- {
- int s;
- /* If the operating system allows to directly open the socket with CLOEXEC
- flag, do so. That way there are no race conditions. */
- #ifdef SOCK_CLOEXEC
- type |= SOCK_CLOEXEC;
- #endif
- /* Open the underlying socket. */
- s = socket (domain, type, protocol);
- if (nn_slow (s < 0))
- return -errno;
- nn_usock_init_from_fd (self, s);
- /* Start the state machine. */
- nn_fsm_start (&self->fsm);
- return 0;
- }
- void nn_usock_start_fd (struct nn_usock *self, int fd)
- {
- nn_usock_init_from_fd (self, fd);
- nn_fsm_start (&self->fsm);
- nn_fsm_action (&self->fsm, NN_USOCK_ACTION_STARTED);
- }
- static void nn_usock_init_from_fd (struct nn_usock *self, int s)
- {
- int rc;
- int opt;
- nn_assert (self->state == NN_USOCK_STATE_IDLE ||
- NN_USOCK_STATE_BEING_ACCEPTED);
- /* Store the file descriptor. */
- nn_assert (self->s == -1);
- self->s = s;
- /* Setting FD_CLOEXEC option immediately after socket creation is the
- second best option after using SOCK_CLOEXEC. There is a race condition
- here (if process is forked between socket creation and setting
- the option) but the problem is pretty unlikely to happen. */
- #if defined FD_CLOEXEC
- rc = fcntl (self->s, F_SETFD, FD_CLOEXEC);
- #if defined NN_HAVE_OSX
- errno_assert (rc != -1 || errno == EINVAL);
- #else
- errno_assert (rc != -1);
- #endif
- #endif
- /* If applicable, prevent SIGPIPE signal when writing to the connection
- already closed by the peer. */
- #ifdef SO_NOSIGPIPE
- opt = 1;
- rc = setsockopt (self->s, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof (opt));
- #if defined NN_HAVE_OSX
- errno_assert (rc == 0 || errno == EINVAL);
- #else
- errno_assert (rc == 0);
- #endif
- #endif
- /* Switch the socket to the non-blocking mode. All underlying sockets
- are always used in the callbackhronous mode. */
- opt = fcntl (self->s, F_GETFL, 0);
- if (opt == -1)
- opt = 0;
- if (!(opt & O_NONBLOCK)) {
- rc = fcntl (self->s, F_SETFL, opt | O_NONBLOCK);
- #if defined NN_HAVE_OSX
- errno_assert (rc != -1 || errno == EINVAL);
- #else
- errno_assert (rc != -1);
- #endif
- }
- }
- void nn_usock_stop (struct nn_usock *self)
- {
- nn_fsm_stop (&self->fsm);
- }
- void nn_usock_async_stop (struct nn_usock *self)
- {
- nn_worker_execute (self->worker, &self->task_stop);
- nn_fsm_raise (&self->fsm, &self->event_error, NN_USOCK_SHUTDOWN);
- }
- void nn_usock_swap_owner (struct nn_usock *self, struct nn_fsm_owner *owner)
- {
- nn_fsm_swap_owner (&self->fsm, owner);
- }
- int nn_usock_setsockopt (struct nn_usock *self, int level, int optname,
- const void *optval, size_t optlen)
- {
- int rc;
- /* The socket can be modified only before it's active. */
- nn_assert (self->state == NN_USOCK_STATE_STARTING ||
- self->state == NN_USOCK_STATE_ACCEPTED);
- /* EINVAL errors are ignored on OSX platform. The reason for that is buggy
- OSX behaviour where setsockopt returns EINVAL if the peer have already
- disconnected. Thus, nn_usock_setsockopt() can succeed on OSX even though
- the option value was invalid, but the peer have already closed the
- connection. This behaviour should be relatively harmless. */
- rc = setsockopt (self->s, level, optname, optval, (socklen_t) optlen);
- #if defined NN_HAVE_OSX
- if (nn_slow (rc != 0 && errno != EINVAL))
- return -errno;
- #else
- if (nn_slow (rc != 0))
- return -errno;
- #endif
- return 0;
- }
- int nn_usock_bind (struct nn_usock *self, const struct sockaddr *addr,
- size_t addrlen)
- {
- int rc;
- int opt;
- /* The socket can be bound only before it's connected. */
- nn_assert_state (self, NN_USOCK_STATE_STARTING);
- /* Allow re-using the address. */
- opt = 1;
- rc = setsockopt (self->s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));
- errno_assert (rc == 0);
- rc = bind (self->s, addr, (socklen_t) addrlen);
- if (nn_slow (rc != 0))
- return -errno;
- return 0;
- }
- int nn_usock_listen (struct nn_usock *self, int backlog)
- {
- int rc;
- /* You can start listening only before the socket is connected. */
- nn_assert_state (self, NN_USOCK_STATE_STARTING);
- /* Start listening for incoming connections. */
- rc = listen (self->s, backlog);
- if (nn_slow (rc != 0))
- return -errno;
- /* Notify the state machine. */
- nn_fsm_action (&self->fsm, NN_USOCK_ACTION_LISTEN);
- return 0;
- }
- void nn_usock_accept (struct nn_usock *self, struct nn_usock *listener)
- {
- int s;
- /* Start the actual accepting. */
- if (nn_fsm_isidle(&self->fsm)) {
- nn_fsm_start (&self->fsm);
- nn_fsm_action (&self->fsm, NN_USOCK_ACTION_BEING_ACCEPTED);
- }
- nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_ACCEPT);
- /* Try to accept new connection in synchronous manner. */
- #if NN_HAVE_ACCEPT4
- s = accept4 (listener->s, NULL, NULL, SOCK_CLOEXEC);
- #else
- s = accept (listener->s, NULL, NULL);
- #endif
- /* Immediate success. */
- if (nn_fast (s >= 0)) {
- /* Disassociate the listener socket from the accepted
- socket. Is useful if we restart accepting on ACCEPT_ERROR */
- listener->asock = NULL;
- self->asock = NULL;
- nn_usock_init_from_fd (self, s);
- nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_DONE);
- nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE);
- return;
- }
- /* Detect a failure. Note that in ECONNABORTED case we simply ignore
- the error and wait for next connection in asynchronous manner. */
- errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
- errno == ECONNABORTED || errno == ENFILE || errno == EMFILE ||
- errno == ENOBUFS || errno == ENOMEM);
- /* Pair the two sockets. They are already paired in case
- previous attempt failed on ACCEPT_ERROR */
- nn_assert (!self->asock || self->asock == listener);
- self->asock = listener;
- nn_assert (!listener->asock || listener->asock == self);
- listener->asock = self;
- /* Some errors are just ok to ignore for now. We also stop repeating
- any errors until next IN_FD event so that we are not in a tight loop
- and allow processing other events in the meantime */
- if (nn_slow (errno != EAGAIN && errno != EWOULDBLOCK
- && errno != ECONNABORTED && errno != listener->errnum))
- {
- listener->errnum = errno;
- listener->state = NN_USOCK_STATE_ACCEPTING_ERROR;
- nn_fsm_raise (&listener->fsm,
- &listener->event_error, NN_USOCK_ACCEPT_ERROR);
- return;
- }
- /* Ask the worker thread to wait for the new connection. */
- nn_worker_execute (listener->worker, &listener->task_accept);
- }
- void nn_usock_activate (struct nn_usock *self)
- {
- nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ACTIVATE);
- }
- void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr,
- size_t addrlen)
- {
- int rc;
- /* Notify the state machine that we've started connecting. */
- nn_fsm_action (&self->fsm, NN_USOCK_ACTION_CONNECT);
- /* Do the connect itself. */
- rc = connect (self->s, addr, (socklen_t) addrlen);
- /* Immediate success. */
- if (nn_fast (rc == 0)) {
- nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE);
- return;
- }
- /* Immediate error. */
- if (nn_slow (errno != EINPROGRESS)) {
- self->errnum = errno;
- nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
- return;
- }
- /* Start asynchronous connect. */
- nn_worker_execute (self->worker, &self->task_connecting);
- }
- void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
- int iovcnt)
- {
- int rc;
- int i;
- int out;
- /* Make sure that the socket is actually alive. */
- if (self->state != NN_USOCK_STATE_ACTIVE) {
- nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
- return;
- }
- /* Copy the iovecs to the socket. */
- nn_assert (iovcnt <= NN_USOCK_MAX_IOVCNT);
- self->out.hdr.msg_iov = self->out.iov;
- out = 0;
- for (i = 0; i != iovcnt; ++i) {
- if (iov [i].iov_len == 0)
- continue;
- self->out.iov [out].iov_base = iov [i].iov_base;
- self->out.iov [out].iov_len = iov [i].iov_len;
- out++;
- }
- self->out.hdr.msg_iovlen = out;
- /* Try to send the data immediately. */
- rc = nn_usock_send_raw (self, &self->out.hdr);
- /* Success. */
- if (nn_fast (rc == 0)) {
- nn_fsm_raise (&self->fsm, &self->event_sent, NN_USOCK_SENT);
- return;
- }
- /* Errors. */
- if (nn_slow (rc != -EAGAIN)) {
- errnum_assert (rc == -ECONNRESET, -rc);
- nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
- return;
- }
- /* Ask the worker thread to send the remaining data. */
- nn_worker_execute (self->worker, &self->task_send);
- }
- void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd)
- {
- int rc;
- size_t nbytes;
- /* Make sure that the socket is actually alive. */
- if (self->state != NN_USOCK_STATE_ACTIVE) {
- nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
- return;
- }
- /* Try to receive the data immediately. */
- nbytes = len;
- self->in.pfd = fd;
- rc = nn_usock_recv_raw (self, buf, &nbytes);
- if (nn_slow (rc < 0)) {
- errnum_assert (rc == -ECONNRESET, -rc);
- nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
- return;
- }
- /* Success. */
- if (nn_fast (nbytes == len)) {
- nn_fsm_raise (&self->fsm, &self->event_received, NN_USOCK_RECEIVED);
- return;
- }
- /* There are still data to receive in the background. */
- self->in.buf = ((uint8_t*) buf) + nbytes;
- self->in.len = len - nbytes;
- /* Ask the worker thread to receive the remaining data. */
- nn_worker_execute (self->worker, &self->task_recv);
- }
- static int nn_internal_tasks (struct nn_usock *usock, int src, int type)
- {
- /******************************************************************************/
- /* Internal tasks sent from the user thread to the worker thread. */
- /******************************************************************************/
- switch (src) {
- case NN_USOCK_SRC_TASK_SEND:
- nn_assert (type == NN_WORKER_TASK_EXECUTE);
- nn_worker_set_out (usock->worker, &usock->wfd);
- return 1;
- case NN_USOCK_SRC_TASK_RECV:
- nn_assert (type == NN_WORKER_TASK_EXECUTE);
- nn_worker_set_in (usock->worker, &usock->wfd);
- return 1;
- case NN_USOCK_SRC_TASK_CONNECTED:
- nn_assert (type == NN_WORKER_TASK_EXECUTE);
- nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
- return 1;
- case NN_USOCK_SRC_TASK_CONNECTING:
- nn_assert (type == NN_WORKER_TASK_EXECUTE);
- nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
- nn_worker_set_out (usock->worker, &usock->wfd);
- return 1;
- case NN_USOCK_SRC_TASK_ACCEPT:
- nn_assert (type == NN_WORKER_TASK_EXECUTE);
- nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
- nn_worker_set_in (usock->worker, &usock->wfd);
- return 1;
- }
- return 0;
- }
- static void nn_usock_shutdown (struct nn_fsm *self, int src, int type,
- NN_UNUSED void *srcptr)
- {
- struct nn_usock *usock;
- usock = nn_cont (self, struct nn_usock, fsm);
- if (nn_internal_tasks (usock, src, type))
- return;
- if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) {
- /* Socket in ACCEPTING or CANCELLING state cannot be closed.
- Stop the socket being accepted first. */
- nn_assert (usock->state != NN_USOCK_STATE_ACCEPTING &&
- usock->state != NN_USOCK_STATE_CANCELLING);
- usock->errnum = 0;
- /* Synchronous stop. */
- if (usock->state == NN_USOCK_STATE_IDLE)
- goto finish3;
- if (usock->state == NN_USOCK_STATE_DONE)
- goto finish2;
- if (usock->state == NN_USOCK_STATE_STARTING ||
- usock->state == NN_USOCK_STATE_ACCEPTED ||
- usock->state == NN_USOCK_STATE_ACCEPTING_ERROR ||
- usock->state == NN_USOCK_STATE_LISTENING)
- goto finish1;
- /* When socket that's being accepted is asked to stop, we have to
- ask the listener socket to stop accepting first. */
- if (usock->state == NN_USOCK_STATE_BEING_ACCEPTED) {
- nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_CANCEL);
- usock->state = NN_USOCK_STATE_STOPPING_ACCEPT;
- return;
- }
- /* Asynchronous stop. */
- if (usock->state != NN_USOCK_STATE_REMOVING_FD)
- nn_usock_async_stop (usock);
- usock->state = NN_USOCK_STATE_STOPPING;
- return;
- }
- if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING_ACCEPT)) {
- nn_assert (src == NN_FSM_ACTION && type == NN_USOCK_ACTION_DONE);
- goto finish2;
- }
- if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING)) {
- if (src != NN_USOCK_SRC_TASK_STOP)
- return;
- nn_assert (type == NN_WORKER_TASK_EXECUTE);
- nn_worker_rm_fd (usock->worker, &usock->wfd);
- finish1:
- nn_closefd (usock->s);
- usock->s = -1;
- finish2:
- usock->state = NN_USOCK_STATE_IDLE;
- nn_fsm_stopped (&usock->fsm, NN_USOCK_STOPPED);
- finish3:
- return;
- }
- nn_fsm_bad_state(usock->state, src, type);
- }
- static void nn_usock_handler (struct nn_fsm *self, int src, int type,
- NN_UNUSED void *srcptr)
- {
- int rc;
- struct nn_usock *usock;
- int s;
- size_t sz;
- int sockerr;
- usock = nn_cont (self, struct nn_usock, fsm);
- if(nn_internal_tasks(usock, src, type))
- return;
- switch (usock->state) {
- /******************************************************************************/
- /* IDLE state. */
- /* nn_usock object is initialised, but underlying OS socket is not yet */
- /* created. */
- /******************************************************************************/
- case NN_USOCK_STATE_IDLE:
- switch (src) {
- case NN_FSM_ACTION:
- switch (type) {
- case NN_FSM_START:
- usock->state = NN_USOCK_STATE_STARTING;
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- default:
- nn_fsm_bad_source (usock->state, src, type);
- }
- /******************************************************************************/
- /* STARTING state. */
- /* Underlying OS socket is created, but it's not yet passed to the worker */
- /* thread. In this state we can set socket options, local and remote */
- /* address etc. */
- /******************************************************************************/
- case NN_USOCK_STATE_STARTING:
- /* Events from the owner of the usock. */
- switch (src) {
- case NN_FSM_ACTION:
- switch (type) {
- case NN_USOCK_ACTION_LISTEN:
- usock->state = NN_USOCK_STATE_LISTENING;
- return;
- case NN_USOCK_ACTION_CONNECT:
- usock->state = NN_USOCK_STATE_CONNECTING;
- return;
- case NN_USOCK_ACTION_BEING_ACCEPTED:
- usock->state = NN_USOCK_STATE_BEING_ACCEPTED;
- return;
- case NN_USOCK_ACTION_STARTED:
- nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
- usock->state = NN_USOCK_STATE_ACTIVE;
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- default:
- nn_fsm_bad_source (usock->state, src, type);
- }
- /******************************************************************************/
- /* BEING_ACCEPTED state. */
- /* accept() was called on the usock. Now the socket is waiting for a new */
- /* connection to arrive. */
- /******************************************************************************/
- case NN_USOCK_STATE_BEING_ACCEPTED:
- switch (src) {
- case NN_FSM_ACTION:
- switch (type) {
- case NN_USOCK_ACTION_DONE:
- usock->state = NN_USOCK_STATE_ACCEPTED;
- nn_fsm_raise (&usock->fsm, &usock->event_established,
- NN_USOCK_ACCEPTED);
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- default:
- nn_fsm_bad_source (usock->state, src, type);
- }
- /******************************************************************************/
- /* ACCEPTED state. */
- /* Connection was accepted, now it can be tuned. Afterwards, it'll move to */
- /* the active state. */
- /******************************************************************************/
- case NN_USOCK_STATE_ACCEPTED:
- switch (src) {
- case NN_FSM_ACTION:
- switch (type) {
- case NN_USOCK_ACTION_ACTIVATE:
- nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
- usock->state = NN_USOCK_STATE_ACTIVE;
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- default:
- nn_fsm_bad_source (usock->state, src, type);
- }
- /******************************************************************************/
- /* CONNECTING state. */
- /* Asynchronous connecting is going on. */
- /******************************************************************************/
- case NN_USOCK_STATE_CONNECTING:
- switch (src) {
- case NN_FSM_ACTION:
- switch (type) {
- case NN_USOCK_ACTION_DONE:
- usock->state = NN_USOCK_STATE_ACTIVE;
- nn_worker_execute (usock->worker, &usock->task_connected);
- nn_fsm_raise (&usock->fsm, &usock->event_established,
- NN_USOCK_CONNECTED);
- return;
- case NN_USOCK_ACTION_ERROR:
- nn_closefd (usock->s);
- usock->s = -1;
- usock->state = NN_USOCK_STATE_DONE;
- nn_fsm_raise (&usock->fsm, &usock->event_error,
- NN_USOCK_ERROR);
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- case NN_USOCK_SRC_FD:
- switch (type) {
- case NN_WORKER_FD_OUT:
- nn_worker_reset_out (usock->worker, &usock->wfd);
- usock->state = NN_USOCK_STATE_ACTIVE;
- sockerr = nn_usock_geterr(usock);
- if (sockerr == 0) {
- nn_fsm_raise (&usock->fsm, &usock->event_established,
- NN_USOCK_CONNECTED);
- } else {
- usock->errnum = sockerr;
- nn_worker_rm_fd (usock->worker, &usock->wfd);
- rc = close (usock->s);
- errno_assert (rc == 0);
- usock->s = -1;
- usock->state = NN_USOCK_STATE_DONE;
- nn_fsm_raise (&usock->fsm,
- &usock->event_error, NN_USOCK_ERROR);
- }
- return;
- case NN_WORKER_FD_ERR:
- nn_worker_rm_fd (usock->worker, &usock->wfd);
- nn_closefd (usock->s);
- usock->s = -1;
- usock->state = NN_USOCK_STATE_DONE;
- nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR);
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- default:
- nn_fsm_bad_source (usock->state, src, type);
- }
- /******************************************************************************/
- /* ACTIVE state. */
- /* Socket is connected. It can be used for sending and receiving data. */
- /******************************************************************************/
- case NN_USOCK_STATE_ACTIVE:
- switch (src) {
- case NN_USOCK_SRC_FD:
- switch (type) {
- case NN_WORKER_FD_IN:
- sz = usock->in.len;
- rc = nn_usock_recv_raw (usock, usock->in.buf, &sz);
- if (nn_fast (rc == 0)) {
- usock->in.len -= sz;
- usock->in.buf += sz;
- if (!usock->in.len) {
- nn_worker_reset_in (usock->worker, &usock->wfd);
- nn_fsm_raise (&usock->fsm, &usock->event_received,
- NN_USOCK_RECEIVED);
- }
- return;
- }
- errnum_assert (rc == -ECONNRESET, -rc);
- goto error;
- case NN_WORKER_FD_OUT:
- rc = nn_usock_send_raw (usock, &usock->out.hdr);
- if (nn_fast (rc == 0)) {
- nn_worker_reset_out (usock->worker, &usock->wfd);
- nn_fsm_raise (&usock->fsm, &usock->event_sent,
- NN_USOCK_SENT);
- return;
- }
- if (nn_fast (rc == -EAGAIN))
- return;
- errnum_assert (rc == -ECONNRESET, -rc);
- goto error;
- case NN_WORKER_FD_ERR:
- error:
- nn_worker_rm_fd (usock->worker, &usock->wfd);
- nn_closefd (usock->s);
- usock->s = -1;
- usock->state = NN_USOCK_STATE_DONE;
- nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR);
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- case NN_FSM_ACTION:
- switch (type) {
- case NN_USOCK_ACTION_ERROR:
- usock->state = NN_USOCK_STATE_REMOVING_FD;
- nn_usock_async_stop (usock);
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- default:
- nn_fsm_bad_source(usock->state, src, type);
- }
- /******************************************************************************/
- /* REMOVING_FD state. */
- /******************************************************************************/
- case NN_USOCK_STATE_REMOVING_FD:
- switch (src) {
- case NN_USOCK_SRC_TASK_STOP:
- switch (type) {
- case NN_WORKER_TASK_EXECUTE:
- nn_worker_rm_fd (usock->worker, &usock->wfd);
- nn_closefd (usock->s);
- usock->s = -1;
- usock->state = NN_USOCK_STATE_DONE;
- nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR);
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- /* Events from the file descriptor are ignored while it is being
- removed. */
- case NN_USOCK_SRC_FD:
- return;
- case NN_FSM_ACTION:
- switch (type) {
- case NN_USOCK_ACTION_ERROR:
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- default:
- nn_fsm_bad_source (usock->state, src, type);
- }
- /******************************************************************************/
- /* DONE state. */
- /* Socket is closed. The only thing that can be done in this state is */
- /* stopping the usock. */
- /******************************************************************************/
- case NN_USOCK_STATE_DONE:
- return;
- /******************************************************************************/
- /* LISTENING state. */
- /* Socket is listening for new incoming connections, however, user is not */
- /* accepting a new connection. */
- /******************************************************************************/
- case NN_USOCK_STATE_LISTENING:
- switch (src) {
- case NN_FSM_ACTION:
- switch (type) {
- case NN_USOCK_ACTION_ACCEPT:
- usock->state = NN_USOCK_STATE_ACCEPTING;
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- default:
- nn_fsm_bad_source (usock->state, src, type);
- }
- /******************************************************************************/
- /* ACCEPTING state. */
- /* User is waiting asynchronouslyfor a new inbound connection */
- /* to be accepted. */
- /******************************************************************************/
- case NN_USOCK_STATE_ACCEPTING:
- switch (src) {
- case NN_FSM_ACTION:
- switch (type) {
- case NN_USOCK_ACTION_DONE:
- usock->state = NN_USOCK_STATE_LISTENING;
- return;
- case NN_USOCK_ACTION_CANCEL:
- usock->state = NN_USOCK_STATE_CANCELLING;
- nn_worker_execute (usock->worker, &usock->task_stop);
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- case NN_USOCK_SRC_FD:
- switch (type) {
- case NN_WORKER_FD_IN:
- /* New connection arrived in asynchronous manner. */
- #if NN_HAVE_ACCEPT4
- s = accept4 (usock->s, NULL, NULL, SOCK_CLOEXEC);
- #else
- s = accept (usock->s, NULL, NULL);
- #endif
- /* ECONNABORTED is an valid error. New connection was closed
- by the peer before we were able to accept it. If it happens
- do nothing and wait for next incoming connection. */
- if (nn_slow (s < 0 && errno == ECONNABORTED))
- return;
- /* Resource allocation errors. It's not clear from POSIX
- specification whether the new connection is closed in this
- case or whether it remains in the backlog. In the latter
- case it would be wise to wait here for a while to prevent
- busy looping. */
- if (nn_slow (s < 0 && (errno == ENFILE || errno == EMFILE ||
- errno == ENOBUFS || errno == ENOMEM))) {
- usock->errnum = errno;
- usock->state = NN_USOCK_STATE_ACCEPTING_ERROR;
- /* Wait till the user starts accepting once again. */
- nn_worker_rm_fd (usock->worker, &usock->wfd);
- nn_fsm_raise (&usock->fsm,
- &usock->event_error, NN_USOCK_ACCEPT_ERROR);
- return;
- }
- /* Any other error is unexpected. */
- errno_assert (s >= 0);
- /* Initialise the new usock object. */
- nn_usock_init_from_fd (usock->asock, s);
- usock->asock->state = NN_USOCK_STATE_ACCEPTED;
- /* Notify the user that connection was accepted. */
- nn_fsm_raise (&usock->asock->fsm,
- &usock->asock->event_established, NN_USOCK_ACCEPTED);
- /* Disassociate the listener socket from the accepted
- socket. */
- usock->asock->asock = NULL;
- usock->asock = NULL;
- /* Wait till the user starts accepting once again. */
- nn_worker_rm_fd (usock->worker, &usock->wfd);
- usock->state = NN_USOCK_STATE_LISTENING;
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- default:
- nn_fsm_bad_source (usock->state, src, type);
- }
- /******************************************************************************/
- /* ACCEPTING_ERROR state. */
- /* Waiting the socket to accept the error and restart */
- /******************************************************************************/
- case NN_USOCK_STATE_ACCEPTING_ERROR:
- switch (src) {
- case NN_FSM_ACTION:
- switch (type) {
- case NN_USOCK_ACTION_ACCEPT:
- usock->state = NN_USOCK_STATE_ACCEPTING;
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- default:
- nn_fsm_bad_source (usock->state, src, type);
- }
- /******************************************************************************/
- /* CANCELLING state. */
- /******************************************************************************/
- case NN_USOCK_STATE_CANCELLING:
- switch (src) {
- case NN_USOCK_SRC_TASK_STOP:
- switch (type) {
- case NN_WORKER_TASK_EXECUTE:
- nn_worker_rm_fd (usock->worker, &usock->wfd);
- usock->state = NN_USOCK_STATE_LISTENING;
- /* Notify the accepted socket that it was stopped. */
- nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_DONE);
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- case NN_USOCK_SRC_FD:
- switch (type) {
- case NN_WORKER_FD_IN:
- return;
- default:
- nn_fsm_bad_action (usock->state, src, type);
- }
- default:
- nn_fsm_bad_source (usock->state, src, type);
- }
- /******************************************************************************/
- /* Invalid state */
- /******************************************************************************/
- default:
- nn_fsm_bad_state (usock->state, src, type);
- }
- }
- static int nn_usock_send_raw (struct nn_usock *self, struct msghdr *hdr)
- {
- ssize_t nbytes;
- /* Try to send the data. */
- #if defined MSG_NOSIGNAL
- nbytes = sendmsg (self->s, hdr, MSG_NOSIGNAL);
- #else
- nbytes = sendmsg (self->s, hdr, 0);
- #endif
- /* Handle errors. */
- if (nn_slow (nbytes < 0)) {
- if (nn_fast (errno == EAGAIN || errno == EWOULDBLOCK))
- nbytes = 0;
- else {
- /* If the connection fails, return ECONNRESET. */
- return -ECONNRESET;
- }
- }
- /* Some bytes were sent. Adjust the iovecs accordingly. */
- while (nbytes) {
- if (nbytes >= (ssize_t)hdr->msg_iov->iov_len) {
- --hdr->msg_iovlen;
- if (!hdr->msg_iovlen) {
- nn_assert (nbytes == (ssize_t)hdr->msg_iov->iov_len);
- return 0;
- }
- nbytes -= hdr->msg_iov->iov_len;
- ++hdr->msg_iov;
- }
- else {
- *((uint8_t**) &(hdr->msg_iov->iov_base)) += nbytes;
- hdr->msg_iov->iov_len -= nbytes;
- return -EAGAIN;
- }
- }
- if (hdr->msg_iovlen > 0)
- return -EAGAIN;
- return 0;
- }
- static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len)
- {
- size_t sz;
- size_t length;
- ssize_t nbytes;
- struct iovec iov;
- struct msghdr hdr;
- unsigned char ctrl [256];
- #if defined NN_HAVE_MSG_CONTROL
- struct cmsghdr *cmsg;
- #endif
- /* If batch buffer doesn't exist, allocate it. The point of delayed
- deallocation to allow non-receiving sockets, such as TCP listening
- sockets, to do without the batch buffer. */
- if (nn_slow (!self->in.batch)) {
- self->in.batch = nn_alloc (NN_USOCK_BATCH_SIZE, "AIO batch buffer");
- alloc_assert (self->in.batch);
- }
- /* Try to satisfy the recv request by data from the batch buffer. */
- length = *len;
- sz = self->in.batch_len - self->in.batch_pos;
- if (sz) {
- if (sz > length)
- sz = length;
- memcpy (buf, self->in.batch + self->in.batch_pos, sz);
- self->in.batch_pos += sz;
- buf = ((char*) buf) + sz;
- length -= sz;
- if (!length)
- return 0;
- }
- /* If recv request is greater than the batch buffer, get the data directly
- into the place. Otherwise, read data to the batch buffer. */
- if (length > NN_USOCK_BATCH_SIZE) {
- iov.iov_base = buf;
- iov.iov_len = length;
- }
- else {
- iov.iov_base = self->in.batch;
- iov.iov_len = NN_USOCK_BATCH_SIZE;
- }
- memset (&hdr, 0, sizeof (hdr));
- hdr.msg_iov = &iov;
- hdr.msg_iovlen = 1;
- #if defined NN_HAVE_MSG_CONTROL
- hdr.msg_control = ctrl;
- hdr.msg_controllen = sizeof (ctrl);
- #else
- *((int*) ctrl) = -1;
- hdr.msg_accrights = ctrl;
- hdr.msg_accrightslen = sizeof (int);
- #endif
- nbytes = recvmsg (self->s, &hdr, 0);
- /* Handle any possible errors. */
- if (nn_slow (nbytes <= 0)) {
- if (nn_slow (nbytes == 0))
- return -ECONNRESET;
- /* Zero bytes received. */
- if (nn_fast (errno == EAGAIN || errno == EWOULDBLOCK))
- nbytes = 0;
- else {
- /* If the peer closes the connection, return ECONNRESET. */
- return -ECONNRESET;
- }
- }
- /* Extract the associated file descriptor, if any. */
- if (nbytes > 0) {
- #if defined NN_HAVE_MSG_CONTROL
- cmsg = CMSG_FIRSTHDR (&hdr);
- while (cmsg) {
- if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
- if (self->in.pfd) {
- *self->in.pfd = *((int*) CMSG_DATA (cmsg));
- self->in.pfd = NULL;
- }
- else {
- nn_closefd (*((int*) CMSG_DATA (cmsg)));
- }
- break;
- }
- cmsg = CMSG_NXTHDR (&hdr, cmsg);
- }
- #else
- if (hdr.msg_accrightslen > 0) {
- nn_assert (hdr.msg_accrightslen == sizeof (int));
- if (self->in.pfd) {
- *self->in.pfd = *((int*) hdr.msg_accrights);
- self->in.pfd = NULL;
- }
- else {
- nn_closefd (*((int*) hdr.msg_accrights));
- }
- }
- #endif
- }
- /* If the data were received directly into the place we can return
- straight away. */
- if (length > NN_USOCK_BATCH_SIZE) {
- length -= nbytes;
- *len -= length;
- return 0;
- }
- /* New data were read to the batch buffer. Copy the requested amount of it
- to the user-supplied buffer. */
- self->in.batch_len = nbytes;
- self->in.batch_pos = 0;
- if (nbytes) {
- sz = nbytes > (ssize_t)length ? length : (size_t)nbytes;
- memcpy (buf, self->in.batch, sz);
- length -= sz;
- self->in.batch_pos += sz;
- }
- *len -= length;
- return 0;
- }
- static int nn_usock_geterr (struct nn_usock *self)
- {
- int rc;
- int opt;
- #if defined NN_HAVE_HPUX
- int optsz;
- #else
- socklen_t optsz;
- #endif
- opt = 0;
- optsz = sizeof (opt);
- rc = getsockopt (self->s, SOL_SOCKET, SO_ERROR, &opt, &optsz);
- /* The following should handle both Solaris and UNIXes derived from BSD. */
- if (rc == -1)
- return errno;
- errno_assert (rc == 0);
- nn_assert (optsz == sizeof (opt));
- return opt;
- }
|