asyncnet.nim 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2017 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements a high-level asynchronous sockets API based on the
  10. ## asynchronous dispatcher defined in the ``asyncdispatch`` module.
  11. ##
  12. ## Asynchronous IO in Nim
  13. ## ======================
  14. ##
  15. ## Async IO in Nim consists of multiple layers (from highest to lowest):
  16. ##
  17. ## * ``asyncnet`` module
  18. ##
  19. ## * Async await
  20. ##
  21. ## * ``asyncdispatch`` module (event loop)
  22. ##
  23. ## * ``selectors`` module
  24. ##
  25. ## Each builds on top of the layers below it. The selectors module is an
  26. ## abstraction for the various system ``select()`` mechanisms such as epoll or
  27. ## kqueue. If you wish you can use it directly, and some people have done so
  28. ## `successfully <http://goran.krampe.se/2014/10/25/nim-socketserver/>`_.
  29. ## But you must be aware that on Windows it only supports
  30. ## ``select()``.
  31. ##
  32. ## The async dispatcher implements the proactor pattern and also has an
  33. ## implementation of IOCP. It implements the proactor pattern for other
  34. ## OS' via the selectors module. Futures are also implemented here, and
  35. ## indeed all the procedures return a future.
  36. ##
  37. ## The final layer is the async await transformation. This allows you to
  38. ## write asynchronous code in a synchronous style and works similar to
  39. ## C#'s await. The transformation works by converting any async procedures
  40. ## into an iterator.
  41. ##
  42. ## This is all single threaded, fully non-blocking and does give you a
  43. ## lot of control. In theory you should be able to work with any of these
  44. ## layers interchangeably (as long as you only care about non-Windows
  45. ## platforms).
  46. ##
  47. ## For most applications using ``asyncnet`` is the way to go as it builds
  48. ## over all the layers, providing some extra features such as buffering.
  49. ##
  50. ## SSL
  51. ## ===
  52. ##
  53. ## SSL can be enabled by compiling with the ``-d:ssl`` flag.
  54. ##
  55. ## You must create a new SSL context with the ``newContext`` function defined
  56. ## in the ``net`` module. You may then call ``wrapSocket`` on your socket using
  57. ## the newly created SSL context to get an SSL socket.
  58. ##
  59. ## Examples
  60. ## ========
  61. ##
  62. ## Chat server
  63. ## -----------
  64. ##
  65. ## The following example demonstrates a simple chat server.
  66. ##
  67. ## .. code-block::nim
  68. ##
  69. ## import asyncnet, asyncdispatch
  70. ##
  71. ## var clients {.threadvar.}: seq[AsyncSocket]
  72. ##
  73. ## proc processClient(client: AsyncSocket) {.async.} =
  74. ## while true:
  75. ## let line = await client.recvLine()
  76. ## if line.len == 0: break
  77. ## for c in clients:
  78. ## await c.send(line & "\c\L")
  79. ##
  80. ## proc serve() {.async.} =
  81. ## clients = @[]
  82. ## var server = newAsyncSocket()
  83. ## server.setSockOpt(OptReuseAddr, true)
  84. ## server.bindAddr(Port(12345))
  85. ## server.listen()
  86. ##
  87. ## while true:
  88. ## let client = await server.accept()
  89. ## clients.add client
  90. ##
  91. ## asyncCheck processClient(client)
  92. ##
  93. ## asyncCheck serve()
  94. ## runForever()
  95. ##
  96. include "system/inclrtl"
  97. import asyncdispatch
  98. import nativesockets
  99. import net
  100. import os
  101. export SOBool
  102. # TODO: Remove duplication introduced by PR #4683.
  103. const defineSsl = defined(ssl) or defined(nimdoc)
  104. when defineSsl:
  105. import openssl
  106. type
  107. # TODO: I would prefer to just do:
  108. # AsyncSocket* {.borrow: `.`.} = distinct Socket. But that doesn't work.
  109. AsyncSocketDesc = object
  110. fd: SocketHandle
  111. closed: bool ## determines whether this socket has been closed
  112. isBuffered: bool ## determines whether this socket is buffered.
  113. buffer: array[0..BufferSize, char]
  114. currPos: int # current index in buffer
  115. bufLen: int # current length of buffer
  116. isSsl: bool
  117. when defineSsl:
  118. sslHandle: SslPtr
  119. sslContext: SslContext
  120. bioIn: BIO
  121. bioOut: BIO
  122. domain: Domain
  123. sockType: SockType
  124. protocol: Protocol
  125. AsyncSocket* = ref AsyncSocketDesc
  126. proc newAsyncSocket*(fd: AsyncFD, domain: Domain = AF_INET,
  127. sockType: SockType = SOCK_STREAM,
  128. protocol: Protocol = IPPROTO_TCP, buffered = true): owned(AsyncSocket) =
  129. ## Creates a new ``AsyncSocket`` based on the supplied params.
  130. ##
  131. ## The supplied ``fd``'s non-blocking state will be enabled implicitly.
  132. ##
  133. ## **Note**: This procedure will **NOT** register ``fd`` with the global
  134. ## async dispatcher. You need to do this manually. If you have used
  135. ## ``newAsyncNativeSocket`` to create ``fd`` then it's already registered.
  136. assert fd != osInvalidSocket.AsyncFD
  137. new(result)
  138. result.fd = fd.SocketHandle
  139. fd.SocketHandle.setBlocking(false)
  140. result.isBuffered = buffered
  141. result.domain = domain
  142. result.sockType = sockType
  143. result.protocol = protocol
  144. if buffered:
  145. result.currPos = 0
  146. proc newAsyncSocket*(domain: Domain = AF_INET, sockType: SockType = SOCK_STREAM,
  147. protocol: Protocol = IPPROTO_TCP, buffered = true): owned(AsyncSocket) =
  148. ## Creates a new asynchronous socket.
  149. ##
  150. ## This procedure will also create a brand new file descriptor for
  151. ## this socket.
  152. let fd = createAsyncNativeSocket(domain, sockType, protocol)
  153. if fd.SocketHandle == osInvalidSocket:
  154. raiseOSError(osLastError())
  155. result = newAsyncSocket(fd, domain, sockType, protocol, buffered)
  156. proc getLocalAddr*(socket: AsyncSocket): (string, Port) =
  157. ## Get the socket's local address and port number.
  158. ##
  159. ## This is high-level interface for `getsockname`:idx:.
  160. getLocalAddr(socket.fd, socket.domain)
  161. proc getPeerAddr*(socket: AsyncSocket): (string, Port) =
  162. ## Get the socket's peer address and port number.
  163. ##
  164. ## This is high-level interface for `getpeername`:idx:.
  165. getPeerAddr(socket.fd, socket.domain)
  166. proc newAsyncSocket*(domain, sockType, protocol: cint,
  167. buffered = true): owned(AsyncSocket) =
  168. ## Creates a new asynchronous socket.
  169. ##
  170. ## This procedure will also create a brand new file descriptor for
  171. ## this socket.
  172. let fd = createAsyncNativeSocket(domain, sockType, protocol)
  173. if fd.SocketHandle == osInvalidSocket:
  174. raiseOSError(osLastError())
  175. result = newAsyncSocket(fd, Domain(domain), SockType(sockType),
  176. Protocol(protocol), buffered)
  177. when defineSsl:
  178. proc getSslError(handle: SslPtr, err: cint): cint =
  179. assert err < 0
  180. var ret = SSL_get_error(handle, err.cint)
  181. case ret
  182. of SSL_ERROR_ZERO_RETURN:
  183. raiseSSLError("TLS/SSL connection failed to initiate, socket closed prematurely.")
  184. of SSL_ERROR_WANT_CONNECT, SSL_ERROR_WANT_ACCEPT:
  185. return ret
  186. of SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_READ:
  187. return ret
  188. of SSL_ERROR_WANT_X509_LOOKUP:
  189. raiseSSLError("Function for x509 lookup has been called.")
  190. of SSL_ERROR_SYSCALL, SSL_ERROR_SSL:
  191. raiseSSLError()
  192. else: raiseSSLError("Unknown Error")
  193. proc sendPendingSslData(socket: AsyncSocket,
  194. flags: set[SocketFlag]) {.async.} =
  195. let len = bioCtrlPending(socket.bioOut)
  196. if len > 0:
  197. var data = newString(len)
  198. let read = bioRead(socket.bioOut, addr data[0], len)
  199. assert read != 0
  200. if read < 0:
  201. raiseSSLError()
  202. data.setLen(read)
  203. await socket.fd.AsyncFD.send(data, flags)
  204. proc appeaseSsl(socket: AsyncSocket, flags: set[SocketFlag],
  205. sslError: cint): owned(Future[bool]) {.async.} =
  206. ## Returns ``true`` if ``socket`` is still connected, otherwise ``false``.
  207. result = true
  208. case sslError
  209. of SSL_ERROR_WANT_WRITE:
  210. await sendPendingSslData(socket, flags)
  211. of SSL_ERROR_WANT_READ:
  212. var data = await recv(socket.fd.AsyncFD, BufferSize, flags)
  213. let length = len(data)
  214. if length > 0:
  215. let ret = bioWrite(socket.bioIn, addr data[0], length.cint)
  216. if ret < 0:
  217. raiseSSLError()
  218. elif length == 0:
  219. # connection not properly closed by remote side or connection dropped
  220. SSL_set_shutdown(socket.sslHandle, SSL_RECEIVED_SHUTDOWN)
  221. result = false
  222. else:
  223. raiseSSLError("Cannot appease SSL.")
  224. template sslLoop(socket: AsyncSocket, flags: set[SocketFlag],
  225. op: untyped) =
  226. var opResult {.inject.} = -1.cint
  227. while opResult < 0:
  228. # Call the desired operation.
  229. opResult = op
  230. # Bit hackish here.
  231. # TODO: Introduce an async template transformation pragma?
  232. # Send any remaining pending SSL data.
  233. yield sendPendingSslData(socket, flags)
  234. # If the operation failed, try to see if SSL has some data to read
  235. # or write.
  236. if opResult < 0:
  237. let err = getSslError(socket.sslHandle, opResult.cint)
  238. let fut = appeaseSsl(socket, flags, err.cint)
  239. yield fut
  240. if not fut.read():
  241. # Socket disconnected.
  242. if SocketFlag.SafeDisconn in flags:
  243. opResult = 0.cint
  244. break
  245. else:
  246. raiseSSLError("Socket has been disconnected")
  247. proc dial*(address: string, port: Port, protocol = IPPROTO_TCP,
  248. buffered = true): owned(Future[AsyncSocket]) {.async.} =
  249. ## Establishes connection to the specified ``address``:``port`` pair via the
  250. ## specified protocol. The procedure iterates through possible
  251. ## resolutions of the ``address`` until it succeeds, meaning that it
  252. ## seamlessly works with both IPv4 and IPv6.
  253. ## Returns AsyncSocket ready to send or receive data.
  254. let asyncFd = await asyncdispatch.dial(address, port, protocol)
  255. let sockType = protocol.toSockType()
  256. let domain = getSockDomain(asyncFd.SocketHandle)
  257. result = newAsyncSocket(asyncFd, domain, sockType, protocol, buffered)
  258. proc connect*(socket: AsyncSocket, address: string, port: Port) {.async.} =
  259. ## Connects ``socket`` to server at ``address:port``.
  260. ##
  261. ## Returns a ``Future`` which will complete when the connection succeeds
  262. ## or an error occurs.
  263. await connect(socket.fd.AsyncFD, address, port, socket.domain)
  264. if socket.isSsl:
  265. when defineSsl:
  266. if not isIpAddress(address):
  267. # Set the SNI address for this connection. This call can fail if
  268. # we're not using TLSv1+.
  269. discard SSL_set_tlsext_host_name(socket.sslHandle, address)
  270. let flags = {SocketFlag.SafeDisconn}
  271. sslSetConnectState(socket.sslHandle)
  272. sslLoop(socket, flags, sslDoHandshake(socket.sslHandle))
  273. template readInto(buf: pointer, size: int, socket: AsyncSocket,
  274. flags: set[SocketFlag]): int =
  275. ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``. Note that
  276. ## this is a template and not a proc.
  277. assert(not socket.closed, "Cannot `recv` on a closed socket")
  278. var res = 0
  279. if socket.isSsl:
  280. when defineSsl:
  281. # SSL mode.
  282. sslLoop(socket, flags,
  283. sslRead(socket.sslHandle, cast[cstring](buf), size.cint))
  284. res = opResult
  285. else:
  286. var recvIntoFut = asyncdispatch.recvInto(socket.fd.AsyncFD, buf, size, flags)
  287. yield recvIntoFut
  288. # Not in SSL mode.
  289. res = recvIntoFut.read()
  290. res
  291. template readIntoBuf(socket: AsyncSocket,
  292. flags: set[SocketFlag]): int =
  293. var size = readInto(addr socket.buffer[0], BufferSize, socket, flags)
  294. socket.currPos = 0
  295. socket.bufLen = size
  296. size
  297. proc recvInto*(socket: AsyncSocket, buf: pointer, size: int,
  298. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) {.async.} =
  299. ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``.
  300. ##
  301. ## For buffered sockets this function will attempt to read all the requested
  302. ## data. It will read this data in ``BufferSize`` chunks.
  303. ##
  304. ## For unbuffered sockets this function makes no effort to read
  305. ## all the data requested. It will return as much data as the operating system
  306. ## gives it.
  307. ##
  308. ## If socket is disconnected during the
  309. ## recv operation then the future may complete with only a part of the
  310. ## requested data.
  311. ##
  312. ## If socket is disconnected and no data is available
  313. ## to be read then the future will complete with a value of ``0``.
  314. if socket.isBuffered:
  315. let originalBufPos = socket.currPos
  316. if socket.bufLen == 0:
  317. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  318. if res == 0:
  319. return 0
  320. var read = 0
  321. var cbuf = cast[cstring](buf)
  322. while read < size:
  323. if socket.currPos >= socket.bufLen:
  324. if SocketFlag.Peek in flags:
  325. # We don't want to get another buffer if we're peeking.
  326. break
  327. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  328. if res == 0:
  329. break
  330. let chunk = min(socket.bufLen-socket.currPos, size-read)
  331. copyMem(addr(cbuf[read]), addr(socket.buffer[socket.currPos]), chunk)
  332. read.inc(chunk)
  333. socket.currPos.inc(chunk)
  334. if SocketFlag.Peek in flags:
  335. # Restore old buffer cursor position.
  336. socket.currPos = originalBufPos
  337. result = read
  338. else:
  339. result = readInto(buf, size, socket, flags)
  340. proc recv*(socket: AsyncSocket, size: int,
  341. flags = {SocketFlag.SafeDisconn}): owned(Future[string]) {.async.} =
  342. ## Reads **up to** ``size`` bytes from ``socket``.
  343. ##
  344. ## For buffered sockets this function will attempt to read all the requested
  345. ## data. It will read this data in ``BufferSize`` chunks.
  346. ##
  347. ## For unbuffered sockets this function makes no effort to read
  348. ## all the data requested. It will return as much data as the operating system
  349. ## gives it.
  350. ##
  351. ## If socket is disconnected during the
  352. ## recv operation then the future may complete with only a part of the
  353. ## requested data.
  354. ##
  355. ## If socket is disconnected and no data is available
  356. ## to be read then the future will complete with a value of ``""``.
  357. if socket.isBuffered:
  358. result = newString(size)
  359. shallow(result)
  360. let originalBufPos = socket.currPos
  361. if socket.bufLen == 0:
  362. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  363. if res == 0:
  364. result.setLen(0)
  365. return
  366. var read = 0
  367. while read < size:
  368. if socket.currPos >= socket.bufLen:
  369. if SocketFlag.Peek in flags:
  370. # We don't want to get another buffer if we're peeking.
  371. break
  372. let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
  373. if res == 0:
  374. break
  375. let chunk = min(socket.bufLen-socket.currPos, size-read)
  376. copyMem(addr(result[read]), addr(socket.buffer[socket.currPos]), chunk)
  377. read.inc(chunk)
  378. socket.currPos.inc(chunk)
  379. if SocketFlag.Peek in flags:
  380. # Restore old buffer cursor position.
  381. socket.currPos = originalBufPos
  382. result.setLen(read)
  383. else:
  384. result = newString(size)
  385. let read = readInto(addr result[0], size, socket, flags)
  386. result.setLen(read)
  387. proc send*(socket: AsyncSocket, buf: pointer, size: int,
  388. flags = {SocketFlag.SafeDisconn}) {.async.} =
  389. ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
  390. ## data has been sent.
  391. assert socket != nil
  392. assert(not socket.closed, "Cannot `send` on a closed socket")
  393. if socket.isSsl:
  394. when defineSsl:
  395. sslLoop(socket, flags,
  396. sslWrite(socket.sslHandle, cast[cstring](buf), size.cint))
  397. await sendPendingSslData(socket, flags)
  398. else:
  399. await send(socket.fd.AsyncFD, buf, size, flags)
  400. proc send*(socket: AsyncSocket, data: string,
  401. flags = {SocketFlag.SafeDisconn}) {.async.} =
  402. ## Sends ``data`` to ``socket``. The returned future will complete once all
  403. ## data has been sent.
  404. assert socket != nil
  405. if socket.isSsl:
  406. when defineSsl:
  407. var copy = data
  408. sslLoop(socket, flags,
  409. sslWrite(socket.sslHandle, addr copy[0], copy.len.cint))
  410. await sendPendingSslData(socket, flags)
  411. else:
  412. await send(socket.fd.AsyncFD, data, flags)
  413. proc acceptAddr*(socket: AsyncSocket, flags = {SocketFlag.SafeDisconn}):
  414. owned(Future[tuple[address: string, client: AsyncSocket]]) =
  415. ## Accepts a new connection. Returns a future containing the client socket
  416. ## corresponding to that connection and the remote address of the client.
  417. ## The future will complete when the connection is successfully accepted.
  418. var retFuture = newFuture[tuple[address: string, client: AsyncSocket]]("asyncnet.acceptAddr")
  419. var fut = acceptAddr(socket.fd.AsyncFD, flags)
  420. fut.callback =
  421. proc (future: Future[tuple[address: string, client: AsyncFD]]) =
  422. assert future.finished
  423. if future.failed:
  424. retFuture.fail(future.readError)
  425. else:
  426. let resultTup = (future.read.address,
  427. newAsyncSocket(future.read.client, socket.domain,
  428. socket.sockType, socket.protocol, socket.isBuffered))
  429. retFuture.complete(resultTup)
  430. return retFuture
  431. proc accept*(socket: AsyncSocket,
  432. flags = {SocketFlag.SafeDisconn}): owned(Future[AsyncSocket]) =
  433. ## Accepts a new connection. Returns a future containing the client socket
  434. ## corresponding to that connection.
  435. ## The future will complete when the connection is successfully accepted.
  436. var retFut = newFuture[AsyncSocket]("asyncnet.accept")
  437. var fut = acceptAddr(socket, flags)
  438. fut.callback =
  439. proc (future: Future[tuple[address: string, client: AsyncSocket]]) =
  440. assert future.finished
  441. if future.failed:
  442. retFut.fail(future.readError)
  443. else:
  444. retFut.complete(future.read.client)
  445. return retFut
  446. proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string],
  447. flags = {SocketFlag.SafeDisconn}, maxLength = MaxLineLength) {.async.} =
  448. ## Reads a line of data from ``socket`` into ``resString``.
  449. ##
  450. ## If a full line is read ``\r\L`` is not
  451. ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  452. ## will be set to it.
  453. ##
  454. ## If the socket is disconnected, ``line`` will be set to ``""``.
  455. ##
  456. ## If the socket is disconnected in the middle of a line (before ``\r\L``
  457. ## is read) then line will be set to ``""``.
  458. ## The partial line **will be lost**.
  459. ##
  460. ## The ``maxLength`` parameter determines the maximum amount of characters
  461. ## that can be read. ``resString`` will be truncated after that.
  462. ##
  463. ## **Warning**: The ``Peek`` flag is not yet implemented.
  464. ##
  465. ## **Warning**: ``recvLineInto`` on unbuffered sockets assumes that the
  466. ## protocol uses ``\r\L`` to delimit a new line.
  467. assert SocketFlag.Peek notin flags ## TODO:
  468. result = newFuture[void]("asyncnet.recvLineInto")
  469. # TODO: Make the async transformation check for FutureVar params and complete
  470. # them when the result future is completed.
  471. # Can we replace the result future with the FutureVar?
  472. template addNLIfEmpty(): untyped =
  473. if resString.mget.len == 0:
  474. resString.mget.add("\c\L")
  475. if socket.isBuffered:
  476. if socket.bufLen == 0:
  477. let res = socket.readIntoBuf(flags)
  478. if res == 0:
  479. resString.complete()
  480. return
  481. var lastR = false
  482. while true:
  483. if socket.currPos >= socket.bufLen:
  484. let res = socket.readIntoBuf(flags)
  485. if res == 0:
  486. resString.mget.setLen(0)
  487. resString.complete()
  488. return
  489. case socket.buffer[socket.currPos]
  490. of '\r':
  491. lastR = true
  492. addNLIfEmpty()
  493. of '\L':
  494. addNLIfEmpty()
  495. socket.currPos.inc()
  496. resString.complete()
  497. return
  498. else:
  499. if lastR:
  500. socket.currPos.inc()
  501. resString.complete()
  502. return
  503. else:
  504. resString.mget.add socket.buffer[socket.currPos]
  505. socket.currPos.inc()
  506. # Verify that this isn't a DOS attack: #3847.
  507. if resString.mget.len > maxLength: break
  508. else:
  509. var c = ""
  510. while true:
  511. c = await recv(socket, 1, flags)
  512. if c.len == 0:
  513. resString.mget.setLen(0)
  514. resString.complete()
  515. return
  516. if c == "\r":
  517. c = await recv(socket, 1, flags) # Skip \L
  518. assert c == "\L"
  519. addNLIfEmpty()
  520. resString.complete()
  521. return
  522. elif c == "\L":
  523. addNLIfEmpty()
  524. resString.complete()
  525. return
  526. resString.mget.add c
  527. # Verify that this isn't a DOS attack: #3847.
  528. if resString.mget.len > maxLength: break
  529. resString.complete()
  530. proc recvLine*(socket: AsyncSocket,
  531. flags = {SocketFlag.SafeDisconn},
  532. maxLength = MaxLineLength): owned(Future[string]) {.async.} =
  533. ## Reads a line of data from ``socket``. Returned future will complete once
  534. ## a full line is read or an error occurs.
  535. ##
  536. ## If a full line is read ``\r\L`` is not
  537. ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  538. ## will be set to it.
  539. ##
  540. ## If the socket is disconnected, ``line`` will be set to ``""``.
  541. ##
  542. ## If the socket is disconnected in the middle of a line (before ``\r\L``
  543. ## is read) then line will be set to ``""``.
  544. ## The partial line **will be lost**.
  545. ##
  546. ## The ``maxLength`` parameter determines the maximum amount of characters
  547. ## that can be read. The result is truncated after that.
  548. ##
  549. ## **Warning**: The ``Peek`` flag is not yet implemented.
  550. ##
  551. ## **Warning**: ``recvLine`` on unbuffered sockets assumes that the protocol
  552. ## uses ``\r\L`` to delimit a new line.
  553. assert SocketFlag.Peek notin flags ## TODO:
  554. # TODO: Optimise this
  555. var resString = newFutureVar[string]("asyncnet.recvLine")
  556. resString.mget() = ""
  557. await socket.recvLineInto(resString, flags, maxLength)
  558. result = resString.mget()
  559. proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [
  560. ReadIOEffect].} =
  561. ## Marks ``socket`` as accepting connections.
  562. ## ``Backlog`` specifies the maximum length of the
  563. ## queue of pending connections.
  564. ##
  565. ## Raises an OSError error upon failure.
  566. if listen(socket.fd, backlog) < 0'i32: raiseOSError(osLastError())
  567. proc bindAddr*(socket: AsyncSocket, port = Port(0), address = "") {.
  568. tags: [ReadIOEffect].} =
  569. ## Binds ``address``:``port`` to the socket.
  570. ##
  571. ## If ``address`` is "" then ADDR_ANY will be bound.
  572. var realaddr = address
  573. if realaddr == "":
  574. case socket.domain
  575. of AF_INET6: realaddr = "::"
  576. of AF_INET: realaddr = "0.0.0.0"
  577. else:
  578. raise newException(ValueError,
  579. "Unknown socket address family and no address specified to bindAddr")
  580. var aiList = getAddrInfo(realaddr, port, socket.domain)
  581. if bindAddr(socket.fd, aiList.ai_addr, aiList.ai_addrlen.SockLen) < 0'i32:
  582. freeaddrinfo(aiList)
  583. raiseOSError(osLastError())
  584. freeaddrinfo(aiList)
  585. when defined(posix):
  586. proc connectUnix*(socket: AsyncSocket, path: string): owned(Future[void]) =
  587. ## Binds Unix socket to `path`.
  588. ## This only works on Unix-style systems: Mac OS X, BSD and Linux
  589. when not defined(nimdoc):
  590. let retFuture = newFuture[void]("connectUnix")
  591. result = retFuture
  592. proc cb(fd: AsyncFD): bool =
  593. let ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
  594. if ret == 0:
  595. retFuture.complete()
  596. return true
  597. elif ret == EINTR:
  598. return false
  599. else:
  600. retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
  601. return true
  602. var socketAddr = makeUnixAddr(path)
  603. let ret = socket.fd.connect(cast[ptr SockAddr](addr socketAddr),
  604. (sizeof(socketAddr.sun_family) + path.len).SockLen)
  605. if ret == 0:
  606. # Request to connect completed immediately.
  607. retFuture.complete()
  608. else:
  609. let lastError = osLastError()
  610. if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
  611. addWrite(AsyncFD(socket.fd), cb)
  612. else:
  613. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  614. proc bindUnix*(socket: AsyncSocket, path: string) {.
  615. tags: [ReadIOEffect].} =
  616. ## Binds Unix socket to `path`.
  617. ## This only works on Unix-style systems: Mac OS X, BSD and Linux
  618. when not defined(nimdoc):
  619. var socketAddr = makeUnixAddr(path)
  620. if socket.fd.bindAddr(cast[ptr SockAddr](addr socketAddr),
  621. (sizeof(socketAddr.sun_family) + path.len).SockLen) != 0'i32:
  622. raiseOSError(osLastError())
  623. elif defined(nimdoc):
  624. proc connectUnix*(socket: AsyncSocket, path: string): owned(Future[void]) =
  625. ## Binds Unix socket to `path`.
  626. ## This only works on Unix-style systems: Mac OS X, BSD and Linux
  627. discard
  628. proc bindUnix*(socket: AsyncSocket, path: string) =
  629. ## Binds Unix socket to `path`.
  630. ## This only works on Unix-style systems: Mac OS X, BSD and Linux
  631. discard
  632. proc close*(socket: AsyncSocket) =
  633. ## Closes the socket.
  634. defer:
  635. socket.fd.AsyncFD.closeSocket()
  636. when defineSsl:
  637. if socket.isSsl:
  638. let res = SSL_shutdown(socket.sslHandle)
  639. SSL_free(socket.sslHandle)
  640. if res == 0:
  641. discard
  642. elif res != 1:
  643. raiseSSLError()
  644. socket.closed = true # TODO: Add extra debugging checks for this.
  645. when defineSsl:
  646. proc wrapSocket*(ctx: SslContext, socket: AsyncSocket) =
  647. ## Wraps a socket in an SSL context. This function effectively turns
  648. ## ``socket`` into an SSL socket.
  649. ##
  650. ## **Disclaimer**: This code is not well tested, may be very unsafe and
  651. ## prone to security vulnerabilities.
  652. socket.isSsl = true
  653. socket.sslContext = ctx
  654. socket.sslHandle = SSL_new(socket.sslContext.context)
  655. if socket.sslHandle == nil:
  656. raiseSSLError()
  657. socket.bioIn = bioNew(bioSMem())
  658. socket.bioOut = bioNew(bioSMem())
  659. sslSetBio(socket.sslHandle, socket.bioIn, socket.bioOut)
  660. proc wrapConnectedSocket*(ctx: SslContext, socket: AsyncSocket,
  661. handshake: SslHandshakeType,
  662. hostname: string = "") =
  663. ## Wraps a connected socket in an SSL context. This function effectively
  664. ## turns ``socket`` into an SSL socket.
  665. ## ``hostname`` should be specified so that the client knows which hostname
  666. ## the server certificate should be validated against.
  667. ##
  668. ## This should be called on a connected socket, and will perform
  669. ## an SSL handshake immediately.
  670. ##
  671. ## **Disclaimer**: This code is not well tested, may be very unsafe and
  672. ## prone to security vulnerabilities.
  673. wrapSocket(ctx, socket)
  674. case handshake
  675. of handshakeAsClient:
  676. if hostname.len > 0 and not isIpAddress(hostname):
  677. # Set the SNI address for this connection. This call can fail if
  678. # we're not using TLSv1+.
  679. discard SSL_set_tlsext_host_name(socket.sslHandle, hostname)
  680. sslSetConnectState(socket.sslHandle)
  681. of handshakeAsServer:
  682. sslSetAcceptState(socket.sslHandle)
  683. proc getPeerCertificates*(socket: AsyncSocket): seq[Certificate] {.since: (1, 1).} =
  684. ## Returns the certificate chain received by the peer we are connected to
  685. ## through the given socket.
  686. ## The handshake must have been completed and the certificate chain must
  687. ## have been verified successfully or else an empty sequence is returned.
  688. ## The chain is ordered from leaf certificate to root certificate.
  689. if not socket.isSsl:
  690. result = newSeq[Certificate]()
  691. else:
  692. result = getPeerCertificates(socket.sslHandle)
  693. proc getSockOpt*(socket: AsyncSocket, opt: SOBool, level = SOL_SOCKET): bool {.
  694. tags: [ReadIOEffect].} =
  695. ## Retrieves option ``opt`` as a boolean value.
  696. var res = getSockOptInt(socket.fd, cint(level), toCInt(opt))
  697. result = res != 0
  698. proc setSockOpt*(socket: AsyncSocket, opt: SOBool, value: bool,
  699. level = SOL_SOCKET) {.tags: [WriteIOEffect].} =
  700. ## Sets option ``opt`` to a boolean value specified by ``value``.
  701. var valuei = cint(if value: 1 else: 0)
  702. setSockOptInt(socket.fd, cint(level), toCInt(opt), valuei)
  703. proc isSsl*(socket: AsyncSocket): bool =
  704. ## Determines whether ``socket`` is a SSL socket.
  705. socket.isSsl
  706. proc getFd*(socket: AsyncSocket): SocketHandle =
  707. ## Returns the socket's file descriptor.
  708. return socket.fd
  709. proc isClosed*(socket: AsyncSocket): bool =
  710. ## Determines whether the socket has been closed.
  711. return socket.closed
  712. when not defined(testing) and isMainModule:
  713. type
  714. TestCases = enum
  715. HighClient, LowClient, LowServer
  716. const test = HighClient
  717. when test == HighClient:
  718. proc main() {.async.} =
  719. var sock = newAsyncSocket()
  720. await sock.connect("irc.freenode.net", Port(6667))
  721. while true:
  722. let line = await sock.recvLine()
  723. if line == "":
  724. echo("Disconnected")
  725. break
  726. else:
  727. echo("Got line: ", line)
  728. asyncCheck main()
  729. elif test == LowClient:
  730. var sock = newAsyncSocket()
  731. var f = connect(sock, "irc.freenode.net", Port(6667))
  732. f.callback =
  733. proc (future: Future[void]) =
  734. echo("Connected in future!")
  735. for i in 0 .. 50:
  736. var recvF = recv(sock, 10)
  737. recvF.callback =
  738. proc (future: Future[string]) =
  739. echo("Read ", future.read.len, ": ", future.read.repr)
  740. elif test == LowServer:
  741. var sock = newAsyncSocket()
  742. sock.bindAddr(Port(6667))
  743. sock.listen()
  744. proc onAccept(future: Future[AsyncSocket]) =
  745. let client = future.read
  746. echo "Accepted ", client.fd.cint
  747. var t = send(client, "test\c\L")
  748. t.callback =
  749. proc (future: Future[void]) =
  750. echo("Send")
  751. client.close()
  752. var f = accept(sock)
  753. f.callback = onAccept
  754. var f = accept(sock)
  755. f.callback = onAccept
  756. runForever()