asyncdispatch.nim 76 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055
  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2015 Dominik Picheta
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. ## This module implements asynchronous IO. This includes a dispatcher,
  10. ## a `Future` type implementation, and an `async` macro which allows
  11. ## asynchronous code to be written in a synchronous style with the `await`
  12. ## keyword.
  13. ##
  14. ## The dispatcher acts as a kind of event loop. You must call `poll` on it
  15. ## (or a function which does so for you such as `waitFor` or `runForever`)
  16. ## in order to poll for any outstanding events. The underlying implementation
  17. ## is based on epoll on Linux, IO Completion Ports on Windows and select on
  18. ## other operating systems.
  19. ##
  20. ## The `poll` function will not, on its own, return any events. Instead
  21. ## an appropriate `Future` object will be completed. A `Future` is a
  22. ## type which holds a value which is not yet available, but which *may* be
  23. ## available in the future. You can check whether a future is finished
  24. ## by using the `finished` function. When a future is finished it means that
  25. ## either the value that it holds is now available or it holds an error instead.
  26. ## The latter situation occurs when the operation to complete a future fails
  27. ## with an exception. You can distinguish between the two situations with the
  28. ## `failed` function.
  29. ##
  30. ## Future objects can also store a callback procedure which will be called
  31. ## automatically once the future completes.
  32. ##
  33. ## Futures therefore can be thought of as an implementation of the proactor
  34. ## pattern. In this
  35. ## pattern you make a request for an action, and once that action is fulfilled
  36. ## a future is completed with the result of that action. Requests can be
  37. ## made by calling the appropriate functions. For example: calling the `recv`
  38. ## function will create a request for some data to be read from a socket. The
  39. ## future which the `recv` function returns will then complete once the
  40. ## requested amount of data is read **or** an exception occurs.
  41. ##
  42. ## Code to read some data from a socket may look something like this:
  43. ## ```Nim
  44. ## var future = socket.recv(100)
  45. ## future.addCallback(
  46. ## proc () =
  47. ## echo(future.read)
  48. ## )
  49. ## ```
  50. ##
  51. ## All asynchronous functions returning a `Future` will not block. They
  52. ## will not however return immediately. An asynchronous function will have
  53. ## code which will be executed before an asynchronous request is made, in most
  54. ## cases this code sets up the request.
  55. ##
  56. ## In the above example, the `recv` function will return a brand new
  57. ## `Future` instance once the request for data to be read from the socket
  58. ## is made. This `Future` instance will complete once the requested amount
  59. ## of data is read, in this case it is 100 bytes. The second line sets a
  60. ## callback on this future which will be called once the future completes.
  61. ## All the callback does is write the data stored in the future to `stdout`.
  62. ## The `read` function is used for this and it checks whether the future
  63. ## completes with an error for you (if it did, it will simply raise the
  64. ## error), if there is no error, however, it returns the value of the future.
  65. ##
  66. ## Asynchronous procedures
  67. ## =======================
  68. ##
  69. ## Asynchronous procedures remove the pain of working with callbacks. They do
  70. ## this by allowing you to write asynchronous code the same way as you would
  71. ## write synchronous code.
  72. ##
  73. ## An asynchronous procedure is marked using the `{.async.}` pragma.
  74. ## When marking a procedure with the `{.async.}` pragma it must have a
  75. ## `Future[T]` return type or no return type at all. If you do not specify
  76. ## a return type then `Future[void]` is assumed.
  77. ##
  78. ## Inside asynchronous procedures `await` can be used to call any
  79. ## procedures which return a
  80. ## `Future`; this includes asynchronous procedures. When a procedure is
  81. ## "awaited", the asynchronous procedure it is awaited in will
  82. ## suspend its execution
  83. ## until the awaited procedure's Future completes. At which point the
  84. ## asynchronous procedure will resume its execution. During the period
  85. ## when an asynchronous procedure is suspended other asynchronous procedures
  86. ## will be run by the dispatcher.
  87. ##
  88. ## The `await` call may be used in many contexts. It can be used on the right
  89. ## hand side of a variable declaration: `var data = await socket.recv(100)`,
  90. ## in which case the variable will be set to the value of the future
  91. ## automatically. It can be used to await a `Future` object, and it can
  92. ## be used to await a procedure returning a `Future[void]`:
  93. ## `await socket.send("foobar")`.
  94. ##
  95. ## If an awaited future completes with an error, then `await` will re-raise
  96. ## this error. To avoid this, you can use the `yield` keyword instead of
  97. ## `await`. The following section shows different ways that you can handle
  98. ## exceptions in async procs.
  99. ##
  100. ## .. caution::
  101. ## Procedures marked {.async.} do not support mutable parameters such
  102. ## as `var int`. References such as `ref int` should be used instead.
  103. ##
  104. ## Handling Exceptions
  105. ## -------------------
  106. ##
  107. ## You can handle exceptions in the same way as in ordinary Nim code;
  108. ## by using the try statement:
  109. ##
  110. ## ```Nim
  111. ## try:
  112. ## let data = await sock.recv(100)
  113. ## echo("Received ", data)
  114. ## except:
  115. ## # Handle exception
  116. ## ```
  117. ##
  118. ## An alternative approach to handling exceptions is to use `yield` on a future
  119. ## then check the future's `failed` property. For example:
  120. ##
  121. ## ```Nim
  122. ## var future = sock.recv(100)
  123. ## yield future
  124. ## if future.failed:
  125. ## # Handle exception
  126. ## ```
  127. ##
  128. ##
  129. ## Discarding futures
  130. ## ==================
  131. ##
  132. ## Futures should **never** be discarded directly because they may contain
  133. ## errors. If you do not care for the result of a Future then you should use
  134. ## the `asyncCheck` procedure instead of the `discard` keyword. Note that this
  135. ## does not wait for completion, and you should use `waitFor` or `await` for that purpose.
  136. ##
  137. ## .. note:: `await` also checks if the future fails, so you can safely discard
  138. ## its result.
  139. ##
  140. ## Handling futures
  141. ## ================
  142. ##
  143. ## There are many different operations that apply to a future.
  144. ## The three primary high-level operations are `asyncCheck`,
  145. ## `waitFor`, and `await`.
  146. ##
  147. ## * `asyncCheck`: Raises an exception if the future fails. It neither waits
  148. ## for the future to finish nor returns the result of the future.
  149. ## * `waitFor`: Polls the event loop and blocks the current thread until the
  150. ## future finishes. This is often used to call an async procedure from a
  151. ## synchronous context and should never be used in an `async` proc.
  152. ## * `await`: Pauses execution in the current async procedure until the future
  153. ## finishes. While the current procedure is paused, other async procedures will
  154. ## continue running. Should be used instead of `waitFor` in an async
  155. ## procedure.
  156. ##
  157. ## Here is a handy quick reference chart showing their high-level differences:
  158. ## ============== ===================== =======================
  159. ## Procedure Context Blocking
  160. ## ============== ===================== =======================
  161. ## `asyncCheck` non-async and async non-blocking
  162. ## `waitFor` non-async blocks current thread
  163. ## `await` async suspends current proc
  164. ## ============== ===================== =======================
  165. ##
  166. ## Examples
  167. ## ========
  168. ##
  169. ## For examples take a look at the documentation for the modules implementing
  170. ## asynchronous IO. A good place to start is the
  171. ## `asyncnet module <asyncnet.html>`_.
  172. ##
  173. ## Investigating pending futures
  174. ## =============================
  175. ##
  176. ## It's possible to get into a situation where an async proc, or more accurately
  177. ## a `Future[T]` gets stuck and
  178. ## never completes. This can happen for various reasons and can cause serious
  179. ## memory leaks. When this occurs it's hard to identify the procedure that is
  180. ## stuck.
  181. ##
  182. ## Thankfully there is a mechanism which tracks the count of each pending future.
  183. ## All you need to do to enable it is compile with `-d:futureLogging` and
  184. ## use the `getFuturesInProgress` procedure to get the list of pending futures
  185. ## together with the stack traces to the moment of their creation.
  186. ##
  187. ## You may also find it useful to use this
  188. ## `prometheus package <https://github.com/dom96/prometheus>`_ which will log
  189. ## the pending futures into prometheus, allowing you to analyse them via a nice
  190. ## graph.
  191. ##
  192. ##
  193. ##
  194. ## Limitations/Bugs
  195. ## ================
  196. ##
  197. ## * The effect system (`raises: []`) does not work with async procedures.
  198. ## * Mutable parameters are not supported by async procedures.
  199. ##
  200. ##
  201. ## Multiple async backend support
  202. ## ==============================
  203. ##
  204. ## Thanks to its powerful macro support, Nim allows ``async``/``await`` to be
  205. ## implemented in libraries with only minimal support from the language - as
  206. ## such, multiple ``async`` libraries exist, including ``asyncdispatch`` and
  207. ## ``chronos``, and more may come to be developed in the future.
  208. ##
  209. ## Libraries built on top of async/await may wish to support multiple async
  210. ## backends - the best way to do so is to create separate modules for each backend
  211. ## that may be imported side-by-side.
  212. ##
  213. ## An alternative way is to select backend using a global compile flag - this
  214. ## method makes it difficult to compose applications that use both backends as may
  215. ## happen with transitive dependencies, but may be appropriate in some cases -
  216. ## libraries choosing this path should call the flag `asyncBackend`, allowing
  217. ## applications to choose the backend with `-d:asyncBackend=<backend_name>`.
  218. ##
  219. ## Known `async` backends include:
  220. ##
  221. ## * `-d:asyncBackend=none`: disable `async` support completely
  222. ## * `-d:asyncBackend=asyncdispatch`: https://nim-lang.org/docs/asyncdispatch.html
  223. ## * `-d:asyncBackend=chronos`: https://github.com/status-im/nim-chronos/
  224. ##
  225. ## ``none`` can be used when a library supports both a synchronous and
  226. ## asynchronous API, to disable the latter.
  227. import os, tables, strutils, times, heapqueue, options, asyncstreams
  228. import options, math, std/monotimes
  229. import asyncfutures except callSoon
  230. import nativesockets, net, deques
  231. when defined(nimPreviewSlimSystem):
  232. import std/[assertions, syncio]
  233. export Port, SocketFlag
  234. export asyncfutures except callSoon
  235. export asyncstreams
  236. # TODO: Check if yielded future is nil and throw a more meaningful exception
  237. type
  238. PDispatcherBase = ref object of RootRef
  239. timers*: HeapQueue[tuple[finishAt: MonoTime, fut: Future[void]]]
  240. callbacks*: Deque[proc () {.gcsafe.}]
  241. proc processTimers(
  242. p: PDispatcherBase, didSomeWork: var bool
  243. ): Option[int] {.inline.} =
  244. # Pop the timers in the order in which they will expire (smaller `finishAt`).
  245. var count = p.timers.len
  246. let t = getMonoTime()
  247. while count > 0 and t >= p.timers[0].finishAt:
  248. p.timers.pop().fut.complete()
  249. dec count
  250. didSomeWork = true
  251. # Return the number of milliseconds in which the next timer will expire.
  252. if p.timers.len == 0: return
  253. let millisecs = (p.timers[0].finishAt - getMonoTime()).inMilliseconds
  254. return some(millisecs.int + 1)
  255. proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
  256. while p.callbacks.len > 0:
  257. var cb = p.callbacks.popFirst()
  258. cb()
  259. didSomeWork = true
  260. proc adjustTimeout(
  261. p: PDispatcherBase, pollTimeout: int, nextTimer: Option[int]
  262. ): int {.inline.} =
  263. if p.callbacks.len != 0:
  264. return 0
  265. if nextTimer.isNone() or pollTimeout == -1:
  266. return pollTimeout
  267. result = max(nextTimer.get(), 0)
  268. result = min(pollTimeout, result)
  269. proc runOnce(timeout: int): bool {.gcsafe.}
  270. proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.}
  271. ## Schedule `cbproc` to be called as soon as possible.
  272. ## The callback is called when control returns to the event loop.
  273. proc initCallSoonProc =
  274. if asyncfutures.getCallSoonProc().isNil:
  275. asyncfutures.setCallSoonProc(callSoon)
  276. template implementSetInheritable() {.dirty.} =
  277. when declared(setInheritable):
  278. proc setInheritable*(fd: AsyncFD, inheritable: bool): bool =
  279. ## Control whether a file handle can be inherited by child processes.
  280. ## Returns `true` on success.
  281. ##
  282. ## This procedure is not guaranteed to be available for all platforms.
  283. ## Test for availability with `declared() <system.html#declared,untyped>`_.
  284. fd.FileHandle.setInheritable(inheritable)
  285. when defined(windows) or defined(nimdoc):
  286. import winlean, sets, hashes
  287. type
  288. CompletionKey = ULONG_PTR
  289. CompletionData* = object
  290. fd*: AsyncFD # TODO: Rename this.
  291. cb*: owned(proc (fd: AsyncFD, bytesTransferred: DWORD,
  292. errcode: OSErrorCode) {.closure, gcsafe.})
  293. cell*: ForeignCell # we need this `cell` to protect our `cb` environment,
  294. # when using RegisterWaitForSingleObject, because
  295. # waiting is done in different thread.
  296. PDispatcher* = ref object of PDispatcherBase
  297. ioPort: Handle
  298. handles*: HashSet[AsyncFD] # Export handles so that an external library can register them.
  299. CustomObj = object of OVERLAPPED
  300. data*: CompletionData
  301. CustomRef* = ref CustomObj
  302. AsyncFD* = distinct int
  303. PostCallbackData = object
  304. ioPort: Handle
  305. handleFd: AsyncFD
  306. waitFd: Handle
  307. ovl: owned CustomRef
  308. PostCallbackDataPtr = ptr PostCallbackData
  309. AsyncEventImpl = object
  310. hEvent: Handle
  311. hWaiter: Handle
  312. pcd: PostCallbackDataPtr
  313. AsyncEvent* = ptr AsyncEventImpl
  314. Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
  315. proc hash(x: AsyncFD): Hash {.borrow.}
  316. proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
  317. proc newDispatcher*(): owned PDispatcher =
  318. ## Creates a new Dispatcher instance.
  319. new result
  320. result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
  321. result.handles = initHashSet[AsyncFD]()
  322. result.timers.clear()
  323. result.callbacks = initDeque[proc () {.closure, gcsafe.}](64)
  324. var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
  325. proc setGlobalDispatcher*(disp: sink PDispatcher) =
  326. if not gDisp.isNil:
  327. assert gDisp.callbacks.len == 0
  328. gDisp = disp
  329. initCallSoonProc()
  330. proc getGlobalDispatcher*(): PDispatcher =
  331. if gDisp.isNil:
  332. setGlobalDispatcher(newDispatcher())
  333. result = gDisp
  334. proc getIoHandler*(disp: PDispatcher): Handle =
  335. ## Returns the underlying IO Completion Port handle (Windows) or selector
  336. ## (Unix) for the specified dispatcher.
  337. return disp.ioPort
  338. proc register*(fd: AsyncFD) =
  339. ## Registers `fd` with the dispatcher.
  340. let p = getGlobalDispatcher()
  341. if createIoCompletionPort(fd.Handle, p.ioPort,
  342. cast[CompletionKey](fd), 1) == 0:
  343. raiseOSError(osLastError())
  344. p.handles.incl(fd)
  345. proc verifyPresence(fd: AsyncFD) =
  346. ## Ensures that file descriptor has been registered with the dispatcher.
  347. ## Raises ValueError if `fd` has not been registered.
  348. let p = getGlobalDispatcher()
  349. if fd notin p.handles:
  350. raise newException(ValueError,
  351. "Operation performed on a socket which has not been registered with" &
  352. " the dispatcher yet.")
  353. proc hasPendingOperations*(): bool =
  354. ## Returns `true` if the global dispatcher has pending operations.
  355. let p = getGlobalDispatcher()
  356. p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
  357. proc runOnce(timeout: int): bool =
  358. let p = getGlobalDispatcher()
  359. if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
  360. raise newException(ValueError,
  361. "No handles or timers registered in dispatcher.")
  362. result = false
  363. let nextTimer = processTimers(p, result)
  364. let at = adjustTimeout(p, timeout, nextTimer)
  365. var llTimeout =
  366. if at == -1: winlean.INFINITE
  367. else: at.int32
  368. var lpNumberOfBytesTransferred: DWORD
  369. var lpCompletionKey: ULONG_PTR
  370. var customOverlapped: CustomRef
  371. let res = getQueuedCompletionStatus(p.ioPort,
  372. addr lpNumberOfBytesTransferred, addr lpCompletionKey,
  373. cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
  374. result = true
  375. # For 'gcDestructors' the destructor of 'customOverlapped' will
  376. # be called at the end and we are the only owner here. This means
  377. # We do not have to 'GC_unref(customOverlapped)' because the destructor
  378. # does that for us.
  379. # http://stackoverflow.com/a/12277264/492186
  380. # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
  381. if res:
  382. # This is useful for ensuring the reliability of the overlapped struct.
  383. assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
  384. customOverlapped.data.cb(customOverlapped.data.fd,
  385. lpNumberOfBytesTransferred, OSErrorCode(-1))
  386. # If cell.data != nil, then system.protect(rawEnv(cb)) was called,
  387. # so we need to dispose our `cb` environment, because it is not needed
  388. # anymore.
  389. if customOverlapped.data.cell.data != nil:
  390. system.dispose(customOverlapped.data.cell)
  391. when not defined(gcDestructors):
  392. GC_unref(customOverlapped)
  393. else:
  394. let errCode = osLastError()
  395. if customOverlapped != nil:
  396. assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
  397. customOverlapped.data.cb(customOverlapped.data.fd,
  398. lpNumberOfBytesTransferred, errCode)
  399. if customOverlapped.data.cell.data != nil:
  400. system.dispose(customOverlapped.data.cell)
  401. when not defined(gcDestructors):
  402. GC_unref(customOverlapped)
  403. else:
  404. if errCode.int32 == WAIT_TIMEOUT:
  405. # Timed out
  406. result = false
  407. else: raiseOSError(errCode)
  408. # Timer processing.
  409. discard processTimers(p, result)
  410. # Callback queue processing
  411. processPendingCallbacks(p, result)
  412. var acceptEx: WSAPROC_ACCEPTEX
  413. var connectEx: WSAPROC_CONNECTEX
  414. var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS
  415. proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
  416. # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
  417. var bytesRet: DWORD
  418. fun = nil
  419. result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
  420. sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD,
  421. addr bytesRet, nil, nil) == 0
  422. proc initAll() =
  423. let dummySock = createNativeSocket()
  424. if dummySock == INVALID_SOCKET:
  425. raiseOSError(osLastError())
  426. var fun: pointer = nil
  427. if not initPointer(dummySock, fun, WSAID_CONNECTEX):
  428. raiseOSError(osLastError())
  429. connectEx = cast[WSAPROC_CONNECTEX](fun)
  430. if not initPointer(dummySock, fun, WSAID_ACCEPTEX):
  431. raiseOSError(osLastError())
  432. acceptEx = cast[WSAPROC_ACCEPTEX](fun)
  433. if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS):
  434. raiseOSError(osLastError())
  435. getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
  436. close(dummySock)
  437. proc newCustom*(): CustomRef =
  438. result = CustomRef() # 0
  439. GC_ref(result) # 1 prevent destructor from doing a premature free.
  440. # destructor of newCustom's caller --> 0. This means
  441. # Windows holds a ref for us with RC == 0 (single owner).
  442. # This is passed back to us in the IO completion port.
  443. proc recv*(socket: AsyncFD, size: int,
  444. flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
  445. ## Reads **up to** `size` bytes from `socket`. Returned future will
  446. ## complete once all the data requested is read, a part of the data has been
  447. ## read, or the socket has disconnected in which case the future will
  448. ## complete with a value of `""`.
  449. ##
  450. ## .. warning:: The `Peek` socket flag is not supported on Windows.
  451. # Things to note:
  452. # * When WSARecv completes immediately then `bytesReceived` is very
  453. # unreliable.
  454. # * Still need to implement message-oriented socket disconnection,
  455. # '\0' in the message currently signifies a socket disconnect. Who
  456. # knows what will happen when someone sends that to our socket.
  457. verifyPresence(socket)
  458. assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
  459. var retFuture = newFuture[string]("recv")
  460. var dataBuf: TWSABuf
  461. dataBuf.buf = cast[cstring](alloc0(size))
  462. dataBuf.len = size.ULONG
  463. var bytesReceived: DWORD
  464. var flagsio = flags.toOSFlags().DWORD
  465. var ol = newCustom()
  466. ol.data = CompletionData(fd: socket, cb:
  467. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  468. if not retFuture.finished:
  469. if errcode == OSErrorCode(-1):
  470. if bytesCount == 0 and dataBuf.buf[0] == '\0':
  471. retFuture.complete("")
  472. else:
  473. var data = newString(bytesCount)
  474. assert bytesCount <= size
  475. copyMem(addr data[0], addr dataBuf.buf[0], bytesCount)
  476. retFuture.complete($data)
  477. else:
  478. if flags.isDisconnectionError(errcode):
  479. retFuture.complete("")
  480. else:
  481. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  482. if dataBuf.buf != nil:
  483. dealloc dataBuf.buf
  484. dataBuf.buf = nil
  485. )
  486. let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  487. addr flagsio, cast[POVERLAPPED](ol), nil)
  488. if ret == -1:
  489. let err = osLastError()
  490. if err.int32 != ERROR_IO_PENDING:
  491. if dataBuf.buf != nil:
  492. dealloc dataBuf.buf
  493. dataBuf.buf = nil
  494. GC_unref(ol)
  495. if flags.isDisconnectionError(err):
  496. retFuture.complete("")
  497. else:
  498. retFuture.fail(newException(OSError, osErrorMsg(err)))
  499. elif ret == 0:
  500. # Request completed immediately.
  501. if bytesReceived != 0:
  502. var data = newString(bytesReceived)
  503. assert bytesReceived <= size
  504. copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived)
  505. retFuture.complete($data)
  506. else:
  507. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  508. retFuture.complete("")
  509. return retFuture
  510. proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
  511. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  512. ## Reads **up to** `size` bytes from `socket` into `buf`, which must
  513. ## at least be of that size. Returned future will complete once all the
  514. ## data requested is read, a part of the data has been read, or the socket
  515. ## has disconnected in which case the future will complete with a value of
  516. ## `0`.
  517. ##
  518. ## .. warning:: The `Peek` socket flag is not supported on Windows.
  519. # Things to note:
  520. # * When WSARecv completes immediately then `bytesReceived` is very
  521. # unreliable.
  522. # * Still need to implement message-oriented socket disconnection,
  523. # '\0' in the message currently signifies a socket disconnect. Who
  524. # knows what will happen when someone sends that to our socket.
  525. verifyPresence(socket)
  526. assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
  527. var retFuture = newFuture[int]("recvInto")
  528. #buf[] = '\0'
  529. var dataBuf: TWSABuf
  530. dataBuf.buf = cast[cstring](buf)
  531. dataBuf.len = size.ULONG
  532. var bytesReceived: DWORD
  533. var flagsio = flags.toOSFlags().DWORD
  534. var ol = newCustom()
  535. ol.data = CompletionData(fd: socket, cb:
  536. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  537. if not retFuture.finished:
  538. if errcode == OSErrorCode(-1):
  539. retFuture.complete(bytesCount)
  540. else:
  541. if flags.isDisconnectionError(errcode):
  542. retFuture.complete(0)
  543. else:
  544. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  545. if dataBuf.buf != nil:
  546. dataBuf.buf = nil
  547. )
  548. let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  549. addr flagsio, cast[POVERLAPPED](ol), nil)
  550. if ret == -1:
  551. let err = osLastError()
  552. if err.int32 != ERROR_IO_PENDING:
  553. if dataBuf.buf != nil:
  554. dataBuf.buf = nil
  555. GC_unref(ol)
  556. if flags.isDisconnectionError(err):
  557. retFuture.complete(0)
  558. else:
  559. retFuture.fail(newException(OSError, osErrorMsg(err)))
  560. elif ret == 0:
  561. # Request completed immediately.
  562. if bytesReceived != 0:
  563. assert bytesReceived <= size
  564. retFuture.complete(bytesReceived)
  565. else:
  566. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  567. retFuture.complete(bytesReceived)
  568. return retFuture
  569. proc send*(socket: AsyncFD, buf: pointer, size: int,
  570. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  571. ## Sends `size` bytes from `buf` to `socket`. The returned future
  572. ## will complete once all data has been sent.
  573. ##
  574. ## .. warning:: Use it with caution. If `buf` refers to GC'ed object,
  575. ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
  576. verifyPresence(socket)
  577. var retFuture = newFuture[void]("send")
  578. var dataBuf: TWSABuf
  579. dataBuf.buf = cast[cstring](buf)
  580. dataBuf.len = size.ULONG
  581. var bytesReceived, lowFlags: DWORD
  582. var ol = newCustom()
  583. ol.data = CompletionData(fd: socket, cb:
  584. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  585. if not retFuture.finished:
  586. if errcode == OSErrorCode(-1):
  587. retFuture.complete()
  588. else:
  589. if flags.isDisconnectionError(errcode):
  590. retFuture.complete()
  591. else:
  592. retFuture.fail(newOSError(errcode))
  593. )
  594. let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
  595. lowFlags, cast[POVERLAPPED](ol), nil)
  596. if ret == -1:
  597. let err = osLastError()
  598. if err.int32 != ERROR_IO_PENDING:
  599. GC_unref(ol)
  600. if flags.isDisconnectionError(err):
  601. retFuture.complete()
  602. else:
  603. retFuture.fail(newException(OSError, osErrorMsg(err)))
  604. else:
  605. retFuture.complete()
  606. # We don't deallocate `ol` here because even though this completed
  607. # immediately poll will still be notified about its completion and it will
  608. # free `ol`.
  609. return retFuture
  610. proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
  611. saddrLen: SockLen,
  612. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  613. ## Sends `data` to specified destination `saddr`, using
  614. ## socket `socket`. The returned future will complete once all data
  615. ## has been sent.
  616. verifyPresence(socket)
  617. var retFuture = newFuture[void]("sendTo")
  618. var dataBuf: TWSABuf
  619. dataBuf.buf = cast[cstring](data)
  620. dataBuf.len = size.ULONG
  621. var bytesSent = 0.DWORD
  622. var lowFlags = 0.DWORD
  623. # we will preserve address in our stack
  624. var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
  625. var stalen: cint = cint(saddrLen)
  626. zeroMem(addr(staddr[0]), 128)
  627. copyMem(addr(staddr[0]), saddr, saddrLen)
  628. var ol = newCustom()
  629. ol.data = CompletionData(fd: socket, cb:
  630. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  631. if not retFuture.finished:
  632. if errcode == OSErrorCode(-1):
  633. retFuture.complete()
  634. else:
  635. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  636. )
  637. let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent,
  638. lowFlags, cast[ptr SockAddr](addr(staddr[0])),
  639. stalen, cast[POVERLAPPED](ol), nil)
  640. if ret == -1:
  641. let err = osLastError()
  642. if err.int32 != ERROR_IO_PENDING:
  643. GC_unref(ol)
  644. retFuture.fail(newException(OSError, osErrorMsg(err)))
  645. else:
  646. retFuture.complete()
  647. # We don't deallocate `ol` here because even though this completed
  648. # immediately poll will still be notified about its completion and it will
  649. # free `ol`.
  650. return retFuture
  651. proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
  652. saddr: ptr SockAddr, saddrLen: ptr SockLen,
  653. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  654. ## Receives a datagram data from `socket` into `buf`, which must
  655. ## be at least of size `size`, address of datagram's sender will be
  656. ## stored into `saddr` and `saddrLen`. Returned future will complete
  657. ## once one datagram has been received, and will return size of packet
  658. ## received.
  659. verifyPresence(socket)
  660. var retFuture = newFuture[int]("recvFromInto")
  661. var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG)
  662. var bytesReceived = 0.DWORD
  663. var lowFlags = 0.DWORD
  664. var ol = newCustom()
  665. ol.data = CompletionData(fd: socket, cb:
  666. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  667. if not retFuture.finished:
  668. if errcode == OSErrorCode(-1):
  669. assert bytesCount <= size
  670. retFuture.complete(bytesCount)
  671. else:
  672. # datagram sockets don't have disconnection,
  673. # so we can just raise an exception
  674. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  675. )
  676. let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1,
  677. addr bytesReceived, addr lowFlags,
  678. saddr, cast[ptr cint](saddrLen),
  679. cast[POVERLAPPED](ol), nil)
  680. if res == -1:
  681. let err = osLastError()
  682. if err.int32 != ERROR_IO_PENDING:
  683. GC_unref(ol)
  684. retFuture.fail(newException(OSError, osErrorMsg(err)))
  685. else:
  686. # Request completed immediately.
  687. if bytesReceived != 0:
  688. assert bytesReceived <= size
  689. retFuture.complete(bytesReceived)
  690. else:
  691. if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
  692. retFuture.complete(bytesReceived)
  693. return retFuture
  694. proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
  695. inheritable = defined(nimInheritHandles)):
  696. owned(Future[tuple[address: string, client: AsyncFD]]) {.gcsafe.} =
  697. ## Accepts a new connection. Returns a future containing the client socket
  698. ## corresponding to that connection and the remote address of the client.
  699. ## The future will complete when the connection is successfully accepted.
  700. ##
  701. ## The resulting client socket is automatically registered to the
  702. ## dispatcher.
  703. ##
  704. ## If `inheritable` is false (the default), the resulting client socket will
  705. ## not be inheritable by child processes.
  706. ##
  707. ## The `accept` call may result in an error if the connecting socket
  708. ## disconnects during the duration of the `accept`. If the `SafeDisconn`
  709. ## flag is specified then this error will not be raised and instead
  710. ## accept will be called again.
  711. verifyPresence(socket)
  712. var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
  713. var clientSock = createNativeSocket(inheritable = inheritable)
  714. if clientSock == osInvalidSocket: raiseOSError(osLastError())
  715. const lpOutputLen = 1024
  716. var lpOutputBuf = newString(lpOutputLen)
  717. var dwBytesReceived: DWORD
  718. let dwReceiveDataLength = 0.DWORD # We don't want any data to be read.
  719. let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
  720. let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
  721. template failAccept(errcode) =
  722. if flags.isDisconnectionError(errcode):
  723. var newAcceptFut = acceptAddr(socket, flags)
  724. newAcceptFut.callback =
  725. proc () =
  726. if newAcceptFut.failed:
  727. retFuture.fail(newAcceptFut.readError)
  728. else:
  729. retFuture.complete(newAcceptFut.read)
  730. else:
  731. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  732. template completeAccept() {.dirty.} =
  733. var listenSock = socket
  734. let setoptRet = setsockopt(clientSock, SOL_SOCKET,
  735. SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
  736. sizeof(listenSock).SockLen)
  737. if setoptRet != 0:
  738. let errcode = osLastError()
  739. discard clientSock.closesocket()
  740. failAccept(errcode)
  741. else:
  742. var localSockaddr, remoteSockaddr: ptr SockAddr
  743. var localLen, remoteLen: int32
  744. getAcceptExSockAddrs(addr lpOutputBuf[0], dwReceiveDataLength,
  745. dwLocalAddressLength, dwRemoteAddressLength,
  746. addr localSockaddr, addr localLen,
  747. addr remoteSockaddr, addr remoteLen)
  748. try:
  749. let address = getAddrString(remoteSockaddr)
  750. register(clientSock.AsyncFD)
  751. retFuture.complete((address: address, client: clientSock.AsyncFD))
  752. except:
  753. # getAddrString may raise
  754. clientSock.close()
  755. retFuture.fail(getCurrentException())
  756. var ol = newCustom()
  757. ol.data = CompletionData(fd: socket, cb:
  758. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
  759. if not retFuture.finished:
  760. if errcode == OSErrorCode(-1):
  761. completeAccept()
  762. else:
  763. failAccept(errcode)
  764. )
  765. # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
  766. let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0],
  767. dwReceiveDataLength,
  768. dwLocalAddressLength,
  769. dwRemoteAddressLength,
  770. addr dwBytesReceived, cast[POVERLAPPED](ol))
  771. if not ret:
  772. let err = osLastError()
  773. if err.int32 != ERROR_IO_PENDING:
  774. failAccept(err)
  775. GC_unref(ol)
  776. else:
  777. completeAccept()
  778. # We don't deallocate `ol` here because even though this completed
  779. # immediately poll will still be notified about its completion and it will
  780. # free `ol`.
  781. return retFuture
  782. implementSetInheritable()
  783. proc closeSocket*(socket: AsyncFD) =
  784. ## Closes a socket and ensures that it is unregistered.
  785. socket.SocketHandle.close()
  786. getGlobalDispatcher().handles.excl(socket)
  787. proc unregister*(fd: AsyncFD) =
  788. ## Unregisters `fd`.
  789. getGlobalDispatcher().handles.excl(fd)
  790. proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
  791. return fd in disp.handles
  792. {.push stackTrace: off.}
  793. proc waitableCallback(param: pointer,
  794. timerOrWaitFired: WINBOOL) {.stdcall.} =
  795. var p = cast[PostCallbackDataPtr](param)
  796. discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD,
  797. ULONG_PTR(p.handleFd),
  798. cast[pointer](p.ovl))
  799. {.pop.}
  800. proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: DWORD) =
  801. let p = getGlobalDispatcher()
  802. var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).DWORD
  803. var hEvent = wsaCreateEvent()
  804. if hEvent == 0:
  805. raiseOSError(osLastError())
  806. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  807. pcd.ioPort = p.ioPort
  808. pcd.handleFd = fd
  809. var ol = newCustom()
  810. ol.data = CompletionData(fd: fd, cb:
  811. proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
  812. # we excluding our `fd` because cb(fd) can register own handler
  813. # for this `fd`
  814. p.handles.excl(fd)
  815. # unregisterWait() is called before callback, because appropriate
  816. # winsockets function can re-enable event.
  817. # https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx
  818. if unregisterWait(pcd.waitFd) == 0:
  819. let err = osLastError()
  820. if err.int32 != ERROR_IO_PENDING:
  821. deallocShared(cast[pointer](pcd))
  822. discard wsaCloseEvent(hEvent)
  823. raiseOSError(err)
  824. if cb(fd):
  825. # callback returned `true`, so we free all allocated resources
  826. deallocShared(cast[pointer](pcd))
  827. if not wsaCloseEvent(hEvent):
  828. raiseOSError(osLastError())
  829. # pcd.ovl will be unrefed in poll().
  830. else:
  831. # callback returned `false` we need to continue
  832. if p.handles.contains(fd):
  833. # new callback was already registered with `fd`, so we free all
  834. # allocated resources. This happens because in callback `cb`
  835. # addRead/addWrite was called with same `fd`.
  836. deallocShared(cast[pointer](pcd))
  837. if not wsaCloseEvent(hEvent):
  838. raiseOSError(osLastError())
  839. else:
  840. # we need to include `fd` again
  841. p.handles.incl(fd)
  842. # and register WaitForSingleObject again
  843. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  844. cast[WAITORTIMERCALLBACK](waitableCallback),
  845. cast[pointer](pcd), INFINITE, flags):
  846. # pcd.ovl will be unrefed in poll()
  847. let err = osLastError()
  848. deallocShared(cast[pointer](pcd))
  849. discard wsaCloseEvent(hEvent)
  850. raiseOSError(err)
  851. else:
  852. # we incref `pcd.ovl` and `protect` callback one more time,
  853. # because it will be unrefed and disposed in `poll()` after
  854. # callback finishes.
  855. GC_ref(pcd.ovl)
  856. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  857. )
  858. # We need to protect our callback environment value, so GC will not free it
  859. # accidentally.
  860. ol.data.cell = system.protect(rawEnv(ol.data.cb))
  861. # This is main part of `hacky way` is using WSAEventSelect, so `hEvent`
  862. # will be signaled when appropriate `mask` events will be triggered.
  863. if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0:
  864. let err = osLastError()
  865. GC_unref(ol)
  866. deallocShared(cast[pointer](pcd))
  867. discard wsaCloseEvent(hEvent)
  868. raiseOSError(err)
  869. pcd.ovl = ol
  870. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  871. cast[WAITORTIMERCALLBACK](waitableCallback),
  872. cast[pointer](pcd), INFINITE, flags):
  873. let err = osLastError()
  874. GC_unref(ol)
  875. deallocShared(cast[pointer](pcd))
  876. discard wsaCloseEvent(hEvent)
  877. raiseOSError(err)
  878. p.handles.incl(fd)
  879. proc addRead*(fd: AsyncFD, cb: Callback) =
  880. ## Start watching the file descriptor for read availability and then call
  881. ## the callback `cb`.
  882. ##
  883. ## This is not `pure` mechanism for Windows Completion Ports (IOCP),
  884. ## so if you can avoid it, please do it. Use `addRead` only if really
  885. ## need it (main usecase is adaptation of unix-like libraries to be
  886. ## asynchronous on Windows).
  887. ##
  888. ## If you use this function, you don't need to use asyncdispatch.recv()
  889. ## or asyncdispatch.accept(), because they are using IOCP, please use
  890. ## nativesockets.recv() and nativesockets.accept() instead.
  891. ##
  892. ## Be sure your callback `cb` returns `true`, if you want to remove
  893. ## watch of `read` notifications, and `false`, if you want to continue
  894. ## receiving notifications.
  895. registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
  896. proc addWrite*(fd: AsyncFD, cb: Callback) =
  897. ## Start watching the file descriptor for write availability and then call
  898. ## the callback `cb`.
  899. ##
  900. ## This is not `pure` mechanism for Windows Completion Ports (IOCP),
  901. ## so if you can avoid it, please do it. Use `addWrite` only if really
  902. ## need it (main usecase is adaptation of unix-like libraries to be
  903. ## asynchronous on Windows).
  904. ##
  905. ## If you use this function, you don't need to use asyncdispatch.send()
  906. ## or asyncdispatch.connect(), because they are using IOCP, please use
  907. ## nativesockets.send() and nativesockets.connect() instead.
  908. ##
  909. ## Be sure your callback `cb` returns `true`, if you want to remove
  910. ## watch of `write` notifications, and `false`, if you want to continue
  911. ## receiving notifications.
  912. registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
  913. template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
  914. handleCallback) =
  915. let handleFD = AsyncFD(hEvent)
  916. pcd.ioPort = p.ioPort
  917. pcd.handleFd = handleFD
  918. var ol = newCustom()
  919. ol.data.fd = handleFD
  920. ol.data.cb = handleCallback
  921. # We need to protect our callback environment value, so GC will not free it
  922. # accidentally.
  923. ol.data.cell = system.protect(rawEnv(ol.data.cb))
  924. pcd.ovl = ol
  925. if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
  926. cast[WAITORTIMERCALLBACK](waitableCallback),
  927. cast[pointer](pcd), timeout.DWORD, flags):
  928. let err = osLastError()
  929. GC_unref(ol)
  930. deallocShared(cast[pointer](pcd))
  931. discard closeHandle(hEvent)
  932. raiseOSError(err)
  933. p.handles.incl(handleFD)
  934. template closeWaitable(handle: untyped) =
  935. let waitFd = pcd.waitFd
  936. deallocShared(cast[pointer](pcd))
  937. p.handles.excl(fd)
  938. if unregisterWait(waitFd) == 0:
  939. let err = osLastError()
  940. if err.int32 != ERROR_IO_PENDING:
  941. discard closeHandle(handle)
  942. raiseOSError(err)
  943. if closeHandle(handle) == 0:
  944. raiseOSError(osLastError())
  945. proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
  946. ## Registers callback `cb` to be called when timer expired.
  947. ##
  948. ## Parameters:
  949. ##
  950. ## * `timeout` - timeout value in milliseconds.
  951. ## * `oneshot`
  952. ## * `true` - generate only one timeout event
  953. ## * `false` - generate timeout events periodically
  954. doAssert(timeout > 0)
  955. let p = getGlobalDispatcher()
  956. var hEvent = createEvent(nil, 1, 0, nil)
  957. if hEvent == INVALID_HANDLE_VALUE:
  958. raiseOSError(osLastError())
  959. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  960. var flags = WT_EXECUTEINWAITTHREAD.DWORD
  961. if oneshot: flags = flags or WT_EXECUTEONLYONCE
  962. proc timercb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  963. let res = cb(fd)
  964. if res or oneshot:
  965. closeWaitable(hEvent)
  966. else:
  967. # if callback returned `false`, then it wants to be called again, so
  968. # we need to ref and protect `pcd.ovl` again, because it will be
  969. # unrefed and disposed in `poll()`.
  970. GC_ref(pcd.ovl)
  971. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  972. registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
  973. proc addProcess*(pid: int, cb: Callback) =
  974. ## Registers callback `cb` to be called when process with process ID
  975. ## `pid` exited.
  976. const NULL = Handle(0)
  977. let p = getGlobalDispatcher()
  978. let procFlags = SYNCHRONIZE
  979. var hProcess = openProcess(procFlags, 0, pid.DWORD)
  980. if hProcess == NULL:
  981. raiseOSError(osLastError())
  982. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  983. var flags = WT_EXECUTEINWAITTHREAD.DWORD or WT_EXECUTEONLYONCE.DWORD
  984. proc proccb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  985. closeWaitable(hProcess)
  986. discard cb(fd)
  987. registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
  988. proc newAsyncEvent*(): AsyncEvent =
  989. ## Creates a new thread-safe `AsyncEvent` object.
  990. ##
  991. ## New `AsyncEvent` object is not automatically registered with
  992. ## dispatcher like `AsyncSocket`.
  993. var sa = SECURITY_ATTRIBUTES(
  994. nLength: sizeof(SECURITY_ATTRIBUTES).cint,
  995. bInheritHandle: 1
  996. )
  997. var event = createEvent(addr(sa), 0'i32, 0'i32, nil)
  998. if event == INVALID_HANDLE_VALUE:
  999. raiseOSError(osLastError())
  1000. result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
  1001. result.hEvent = event
  1002. proc trigger*(ev: AsyncEvent) =
  1003. ## Set event `ev` to signaled state.
  1004. if setEvent(ev.hEvent) == 0:
  1005. raiseOSError(osLastError())
  1006. proc unregister*(ev: AsyncEvent) =
  1007. ## Unregisters event `ev`.
  1008. doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
  1009. let p = getGlobalDispatcher()
  1010. p.handles.excl(AsyncFD(ev.hEvent))
  1011. if unregisterWait(ev.hWaiter) == 0:
  1012. let err = osLastError()
  1013. if err.int32 != ERROR_IO_PENDING:
  1014. raiseOSError(err)
  1015. ev.hWaiter = 0
  1016. proc close*(ev: AsyncEvent) =
  1017. ## Closes event `ev`.
  1018. let res = closeHandle(ev.hEvent)
  1019. deallocShared(cast[pointer](ev))
  1020. if res == 0:
  1021. raiseOSError(osLastError())
  1022. proc addEvent*(ev: AsyncEvent, cb: Callback) =
  1023. ## Registers callback `cb` to be called when `ev` will be signaled
  1024. doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
  1025. let p = getGlobalDispatcher()
  1026. let hEvent = ev.hEvent
  1027. var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
  1028. var flags = WT_EXECUTEINWAITTHREAD.DWORD
  1029. proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  1030. if ev.hWaiter != 0:
  1031. if cb(fd):
  1032. # we need this check to avoid exception, if `unregister(event)` was
  1033. # called in callback.
  1034. deallocShared(cast[pointer](pcd))
  1035. if ev.hWaiter != 0:
  1036. unregister(ev)
  1037. else:
  1038. # if callback returned `false`, then it wants to be called again, so
  1039. # we need to ref and protect `pcd.ovl` again, because it will be
  1040. # unrefed and disposed in `poll()`.
  1041. GC_ref(pcd.ovl)
  1042. pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
  1043. else:
  1044. # if ev.hWaiter == 0, then event was unregistered before `poll()` call.
  1045. deallocShared(cast[pointer](pcd))
  1046. registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
  1047. ev.hWaiter = pcd.waitFd
  1048. initAll()
  1049. else:
  1050. import selectors
  1051. from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
  1052. MSG_NOSIGNAL
  1053. when declared(posix.accept4):
  1054. from posix import accept4, SOCK_CLOEXEC
  1055. when defined(genode):
  1056. import genode/env # get the implicit Genode env
  1057. import genode/signals
  1058. const
  1059. InitCallbackListSize = 4 # initial size of callbacks sequence,
  1060. # associated with file/socket descriptor.
  1061. InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
  1062. # queue.
  1063. type
  1064. AsyncFD* = distinct cint
  1065. Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
  1066. AsyncData = object
  1067. readList: seq[Callback]
  1068. writeList: seq[Callback]
  1069. AsyncEvent* = distinct SelectEvent
  1070. PDispatcher* = ref object of PDispatcherBase
  1071. selector: Selector[AsyncData]
  1072. when defined(genode):
  1073. signalHandler: SignalHandler
  1074. proc `==`*(x, y: AsyncFD): bool {.borrow.}
  1075. proc `==`*(x, y: AsyncEvent): bool {.borrow.}
  1076. template newAsyncData(): AsyncData =
  1077. AsyncData(
  1078. readList: newSeqOfCap[Callback](InitCallbackListSize),
  1079. writeList: newSeqOfCap[Callback](InitCallbackListSize)
  1080. )
  1081. proc newDispatcher*(): owned(PDispatcher) =
  1082. new result
  1083. result.selector = newSelector[AsyncData]()
  1084. result.timers.clear()
  1085. result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize)
  1086. when defined(genode):
  1087. let entrypoint = ep(cast[GenodeEnv](runtimeEnv))
  1088. result.signalHandler = newSignalHandler(entrypoint):
  1089. discard runOnce(0)
  1090. var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
  1091. proc setGlobalDispatcher*(disp: owned PDispatcher) =
  1092. if not gDisp.isNil:
  1093. assert gDisp.callbacks.len == 0
  1094. gDisp = disp
  1095. initCallSoonProc()
  1096. proc getGlobalDispatcher*(): PDispatcher =
  1097. if gDisp.isNil:
  1098. setGlobalDispatcher(newDispatcher())
  1099. result = gDisp
  1100. proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] =
  1101. return disp.selector
  1102. proc register*(fd: AsyncFD) =
  1103. let p = getGlobalDispatcher()
  1104. var data = newAsyncData()
  1105. p.selector.registerHandle(fd.SocketHandle, {}, data)
  1106. proc unregister*(fd: AsyncFD) =
  1107. getGlobalDispatcher().selector.unregister(fd.SocketHandle)
  1108. proc unregister*(ev: AsyncEvent) =
  1109. getGlobalDispatcher().selector.unregister(SelectEvent(ev))
  1110. proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
  1111. return fd.SocketHandle in disp.selector
  1112. proc addRead*(fd: AsyncFD, cb: Callback) =
  1113. let p = getGlobalDispatcher()
  1114. var newEvents = {Event.Read}
  1115. withData(p.selector, fd.SocketHandle, adata) do:
  1116. adata.readList.add(cb)
  1117. newEvents.incl(Event.Read)
  1118. if len(adata.writeList) != 0: newEvents.incl(Event.Write)
  1119. do:
  1120. raise newException(ValueError, "File descriptor not registered.")
  1121. p.selector.updateHandle(fd.SocketHandle, newEvents)
  1122. proc addWrite*(fd: AsyncFD, cb: Callback) =
  1123. let p = getGlobalDispatcher()
  1124. var newEvents = {Event.Write}
  1125. withData(p.selector, fd.SocketHandle, adata) do:
  1126. adata.writeList.add(cb)
  1127. newEvents.incl(Event.Write)
  1128. if len(adata.readList) != 0: newEvents.incl(Event.Read)
  1129. do:
  1130. raise newException(ValueError, "File descriptor not registered.")
  1131. p.selector.updateHandle(fd.SocketHandle, newEvents)
  1132. proc hasPendingOperations*(): bool =
  1133. let p = getGlobalDispatcher()
  1134. not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
  1135. proc prependSeq(dest: var seq[Callback]; src: sink seq[Callback]) =
  1136. var old = move dest
  1137. dest = src
  1138. for i in 0..high(old):
  1139. dest.add(move old[i])
  1140. proc processBasicCallbacks(
  1141. fd: AsyncFD, event: Event
  1142. ): tuple[readCbListCount, writeCbListCount: int] =
  1143. # Process pending descriptor and AsyncEvent callbacks.
  1144. #
  1145. # Invoke every callback stored in `rwlist`, until one
  1146. # returns `false` (which means callback wants to stay
  1147. # alive). In such case all remaining callbacks will be added
  1148. # to `rwlist` again, in the order they have been inserted.
  1149. #
  1150. # `rwlist` associated with file descriptor MUST BE emptied before
  1151. # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
  1152. # or it can be possible to fall into endless cycle.
  1153. var curList: seq[Callback]
  1154. let selector = getGlobalDispatcher().selector
  1155. withData(selector, fd.int, fdData):
  1156. case event
  1157. of Event.Read:
  1158. #shallowCopy(curList, fdData.readList)
  1159. curList = move fdData.readList
  1160. fdData.readList = newSeqOfCap[Callback](InitCallbackListSize)
  1161. of Event.Write:
  1162. #shallowCopy(curList, fdData.writeList)
  1163. curList = move fdData.writeList
  1164. fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize)
  1165. else:
  1166. assert false, "Cannot process callbacks for " & $event
  1167. let newLength = max(len(curList), InitCallbackListSize)
  1168. var newList = newSeqOfCap[Callback](newLength)
  1169. var eventsExtinguished = false
  1170. for cb in curList:
  1171. if eventsExtinguished:
  1172. newList.add(cb)
  1173. elif not cb(fd):
  1174. # Callback wants to be called again.
  1175. newList.add(cb)
  1176. # This callback has returned with EAGAIN, so we don't need to
  1177. # call any other callbacks as they are all waiting for the same event
  1178. # on the same fd.
  1179. # We do need to ensure they are called again though.
  1180. eventsExtinguished = true
  1181. withData(selector, fd.int, fdData) do:
  1182. # Descriptor is still present in the queue.
  1183. case event
  1184. of Event.Read: prependSeq(fdData.readList, newList)
  1185. of Event.Write: prependSeq(fdData.writeList, newList)
  1186. else:
  1187. assert false, "Cannot process callbacks for " & $event
  1188. result.readCbListCount = len(fdData.readList)
  1189. result.writeCbListCount = len(fdData.writeList)
  1190. do:
  1191. # Descriptor was unregistered in callback via `unregister()`.
  1192. result.readCbListCount = -1
  1193. result.writeCbListCount = -1
  1194. proc processCustomCallbacks(p: PDispatcher; fd: AsyncFD) =
  1195. # Process pending custom event callbacks. Custom events are
  1196. # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
  1197. # There can be only one callback registered with one descriptor,
  1198. # so there is no need to iterate over list.
  1199. var curList: seq[Callback]
  1200. withData(p.selector, fd.int, adata) do:
  1201. curList = move adata.readList
  1202. adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
  1203. let newLength = len(curList)
  1204. var newList = newSeqOfCap[Callback](newLength)
  1205. var cb = curList[0]
  1206. if not cb(fd):
  1207. newList.add(cb)
  1208. withData(p.selector, fd.int, adata) do:
  1209. # descriptor still present in queue.
  1210. adata.readList = newList & adata.readList
  1211. if len(adata.readList) == 0:
  1212. # if no callbacks registered with descriptor, unregister it.
  1213. p.selector.unregister(fd.int)
  1214. do:
  1215. # descriptor was unregistered in callback via `unregister()`.
  1216. discard
  1217. implementSetInheritable()
  1218. proc closeSocket*(sock: AsyncFD) =
  1219. let selector = getGlobalDispatcher().selector
  1220. if sock.SocketHandle notin selector:
  1221. raise newException(ValueError, "File descriptor not registered.")
  1222. let data = selector.getData(sock.SocketHandle)
  1223. sock.unregister()
  1224. sock.SocketHandle.close()
  1225. # We need to unblock the read and write callbacks which could still be
  1226. # waiting for the socket to become readable and/or writeable.
  1227. for cb in data.readList & data.writeList:
  1228. if not cb(sock):
  1229. raise newException(
  1230. ValueError, "Expecting async operations to stop when fd has closed."
  1231. )
  1232. proc runOnce(timeout: int): bool =
  1233. let p = getGlobalDispatcher()
  1234. if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
  1235. when defined(genode):
  1236. if timeout == 0: return
  1237. raise newException(ValueError,
  1238. "No handles or timers registered in dispatcher.")
  1239. result = false
  1240. var keys: array[64, ReadyKey]
  1241. let nextTimer = processTimers(p, result)
  1242. var count =
  1243. p.selector.selectInto(adjustTimeout(p, timeout, nextTimer), keys)
  1244. for i in 0..<count:
  1245. let fd = keys[i].fd.AsyncFD
  1246. let events = keys[i].events
  1247. var (readCbListCount, writeCbListCount) = (0, 0)
  1248. if Event.Read in events or events == {Event.Error}:
  1249. (readCbListCount, writeCbListCount) =
  1250. processBasicCallbacks(fd, Event.Read)
  1251. result = true
  1252. if Event.Write in events or events == {Event.Error}:
  1253. (readCbListCount, writeCbListCount) =
  1254. processBasicCallbacks(fd, Event.Write)
  1255. result = true
  1256. var isCustomEvent = false
  1257. if Event.User in events:
  1258. (readCbListCount, writeCbListCount) =
  1259. processBasicCallbacks(fd, Event.Read)
  1260. isCustomEvent = true
  1261. if readCbListCount == 0:
  1262. p.selector.unregister(fd.int)
  1263. result = true
  1264. when ioselSupportedPlatform:
  1265. const customSet = {Event.Timer, Event.Signal, Event.Process,
  1266. Event.Vnode}
  1267. if (customSet * events) != {}:
  1268. isCustomEvent = true
  1269. processCustomCallbacks(p, fd)
  1270. result = true
  1271. # because state `data` can be modified in callback we need to update
  1272. # descriptor events with currently registered callbacks.
  1273. if not isCustomEvent and (readCbListCount != -1 and writeCbListCount != -1):
  1274. var newEvents: set[Event] = {}
  1275. if readCbListCount > 0: incl(newEvents, Event.Read)
  1276. if writeCbListCount > 0: incl(newEvents, Event.Write)
  1277. p.selector.updateHandle(SocketHandle(fd), newEvents)
  1278. # Timer processing.
  1279. discard processTimers(p, result)
  1280. # Callback queue processing
  1281. processPendingCallbacks(p, result)
  1282. proc recv*(socket: AsyncFD, size: int,
  1283. flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
  1284. var retFuture = newFuture[string]("recv")
  1285. var readBuffer = newString(size)
  1286. proc cb(sock: AsyncFD): bool =
  1287. result = true
  1288. let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
  1289. flags.toOSFlags())
  1290. if res < 0:
  1291. let lastError = osLastError()
  1292. if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
  1293. lastError.int32 != EAGAIN:
  1294. if flags.isDisconnectionError(lastError):
  1295. retFuture.complete("")
  1296. else:
  1297. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1298. else:
  1299. result = false # We still want this callback to be called.
  1300. elif res == 0:
  1301. # Disconnected
  1302. retFuture.complete("")
  1303. else:
  1304. readBuffer.setLen(res)
  1305. retFuture.complete(readBuffer)
  1306. # TODO: The following causes a massive slowdown.
  1307. #if not cb(socket):
  1308. addRead(socket, cb)
  1309. return retFuture
  1310. proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
  1311. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  1312. var retFuture = newFuture[int]("recvInto")
  1313. proc cb(sock: AsyncFD): bool =
  1314. result = true
  1315. let res = recv(sock.SocketHandle, buf, size.cint,
  1316. flags.toOSFlags())
  1317. if res < 0:
  1318. let lastError = osLastError()
  1319. if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
  1320. lastError.int32 != EAGAIN:
  1321. if flags.isDisconnectionError(lastError):
  1322. retFuture.complete(0)
  1323. else:
  1324. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1325. else:
  1326. result = false # We still want this callback to be called.
  1327. else:
  1328. retFuture.complete(res)
  1329. # TODO: The following causes a massive slowdown.
  1330. #if not cb(socket):
  1331. addRead(socket, cb)
  1332. return retFuture
  1333. proc send*(socket: AsyncFD, buf: pointer, size: int,
  1334. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  1335. var retFuture = newFuture[void]("send")
  1336. var written = 0
  1337. proc cb(sock: AsyncFD): bool =
  1338. result = true
  1339. let netSize = size-written
  1340. var d = cast[cstring](buf)
  1341. let res = send(sock.SocketHandle, addr d[written], netSize.cint,
  1342. MSG_NOSIGNAL)
  1343. if res < 0:
  1344. let lastError = osLastError()
  1345. if lastError.int32 != EINTR and
  1346. lastError.int32 != EWOULDBLOCK and
  1347. lastError.int32 != EAGAIN:
  1348. if flags.isDisconnectionError(lastError):
  1349. retFuture.complete()
  1350. else:
  1351. retFuture.fail(newOSError(lastError))
  1352. else:
  1353. result = false # We still want this callback to be called.
  1354. else:
  1355. written.inc(res)
  1356. if res != netSize:
  1357. result = false # We still have data to send.
  1358. else:
  1359. retFuture.complete()
  1360. # TODO: The following causes crashes.
  1361. #if not cb(socket):
  1362. addWrite(socket, cb)
  1363. return retFuture
  1364. proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
  1365. saddrLen: SockLen,
  1366. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  1367. ## Sends `data` of size `size` in bytes to specified destination
  1368. ## (`saddr` of size `saddrLen` in bytes, using socket `socket`.
  1369. ## The returned future will complete once all data has been sent.
  1370. var retFuture = newFuture[void]("sendTo")
  1371. # we will preserve address in our stack
  1372. var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
  1373. var stalen = saddrLen
  1374. zeroMem(addr(staddr[0]), 128)
  1375. copyMem(addr(staddr[0]), saddr, saddrLen)
  1376. proc cb(sock: AsyncFD): bool =
  1377. result = true
  1378. let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL,
  1379. cast[ptr SockAddr](addr(staddr[0])), stalen)
  1380. if res < 0:
  1381. let lastError = osLastError()
  1382. if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
  1383. lastError.int32 != EAGAIN:
  1384. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1385. else:
  1386. result = false # We still want this callback to be called.
  1387. else:
  1388. retFuture.complete()
  1389. addWrite(socket, cb)
  1390. return retFuture
  1391. proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
  1392. saddr: ptr SockAddr, saddrLen: ptr SockLen,
  1393. flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
  1394. ## Receives a datagram data from `socket` into `data`, which must
  1395. ## be at least of size `size` in bytes, address of datagram's sender
  1396. ## will be stored into `saddr` and `saddrLen`. Returned future will
  1397. ## complete once one datagram has been received, and will return size
  1398. ## of packet received.
  1399. var retFuture = newFuture[int]("recvFromInto")
  1400. proc cb(sock: AsyncFD): bool =
  1401. result = true
  1402. let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(),
  1403. saddr, saddrLen)
  1404. if res < 0:
  1405. let lastError = osLastError()
  1406. if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
  1407. lastError.int32 != EAGAIN:
  1408. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1409. else:
  1410. result = false
  1411. else:
  1412. retFuture.complete(res)
  1413. addRead(socket, cb)
  1414. return retFuture
  1415. proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
  1416. inheritable = defined(nimInheritHandles)):
  1417. owned(Future[tuple[address: string, client: AsyncFD]]) =
  1418. var retFuture = newFuture[tuple[address: string,
  1419. client: AsyncFD]]("acceptAddr")
  1420. proc cb(sock: AsyncFD): bool =
  1421. result = true
  1422. var sockAddress: Sockaddr_storage
  1423. var addrLen = sizeof(sockAddress).SockLen
  1424. var client =
  1425. when declared(accept4):
  1426. accept4(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
  1427. addr(addrLen), if inheritable: 0 else: SOCK_CLOEXEC)
  1428. else:
  1429. accept(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
  1430. addr(addrLen))
  1431. when declared(setInheritable) and not declared(accept4):
  1432. if client != osInvalidSocket and not setInheritable(client, inheritable):
  1433. # Set failure first because close() itself can fail,
  1434. # altering osLastError().
  1435. retFuture.fail(newOSError(osLastError()))
  1436. close client
  1437. return false
  1438. if client == osInvalidSocket:
  1439. let lastError = osLastError()
  1440. assert lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN
  1441. if lastError.int32 == EINTR:
  1442. return false
  1443. else:
  1444. if flags.isDisconnectionError(lastError):
  1445. return false
  1446. else:
  1447. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1448. else:
  1449. try:
  1450. let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
  1451. register(client.AsyncFD)
  1452. retFuture.complete((address, client.AsyncFD))
  1453. except:
  1454. # getAddrString may raise
  1455. client.close()
  1456. retFuture.fail(getCurrentException())
  1457. addRead(socket, cb)
  1458. return retFuture
  1459. when ioselSupportedPlatform:
  1460. proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
  1461. ## Start watching for timeout expiration, and then call the
  1462. ## callback `cb`.
  1463. ## `timeout` - time in milliseconds,
  1464. ## `oneshot` - if `true` only one event will be dispatched,
  1465. ## if `false` continuous events every `timeout` milliseconds.
  1466. let p = getGlobalDispatcher()
  1467. var data = newAsyncData()
  1468. data.readList.add(cb)
  1469. p.selector.registerTimer(timeout, oneshot, data)
  1470. proc addSignal*(signal: int, cb: Callback) =
  1471. ## Start watching signal `signal`, and when signal appears, call the
  1472. ## callback `cb`.
  1473. let p = getGlobalDispatcher()
  1474. var data = newAsyncData()
  1475. data.readList.add(cb)
  1476. p.selector.registerSignal(signal, data)
  1477. proc addProcess*(pid: int, cb: Callback) =
  1478. ## Start watching for process exit with pid `pid`, and then call
  1479. ## the callback `cb`.
  1480. let p = getGlobalDispatcher()
  1481. var data = newAsyncData()
  1482. data.readList.add(cb)
  1483. p.selector.registerProcess(pid, data)
  1484. proc newAsyncEvent*(): AsyncEvent =
  1485. ## Creates new `AsyncEvent`.
  1486. result = AsyncEvent(newSelectEvent())
  1487. proc trigger*(ev: AsyncEvent) =
  1488. ## Sets new `AsyncEvent` to signaled state.
  1489. trigger(SelectEvent(ev))
  1490. proc close*(ev: AsyncEvent) =
  1491. ## Closes `AsyncEvent`
  1492. close(SelectEvent(ev))
  1493. proc addEvent*(ev: AsyncEvent, cb: Callback) =
  1494. ## Start watching for event `ev`, and call callback `cb`, when
  1495. ## ev will be set to signaled state.
  1496. let p = getGlobalDispatcher()
  1497. var data = newAsyncData()
  1498. data.readList.add(cb)
  1499. p.selector.registerEvent(SelectEvent(ev), data)
  1500. proc drain*(timeout = 500) =
  1501. ## Waits for completion of **all** events and processes them. Raises `ValueError`
  1502. ## if there are no pending operations. In contrast to `poll` this
  1503. ## processes as many events as are available until the timeout has elapsed.
  1504. var curTimeout = timeout
  1505. let start = now()
  1506. while hasPendingOperations():
  1507. discard runOnce(curTimeout)
  1508. curTimeout -= (now() - start).inMilliseconds.int
  1509. if curTimeout < 0:
  1510. break
  1511. proc poll*(timeout = 500) =
  1512. ## Waits for completion events and processes them. Raises `ValueError`
  1513. ## if there are no pending operations. This runs the underlying OS
  1514. ## `epoll`:idx: or `kqueue`:idx: primitive only once.
  1515. discard runOnce(timeout)
  1516. template createAsyncNativeSocketImpl(domain, sockType, protocol: untyped,
  1517. inheritable = defined(nimInheritHandles)) =
  1518. let handle = createNativeSocket(domain, sockType, protocol, inheritable)
  1519. if handle == osInvalidSocket:
  1520. return osInvalidSocket.AsyncFD
  1521. handle.setBlocking(false)
  1522. when defined(macosx) and not defined(nimdoc):
  1523. handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
  1524. result = handle.AsyncFD
  1525. register(result)
  1526. proc createAsyncNativeSocket*(domain: cint, sockType: cint,
  1527. protocol: cint,
  1528. inheritable = defined(nimInheritHandles)): AsyncFD =
  1529. createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
  1530. proc createAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
  1531. sockType: SockType = SOCK_STREAM,
  1532. protocol: Protocol = IPPROTO_TCP,
  1533. inheritable = defined(nimInheritHandles)): AsyncFD =
  1534. createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
  1535. when defined(windows) or defined(nimdoc):
  1536. proc bindToDomain(handle: SocketHandle, domain: Domain) =
  1537. # Extracted into a separate proc, because connect() on Windows requires
  1538. # the socket to be initially bound.
  1539. template doBind(saddr) =
  1540. if bindAddr(handle, cast[ptr SockAddr](addr(saddr)),
  1541. sizeof(saddr).SockLen) < 0'i32:
  1542. raiseOSError(osLastError())
  1543. if domain == Domain.AF_INET6:
  1544. var saddr: Sockaddr_in6
  1545. saddr.sin6_family = uint16(toInt(domain))
  1546. doBind(saddr)
  1547. else:
  1548. var saddr: Sockaddr_in
  1549. saddr.sin_family = uint16(toInt(domain))
  1550. doBind(saddr)
  1551. proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
  1552. let retFuture = newFuture[void]("doConnect")
  1553. result = retFuture
  1554. var ol = newCustom()
  1555. ol.data = CompletionData(fd: socket, cb:
  1556. proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
  1557. if not retFuture.finished:
  1558. if errcode == OSErrorCode(-1):
  1559. const SO_UPDATE_CONNECT_CONTEXT = 0x7010
  1560. socket.SocketHandle.setSockOptInt(SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, 1) # 15022
  1561. retFuture.complete()
  1562. else:
  1563. retFuture.fail(newException(OSError, osErrorMsg(errcode)))
  1564. )
  1565. let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr,
  1566. cint(addrInfo.ai_addrlen), nil, 0, nil,
  1567. cast[POVERLAPPED](ol))
  1568. if ret:
  1569. # Request to connect completed immediately.
  1570. retFuture.complete()
  1571. # We don't deallocate `ol` here because even though this completed
  1572. # immediately poll will still be notified about its completion and it
  1573. # will free `ol`.
  1574. else:
  1575. let lastError = osLastError()
  1576. if lastError.int32 != ERROR_IO_PENDING:
  1577. # With ERROR_IO_PENDING `ol` will be deallocated in `poll`,
  1578. # and the future will be completed/failed there, too.
  1579. GC_unref(ol)
  1580. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1581. else:
  1582. proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
  1583. let retFuture = newFuture[void]("doConnect")
  1584. result = retFuture
  1585. proc cb(fd: AsyncFD): bool =
  1586. let ret = SocketHandle(fd).getSockOptInt(
  1587. cint(SOL_SOCKET), cint(SO_ERROR))
  1588. if ret == 0:
  1589. # We have connected.
  1590. retFuture.complete()
  1591. return true
  1592. elif ret == EINTR:
  1593. # interrupted, keep waiting
  1594. return false
  1595. else:
  1596. retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
  1597. return true
  1598. let ret = connect(socket.SocketHandle,
  1599. addrInfo.ai_addr,
  1600. addrInfo.ai_addrlen.SockLen)
  1601. if ret == 0:
  1602. # Request to connect completed immediately.
  1603. retFuture.complete()
  1604. else:
  1605. let lastError = osLastError()
  1606. if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
  1607. addWrite(socket, cb)
  1608. else:
  1609. retFuture.fail(newException(OSError, osErrorMsg(lastError)))
  1610. template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped,
  1611. protocol: Protocol = IPPROTO_RAW) =
  1612. ## Iterates through the AddrInfo linked list asynchronously
  1613. ## until the connection can be established.
  1614. const shouldCreateFd = not declared(fd)
  1615. when shouldCreateFd:
  1616. let sockType = protocol.toSockType()
  1617. var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD]
  1618. for i in low(fdPerDomain)..high(fdPerDomain):
  1619. fdPerDomain[i] = osInvalidSocket.AsyncFD
  1620. template closeUnusedFds(domainToKeep = -1) {.dirty.} =
  1621. for i, fd in fdPerDomain:
  1622. if fd != osInvalidSocket.AsyncFD and i != domainToKeep:
  1623. fd.closeSocket()
  1624. var lastException: ref Exception
  1625. var curAddrInfo = addrInfo
  1626. var domain: Domain
  1627. when shouldCreateFd:
  1628. var curFd: AsyncFD
  1629. else:
  1630. var curFd = fd
  1631. proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} =
  1632. if fut == nil or fut.failed:
  1633. if fut != nil:
  1634. lastException = fut.readError()
  1635. while curAddrInfo != nil:
  1636. let domainOpt = curAddrInfo.ai_family.toKnownDomain()
  1637. if domainOpt.isSome:
  1638. domain = domainOpt.unsafeGet()
  1639. break
  1640. curAddrInfo = curAddrInfo.ai_next
  1641. if curAddrInfo == nil:
  1642. freeAddrInfo(addrInfo)
  1643. when shouldCreateFd:
  1644. closeUnusedFds()
  1645. if lastException != nil:
  1646. retFuture.fail(lastException)
  1647. else:
  1648. retFuture.fail(newException(
  1649. IOError, "Couldn't resolve address: " & address))
  1650. return
  1651. when shouldCreateFd:
  1652. curFd = fdPerDomain[ord(domain)]
  1653. if curFd == osInvalidSocket.AsyncFD:
  1654. try:
  1655. curFd = createAsyncNativeSocket(domain, sockType, protocol)
  1656. except:
  1657. freeAddrInfo(addrInfo)
  1658. closeUnusedFds()
  1659. raise getCurrentException()
  1660. when defined(windows):
  1661. curFd.SocketHandle.bindToDomain(domain)
  1662. fdPerDomain[ord(domain)] = curFd
  1663. doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo
  1664. curAddrInfo = curAddrInfo.ai_next
  1665. else:
  1666. freeAddrInfo(addrInfo)
  1667. when shouldCreateFd:
  1668. closeUnusedFds(ord(domain))
  1669. retFuture.complete(curFd)
  1670. else:
  1671. retFuture.complete()
  1672. tryNextAddrInfo(nil)
  1673. proc dial*(address: string, port: Port,
  1674. protocol: Protocol = IPPROTO_TCP): owned(Future[AsyncFD]) =
  1675. ## Establishes connection to the specified `address`:`port` pair via the
  1676. ## specified protocol. The procedure iterates through possible
  1677. ## resolutions of the `address` until it succeeds, meaning that it
  1678. ## seamlessly works with both IPv4 and IPv6.
  1679. ## Returns the async file descriptor, registered in the dispatcher of
  1680. ## the current thread, ready to send or receive data.
  1681. let retFuture = newFuture[AsyncFD]("dial")
  1682. result = retFuture
  1683. let sockType = protocol.toSockType()
  1684. let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol)
  1685. asyncAddrInfoLoop(aiList, noFD, protocol)
  1686. proc connect*(socket: AsyncFD, address: string, port: Port,
  1687. domain = Domain.AF_INET): owned(Future[void]) =
  1688. let retFuture = newFuture[void]("connect")
  1689. result = retFuture
  1690. when defined(windows):
  1691. verifyPresence(socket)
  1692. else:
  1693. assert getSockDomain(socket.SocketHandle) == domain
  1694. let aiList = getAddrInfo(address, port, domain)
  1695. when defined(windows):
  1696. socket.SocketHandle.bindToDomain(domain)
  1697. asyncAddrInfoLoop(aiList, socket)
  1698. proc sleepAsync*(ms: int | float): owned(Future[void]) =
  1699. ## Suspends the execution of the current async procedure for the next
  1700. ## `ms` milliseconds.
  1701. var retFuture = newFuture[void]("sleepAsync")
  1702. let p = getGlobalDispatcher()
  1703. when ms is int:
  1704. p.timers.push((getMonoTime() + initDuration(milliseconds = ms), retFuture))
  1705. elif ms is float:
  1706. let ns = (ms * 1_000_000).int64
  1707. p.timers.push((getMonoTime() + initDuration(nanoseconds = ns), retFuture))
  1708. return retFuture
  1709. proc withTimeout*[T](fut: Future[T], timeout: int): owned(Future[bool]) =
  1710. ## Returns a future which will complete once `fut` completes or after
  1711. ## `timeout` milliseconds has elapsed.
  1712. ##
  1713. ## If `fut` completes first the returned future will hold true,
  1714. ## otherwise, if `timeout` milliseconds has elapsed first, the returned
  1715. ## future will hold false.
  1716. var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`")
  1717. var timeoutFuture = sleepAsync(timeout)
  1718. fut.callback =
  1719. proc () =
  1720. if not retFuture.finished:
  1721. if fut.failed:
  1722. retFuture.fail(fut.error)
  1723. else:
  1724. retFuture.complete(true)
  1725. timeoutFuture.callback =
  1726. proc () =
  1727. if not retFuture.finished: retFuture.complete(false)
  1728. return retFuture
  1729. proc accept*(socket: AsyncFD,
  1730. flags = {SocketFlag.SafeDisconn},
  1731. inheritable = defined(nimInheritHandles)): owned(Future[AsyncFD]) =
  1732. ## Accepts a new connection. Returns a future containing the client socket
  1733. ## corresponding to that connection.
  1734. ##
  1735. ## If `inheritable` is false (the default), the resulting client socket
  1736. ## will not be inheritable by child processes.
  1737. ##
  1738. ## The future will complete when the connection is successfully accepted.
  1739. var retFut = newFuture[AsyncFD]("accept")
  1740. var fut = acceptAddr(socket, flags, inheritable)
  1741. fut.callback =
  1742. proc (future: Future[tuple[address: string, client: AsyncFD]]) =
  1743. assert future.finished
  1744. if future.failed:
  1745. retFut.fail(future.error)
  1746. else:
  1747. retFut.complete(future.read.client)
  1748. return retFut
  1749. proc keepAlive(x: string) =
  1750. discard "mark 'x' as escaping so that it is put into a closure for us to keep the data alive"
  1751. proc send*(socket: AsyncFD, data: string,
  1752. flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
  1753. ## Sends `data` to `socket`. The returned future will complete once all
  1754. ## data has been sent.
  1755. var retFuture = newFuture[void]("send")
  1756. if data.len > 0:
  1757. let sendFut = socket.send(unsafeAddr data[0], data.len, flags)
  1758. sendFut.callback =
  1759. proc () =
  1760. keepAlive(data)
  1761. if sendFut.failed:
  1762. retFuture.fail(sendFut.error)
  1763. else:
  1764. retFuture.complete()
  1765. else:
  1766. retFuture.complete()
  1767. return retFuture
  1768. # -- Await Macro
  1769. import asyncmacro
  1770. export asyncmacro
  1771. proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} =
  1772. ## Returns a future that will complete when all the string data from the
  1773. ## specified future stream is retrieved.
  1774. result = ""
  1775. while true:
  1776. let (hasValue, value) = await future.read()
  1777. if hasValue:
  1778. result.add(value)
  1779. else:
  1780. break
  1781. proc callSoon(cbproc: proc () {.gcsafe.}) =
  1782. getGlobalDispatcher().callbacks.addLast(cbproc)
  1783. proc runForever*() =
  1784. ## Begins a never ending global dispatcher poll loop.
  1785. while true:
  1786. poll()
  1787. proc waitFor*[T](fut: Future[T]): T =
  1788. ## **Blocks** the current thread until the specified future completes.
  1789. while not fut.finished:
  1790. poll()
  1791. fut.read
  1792. proc activeDescriptors*(): int {.inline.} =
  1793. ## Returns the current number of active file descriptors for the current
  1794. ## event loop. This is a cheap operation that does not involve a system call.
  1795. when defined(windows):
  1796. result = getGlobalDispatcher().handles.len
  1797. elif not defined(nimdoc):
  1798. result = getGlobalDispatcher().selector.count
  1799. when defined(posix):
  1800. import posix
  1801. when defined(linux) or defined(windows) or defined(macosx) or defined(bsd) or
  1802. defined(solaris) or defined(zephyr) or defined(freertos):
  1803. proc maxDescriptors*(): int {.raises: OSError.} =
  1804. ## Returns the maximum number of active file descriptors for the current
  1805. ## process. This involves a system call. For now `maxDescriptors` is
  1806. ## supported on the following OSes: Windows, Linux, OSX, BSD, Solaris.
  1807. when defined(windows):
  1808. result = 16_700_000
  1809. elif defined(zephyr) or defined(freertos):
  1810. result = FD_MAX
  1811. else:
  1812. var fdLim: RLimit
  1813. if getrlimit(RLIMIT_NOFILE, fdLim) < 0:
  1814. raiseOSError(osLastError())
  1815. result = int(fdLim.rlim_cur) - 1
  1816. when defined(genode):
  1817. proc scheduleCallbacks*(): bool {.discardable.} =
  1818. ## *Genode only.*
  1819. ## Schedule callback processing and return immediately.
  1820. ## Returns `false` if there is nothing to schedule.
  1821. ## RPC servers should call this to dispatch `callSoon`
  1822. ## bodies after retiring an RPC to its client.
  1823. ## This is effectively a non-blocking `poll(…)` and is
  1824. ## equivalent to scheduling a momentary no-op timeout
  1825. ## but faster and with less overhead.
  1826. let dis = getGlobalDispatcher()
  1827. result = dis.callbacks.len > 0
  1828. if result: submit(dis.signalHandler.cap)