usock_posix.inc 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211
  1. /*
  2. Copyright (c) 2013 Martin Sustrik All rights reserved.
  3. Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
  4. Copyright 2016 Garrett D'Amore <garrett@damore.org>
  5. Permission is hereby granted, free of charge, to any person obtaining a copy
  6. of this software and associated documentation files (the "Software"),
  7. to deal in the Software without restriction, including without limitation
  8. the rights to use, copy, modify, merge, publish, distribute, sublicense,
  9. and/or sell copies of the Software, and to permit persons to whom
  10. the Software is furnished to do so, subject to the following conditions:
  11. The above copyright notice and this permission notice shall be included
  12. in all copies or substantial portions of the Software.
  13. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  16. THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  18. FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  19. IN THE SOFTWARE.
  20. */
  21. #include "../utils/alloc.h"
  22. #include "../utils/closefd.h"
  23. #include "../utils/cont.h"
  24. #include "../utils/fast.h"
  25. #include "../utils/err.h"
  26. #include "../utils/attr.h"
  27. #include <string.h>
  28. #include <unistd.h>
  29. #include <fcntl.h>
  30. #include <sys/uio.h>
  31. #define NN_USOCK_STATE_IDLE 1
  32. #define NN_USOCK_STATE_STARTING 2
  33. #define NN_USOCK_STATE_BEING_ACCEPTED 3
  34. #define NN_USOCK_STATE_ACCEPTED 4
  35. #define NN_USOCK_STATE_CONNECTING 5
  36. #define NN_USOCK_STATE_ACTIVE 6
  37. #define NN_USOCK_STATE_REMOVING_FD 7
  38. #define NN_USOCK_STATE_DONE 8
  39. #define NN_USOCK_STATE_LISTENING 9
  40. #define NN_USOCK_STATE_ACCEPTING 10
  41. #define NN_USOCK_STATE_CANCELLING 11
  42. #define NN_USOCK_STATE_STOPPING 12
  43. #define NN_USOCK_STATE_STOPPING_ACCEPT 13
  44. #define NN_USOCK_STATE_ACCEPTING_ERROR 14
  45. #define NN_USOCK_ACTION_ACCEPT 1
  46. #define NN_USOCK_ACTION_BEING_ACCEPTED 2
  47. #define NN_USOCK_ACTION_CANCEL 3
  48. #define NN_USOCK_ACTION_LISTEN 4
  49. #define NN_USOCK_ACTION_CONNECT 5
  50. #define NN_USOCK_ACTION_ACTIVATE 6
  51. #define NN_USOCK_ACTION_DONE 7
  52. #define NN_USOCK_ACTION_ERROR 8
  53. #define NN_USOCK_ACTION_STARTED 9
  54. #define NN_USOCK_SRC_FD 1
  55. #define NN_USOCK_SRC_TASK_CONNECTING 2
  56. #define NN_USOCK_SRC_TASK_CONNECTED 3
  57. #define NN_USOCK_SRC_TASK_ACCEPT 4
  58. #define NN_USOCK_SRC_TASK_SEND 5
  59. #define NN_USOCK_SRC_TASK_RECV 6
  60. #define NN_USOCK_SRC_TASK_STOP 7
  61. /* Private functions. */
  62. static void nn_usock_init_from_fd (struct nn_usock *self, int s);
  63. static int nn_usock_send_raw (struct nn_usock *self, struct msghdr *hdr);
  64. static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len);
  65. static int nn_usock_geterr (struct nn_usock *self);
  66. static void nn_usock_handler (struct nn_fsm *self, int src, int type,
  67. void *srcptr);
  68. static void nn_usock_shutdown (struct nn_fsm *self, int src, int type,
  69. void *srcptr);
  70. void nn_usock_init (struct nn_usock *self, int src, struct nn_fsm *owner)
  71. {
  72. /* Initalise the state machine. */
  73. nn_fsm_init (&self->fsm, nn_usock_handler, nn_usock_shutdown,
  74. src, self, owner);
  75. self->state = NN_USOCK_STATE_IDLE;
  76. /* Choose a worker thread to handle this socket. */
  77. self->worker = nn_fsm_choose_worker (&self->fsm);
  78. /* Actual file descriptor will be generated during 'start' step. */
  79. self->s = -1;
  80. self->errnum = 0;
  81. self->in.buf = NULL;
  82. self->in.len = 0;
  83. self->in.batch = NULL;
  84. self->in.batch_len = 0;
  85. self->in.batch_pos = 0;
  86. self->in.pfd = NULL;
  87. memset (&self->out.hdr, 0, sizeof (struct msghdr));
  88. /* Initialise tasks for the worker thread. */
  89. nn_worker_fd_init (&self->wfd, NN_USOCK_SRC_FD, &self->fsm);
  90. nn_worker_task_init (&self->task_connecting, NN_USOCK_SRC_TASK_CONNECTING,
  91. &self->fsm);
  92. nn_worker_task_init (&self->task_connected, NN_USOCK_SRC_TASK_CONNECTED,
  93. &self->fsm);
  94. nn_worker_task_init (&self->task_accept, NN_USOCK_SRC_TASK_ACCEPT,
  95. &self->fsm);
  96. nn_worker_task_init (&self->task_send, NN_USOCK_SRC_TASK_SEND, &self->fsm);
  97. nn_worker_task_init (&self->task_recv, NN_USOCK_SRC_TASK_RECV, &self->fsm);
  98. nn_worker_task_init (&self->task_stop, NN_USOCK_SRC_TASK_STOP, &self->fsm);
  99. /* Intialise events raised by usock. */
  100. nn_fsm_event_init (&self->event_established);
  101. nn_fsm_event_init (&self->event_sent);
  102. nn_fsm_event_init (&self->event_received);
  103. nn_fsm_event_init (&self->event_error);
  104. /* accepting is not going on at the moment. */
  105. self->asock = NULL;
  106. }
  107. void nn_usock_term (struct nn_usock *self)
  108. {
  109. nn_assert_state (self, NN_USOCK_STATE_IDLE);
  110. if (self->in.batch)
  111. nn_free (self->in.batch);
  112. nn_fsm_event_term (&self->event_error);
  113. nn_fsm_event_term (&self->event_received);
  114. nn_fsm_event_term (&self->event_sent);
  115. nn_fsm_event_term (&self->event_established);
  116. nn_worker_cancel (self->worker, &self->task_recv);
  117. nn_worker_task_term (&self->task_stop);
  118. nn_worker_task_term (&self->task_recv);
  119. nn_worker_task_term (&self->task_send);
  120. nn_worker_task_term (&self->task_accept);
  121. nn_worker_task_term (&self->task_connected);
  122. nn_worker_task_term (&self->task_connecting);
  123. nn_worker_fd_term (&self->wfd);
  124. nn_fsm_term (&self->fsm);
  125. }
  126. int nn_usock_isidle (struct nn_usock *self)
  127. {
  128. return nn_fsm_isidle (&self->fsm);
  129. }
  130. int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol)
  131. {
  132. int s;
  133. /* If the operating system allows to directly open the socket with CLOEXEC
  134. flag, do so. That way there are no race conditions. */
  135. #ifdef SOCK_CLOEXEC
  136. type |= SOCK_CLOEXEC;
  137. #endif
  138. /* Open the underlying socket. */
  139. s = socket (domain, type, protocol);
  140. if (nn_slow (s < 0))
  141. return -errno;
  142. nn_usock_init_from_fd (self, s);
  143. /* Start the state machine. */
  144. nn_fsm_start (&self->fsm);
  145. return 0;
  146. }
  147. void nn_usock_start_fd (struct nn_usock *self, int fd)
  148. {
  149. nn_usock_init_from_fd (self, fd);
  150. nn_fsm_start (&self->fsm);
  151. nn_fsm_action (&self->fsm, NN_USOCK_ACTION_STARTED);
  152. }
  153. static void nn_usock_init_from_fd (struct nn_usock *self, int s)
  154. {
  155. int rc;
  156. int opt;
  157. nn_assert (self->state == NN_USOCK_STATE_IDLE ||
  158. NN_USOCK_STATE_BEING_ACCEPTED);
  159. /* Store the file descriptor. */
  160. nn_assert (self->s == -1);
  161. self->s = s;
  162. /* Setting FD_CLOEXEC option immediately after socket creation is the
  163. second best option after using SOCK_CLOEXEC. There is a race condition
  164. here (if process is forked between socket creation and setting
  165. the option) but the problem is pretty unlikely to happen. */
  166. #if defined FD_CLOEXEC
  167. rc = fcntl (self->s, F_SETFD, FD_CLOEXEC);
  168. #if defined NN_HAVE_OSX
  169. errno_assert (rc != -1 || errno == EINVAL);
  170. #else
  171. errno_assert (rc != -1);
  172. #endif
  173. #endif
  174. /* If applicable, prevent SIGPIPE signal when writing to the connection
  175. already closed by the peer. */
  176. #ifdef SO_NOSIGPIPE
  177. opt = 1;
  178. rc = setsockopt (self->s, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof (opt));
  179. #if defined NN_HAVE_OSX
  180. errno_assert (rc == 0 || errno == EINVAL);
  181. #else
  182. errno_assert (rc == 0);
  183. #endif
  184. #endif
  185. /* Switch the socket to the non-blocking mode. All underlying sockets
  186. are always used in the callbackhronous mode. */
  187. opt = fcntl (self->s, F_GETFL, 0);
  188. if (opt == -1)
  189. opt = 0;
  190. if (!(opt & O_NONBLOCK)) {
  191. rc = fcntl (self->s, F_SETFL, opt | O_NONBLOCK);
  192. #if defined NN_HAVE_OSX
  193. errno_assert (rc != -1 || errno == EINVAL);
  194. #else
  195. errno_assert (rc != -1);
  196. #endif
  197. }
  198. }
  199. void nn_usock_stop (struct nn_usock *self)
  200. {
  201. nn_fsm_stop (&self->fsm);
  202. }
  203. void nn_usock_async_stop (struct nn_usock *self)
  204. {
  205. nn_worker_execute (self->worker, &self->task_stop);
  206. nn_fsm_raise (&self->fsm, &self->event_error, NN_USOCK_SHUTDOWN);
  207. }
  208. void nn_usock_swap_owner (struct nn_usock *self, struct nn_fsm_owner *owner)
  209. {
  210. nn_fsm_swap_owner (&self->fsm, owner);
  211. }
  212. int nn_usock_setsockopt (struct nn_usock *self, int level, int optname,
  213. const void *optval, size_t optlen)
  214. {
  215. int rc;
  216. /* The socket can be modified only before it's active. */
  217. nn_assert (self->state == NN_USOCK_STATE_STARTING ||
  218. self->state == NN_USOCK_STATE_ACCEPTED);
  219. /* EINVAL errors are ignored on OSX platform. The reason for that is buggy
  220. OSX behaviour where setsockopt returns EINVAL if the peer have already
  221. disconnected. Thus, nn_usock_setsockopt() can succeed on OSX even though
  222. the option value was invalid, but the peer have already closed the
  223. connection. This behaviour should be relatively harmless. */
  224. rc = setsockopt (self->s, level, optname, optval, (socklen_t) optlen);
  225. #if defined NN_HAVE_OSX
  226. if (nn_slow (rc != 0 && errno != EINVAL))
  227. return -errno;
  228. #else
  229. if (nn_slow (rc != 0))
  230. return -errno;
  231. #endif
  232. return 0;
  233. }
  234. int nn_usock_bind (struct nn_usock *self, const struct sockaddr *addr,
  235. size_t addrlen)
  236. {
  237. int rc;
  238. int opt;
  239. /* The socket can be bound only before it's connected. */
  240. nn_assert_state (self, NN_USOCK_STATE_STARTING);
  241. /* Allow re-using the address. */
  242. opt = 1;
  243. rc = setsockopt (self->s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));
  244. errno_assert (rc == 0);
  245. rc = bind (self->s, addr, (socklen_t) addrlen);
  246. if (nn_slow (rc != 0))
  247. return -errno;
  248. return 0;
  249. }
  250. int nn_usock_listen (struct nn_usock *self, int backlog)
  251. {
  252. int rc;
  253. /* You can start listening only before the socket is connected. */
  254. nn_assert_state (self, NN_USOCK_STATE_STARTING);
  255. /* Start listening for incoming connections. */
  256. rc = listen (self->s, backlog);
  257. if (nn_slow (rc != 0))
  258. return -errno;
  259. /* Notify the state machine. */
  260. nn_fsm_action (&self->fsm, NN_USOCK_ACTION_LISTEN);
  261. return 0;
  262. }
  263. void nn_usock_accept (struct nn_usock *self, struct nn_usock *listener)
  264. {
  265. int s;
  266. /* Start the actual accepting. */
  267. if (nn_fsm_isidle(&self->fsm)) {
  268. nn_fsm_start (&self->fsm);
  269. nn_fsm_action (&self->fsm, NN_USOCK_ACTION_BEING_ACCEPTED);
  270. }
  271. nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_ACCEPT);
  272. /* Try to accept new connection in synchronous manner. */
  273. #if NN_HAVE_ACCEPT4
  274. s = accept4 (listener->s, NULL, NULL, SOCK_CLOEXEC);
  275. #else
  276. s = accept (listener->s, NULL, NULL);
  277. #endif
  278. /* Immediate success. */
  279. if (nn_fast (s >= 0)) {
  280. /* Disassociate the listener socket from the accepted
  281. socket. Is useful if we restart accepting on ACCEPT_ERROR */
  282. listener->asock = NULL;
  283. self->asock = NULL;
  284. nn_usock_init_from_fd (self, s);
  285. nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_DONE);
  286. nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE);
  287. return;
  288. }
  289. /* Detect a failure. Note that in ECONNABORTED case we simply ignore
  290. the error and wait for next connection in asynchronous manner. */
  291. errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
  292. errno == ECONNABORTED || errno == ENFILE || errno == EMFILE ||
  293. errno == ENOBUFS || errno == ENOMEM);
  294. /* Pair the two sockets. They are already paired in case
  295. previous attempt failed on ACCEPT_ERROR */
  296. nn_assert (!self->asock || self->asock == listener);
  297. self->asock = listener;
  298. nn_assert (!listener->asock || listener->asock == self);
  299. listener->asock = self;
  300. /* Some errors are just ok to ignore for now. We also stop repeating
  301. any errors until next IN_FD event so that we are not in a tight loop
  302. and allow processing other events in the meantime */
  303. if (nn_slow (errno != EAGAIN && errno != EWOULDBLOCK
  304. && errno != ECONNABORTED && errno != listener->errnum))
  305. {
  306. listener->errnum = errno;
  307. listener->state = NN_USOCK_STATE_ACCEPTING_ERROR;
  308. nn_fsm_raise (&listener->fsm,
  309. &listener->event_error, NN_USOCK_ACCEPT_ERROR);
  310. return;
  311. }
  312. /* Ask the worker thread to wait for the new connection. */
  313. nn_worker_execute (listener->worker, &listener->task_accept);
  314. }
  315. void nn_usock_activate (struct nn_usock *self)
  316. {
  317. nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ACTIVATE);
  318. }
  319. void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr,
  320. size_t addrlen)
  321. {
  322. int rc;
  323. /* Notify the state machine that we've started connecting. */
  324. nn_fsm_action (&self->fsm, NN_USOCK_ACTION_CONNECT);
  325. /* Do the connect itself. */
  326. rc = connect (self->s, addr, (socklen_t) addrlen);
  327. /* Immediate success. */
  328. if (nn_fast (rc == 0)) {
  329. nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE);
  330. return;
  331. }
  332. /* Immediate error. */
  333. if (nn_slow (errno != EINPROGRESS)) {
  334. self->errnum = errno;
  335. nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
  336. return;
  337. }
  338. /* Start asynchronous connect. */
  339. nn_worker_execute (self->worker, &self->task_connecting);
  340. }
  341. void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
  342. int iovcnt)
  343. {
  344. int rc;
  345. int i;
  346. int out;
  347. /* Make sure that the socket is actually alive. */
  348. if (self->state != NN_USOCK_STATE_ACTIVE) {
  349. nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
  350. return;
  351. }
  352. /* Copy the iovecs to the socket. */
  353. nn_assert (iovcnt <= NN_USOCK_MAX_IOVCNT);
  354. self->out.hdr.msg_iov = self->out.iov;
  355. out = 0;
  356. for (i = 0; i != iovcnt; ++i) {
  357. if (iov [i].iov_len == 0)
  358. continue;
  359. self->out.iov [out].iov_base = iov [i].iov_base;
  360. self->out.iov [out].iov_len = iov [i].iov_len;
  361. out++;
  362. }
  363. self->out.hdr.msg_iovlen = out;
  364. /* Try to send the data immediately. */
  365. rc = nn_usock_send_raw (self, &self->out.hdr);
  366. /* Success. */
  367. if (nn_fast (rc == 0)) {
  368. nn_fsm_raise (&self->fsm, &self->event_sent, NN_USOCK_SENT);
  369. return;
  370. }
  371. /* Errors. */
  372. if (nn_slow (rc != -EAGAIN)) {
  373. errnum_assert (rc == -ECONNRESET, -rc);
  374. nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
  375. return;
  376. }
  377. /* Ask the worker thread to send the remaining data. */
  378. nn_worker_execute (self->worker, &self->task_send);
  379. }
  380. void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd)
  381. {
  382. int rc;
  383. size_t nbytes;
  384. /* Make sure that the socket is actually alive. */
  385. if (self->state != NN_USOCK_STATE_ACTIVE) {
  386. nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
  387. return;
  388. }
  389. /* Try to receive the data immediately. */
  390. nbytes = len;
  391. self->in.pfd = fd;
  392. rc = nn_usock_recv_raw (self, buf, &nbytes);
  393. if (nn_slow (rc < 0)) {
  394. errnum_assert (rc == -ECONNRESET, -rc);
  395. nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
  396. return;
  397. }
  398. /* Success. */
  399. if (nn_fast (nbytes == len)) {
  400. nn_fsm_raise (&self->fsm, &self->event_received, NN_USOCK_RECEIVED);
  401. return;
  402. }
  403. /* There are still data to receive in the background. */
  404. self->in.buf = ((uint8_t*) buf) + nbytes;
  405. self->in.len = len - nbytes;
  406. /* Ask the worker thread to receive the remaining data. */
  407. nn_worker_execute (self->worker, &self->task_recv);
  408. }
  409. static int nn_internal_tasks (struct nn_usock *usock, int src, int type)
  410. {
  411. /******************************************************************************/
  412. /* Internal tasks sent from the user thread to the worker thread. */
  413. /******************************************************************************/
  414. switch (src) {
  415. case NN_USOCK_SRC_TASK_SEND:
  416. nn_assert (type == NN_WORKER_TASK_EXECUTE);
  417. nn_worker_set_out (usock->worker, &usock->wfd);
  418. return 1;
  419. case NN_USOCK_SRC_TASK_RECV:
  420. nn_assert (type == NN_WORKER_TASK_EXECUTE);
  421. nn_worker_set_in (usock->worker, &usock->wfd);
  422. return 1;
  423. case NN_USOCK_SRC_TASK_CONNECTED:
  424. nn_assert (type == NN_WORKER_TASK_EXECUTE);
  425. nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
  426. return 1;
  427. case NN_USOCK_SRC_TASK_CONNECTING:
  428. nn_assert (type == NN_WORKER_TASK_EXECUTE);
  429. nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
  430. nn_worker_set_out (usock->worker, &usock->wfd);
  431. return 1;
  432. case NN_USOCK_SRC_TASK_ACCEPT:
  433. nn_assert (type == NN_WORKER_TASK_EXECUTE);
  434. nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
  435. nn_worker_set_in (usock->worker, &usock->wfd);
  436. return 1;
  437. }
  438. return 0;
  439. }
  440. static void nn_usock_shutdown (struct nn_fsm *self, int src, int type,
  441. NN_UNUSED void *srcptr)
  442. {
  443. struct nn_usock *usock;
  444. usock = nn_cont (self, struct nn_usock, fsm);
  445. if (nn_internal_tasks (usock, src, type))
  446. return;
  447. if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) {
  448. /* Socket in ACCEPTING or CANCELLING state cannot be closed.
  449. Stop the socket being accepted first. */
  450. nn_assert (usock->state != NN_USOCK_STATE_ACCEPTING &&
  451. usock->state != NN_USOCK_STATE_CANCELLING);
  452. usock->errnum = 0;
  453. /* Synchronous stop. */
  454. if (usock->state == NN_USOCK_STATE_IDLE)
  455. goto finish3;
  456. if (usock->state == NN_USOCK_STATE_DONE)
  457. goto finish2;
  458. if (usock->state == NN_USOCK_STATE_STARTING ||
  459. usock->state == NN_USOCK_STATE_ACCEPTED ||
  460. usock->state == NN_USOCK_STATE_ACCEPTING_ERROR ||
  461. usock->state == NN_USOCK_STATE_LISTENING)
  462. goto finish1;
  463. /* When socket that's being accepted is asked to stop, we have to
  464. ask the listener socket to stop accepting first. */
  465. if (usock->state == NN_USOCK_STATE_BEING_ACCEPTED) {
  466. nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_CANCEL);
  467. usock->state = NN_USOCK_STATE_STOPPING_ACCEPT;
  468. return;
  469. }
  470. /* Asynchronous stop. */
  471. if (usock->state != NN_USOCK_STATE_REMOVING_FD)
  472. nn_usock_async_stop (usock);
  473. usock->state = NN_USOCK_STATE_STOPPING;
  474. return;
  475. }
  476. if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING_ACCEPT)) {
  477. nn_assert (src == NN_FSM_ACTION && type == NN_USOCK_ACTION_DONE);
  478. goto finish2;
  479. }
  480. if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING)) {
  481. if (src != NN_USOCK_SRC_TASK_STOP)
  482. return;
  483. nn_assert (type == NN_WORKER_TASK_EXECUTE);
  484. nn_worker_rm_fd (usock->worker, &usock->wfd);
  485. finish1:
  486. nn_closefd (usock->s);
  487. usock->s = -1;
  488. finish2:
  489. usock->state = NN_USOCK_STATE_IDLE;
  490. nn_fsm_stopped (&usock->fsm, NN_USOCK_STOPPED);
  491. finish3:
  492. return;
  493. }
  494. nn_fsm_bad_state(usock->state, src, type);
  495. }
  496. static void nn_usock_handler (struct nn_fsm *self, int src, int type,
  497. NN_UNUSED void *srcptr)
  498. {
  499. int rc;
  500. struct nn_usock *usock;
  501. int s;
  502. size_t sz;
  503. int sockerr;
  504. usock = nn_cont (self, struct nn_usock, fsm);
  505. if(nn_internal_tasks(usock, src, type))
  506. return;
  507. switch (usock->state) {
  508. /******************************************************************************/
  509. /* IDLE state. */
  510. /* nn_usock object is initialised, but underlying OS socket is not yet */
  511. /* created. */
  512. /******************************************************************************/
  513. case NN_USOCK_STATE_IDLE:
  514. switch (src) {
  515. case NN_FSM_ACTION:
  516. switch (type) {
  517. case NN_FSM_START:
  518. usock->state = NN_USOCK_STATE_STARTING;
  519. return;
  520. default:
  521. nn_fsm_bad_action (usock->state, src, type);
  522. }
  523. default:
  524. nn_fsm_bad_source (usock->state, src, type);
  525. }
  526. /******************************************************************************/
  527. /* STARTING state. */
  528. /* Underlying OS socket is created, but it's not yet passed to the worker */
  529. /* thread. In this state we can set socket options, local and remote */
  530. /* address etc. */
  531. /******************************************************************************/
  532. case NN_USOCK_STATE_STARTING:
  533. /* Events from the owner of the usock. */
  534. switch (src) {
  535. case NN_FSM_ACTION:
  536. switch (type) {
  537. case NN_USOCK_ACTION_LISTEN:
  538. usock->state = NN_USOCK_STATE_LISTENING;
  539. return;
  540. case NN_USOCK_ACTION_CONNECT:
  541. usock->state = NN_USOCK_STATE_CONNECTING;
  542. return;
  543. case NN_USOCK_ACTION_BEING_ACCEPTED:
  544. usock->state = NN_USOCK_STATE_BEING_ACCEPTED;
  545. return;
  546. case NN_USOCK_ACTION_STARTED:
  547. nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
  548. usock->state = NN_USOCK_STATE_ACTIVE;
  549. return;
  550. default:
  551. nn_fsm_bad_action (usock->state, src, type);
  552. }
  553. default:
  554. nn_fsm_bad_source (usock->state, src, type);
  555. }
  556. /******************************************************************************/
  557. /* BEING_ACCEPTED state. */
  558. /* accept() was called on the usock. Now the socket is waiting for a new */
  559. /* connection to arrive. */
  560. /******************************************************************************/
  561. case NN_USOCK_STATE_BEING_ACCEPTED:
  562. switch (src) {
  563. case NN_FSM_ACTION:
  564. switch (type) {
  565. case NN_USOCK_ACTION_DONE:
  566. usock->state = NN_USOCK_STATE_ACCEPTED;
  567. nn_fsm_raise (&usock->fsm, &usock->event_established,
  568. NN_USOCK_ACCEPTED);
  569. return;
  570. default:
  571. nn_fsm_bad_action (usock->state, src, type);
  572. }
  573. default:
  574. nn_fsm_bad_source (usock->state, src, type);
  575. }
  576. /******************************************************************************/
  577. /* ACCEPTED state. */
  578. /* Connection was accepted, now it can be tuned. Afterwards, it'll move to */
  579. /* the active state. */
  580. /******************************************************************************/
  581. case NN_USOCK_STATE_ACCEPTED:
  582. switch (src) {
  583. case NN_FSM_ACTION:
  584. switch (type) {
  585. case NN_USOCK_ACTION_ACTIVATE:
  586. nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
  587. usock->state = NN_USOCK_STATE_ACTIVE;
  588. return;
  589. default:
  590. nn_fsm_bad_action (usock->state, src, type);
  591. }
  592. default:
  593. nn_fsm_bad_source (usock->state, src, type);
  594. }
  595. /******************************************************************************/
  596. /* CONNECTING state. */
  597. /* Asynchronous connecting is going on. */
  598. /******************************************************************************/
  599. case NN_USOCK_STATE_CONNECTING:
  600. switch (src) {
  601. case NN_FSM_ACTION:
  602. switch (type) {
  603. case NN_USOCK_ACTION_DONE:
  604. usock->state = NN_USOCK_STATE_ACTIVE;
  605. nn_worker_execute (usock->worker, &usock->task_connected);
  606. nn_fsm_raise (&usock->fsm, &usock->event_established,
  607. NN_USOCK_CONNECTED);
  608. return;
  609. case NN_USOCK_ACTION_ERROR:
  610. nn_closefd (usock->s);
  611. usock->s = -1;
  612. usock->state = NN_USOCK_STATE_DONE;
  613. nn_fsm_raise (&usock->fsm, &usock->event_error,
  614. NN_USOCK_ERROR);
  615. return;
  616. default:
  617. nn_fsm_bad_action (usock->state, src, type);
  618. }
  619. case NN_USOCK_SRC_FD:
  620. switch (type) {
  621. case NN_WORKER_FD_OUT:
  622. nn_worker_reset_out (usock->worker, &usock->wfd);
  623. usock->state = NN_USOCK_STATE_ACTIVE;
  624. sockerr = nn_usock_geterr(usock);
  625. if (sockerr == 0) {
  626. nn_fsm_raise (&usock->fsm, &usock->event_established,
  627. NN_USOCK_CONNECTED);
  628. } else {
  629. usock->errnum = sockerr;
  630. nn_worker_rm_fd (usock->worker, &usock->wfd);
  631. rc = close (usock->s);
  632. errno_assert (rc == 0);
  633. usock->s = -1;
  634. usock->state = NN_USOCK_STATE_DONE;
  635. nn_fsm_raise (&usock->fsm,
  636. &usock->event_error, NN_USOCK_ERROR);
  637. }
  638. return;
  639. case NN_WORKER_FD_ERR:
  640. nn_worker_rm_fd (usock->worker, &usock->wfd);
  641. nn_closefd (usock->s);
  642. usock->s = -1;
  643. usock->state = NN_USOCK_STATE_DONE;
  644. nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR);
  645. return;
  646. default:
  647. nn_fsm_bad_action (usock->state, src, type);
  648. }
  649. default:
  650. nn_fsm_bad_source (usock->state, src, type);
  651. }
  652. /******************************************************************************/
  653. /* ACTIVE state. */
  654. /* Socket is connected. It can be used for sending and receiving data. */
  655. /******************************************************************************/
  656. case NN_USOCK_STATE_ACTIVE:
  657. switch (src) {
  658. case NN_USOCK_SRC_FD:
  659. switch (type) {
  660. case NN_WORKER_FD_IN:
  661. sz = usock->in.len;
  662. rc = nn_usock_recv_raw (usock, usock->in.buf, &sz);
  663. if (nn_fast (rc == 0)) {
  664. usock->in.len -= sz;
  665. usock->in.buf += sz;
  666. if (!usock->in.len) {
  667. nn_worker_reset_in (usock->worker, &usock->wfd);
  668. nn_fsm_raise (&usock->fsm, &usock->event_received,
  669. NN_USOCK_RECEIVED);
  670. }
  671. return;
  672. }
  673. errnum_assert (rc == -ECONNRESET, -rc);
  674. goto error;
  675. case NN_WORKER_FD_OUT:
  676. rc = nn_usock_send_raw (usock, &usock->out.hdr);
  677. if (nn_fast (rc == 0)) {
  678. nn_worker_reset_out (usock->worker, &usock->wfd);
  679. nn_fsm_raise (&usock->fsm, &usock->event_sent,
  680. NN_USOCK_SENT);
  681. return;
  682. }
  683. if (nn_fast (rc == -EAGAIN))
  684. return;
  685. errnum_assert (rc == -ECONNRESET, -rc);
  686. goto error;
  687. case NN_WORKER_FD_ERR:
  688. error:
  689. nn_worker_rm_fd (usock->worker, &usock->wfd);
  690. nn_closefd (usock->s);
  691. usock->s = -1;
  692. usock->state = NN_USOCK_STATE_DONE;
  693. nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR);
  694. return;
  695. default:
  696. nn_fsm_bad_action (usock->state, src, type);
  697. }
  698. case NN_FSM_ACTION:
  699. switch (type) {
  700. case NN_USOCK_ACTION_ERROR:
  701. usock->state = NN_USOCK_STATE_REMOVING_FD;
  702. nn_usock_async_stop (usock);
  703. return;
  704. default:
  705. nn_fsm_bad_action (usock->state, src, type);
  706. }
  707. default:
  708. nn_fsm_bad_source(usock->state, src, type);
  709. }
  710. /******************************************************************************/
  711. /* REMOVING_FD state. */
  712. /******************************************************************************/
  713. case NN_USOCK_STATE_REMOVING_FD:
  714. switch (src) {
  715. case NN_USOCK_SRC_TASK_STOP:
  716. switch (type) {
  717. case NN_WORKER_TASK_EXECUTE:
  718. nn_worker_rm_fd (usock->worker, &usock->wfd);
  719. nn_closefd (usock->s);
  720. usock->s = -1;
  721. usock->state = NN_USOCK_STATE_DONE;
  722. nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR);
  723. return;
  724. default:
  725. nn_fsm_bad_action (usock->state, src, type);
  726. }
  727. /* Events from the file descriptor are ignored while it is being
  728. removed. */
  729. case NN_USOCK_SRC_FD:
  730. return;
  731. case NN_FSM_ACTION:
  732. switch (type) {
  733. case NN_USOCK_ACTION_ERROR:
  734. return;
  735. default:
  736. nn_fsm_bad_action (usock->state, src, type);
  737. }
  738. default:
  739. nn_fsm_bad_source (usock->state, src, type);
  740. }
  741. /******************************************************************************/
  742. /* DONE state. */
  743. /* Socket is closed. The only thing that can be done in this state is */
  744. /* stopping the usock. */
  745. /******************************************************************************/
  746. case NN_USOCK_STATE_DONE:
  747. return;
  748. /******************************************************************************/
  749. /* LISTENING state. */
  750. /* Socket is listening for new incoming connections, however, user is not */
  751. /* accepting a new connection. */
  752. /******************************************************************************/
  753. case NN_USOCK_STATE_LISTENING:
  754. switch (src) {
  755. case NN_FSM_ACTION:
  756. switch (type) {
  757. case NN_USOCK_ACTION_ACCEPT:
  758. usock->state = NN_USOCK_STATE_ACCEPTING;
  759. return;
  760. default:
  761. nn_fsm_bad_action (usock->state, src, type);
  762. }
  763. default:
  764. nn_fsm_bad_source (usock->state, src, type);
  765. }
  766. /******************************************************************************/
  767. /* ACCEPTING state. */
  768. /* User is waiting asynchronouslyfor a new inbound connection */
  769. /* to be accepted. */
  770. /******************************************************************************/
  771. case NN_USOCK_STATE_ACCEPTING:
  772. switch (src) {
  773. case NN_FSM_ACTION:
  774. switch (type) {
  775. case NN_USOCK_ACTION_DONE:
  776. usock->state = NN_USOCK_STATE_LISTENING;
  777. return;
  778. case NN_USOCK_ACTION_CANCEL:
  779. usock->state = NN_USOCK_STATE_CANCELLING;
  780. nn_worker_execute (usock->worker, &usock->task_stop);
  781. return;
  782. default:
  783. nn_fsm_bad_action (usock->state, src, type);
  784. }
  785. case NN_USOCK_SRC_FD:
  786. switch (type) {
  787. case NN_WORKER_FD_IN:
  788. /* New connection arrived in asynchronous manner. */
  789. #if NN_HAVE_ACCEPT4
  790. s = accept4 (usock->s, NULL, NULL, SOCK_CLOEXEC);
  791. #else
  792. s = accept (usock->s, NULL, NULL);
  793. #endif
  794. /* ECONNABORTED is an valid error. New connection was closed
  795. by the peer before we were able to accept it. If it happens
  796. do nothing and wait for next incoming connection. */
  797. if (nn_slow (s < 0 && errno == ECONNABORTED))
  798. return;
  799. /* Resource allocation errors. It's not clear from POSIX
  800. specification whether the new connection is closed in this
  801. case or whether it remains in the backlog. In the latter
  802. case it would be wise to wait here for a while to prevent
  803. busy looping. */
  804. if (nn_slow (s < 0 && (errno == ENFILE || errno == EMFILE ||
  805. errno == ENOBUFS || errno == ENOMEM))) {
  806. usock->errnum = errno;
  807. usock->state = NN_USOCK_STATE_ACCEPTING_ERROR;
  808. /* Wait till the user starts accepting once again. */
  809. nn_worker_rm_fd (usock->worker, &usock->wfd);
  810. nn_fsm_raise (&usock->fsm,
  811. &usock->event_error, NN_USOCK_ACCEPT_ERROR);
  812. return;
  813. }
  814. /* Any other error is unexpected. */
  815. errno_assert (s >= 0);
  816. /* Initialise the new usock object. */
  817. nn_usock_init_from_fd (usock->asock, s);
  818. usock->asock->state = NN_USOCK_STATE_ACCEPTED;
  819. /* Notify the user that connection was accepted. */
  820. nn_fsm_raise (&usock->asock->fsm,
  821. &usock->asock->event_established, NN_USOCK_ACCEPTED);
  822. /* Disassociate the listener socket from the accepted
  823. socket. */
  824. usock->asock->asock = NULL;
  825. usock->asock = NULL;
  826. /* Wait till the user starts accepting once again. */
  827. nn_worker_rm_fd (usock->worker, &usock->wfd);
  828. usock->state = NN_USOCK_STATE_LISTENING;
  829. return;
  830. default:
  831. nn_fsm_bad_action (usock->state, src, type);
  832. }
  833. default:
  834. nn_fsm_bad_source (usock->state, src, type);
  835. }
  836. /******************************************************************************/
  837. /* ACCEPTING_ERROR state. */
  838. /* Waiting the socket to accept the error and restart */
  839. /******************************************************************************/
  840. case NN_USOCK_STATE_ACCEPTING_ERROR:
  841. switch (src) {
  842. case NN_FSM_ACTION:
  843. switch (type) {
  844. case NN_USOCK_ACTION_ACCEPT:
  845. usock->state = NN_USOCK_STATE_ACCEPTING;
  846. return;
  847. default:
  848. nn_fsm_bad_action (usock->state, src, type);
  849. }
  850. default:
  851. nn_fsm_bad_source (usock->state, src, type);
  852. }
  853. /******************************************************************************/
  854. /* CANCELLING state. */
  855. /******************************************************************************/
  856. case NN_USOCK_STATE_CANCELLING:
  857. switch (src) {
  858. case NN_USOCK_SRC_TASK_STOP:
  859. switch (type) {
  860. case NN_WORKER_TASK_EXECUTE:
  861. nn_worker_rm_fd (usock->worker, &usock->wfd);
  862. usock->state = NN_USOCK_STATE_LISTENING;
  863. /* Notify the accepted socket that it was stopped. */
  864. nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_DONE);
  865. return;
  866. default:
  867. nn_fsm_bad_action (usock->state, src, type);
  868. }
  869. case NN_USOCK_SRC_FD:
  870. switch (type) {
  871. case NN_WORKER_FD_IN:
  872. return;
  873. default:
  874. nn_fsm_bad_action (usock->state, src, type);
  875. }
  876. default:
  877. nn_fsm_bad_source (usock->state, src, type);
  878. }
  879. /******************************************************************************/
  880. /* Invalid state */
  881. /******************************************************************************/
  882. default:
  883. nn_fsm_bad_state (usock->state, src, type);
  884. }
  885. }
  886. static int nn_usock_send_raw (struct nn_usock *self, struct msghdr *hdr)
  887. {
  888. ssize_t nbytes;
  889. /* Try to send the data. */
  890. #if defined MSG_NOSIGNAL
  891. nbytes = sendmsg (self->s, hdr, MSG_NOSIGNAL);
  892. #else
  893. nbytes = sendmsg (self->s, hdr, 0);
  894. #endif
  895. /* Handle errors. */
  896. if (nn_slow (nbytes < 0)) {
  897. if (nn_fast (errno == EAGAIN || errno == EWOULDBLOCK))
  898. nbytes = 0;
  899. else {
  900. /* If the connection fails, return ECONNRESET. */
  901. return -ECONNRESET;
  902. }
  903. }
  904. /* Some bytes were sent. Adjust the iovecs accordingly. */
  905. while (nbytes) {
  906. if (nbytes >= (ssize_t)hdr->msg_iov->iov_len) {
  907. --hdr->msg_iovlen;
  908. if (!hdr->msg_iovlen) {
  909. nn_assert (nbytes == (ssize_t)hdr->msg_iov->iov_len);
  910. return 0;
  911. }
  912. nbytes -= hdr->msg_iov->iov_len;
  913. ++hdr->msg_iov;
  914. }
  915. else {
  916. *((uint8_t**) &(hdr->msg_iov->iov_base)) += nbytes;
  917. hdr->msg_iov->iov_len -= nbytes;
  918. return -EAGAIN;
  919. }
  920. }
  921. if (hdr->msg_iovlen > 0)
  922. return -EAGAIN;
  923. return 0;
  924. }
  925. static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len)
  926. {
  927. size_t sz;
  928. size_t length;
  929. ssize_t nbytes;
  930. struct iovec iov;
  931. struct msghdr hdr;
  932. unsigned char ctrl [256];
  933. #if defined NN_HAVE_MSG_CONTROL
  934. struct cmsghdr *cmsg;
  935. #endif
  936. /* If batch buffer doesn't exist, allocate it. The point of delayed
  937. deallocation to allow non-receiving sockets, such as TCP listening
  938. sockets, to do without the batch buffer. */
  939. if (nn_slow (!self->in.batch)) {
  940. self->in.batch = nn_alloc (NN_USOCK_BATCH_SIZE, "AIO batch buffer");
  941. alloc_assert (self->in.batch);
  942. }
  943. /* Try to satisfy the recv request by data from the batch buffer. */
  944. length = *len;
  945. sz = self->in.batch_len - self->in.batch_pos;
  946. if (sz) {
  947. if (sz > length)
  948. sz = length;
  949. memcpy (buf, self->in.batch + self->in.batch_pos, sz);
  950. self->in.batch_pos += sz;
  951. buf = ((char*) buf) + sz;
  952. length -= sz;
  953. if (!length)
  954. return 0;
  955. }
  956. /* If recv request is greater than the batch buffer, get the data directly
  957. into the place. Otherwise, read data to the batch buffer. */
  958. if (length > NN_USOCK_BATCH_SIZE) {
  959. iov.iov_base = buf;
  960. iov.iov_len = length;
  961. }
  962. else {
  963. iov.iov_base = self->in.batch;
  964. iov.iov_len = NN_USOCK_BATCH_SIZE;
  965. }
  966. memset (&hdr, 0, sizeof (hdr));
  967. hdr.msg_iov = &iov;
  968. hdr.msg_iovlen = 1;
  969. #if defined NN_HAVE_MSG_CONTROL
  970. hdr.msg_control = ctrl;
  971. hdr.msg_controllen = sizeof (ctrl);
  972. #else
  973. *((int*) ctrl) = -1;
  974. hdr.msg_accrights = ctrl;
  975. hdr.msg_accrightslen = sizeof (int);
  976. #endif
  977. nbytes = recvmsg (self->s, &hdr, 0);
  978. /* Handle any possible errors. */
  979. if (nn_slow (nbytes <= 0)) {
  980. if (nn_slow (nbytes == 0))
  981. return -ECONNRESET;
  982. /* Zero bytes received. */
  983. if (nn_fast (errno == EAGAIN || errno == EWOULDBLOCK))
  984. nbytes = 0;
  985. else {
  986. /* If the peer closes the connection, return ECONNRESET. */
  987. return -ECONNRESET;
  988. }
  989. }
  990. /* Extract the associated file descriptor, if any. */
  991. if (nbytes > 0) {
  992. #if defined NN_HAVE_MSG_CONTROL
  993. cmsg = CMSG_FIRSTHDR (&hdr);
  994. while (cmsg) {
  995. if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
  996. if (self->in.pfd) {
  997. *self->in.pfd = *((int*) CMSG_DATA (cmsg));
  998. self->in.pfd = NULL;
  999. }
  1000. else {
  1001. nn_closefd (*((int*) CMSG_DATA (cmsg)));
  1002. }
  1003. break;
  1004. }
  1005. cmsg = CMSG_NXTHDR (&hdr, cmsg);
  1006. }
  1007. #else
  1008. if (hdr.msg_accrightslen > 0) {
  1009. nn_assert (hdr.msg_accrightslen == sizeof (int));
  1010. if (self->in.pfd) {
  1011. *self->in.pfd = *((int*) hdr.msg_accrights);
  1012. self->in.pfd = NULL;
  1013. }
  1014. else {
  1015. nn_closefd (*((int*) hdr.msg_accrights));
  1016. }
  1017. }
  1018. #endif
  1019. }
  1020. /* If the data were received directly into the place we can return
  1021. straight away. */
  1022. if (length > NN_USOCK_BATCH_SIZE) {
  1023. length -= nbytes;
  1024. *len -= length;
  1025. return 0;
  1026. }
  1027. /* New data were read to the batch buffer. Copy the requested amount of it
  1028. to the user-supplied buffer. */
  1029. self->in.batch_len = nbytes;
  1030. self->in.batch_pos = 0;
  1031. if (nbytes) {
  1032. sz = nbytes > (ssize_t)length ? length : (size_t)nbytes;
  1033. memcpy (buf, self->in.batch, sz);
  1034. length -= sz;
  1035. self->in.batch_pos += sz;
  1036. }
  1037. *len -= length;
  1038. return 0;
  1039. }
  1040. static int nn_usock_geterr (struct nn_usock *self)
  1041. {
  1042. int rc;
  1043. int opt;
  1044. #if defined NN_HAVE_HPUX
  1045. int optsz;
  1046. #else
  1047. socklen_t optsz;
  1048. #endif
  1049. opt = 0;
  1050. optsz = sizeof (opt);
  1051. rc = getsockopt (self->s, SOL_SOCKET, SO_ERROR, &opt, &optsz);
  1052. /* The following should handle both Solaris and UNIXes derived from BSD. */
  1053. if (rc == -1)
  1054. return errno;
  1055. errno_assert (rc == 0);
  1056. nn_assert (optsz == sizeof (opt));
  1057. return opt;
  1058. }