123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055 |
- #
- #
- # Nim's Runtime Library
- # (c) Copyright 2015 Dominik Picheta
- #
- # See the file "copying.txt", included in this
- # distribution, for details about the copyright.
- #
- ## This module implements asynchronous IO. This includes a dispatcher,
- ## a `Future` type implementation, and an `async` macro which allows
- ## asynchronous code to be written in a synchronous style with the `await`
- ## keyword.
- ##
- ## The dispatcher acts as a kind of event loop. You must call `poll` on it
- ## (or a function which does so for you such as `waitFor` or `runForever`)
- ## in order to poll for any outstanding events. The underlying implementation
- ## is based on epoll on Linux, IO Completion Ports on Windows and select on
- ## other operating systems.
- ##
- ## The `poll` function will not, on its own, return any events. Instead
- ## an appropriate `Future` object will be completed. A `Future` is a
- ## type which holds a value which is not yet available, but which *may* be
- ## available in the future. You can check whether a future is finished
- ## by using the `finished` function. When a future is finished it means that
- ## either the value that it holds is now available or it holds an error instead.
- ## The latter situation occurs when the operation to complete a future fails
- ## with an exception. You can distinguish between the two situations with the
- ## `failed` function.
- ##
- ## Future objects can also store a callback procedure which will be called
- ## automatically once the future completes.
- ##
- ## Futures therefore can be thought of as an implementation of the proactor
- ## pattern. In this
- ## pattern you make a request for an action, and once that action is fulfilled
- ## a future is completed with the result of that action. Requests can be
- ## made by calling the appropriate functions. For example: calling the `recv`
- ## function will create a request for some data to be read from a socket. The
- ## future which the `recv` function returns will then complete once the
- ## requested amount of data is read **or** an exception occurs.
- ##
- ## Code to read some data from a socket may look something like this:
- ## ```Nim
- ## var future = socket.recv(100)
- ## future.addCallback(
- ## proc () =
- ## echo(future.read)
- ## )
- ## ```
- ##
- ## All asynchronous functions returning a `Future` will not block. They
- ## will not however return immediately. An asynchronous function will have
- ## code which will be executed before an asynchronous request is made, in most
- ## cases this code sets up the request.
- ##
- ## In the above example, the `recv` function will return a brand new
- ## `Future` instance once the request for data to be read from the socket
- ## is made. This `Future` instance will complete once the requested amount
- ## of data is read, in this case it is 100 bytes. The second line sets a
- ## callback on this future which will be called once the future completes.
- ## All the callback does is write the data stored in the future to `stdout`.
- ## The `read` function is used for this and it checks whether the future
- ## completes with an error for you (if it did, it will simply raise the
- ## error), if there is no error, however, it returns the value of the future.
- ##
- ## Asynchronous procedures
- ## =======================
- ##
- ## Asynchronous procedures remove the pain of working with callbacks. They do
- ## this by allowing you to write asynchronous code the same way as you would
- ## write synchronous code.
- ##
- ## An asynchronous procedure is marked using the `{.async.}` pragma.
- ## When marking a procedure with the `{.async.}` pragma it must have a
- ## `Future[T]` return type or no return type at all. If you do not specify
- ## a return type then `Future[void]` is assumed.
- ##
- ## Inside asynchronous procedures `await` can be used to call any
- ## procedures which return a
- ## `Future`; this includes asynchronous procedures. When a procedure is
- ## "awaited", the asynchronous procedure it is awaited in will
- ## suspend its execution
- ## until the awaited procedure's Future completes. At which point the
- ## asynchronous procedure will resume its execution. During the period
- ## when an asynchronous procedure is suspended other asynchronous procedures
- ## will be run by the dispatcher.
- ##
- ## The `await` call may be used in many contexts. It can be used on the right
- ## hand side of a variable declaration: `var data = await socket.recv(100)`,
- ## in which case the variable will be set to the value of the future
- ## automatically. It can be used to await a `Future` object, and it can
- ## be used to await a procedure returning a `Future[void]`:
- ## `await socket.send("foobar")`.
- ##
- ## If an awaited future completes with an error, then `await` will re-raise
- ## this error. To avoid this, you can use the `yield` keyword instead of
- ## `await`. The following section shows different ways that you can handle
- ## exceptions in async procs.
- ##
- ## .. caution::
- ## Procedures marked {.async.} do not support mutable parameters such
- ## as `var int`. References such as `ref int` should be used instead.
- ##
- ## Handling Exceptions
- ## -------------------
- ##
- ## You can handle exceptions in the same way as in ordinary Nim code;
- ## by using the try statement:
- ##
- ## ```Nim
- ## try:
- ## let data = await sock.recv(100)
- ## echo("Received ", data)
- ## except:
- ## # Handle exception
- ## ```
- ##
- ## An alternative approach to handling exceptions is to use `yield` on a future
- ## then check the future's `failed` property. For example:
- ##
- ## ```Nim
- ## var future = sock.recv(100)
- ## yield future
- ## if future.failed:
- ## # Handle exception
- ## ```
- ##
- ##
- ## Discarding futures
- ## ==================
- ##
- ## Futures should **never** be discarded directly because they may contain
- ## errors. If you do not care for the result of a Future then you should use
- ## the `asyncCheck` procedure instead of the `discard` keyword. Note that this
- ## does not wait for completion, and you should use `waitFor` or `await` for that purpose.
- ##
- ## .. note:: `await` also checks if the future fails, so you can safely discard
- ## its result.
- ##
- ## Handling futures
- ## ================
- ##
- ## There are many different operations that apply to a future.
- ## The three primary high-level operations are `asyncCheck`,
- ## `waitFor`, and `await`.
- ##
- ## * `asyncCheck`: Raises an exception if the future fails. It neither waits
- ## for the future to finish nor returns the result of the future.
- ## * `waitFor`: Polls the event loop and blocks the current thread until the
- ## future finishes. This is often used to call an async procedure from a
- ## synchronous context and should never be used in an `async` proc.
- ## * `await`: Pauses execution in the current async procedure until the future
- ## finishes. While the current procedure is paused, other async procedures will
- ## continue running. Should be used instead of `waitFor` in an async
- ## procedure.
- ##
- ## Here is a handy quick reference chart showing their high-level differences:
- ## ============== ===================== =======================
- ## Procedure Context Blocking
- ## ============== ===================== =======================
- ## `asyncCheck` non-async and async non-blocking
- ## `waitFor` non-async blocks current thread
- ## `await` async suspends current proc
- ## ============== ===================== =======================
- ##
- ## Examples
- ## ========
- ##
- ## For examples take a look at the documentation for the modules implementing
- ## asynchronous IO. A good place to start is the
- ## `asyncnet module <asyncnet.html>`_.
- ##
- ## Investigating pending futures
- ## =============================
- ##
- ## It's possible to get into a situation where an async proc, or more accurately
- ## a `Future[T]` gets stuck and
- ## never completes. This can happen for various reasons and can cause serious
- ## memory leaks. When this occurs it's hard to identify the procedure that is
- ## stuck.
- ##
- ## Thankfully there is a mechanism which tracks the count of each pending future.
- ## All you need to do to enable it is compile with `-d:futureLogging` and
- ## use the `getFuturesInProgress` procedure to get the list of pending futures
- ## together with the stack traces to the moment of their creation.
- ##
- ## You may also find it useful to use this
- ## `prometheus package <https://github.com/dom96/prometheus>`_ which will log
- ## the pending futures into prometheus, allowing you to analyse them via a nice
- ## graph.
- ##
- ##
- ##
- ## Limitations/Bugs
- ## ================
- ##
- ## * The effect system (`raises: []`) does not work with async procedures.
- ## * Mutable parameters are not supported by async procedures.
- ##
- ##
- ## Multiple async backend support
- ## ==============================
- ##
- ## Thanks to its powerful macro support, Nim allows ``async``/``await`` to be
- ## implemented in libraries with only minimal support from the language - as
- ## such, multiple ``async`` libraries exist, including ``asyncdispatch`` and
- ## ``chronos``, and more may come to be developed in the future.
- ##
- ## Libraries built on top of async/await may wish to support multiple async
- ## backends - the best way to do so is to create separate modules for each backend
- ## that may be imported side-by-side.
- ##
- ## An alternative way is to select backend using a global compile flag - this
- ## method makes it difficult to compose applications that use both backends as may
- ## happen with transitive dependencies, but may be appropriate in some cases -
- ## libraries choosing this path should call the flag `asyncBackend`, allowing
- ## applications to choose the backend with `-d:asyncBackend=<backend_name>`.
- ##
- ## Known `async` backends include:
- ##
- ## * `-d:asyncBackend=none`: disable `async` support completely
- ## * `-d:asyncBackend=asyncdispatch`: https://nim-lang.org/docs/asyncdispatch.html
- ## * `-d:asyncBackend=chronos`: https://github.com/status-im/nim-chronos/
- ##
- ## ``none`` can be used when a library supports both a synchronous and
- ## asynchronous API, to disable the latter.
- import os, tables, strutils, times, heapqueue, options, asyncstreams
- import options, math, std/monotimes
- import asyncfutures except callSoon
- import nativesockets, net, deques
- when defined(nimPreviewSlimSystem):
- import std/[assertions, syncio]
- export Port, SocketFlag
- export asyncfutures except callSoon
- export asyncstreams
- # TODO: Check if yielded future is nil and throw a more meaningful exception
- type
- PDispatcherBase = ref object of RootRef
- timers*: HeapQueue[tuple[finishAt: MonoTime, fut: Future[void]]]
- callbacks*: Deque[proc () {.gcsafe.}]
- proc processTimers(
- p: PDispatcherBase, didSomeWork: var bool
- ): Option[int] {.inline.} =
- # Pop the timers in the order in which they will expire (smaller `finishAt`).
- var count = p.timers.len
- let t = getMonoTime()
- while count > 0 and t >= p.timers[0].finishAt:
- p.timers.pop().fut.complete()
- dec count
- didSomeWork = true
- # Return the number of milliseconds in which the next timer will expire.
- if p.timers.len == 0: return
- let millisecs = (p.timers[0].finishAt - getMonoTime()).inMilliseconds
- return some(millisecs.int + 1)
- proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
- while p.callbacks.len > 0:
- var cb = p.callbacks.popFirst()
- cb()
- didSomeWork = true
- proc adjustTimeout(
- p: PDispatcherBase, pollTimeout: int, nextTimer: Option[int]
- ): int {.inline.} =
- if p.callbacks.len != 0:
- return 0
- if nextTimer.isNone() or pollTimeout == -1:
- return pollTimeout
- result = max(nextTimer.get(), 0)
- result = min(pollTimeout, result)
- proc runOnce(timeout: int): bool {.gcsafe.}
- proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.}
- ## Schedule `cbproc` to be called as soon as possible.
- ## The callback is called when control returns to the event loop.
- proc initCallSoonProc =
- if asyncfutures.getCallSoonProc().isNil:
- asyncfutures.setCallSoonProc(callSoon)
- template implementSetInheritable() {.dirty.} =
- when declared(setInheritable):
- proc setInheritable*(fd: AsyncFD, inheritable: bool): bool =
- ## Control whether a file handle can be inherited by child processes.
- ## Returns `true` on success.
- ##
- ## This procedure is not guaranteed to be available for all platforms.
- ## Test for availability with `declared() <system.html#declared,untyped>`_.
- fd.FileHandle.setInheritable(inheritable)
- when defined(windows) or defined(nimdoc):
- import winlean, sets, hashes
- type
- CompletionKey = ULONG_PTR
- CompletionData* = object
- fd*: AsyncFD # TODO: Rename this.
- cb*: owned(proc (fd: AsyncFD, bytesTransferred: DWORD,
- errcode: OSErrorCode) {.closure, gcsafe.})
- cell*: ForeignCell # we need this `cell` to protect our `cb` environment,
- # when using RegisterWaitForSingleObject, because
- # waiting is done in different thread.
- PDispatcher* = ref object of PDispatcherBase
- ioPort: Handle
- handles*: HashSet[AsyncFD] # Export handles so that an external library can register them.
- CustomObj = object of OVERLAPPED
- data*: CompletionData
- CustomRef* = ref CustomObj
- AsyncFD* = distinct int
- PostCallbackData = object
- ioPort: Handle
- handleFd: AsyncFD
- waitFd: Handle
- ovl: owned CustomRef
- PostCallbackDataPtr = ptr PostCallbackData
- AsyncEventImpl = object
- hEvent: Handle
- hWaiter: Handle
- pcd: PostCallbackDataPtr
- AsyncEvent* = ptr AsyncEventImpl
- Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
- proc hash(x: AsyncFD): Hash {.borrow.}
- proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
- proc newDispatcher*(): owned PDispatcher =
- ## Creates a new Dispatcher instance.
- new result
- result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
- result.handles = initHashSet[AsyncFD]()
- result.timers.clear()
- result.callbacks = initDeque[proc () {.closure, gcsafe.}](64)
- var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
- proc setGlobalDispatcher*(disp: sink PDispatcher) =
- if not gDisp.isNil:
- assert gDisp.callbacks.len == 0
- gDisp = disp
- initCallSoonProc()
- proc getGlobalDispatcher*(): PDispatcher =
- if gDisp.isNil:
- setGlobalDispatcher(newDispatcher())
- result = gDisp
- proc getIoHandler*(disp: PDispatcher): Handle =
- ## Returns the underlying IO Completion Port handle (Windows) or selector
- ## (Unix) for the specified dispatcher.
- return disp.ioPort
- proc register*(fd: AsyncFD) =
- ## Registers `fd` with the dispatcher.
- let p = getGlobalDispatcher()
- if createIoCompletionPort(fd.Handle, p.ioPort,
- cast[CompletionKey](fd), 1) == 0:
- raiseOSError(osLastError())
- p.handles.incl(fd)
- proc verifyPresence(fd: AsyncFD) =
- ## Ensures that file descriptor has been registered with the dispatcher.
- ## Raises ValueError if `fd` has not been registered.
- let p = getGlobalDispatcher()
- if fd notin p.handles:
- raise newException(ValueError,
- "Operation performed on a socket which has not been registered with" &
- " the dispatcher yet.")
- proc hasPendingOperations*(): bool =
- ## Returns `true` if the global dispatcher has pending operations.
- let p = getGlobalDispatcher()
- p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
- proc runOnce(timeout: int): bool =
- let p = getGlobalDispatcher()
- if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
- raise newException(ValueError,
- "No handles or timers registered in dispatcher.")
- result = false
- let nextTimer = processTimers(p, result)
- let at = adjustTimeout(p, timeout, nextTimer)
- var llTimeout =
- if at == -1: winlean.INFINITE
- else: at.int32
- var lpNumberOfBytesTransferred: DWORD
- var lpCompletionKey: ULONG_PTR
- var customOverlapped: CustomRef
- let res = getQueuedCompletionStatus(p.ioPort,
- addr lpNumberOfBytesTransferred, addr lpCompletionKey,
- cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
- result = true
- # For 'gcDestructors' the destructor of 'customOverlapped' will
- # be called at the end and we are the only owner here. This means
- # We do not have to 'GC_unref(customOverlapped)' because the destructor
- # does that for us.
- # http://stackoverflow.com/a/12277264/492186
- # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
- if res:
- # This is useful for ensuring the reliability of the overlapped struct.
- assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
- customOverlapped.data.cb(customOverlapped.data.fd,
- lpNumberOfBytesTransferred, OSErrorCode(-1))
- # If cell.data != nil, then system.protect(rawEnv(cb)) was called,
- # so we need to dispose our `cb` environment, because it is not needed
- # anymore.
- if customOverlapped.data.cell.data != nil:
- system.dispose(customOverlapped.data.cell)
- when not defined(gcDestructors):
- GC_unref(customOverlapped)
- else:
- let errCode = osLastError()
- if customOverlapped != nil:
- assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
- customOverlapped.data.cb(customOverlapped.data.fd,
- lpNumberOfBytesTransferred, errCode)
- if customOverlapped.data.cell.data != nil:
- system.dispose(customOverlapped.data.cell)
- when not defined(gcDestructors):
- GC_unref(customOverlapped)
- else:
- if errCode.int32 == WAIT_TIMEOUT:
- # Timed out
- result = false
- else: raiseOSError(errCode)
- # Timer processing.
- discard processTimers(p, result)
- # Callback queue processing
- processPendingCallbacks(p, result)
- var acceptEx: WSAPROC_ACCEPTEX
- var connectEx: WSAPROC_CONNECTEX
- var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS
- proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
- # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
- var bytesRet: DWORD
- fun = nil
- result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
- sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD,
- addr bytesRet, nil, nil) == 0
- proc initAll() =
- let dummySock = createNativeSocket()
- if dummySock == INVALID_SOCKET:
- raiseOSError(osLastError())
- var fun: pointer = nil
- if not initPointer(dummySock, fun, WSAID_CONNECTEX):
- raiseOSError(osLastError())
- connectEx = cast[WSAPROC_CONNECTEX](fun)
- if not initPointer(dummySock, fun, WSAID_ACCEPTEX):
- raiseOSError(osLastError())
- acceptEx = cast[WSAPROC_ACCEPTEX](fun)
- if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS):
- raiseOSError(osLastError())
- getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
- close(dummySock)
- proc newCustom*(): CustomRef =
- result = CustomRef() # 0
- GC_ref(result) # 1 prevent destructor from doing a premature free.
- # destructor of newCustom's caller --> 0. This means
- # Windows holds a ref for us with RC == 0 (single owner).
- # This is passed back to us in the IO completion port.
- proc recv*(socket: AsyncFD, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
- ## Reads **up to** `size` bytes from `socket`. Returned future will
- ## complete once all the data requested is read, a part of the data has been
- ## read, or the socket has disconnected in which case the future will
- ## complete with a value of `""`.
- ##
- ## .. warning:: The `Peek` socket flag is not supported on Windows.
- # Things to note:
- # * When WSARecv completes immediately then `bytesReceived` is very
- # unreliable.
- # * Still need to implement message-oriented socket disconnection,
- # '\0' in the message currently signifies a socket disconnect. Who
- # knows what will happen when someone sends that to our socket.
- verifyPresence(socket)
- assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
- var retFuture = newFuture[string]("recv")
- var dataBuf: TWSABuf
- dataBuf.buf = cast[cstring](alloc0(size))
- dataBuf.len = size.ULONG
- var bytesReceived: DWORD
- var flagsio = flags.toOSFlags().DWORD
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- if bytesCount == 0 and dataBuf.buf[0] == '\0':
- retFuture.complete("")
- else:
- var data = newString(bytesCount)
- assert bytesCount <= size
- copyMem(addr data[0], addr dataBuf.buf[0], bytesCount)
- retFuture.complete($data)
- else:
- if flags.isDisconnectionError(errcode):
- retFuture.complete("")
- else:
- retFuture.fail(newException(OSError, osErrorMsg(errcode)))
- if dataBuf.buf != nil:
- dealloc dataBuf.buf
- dataBuf.buf = nil
- )
- let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
- addr flagsio, cast[POVERLAPPED](ol), nil)
- if ret == -1:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- if dataBuf.buf != nil:
- dealloc dataBuf.buf
- dataBuf.buf = nil
- GC_unref(ol)
- if flags.isDisconnectionError(err):
- retFuture.complete("")
- else:
- retFuture.fail(newException(OSError, osErrorMsg(err)))
- elif ret == 0:
- # Request completed immediately.
- if bytesReceived != 0:
- var data = newString(bytesReceived)
- assert bytesReceived <= size
- copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived)
- retFuture.complete($data)
- else:
- if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
- retFuture.complete("")
- return retFuture
- proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
- ## Reads **up to** `size` bytes from `socket` into `buf`, which must
- ## at least be of that size. Returned future will complete once all the
- ## data requested is read, a part of the data has been read, or the socket
- ## has disconnected in which case the future will complete with a value of
- ## `0`.
- ##
- ## .. warning:: The `Peek` socket flag is not supported on Windows.
- # Things to note:
- # * When WSARecv completes immediately then `bytesReceived` is very
- # unreliable.
- # * Still need to implement message-oriented socket disconnection,
- # '\0' in the message currently signifies a socket disconnect. Who
- # knows what will happen when someone sends that to our socket.
- verifyPresence(socket)
- assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
- var retFuture = newFuture[int]("recvInto")
- #buf[] = '\0'
- var dataBuf: TWSABuf
- dataBuf.buf = cast[cstring](buf)
- dataBuf.len = size.ULONG
- var bytesReceived: DWORD
- var flagsio = flags.toOSFlags().DWORD
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- retFuture.complete(bytesCount)
- else:
- if flags.isDisconnectionError(errcode):
- retFuture.complete(0)
- else:
- retFuture.fail(newException(OSError, osErrorMsg(errcode)))
- if dataBuf.buf != nil:
- dataBuf.buf = nil
- )
- let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
- addr flagsio, cast[POVERLAPPED](ol), nil)
- if ret == -1:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- if dataBuf.buf != nil:
- dataBuf.buf = nil
- GC_unref(ol)
- if flags.isDisconnectionError(err):
- retFuture.complete(0)
- else:
- retFuture.fail(newException(OSError, osErrorMsg(err)))
- elif ret == 0:
- # Request completed immediately.
- if bytesReceived != 0:
- assert bytesReceived <= size
- retFuture.complete(bytesReceived)
- else:
- if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
- retFuture.complete(bytesReceived)
- return retFuture
- proc send*(socket: AsyncFD, buf: pointer, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
- ## Sends `size` bytes from `buf` to `socket`. The returned future
- ## will complete once all data has been sent.
- ##
- ## .. warning:: Use it with caution. If `buf` refers to GC'ed object,
- ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
- verifyPresence(socket)
- var retFuture = newFuture[void]("send")
- var dataBuf: TWSABuf
- dataBuf.buf = cast[cstring](buf)
- dataBuf.len = size.ULONG
- var bytesReceived, lowFlags: DWORD
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- retFuture.complete()
- else:
- if flags.isDisconnectionError(errcode):
- retFuture.complete()
- else:
- retFuture.fail(newOSError(errcode))
- )
- let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
- lowFlags, cast[POVERLAPPED](ol), nil)
- if ret == -1:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- GC_unref(ol)
- if flags.isDisconnectionError(err):
- retFuture.complete()
- else:
- retFuture.fail(newException(OSError, osErrorMsg(err)))
- else:
- retFuture.complete()
- # We don't deallocate `ol` here because even though this completed
- # immediately poll will still be notified about its completion and it will
- # free `ol`.
- return retFuture
- proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
- saddrLen: SockLen,
- flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
- ## Sends `data` to specified destination `saddr`, using
- ## socket `socket`. The returned future will complete once all data
- ## has been sent.
- verifyPresence(socket)
- var retFuture = newFuture[void]("sendTo")
- var dataBuf: TWSABuf
- dataBuf.buf = cast[cstring](data)
- dataBuf.len = size.ULONG
- var bytesSent = 0.DWORD
- var lowFlags = 0.DWORD
- # we will preserve address in our stack
- var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
- var stalen: cint = cint(saddrLen)
- zeroMem(addr(staddr[0]), 128)
- copyMem(addr(staddr[0]), saddr, saddrLen)
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- retFuture.complete()
- else:
- retFuture.fail(newException(OSError, osErrorMsg(errcode)))
- )
- let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent,
- lowFlags, cast[ptr SockAddr](addr(staddr[0])),
- stalen, cast[POVERLAPPED](ol), nil)
- if ret == -1:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- GC_unref(ol)
- retFuture.fail(newException(OSError, osErrorMsg(err)))
- else:
- retFuture.complete()
- # We don't deallocate `ol` here because even though this completed
- # immediately poll will still be notified about its completion and it will
- # free `ol`.
- return retFuture
- proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
- saddr: ptr SockAddr, saddrLen: ptr SockLen,
- flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
- ## Receives a datagram data from `socket` into `buf`, which must
- ## be at least of size `size`, address of datagram's sender will be
- ## stored into `saddr` and `saddrLen`. Returned future will complete
- ## once one datagram has been received, and will return size of packet
- ## received.
- verifyPresence(socket)
- var retFuture = newFuture[int]("recvFromInto")
- var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG)
- var bytesReceived = 0.DWORD
- var lowFlags = 0.DWORD
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- assert bytesCount <= size
- retFuture.complete(bytesCount)
- else:
- # datagram sockets don't have disconnection,
- # so we can just raise an exception
- retFuture.fail(newException(OSError, osErrorMsg(errcode)))
- )
- let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1,
- addr bytesReceived, addr lowFlags,
- saddr, cast[ptr cint](saddrLen),
- cast[POVERLAPPED](ol), nil)
- if res == -1:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- GC_unref(ol)
- retFuture.fail(newException(OSError, osErrorMsg(err)))
- else:
- # Request completed immediately.
- if bytesReceived != 0:
- assert bytesReceived <= size
- retFuture.complete(bytesReceived)
- else:
- if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
- retFuture.complete(bytesReceived)
- return retFuture
- proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
- inheritable = defined(nimInheritHandles)):
- owned(Future[tuple[address: string, client: AsyncFD]]) {.gcsafe.} =
- ## Accepts a new connection. Returns a future containing the client socket
- ## corresponding to that connection and the remote address of the client.
- ## The future will complete when the connection is successfully accepted.
- ##
- ## The resulting client socket is automatically registered to the
- ## dispatcher.
- ##
- ## If `inheritable` is false (the default), the resulting client socket will
- ## not be inheritable by child processes.
- ##
- ## The `accept` call may result in an error if the connecting socket
- ## disconnects during the duration of the `accept`. If the `SafeDisconn`
- ## flag is specified then this error will not be raised and instead
- ## accept will be called again.
- verifyPresence(socket)
- var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
- var clientSock = createNativeSocket(inheritable = inheritable)
- if clientSock == osInvalidSocket: raiseOSError(osLastError())
- const lpOutputLen = 1024
- var lpOutputBuf = newString(lpOutputLen)
- var dwBytesReceived: DWORD
- let dwReceiveDataLength = 0.DWORD # We don't want any data to be read.
- let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
- let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
- template failAccept(errcode) =
- if flags.isDisconnectionError(errcode):
- var newAcceptFut = acceptAddr(socket, flags)
- newAcceptFut.callback =
- proc () =
- if newAcceptFut.failed:
- retFuture.fail(newAcceptFut.readError)
- else:
- retFuture.complete(newAcceptFut.read)
- else:
- retFuture.fail(newException(OSError, osErrorMsg(errcode)))
- template completeAccept() {.dirty.} =
- var listenSock = socket
- let setoptRet = setsockopt(clientSock, SOL_SOCKET,
- SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
- sizeof(listenSock).SockLen)
- if setoptRet != 0:
- let errcode = osLastError()
- discard clientSock.closesocket()
- failAccept(errcode)
- else:
- var localSockaddr, remoteSockaddr: ptr SockAddr
- var localLen, remoteLen: int32
- getAcceptExSockAddrs(addr lpOutputBuf[0], dwReceiveDataLength,
- dwLocalAddressLength, dwRemoteAddressLength,
- addr localSockaddr, addr localLen,
- addr remoteSockaddr, addr remoteLen)
- try:
- let address = getAddrString(remoteSockaddr)
- register(clientSock.AsyncFD)
- retFuture.complete((address: address, client: clientSock.AsyncFD))
- except:
- # getAddrString may raise
- clientSock.close()
- retFuture.fail(getCurrentException())
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- completeAccept()
- else:
- failAccept(errcode)
- )
- # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
- let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0],
- dwReceiveDataLength,
- dwLocalAddressLength,
- dwRemoteAddressLength,
- addr dwBytesReceived, cast[POVERLAPPED](ol))
- if not ret:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- failAccept(err)
- GC_unref(ol)
- else:
- completeAccept()
- # We don't deallocate `ol` here because even though this completed
- # immediately poll will still be notified about its completion and it will
- # free `ol`.
- return retFuture
- implementSetInheritable()
- proc closeSocket*(socket: AsyncFD) =
- ## Closes a socket and ensures that it is unregistered.
- socket.SocketHandle.close()
- getGlobalDispatcher().handles.excl(socket)
- proc unregister*(fd: AsyncFD) =
- ## Unregisters `fd`.
- getGlobalDispatcher().handles.excl(fd)
- proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
- return fd in disp.handles
- {.push stackTrace: off.}
- proc waitableCallback(param: pointer,
- timerOrWaitFired: WINBOOL) {.stdcall.} =
- var p = cast[PostCallbackDataPtr](param)
- discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD,
- ULONG_PTR(p.handleFd),
- cast[pointer](p.ovl))
- {.pop.}
- proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: DWORD) =
- let p = getGlobalDispatcher()
- var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).DWORD
- var hEvent = wsaCreateEvent()
- if hEvent == 0:
- raiseOSError(osLastError())
- var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
- pcd.ioPort = p.ioPort
- pcd.handleFd = fd
- var ol = newCustom()
- ol.data = CompletionData(fd: fd, cb:
- proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
- # we excluding our `fd` because cb(fd) can register own handler
- # for this `fd`
- p.handles.excl(fd)
- # unregisterWait() is called before callback, because appropriate
- # winsockets function can re-enable event.
- # https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx
- if unregisterWait(pcd.waitFd) == 0:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- deallocShared(cast[pointer](pcd))
- discard wsaCloseEvent(hEvent)
- raiseOSError(err)
- if cb(fd):
- # callback returned `true`, so we free all allocated resources
- deallocShared(cast[pointer](pcd))
- if not wsaCloseEvent(hEvent):
- raiseOSError(osLastError())
- # pcd.ovl will be unrefed in poll().
- else:
- # callback returned `false` we need to continue
- if p.handles.contains(fd):
- # new callback was already registered with `fd`, so we free all
- # allocated resources. This happens because in callback `cb`
- # addRead/addWrite was called with same `fd`.
- deallocShared(cast[pointer](pcd))
- if not wsaCloseEvent(hEvent):
- raiseOSError(osLastError())
- else:
- # we need to include `fd` again
- p.handles.incl(fd)
- # and register WaitForSingleObject again
- if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
- cast[WAITORTIMERCALLBACK](waitableCallback),
- cast[pointer](pcd), INFINITE, flags):
- # pcd.ovl will be unrefed in poll()
- let err = osLastError()
- deallocShared(cast[pointer](pcd))
- discard wsaCloseEvent(hEvent)
- raiseOSError(err)
- else:
- # we incref `pcd.ovl` and `protect` callback one more time,
- # because it will be unrefed and disposed in `poll()` after
- # callback finishes.
- GC_ref(pcd.ovl)
- pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
- )
- # We need to protect our callback environment value, so GC will not free it
- # accidentally.
- ol.data.cell = system.protect(rawEnv(ol.data.cb))
- # This is main part of `hacky way` is using WSAEventSelect, so `hEvent`
- # will be signaled when appropriate `mask` events will be triggered.
- if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0:
- let err = osLastError()
- GC_unref(ol)
- deallocShared(cast[pointer](pcd))
- discard wsaCloseEvent(hEvent)
- raiseOSError(err)
- pcd.ovl = ol
- if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
- cast[WAITORTIMERCALLBACK](waitableCallback),
- cast[pointer](pcd), INFINITE, flags):
- let err = osLastError()
- GC_unref(ol)
- deallocShared(cast[pointer](pcd))
- discard wsaCloseEvent(hEvent)
- raiseOSError(err)
- p.handles.incl(fd)
- proc addRead*(fd: AsyncFD, cb: Callback) =
- ## Start watching the file descriptor for read availability and then call
- ## the callback `cb`.
- ##
- ## This is not `pure` mechanism for Windows Completion Ports (IOCP),
- ## so if you can avoid it, please do it. Use `addRead` only if really
- ## need it (main usecase is adaptation of unix-like libraries to be
- ## asynchronous on Windows).
- ##
- ## If you use this function, you don't need to use asyncdispatch.recv()
- ## or asyncdispatch.accept(), because they are using IOCP, please use
- ## nativesockets.recv() and nativesockets.accept() instead.
- ##
- ## Be sure your callback `cb` returns `true`, if you want to remove
- ## watch of `read` notifications, and `false`, if you want to continue
- ## receiving notifications.
- registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
- proc addWrite*(fd: AsyncFD, cb: Callback) =
- ## Start watching the file descriptor for write availability and then call
- ## the callback `cb`.
- ##
- ## This is not `pure` mechanism for Windows Completion Ports (IOCP),
- ## so if you can avoid it, please do it. Use `addWrite` only if really
- ## need it (main usecase is adaptation of unix-like libraries to be
- ## asynchronous on Windows).
- ##
- ## If you use this function, you don't need to use asyncdispatch.send()
- ## or asyncdispatch.connect(), because they are using IOCP, please use
- ## nativesockets.send() and nativesockets.connect() instead.
- ##
- ## Be sure your callback `cb` returns `true`, if you want to remove
- ## watch of `write` notifications, and `false`, if you want to continue
- ## receiving notifications.
- registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
- template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
- handleCallback) =
- let handleFD = AsyncFD(hEvent)
- pcd.ioPort = p.ioPort
- pcd.handleFd = handleFD
- var ol = newCustom()
- ol.data.fd = handleFD
- ol.data.cb = handleCallback
- # We need to protect our callback environment value, so GC will not free it
- # accidentally.
- ol.data.cell = system.protect(rawEnv(ol.data.cb))
- pcd.ovl = ol
- if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
- cast[WAITORTIMERCALLBACK](waitableCallback),
- cast[pointer](pcd), timeout.DWORD, flags):
- let err = osLastError()
- GC_unref(ol)
- deallocShared(cast[pointer](pcd))
- discard closeHandle(hEvent)
- raiseOSError(err)
- p.handles.incl(handleFD)
- template closeWaitable(handle: untyped) =
- let waitFd = pcd.waitFd
- deallocShared(cast[pointer](pcd))
- p.handles.excl(fd)
- if unregisterWait(waitFd) == 0:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- discard closeHandle(handle)
- raiseOSError(err)
- if closeHandle(handle) == 0:
- raiseOSError(osLastError())
- proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
- ## Registers callback `cb` to be called when timer expired.
- ##
- ## Parameters:
- ##
- ## * `timeout` - timeout value in milliseconds.
- ## * `oneshot`
- ## * `true` - generate only one timeout event
- ## * `false` - generate timeout events periodically
- doAssert(timeout > 0)
- let p = getGlobalDispatcher()
- var hEvent = createEvent(nil, 1, 0, nil)
- if hEvent == INVALID_HANDLE_VALUE:
- raiseOSError(osLastError())
- var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
- var flags = WT_EXECUTEINWAITTHREAD.DWORD
- if oneshot: flags = flags or WT_EXECUTEONLYONCE
- proc timercb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- let res = cb(fd)
- if res or oneshot:
- closeWaitable(hEvent)
- else:
- # if callback returned `false`, then it wants to be called again, so
- # we need to ref and protect `pcd.ovl` again, because it will be
- # unrefed and disposed in `poll()`.
- GC_ref(pcd.ovl)
- pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
- registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
- proc addProcess*(pid: int, cb: Callback) =
- ## Registers callback `cb` to be called when process with process ID
- ## `pid` exited.
- const NULL = Handle(0)
- let p = getGlobalDispatcher()
- let procFlags = SYNCHRONIZE
- var hProcess = openProcess(procFlags, 0, pid.DWORD)
- if hProcess == NULL:
- raiseOSError(osLastError())
- var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
- var flags = WT_EXECUTEINWAITTHREAD.DWORD or WT_EXECUTEONLYONCE.DWORD
- proc proccb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- closeWaitable(hProcess)
- discard cb(fd)
- registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
- proc newAsyncEvent*(): AsyncEvent =
- ## Creates a new thread-safe `AsyncEvent` object.
- ##
- ## New `AsyncEvent` object is not automatically registered with
- ## dispatcher like `AsyncSocket`.
- var sa = SECURITY_ATTRIBUTES(
- nLength: sizeof(SECURITY_ATTRIBUTES).cint,
- bInheritHandle: 1
- )
- var event = createEvent(addr(sa), 0'i32, 0'i32, nil)
- if event == INVALID_HANDLE_VALUE:
- raiseOSError(osLastError())
- result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
- result.hEvent = event
- proc trigger*(ev: AsyncEvent) =
- ## Set event `ev` to signaled state.
- if setEvent(ev.hEvent) == 0:
- raiseOSError(osLastError())
- proc unregister*(ev: AsyncEvent) =
- ## Unregisters event `ev`.
- doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
- let p = getGlobalDispatcher()
- p.handles.excl(AsyncFD(ev.hEvent))
- if unregisterWait(ev.hWaiter) == 0:
- let err = osLastError()
- if err.int32 != ERROR_IO_PENDING:
- raiseOSError(err)
- ev.hWaiter = 0
- proc close*(ev: AsyncEvent) =
- ## Closes event `ev`.
- let res = closeHandle(ev.hEvent)
- deallocShared(cast[pointer](ev))
- if res == 0:
- raiseOSError(osLastError())
- proc addEvent*(ev: AsyncEvent, cb: Callback) =
- ## Registers callback `cb` to be called when `ev` will be signaled
- doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
- let p = getGlobalDispatcher()
- let hEvent = ev.hEvent
- var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
- var flags = WT_EXECUTEINWAITTHREAD.DWORD
- proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if ev.hWaiter != 0:
- if cb(fd):
- # we need this check to avoid exception, if `unregister(event)` was
- # called in callback.
- deallocShared(cast[pointer](pcd))
- if ev.hWaiter != 0:
- unregister(ev)
- else:
- # if callback returned `false`, then it wants to be called again, so
- # we need to ref and protect `pcd.ovl` again, because it will be
- # unrefed and disposed in `poll()`.
- GC_ref(pcd.ovl)
- pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
- else:
- # if ev.hWaiter == 0, then event was unregistered before `poll()` call.
- deallocShared(cast[pointer](pcd))
- registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
- ev.hWaiter = pcd.waitFd
- initAll()
- else:
- import selectors
- from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
- MSG_NOSIGNAL
- when declared(posix.accept4):
- from posix import accept4, SOCK_CLOEXEC
- when defined(genode):
- import genode/env # get the implicit Genode env
- import genode/signals
- const
- InitCallbackListSize = 4 # initial size of callbacks sequence,
- # associated with file/socket descriptor.
- InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
- # queue.
- type
- AsyncFD* = distinct cint
- Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
- AsyncData = object
- readList: seq[Callback]
- writeList: seq[Callback]
- AsyncEvent* = distinct SelectEvent
- PDispatcher* = ref object of PDispatcherBase
- selector: Selector[AsyncData]
- when defined(genode):
- signalHandler: SignalHandler
- proc `==`*(x, y: AsyncFD): bool {.borrow.}
- proc `==`*(x, y: AsyncEvent): bool {.borrow.}
- template newAsyncData(): AsyncData =
- AsyncData(
- readList: newSeqOfCap[Callback](InitCallbackListSize),
- writeList: newSeqOfCap[Callback](InitCallbackListSize)
- )
- proc newDispatcher*(): owned(PDispatcher) =
- new result
- result.selector = newSelector[AsyncData]()
- result.timers.clear()
- result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize)
- when defined(genode):
- let entrypoint = ep(cast[GenodeEnv](runtimeEnv))
- result.signalHandler = newSignalHandler(entrypoint):
- discard runOnce(0)
- var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
- proc setGlobalDispatcher*(disp: owned PDispatcher) =
- if not gDisp.isNil:
- assert gDisp.callbacks.len == 0
- gDisp = disp
- initCallSoonProc()
- proc getGlobalDispatcher*(): PDispatcher =
- if gDisp.isNil:
- setGlobalDispatcher(newDispatcher())
- result = gDisp
- proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] =
- return disp.selector
- proc register*(fd: AsyncFD) =
- let p = getGlobalDispatcher()
- var data = newAsyncData()
- p.selector.registerHandle(fd.SocketHandle, {}, data)
- proc unregister*(fd: AsyncFD) =
- getGlobalDispatcher().selector.unregister(fd.SocketHandle)
- proc unregister*(ev: AsyncEvent) =
- getGlobalDispatcher().selector.unregister(SelectEvent(ev))
- proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
- return fd.SocketHandle in disp.selector
- proc addRead*(fd: AsyncFD, cb: Callback) =
- let p = getGlobalDispatcher()
- var newEvents = {Event.Read}
- withData(p.selector, fd.SocketHandle, adata) do:
- adata.readList.add(cb)
- newEvents.incl(Event.Read)
- if len(adata.writeList) != 0: newEvents.incl(Event.Write)
- do:
- raise newException(ValueError, "File descriptor not registered.")
- p.selector.updateHandle(fd.SocketHandle, newEvents)
- proc addWrite*(fd: AsyncFD, cb: Callback) =
- let p = getGlobalDispatcher()
- var newEvents = {Event.Write}
- withData(p.selector, fd.SocketHandle, adata) do:
- adata.writeList.add(cb)
- newEvents.incl(Event.Write)
- if len(adata.readList) != 0: newEvents.incl(Event.Read)
- do:
- raise newException(ValueError, "File descriptor not registered.")
- p.selector.updateHandle(fd.SocketHandle, newEvents)
- proc hasPendingOperations*(): bool =
- let p = getGlobalDispatcher()
- not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
- proc prependSeq(dest: var seq[Callback]; src: sink seq[Callback]) =
- var old = move dest
- dest = src
- for i in 0..high(old):
- dest.add(move old[i])
- proc processBasicCallbacks(
- fd: AsyncFD, event: Event
- ): tuple[readCbListCount, writeCbListCount: int] =
- # Process pending descriptor and AsyncEvent callbacks.
- #
- # Invoke every callback stored in `rwlist`, until one
- # returns `false` (which means callback wants to stay
- # alive). In such case all remaining callbacks will be added
- # to `rwlist` again, in the order they have been inserted.
- #
- # `rwlist` associated with file descriptor MUST BE emptied before
- # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
- # or it can be possible to fall into endless cycle.
- var curList: seq[Callback]
- let selector = getGlobalDispatcher().selector
- withData(selector, fd.int, fdData):
- case event
- of Event.Read:
- #shallowCopy(curList, fdData.readList)
- curList = move fdData.readList
- fdData.readList = newSeqOfCap[Callback](InitCallbackListSize)
- of Event.Write:
- #shallowCopy(curList, fdData.writeList)
- curList = move fdData.writeList
- fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize)
- else:
- assert false, "Cannot process callbacks for " & $event
- let newLength = max(len(curList), InitCallbackListSize)
- var newList = newSeqOfCap[Callback](newLength)
- var eventsExtinguished = false
- for cb in curList:
- if eventsExtinguished:
- newList.add(cb)
- elif not cb(fd):
- # Callback wants to be called again.
- newList.add(cb)
- # This callback has returned with EAGAIN, so we don't need to
- # call any other callbacks as they are all waiting for the same event
- # on the same fd.
- # We do need to ensure they are called again though.
- eventsExtinguished = true
- withData(selector, fd.int, fdData) do:
- # Descriptor is still present in the queue.
- case event
- of Event.Read: prependSeq(fdData.readList, newList)
- of Event.Write: prependSeq(fdData.writeList, newList)
- else:
- assert false, "Cannot process callbacks for " & $event
- result.readCbListCount = len(fdData.readList)
- result.writeCbListCount = len(fdData.writeList)
- do:
- # Descriptor was unregistered in callback via `unregister()`.
- result.readCbListCount = -1
- result.writeCbListCount = -1
- proc processCustomCallbacks(p: PDispatcher; fd: AsyncFD) =
- # Process pending custom event callbacks. Custom events are
- # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
- # There can be only one callback registered with one descriptor,
- # so there is no need to iterate over list.
- var curList: seq[Callback]
- withData(p.selector, fd.int, adata) do:
- curList = move adata.readList
- adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
- let newLength = len(curList)
- var newList = newSeqOfCap[Callback](newLength)
- var cb = curList[0]
- if not cb(fd):
- newList.add(cb)
- withData(p.selector, fd.int, adata) do:
- # descriptor still present in queue.
- adata.readList = newList & adata.readList
- if len(adata.readList) == 0:
- # if no callbacks registered with descriptor, unregister it.
- p.selector.unregister(fd.int)
- do:
- # descriptor was unregistered in callback via `unregister()`.
- discard
- implementSetInheritable()
- proc closeSocket*(sock: AsyncFD) =
- let selector = getGlobalDispatcher().selector
- if sock.SocketHandle notin selector:
- raise newException(ValueError, "File descriptor not registered.")
- let data = selector.getData(sock.SocketHandle)
- sock.unregister()
- sock.SocketHandle.close()
- # We need to unblock the read and write callbacks which could still be
- # waiting for the socket to become readable and/or writeable.
- for cb in data.readList & data.writeList:
- if not cb(sock):
- raise newException(
- ValueError, "Expecting async operations to stop when fd has closed."
- )
- proc runOnce(timeout: int): bool =
- let p = getGlobalDispatcher()
- if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
- when defined(genode):
- if timeout == 0: return
- raise newException(ValueError,
- "No handles or timers registered in dispatcher.")
- result = false
- var keys: array[64, ReadyKey]
- let nextTimer = processTimers(p, result)
- var count =
- p.selector.selectInto(adjustTimeout(p, timeout, nextTimer), keys)
- for i in 0..<count:
- let fd = keys[i].fd.AsyncFD
- let events = keys[i].events
- var (readCbListCount, writeCbListCount) = (0, 0)
- if Event.Read in events or events == {Event.Error}:
- (readCbListCount, writeCbListCount) =
- processBasicCallbacks(fd, Event.Read)
- result = true
- if Event.Write in events or events == {Event.Error}:
- (readCbListCount, writeCbListCount) =
- processBasicCallbacks(fd, Event.Write)
- result = true
- var isCustomEvent = false
- if Event.User in events:
- (readCbListCount, writeCbListCount) =
- processBasicCallbacks(fd, Event.Read)
- isCustomEvent = true
- if readCbListCount == 0:
- p.selector.unregister(fd.int)
- result = true
- when ioselSupportedPlatform:
- const customSet = {Event.Timer, Event.Signal, Event.Process,
- Event.Vnode}
- if (customSet * events) != {}:
- isCustomEvent = true
- processCustomCallbacks(p, fd)
- result = true
- # because state `data` can be modified in callback we need to update
- # descriptor events with currently registered callbacks.
- if not isCustomEvent and (readCbListCount != -1 and writeCbListCount != -1):
- var newEvents: set[Event] = {}
- if readCbListCount > 0: incl(newEvents, Event.Read)
- if writeCbListCount > 0: incl(newEvents, Event.Write)
- p.selector.updateHandle(SocketHandle(fd), newEvents)
- # Timer processing.
- discard processTimers(p, result)
- # Callback queue processing
- processPendingCallbacks(p, result)
- proc recv*(socket: AsyncFD, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
- var retFuture = newFuture[string]("recv")
- var readBuffer = newString(size)
- proc cb(sock: AsyncFD): bool =
- result = true
- let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
- flags.toOSFlags())
- if res < 0:
- let lastError = osLastError()
- if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
- lastError.int32 != EAGAIN:
- if flags.isDisconnectionError(lastError):
- retFuture.complete("")
- else:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- result = false # We still want this callback to be called.
- elif res == 0:
- # Disconnected
- retFuture.complete("")
- else:
- readBuffer.setLen(res)
- retFuture.complete(readBuffer)
- # TODO: The following causes a massive slowdown.
- #if not cb(socket):
- addRead(socket, cb)
- return retFuture
- proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
- var retFuture = newFuture[int]("recvInto")
- proc cb(sock: AsyncFD): bool =
- result = true
- let res = recv(sock.SocketHandle, buf, size.cint,
- flags.toOSFlags())
- if res < 0:
- let lastError = osLastError()
- if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
- lastError.int32 != EAGAIN:
- if flags.isDisconnectionError(lastError):
- retFuture.complete(0)
- else:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- result = false # We still want this callback to be called.
- else:
- retFuture.complete(res)
- # TODO: The following causes a massive slowdown.
- #if not cb(socket):
- addRead(socket, cb)
- return retFuture
- proc send*(socket: AsyncFD, buf: pointer, size: int,
- flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
- var retFuture = newFuture[void]("send")
- var written = 0
- proc cb(sock: AsyncFD): bool =
- result = true
- let netSize = size-written
- var d = cast[cstring](buf)
- let res = send(sock.SocketHandle, addr d[written], netSize.cint,
- MSG_NOSIGNAL)
- if res < 0:
- let lastError = osLastError()
- if lastError.int32 != EINTR and
- lastError.int32 != EWOULDBLOCK and
- lastError.int32 != EAGAIN:
- if flags.isDisconnectionError(lastError):
- retFuture.complete()
- else:
- retFuture.fail(newOSError(lastError))
- else:
- result = false # We still want this callback to be called.
- else:
- written.inc(res)
- if res != netSize:
- result = false # We still have data to send.
- else:
- retFuture.complete()
- # TODO: The following causes crashes.
- #if not cb(socket):
- addWrite(socket, cb)
- return retFuture
- proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
- saddrLen: SockLen,
- flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
- ## Sends `data` of size `size` in bytes to specified destination
- ## (`saddr` of size `saddrLen` in bytes, using socket `socket`.
- ## The returned future will complete once all data has been sent.
- var retFuture = newFuture[void]("sendTo")
- # we will preserve address in our stack
- var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
- var stalen = saddrLen
- zeroMem(addr(staddr[0]), 128)
- copyMem(addr(staddr[0]), saddr, saddrLen)
- proc cb(sock: AsyncFD): bool =
- result = true
- let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL,
- cast[ptr SockAddr](addr(staddr[0])), stalen)
- if res < 0:
- let lastError = osLastError()
- if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
- lastError.int32 != EAGAIN:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- result = false # We still want this callback to be called.
- else:
- retFuture.complete()
- addWrite(socket, cb)
- return retFuture
- proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
- saddr: ptr SockAddr, saddrLen: ptr SockLen,
- flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
- ## Receives a datagram data from `socket` into `data`, which must
- ## be at least of size `size` in bytes, address of datagram's sender
- ## will be stored into `saddr` and `saddrLen`. Returned future will
- ## complete once one datagram has been received, and will return size
- ## of packet received.
- var retFuture = newFuture[int]("recvFromInto")
- proc cb(sock: AsyncFD): bool =
- result = true
- let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(),
- saddr, saddrLen)
- if res < 0:
- let lastError = osLastError()
- if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
- lastError.int32 != EAGAIN:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- result = false
- else:
- retFuture.complete(res)
- addRead(socket, cb)
- return retFuture
- proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
- inheritable = defined(nimInheritHandles)):
- owned(Future[tuple[address: string, client: AsyncFD]]) =
- var retFuture = newFuture[tuple[address: string,
- client: AsyncFD]]("acceptAddr")
- proc cb(sock: AsyncFD): bool =
- result = true
- var sockAddress: Sockaddr_storage
- var addrLen = sizeof(sockAddress).SockLen
- var client =
- when declared(accept4):
- accept4(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
- addr(addrLen), if inheritable: 0 else: SOCK_CLOEXEC)
- else:
- accept(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
- addr(addrLen))
- when declared(setInheritable) and not declared(accept4):
- if client != osInvalidSocket and not setInheritable(client, inheritable):
- # Set failure first because close() itself can fail,
- # altering osLastError().
- retFuture.fail(newOSError(osLastError()))
- close client
- return false
- if client == osInvalidSocket:
- let lastError = osLastError()
- assert lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN
- if lastError.int32 == EINTR:
- return false
- else:
- if flags.isDisconnectionError(lastError):
- return false
- else:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- try:
- let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
- register(client.AsyncFD)
- retFuture.complete((address, client.AsyncFD))
- except:
- # getAddrString may raise
- client.close()
- retFuture.fail(getCurrentException())
- addRead(socket, cb)
- return retFuture
- when ioselSupportedPlatform:
- proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
- ## Start watching for timeout expiration, and then call the
- ## callback `cb`.
- ## `timeout` - time in milliseconds,
- ## `oneshot` - if `true` only one event will be dispatched,
- ## if `false` continuous events every `timeout` milliseconds.
- let p = getGlobalDispatcher()
- var data = newAsyncData()
- data.readList.add(cb)
- p.selector.registerTimer(timeout, oneshot, data)
- proc addSignal*(signal: int, cb: Callback) =
- ## Start watching signal `signal`, and when signal appears, call the
- ## callback `cb`.
- let p = getGlobalDispatcher()
- var data = newAsyncData()
- data.readList.add(cb)
- p.selector.registerSignal(signal, data)
- proc addProcess*(pid: int, cb: Callback) =
- ## Start watching for process exit with pid `pid`, and then call
- ## the callback `cb`.
- let p = getGlobalDispatcher()
- var data = newAsyncData()
- data.readList.add(cb)
- p.selector.registerProcess(pid, data)
- proc newAsyncEvent*(): AsyncEvent =
- ## Creates new `AsyncEvent`.
- result = AsyncEvent(newSelectEvent())
- proc trigger*(ev: AsyncEvent) =
- ## Sets new `AsyncEvent` to signaled state.
- trigger(SelectEvent(ev))
- proc close*(ev: AsyncEvent) =
- ## Closes `AsyncEvent`
- close(SelectEvent(ev))
- proc addEvent*(ev: AsyncEvent, cb: Callback) =
- ## Start watching for event `ev`, and call callback `cb`, when
- ## ev will be set to signaled state.
- let p = getGlobalDispatcher()
- var data = newAsyncData()
- data.readList.add(cb)
- p.selector.registerEvent(SelectEvent(ev), data)
- proc drain*(timeout = 500) =
- ## Waits for completion of **all** events and processes them. Raises `ValueError`
- ## if there are no pending operations. In contrast to `poll` this
- ## processes as many events as are available until the timeout has elapsed.
- var curTimeout = timeout
- let start = now()
- while hasPendingOperations():
- discard runOnce(curTimeout)
- curTimeout -= (now() - start).inMilliseconds.int
- if curTimeout < 0:
- break
- proc poll*(timeout = 500) =
- ## Waits for completion events and processes them. Raises `ValueError`
- ## if there are no pending operations. This runs the underlying OS
- ## `epoll`:idx: or `kqueue`:idx: primitive only once.
- discard runOnce(timeout)
- template createAsyncNativeSocketImpl(domain, sockType, protocol: untyped,
- inheritable = defined(nimInheritHandles)) =
- let handle = createNativeSocket(domain, sockType, protocol, inheritable)
- if handle == osInvalidSocket:
- return osInvalidSocket.AsyncFD
- handle.setBlocking(false)
- when defined(macosx) and not defined(nimdoc):
- handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
- result = handle.AsyncFD
- register(result)
- proc createAsyncNativeSocket*(domain: cint, sockType: cint,
- protocol: cint,
- inheritable = defined(nimInheritHandles)): AsyncFD =
- createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
- proc createAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
- sockType: SockType = SOCK_STREAM,
- protocol: Protocol = IPPROTO_TCP,
- inheritable = defined(nimInheritHandles)): AsyncFD =
- createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
- when defined(windows) or defined(nimdoc):
- proc bindToDomain(handle: SocketHandle, domain: Domain) =
- # Extracted into a separate proc, because connect() on Windows requires
- # the socket to be initially bound.
- template doBind(saddr) =
- if bindAddr(handle, cast[ptr SockAddr](addr(saddr)),
- sizeof(saddr).SockLen) < 0'i32:
- raiseOSError(osLastError())
- if domain == Domain.AF_INET6:
- var saddr: Sockaddr_in6
- saddr.sin6_family = uint16(toInt(domain))
- doBind(saddr)
- else:
- var saddr: Sockaddr_in
- saddr.sin_family = uint16(toInt(domain))
- doBind(saddr)
- proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
- let retFuture = newFuture[void]("doConnect")
- result = retFuture
- var ol = newCustom()
- ol.data = CompletionData(fd: socket, cb:
- proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
- if not retFuture.finished:
- if errcode == OSErrorCode(-1):
- const SO_UPDATE_CONNECT_CONTEXT = 0x7010
- socket.SocketHandle.setSockOptInt(SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, 1) # 15022
- retFuture.complete()
- else:
- retFuture.fail(newException(OSError, osErrorMsg(errcode)))
- )
- let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr,
- cint(addrInfo.ai_addrlen), nil, 0, nil,
- cast[POVERLAPPED](ol))
- if ret:
- # Request to connect completed immediately.
- retFuture.complete()
- # We don't deallocate `ol` here because even though this completed
- # immediately poll will still be notified about its completion and it
- # will free `ol`.
- else:
- let lastError = osLastError()
- if lastError.int32 != ERROR_IO_PENDING:
- # With ERROR_IO_PENDING `ol` will be deallocated in `poll`,
- # and the future will be completed/failed there, too.
- GC_unref(ol)
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- else:
- proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
- let retFuture = newFuture[void]("doConnect")
- result = retFuture
- proc cb(fd: AsyncFD): bool =
- let ret = SocketHandle(fd).getSockOptInt(
- cint(SOL_SOCKET), cint(SO_ERROR))
- if ret == 0:
- # We have connected.
- retFuture.complete()
- return true
- elif ret == EINTR:
- # interrupted, keep waiting
- return false
- else:
- retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
- return true
- let ret = connect(socket.SocketHandle,
- addrInfo.ai_addr,
- addrInfo.ai_addrlen.SockLen)
- if ret == 0:
- # Request to connect completed immediately.
- retFuture.complete()
- else:
- let lastError = osLastError()
- if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
- addWrite(socket, cb)
- else:
- retFuture.fail(newException(OSError, osErrorMsg(lastError)))
- template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped,
- protocol: Protocol = IPPROTO_RAW) =
- ## Iterates through the AddrInfo linked list asynchronously
- ## until the connection can be established.
- const shouldCreateFd = not declared(fd)
- when shouldCreateFd:
- let sockType = protocol.toSockType()
- var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD]
- for i in low(fdPerDomain)..high(fdPerDomain):
- fdPerDomain[i] = osInvalidSocket.AsyncFD
- template closeUnusedFds(domainToKeep = -1) {.dirty.} =
- for i, fd in fdPerDomain:
- if fd != osInvalidSocket.AsyncFD and i != domainToKeep:
- fd.closeSocket()
- var lastException: ref Exception
- var curAddrInfo = addrInfo
- var domain: Domain
- when shouldCreateFd:
- var curFd: AsyncFD
- else:
- var curFd = fd
- proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} =
- if fut == nil or fut.failed:
- if fut != nil:
- lastException = fut.readError()
- while curAddrInfo != nil:
- let domainOpt = curAddrInfo.ai_family.toKnownDomain()
- if domainOpt.isSome:
- domain = domainOpt.unsafeGet()
- break
- curAddrInfo = curAddrInfo.ai_next
- if curAddrInfo == nil:
- freeAddrInfo(addrInfo)
- when shouldCreateFd:
- closeUnusedFds()
- if lastException != nil:
- retFuture.fail(lastException)
- else:
- retFuture.fail(newException(
- IOError, "Couldn't resolve address: " & address))
- return
- when shouldCreateFd:
- curFd = fdPerDomain[ord(domain)]
- if curFd == osInvalidSocket.AsyncFD:
- try:
- curFd = createAsyncNativeSocket(domain, sockType, protocol)
- except:
- freeAddrInfo(addrInfo)
- closeUnusedFds()
- raise getCurrentException()
- when defined(windows):
- curFd.SocketHandle.bindToDomain(domain)
- fdPerDomain[ord(domain)] = curFd
- doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo
- curAddrInfo = curAddrInfo.ai_next
- else:
- freeAddrInfo(addrInfo)
- when shouldCreateFd:
- closeUnusedFds(ord(domain))
- retFuture.complete(curFd)
- else:
- retFuture.complete()
- tryNextAddrInfo(nil)
- proc dial*(address: string, port: Port,
- protocol: Protocol = IPPROTO_TCP): owned(Future[AsyncFD]) =
- ## Establishes connection to the specified `address`:`port` pair via the
- ## specified protocol. The procedure iterates through possible
- ## resolutions of the `address` until it succeeds, meaning that it
- ## seamlessly works with both IPv4 and IPv6.
- ## Returns the async file descriptor, registered in the dispatcher of
- ## the current thread, ready to send or receive data.
- let retFuture = newFuture[AsyncFD]("dial")
- result = retFuture
- let sockType = protocol.toSockType()
- let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol)
- asyncAddrInfoLoop(aiList, noFD, protocol)
- proc connect*(socket: AsyncFD, address: string, port: Port,
- domain = Domain.AF_INET): owned(Future[void]) =
- let retFuture = newFuture[void]("connect")
- result = retFuture
- when defined(windows):
- verifyPresence(socket)
- else:
- assert getSockDomain(socket.SocketHandle) == domain
- let aiList = getAddrInfo(address, port, domain)
- when defined(windows):
- socket.SocketHandle.bindToDomain(domain)
- asyncAddrInfoLoop(aiList, socket)
- proc sleepAsync*(ms: int | float): owned(Future[void]) =
- ## Suspends the execution of the current async procedure for the next
- ## `ms` milliseconds.
- var retFuture = newFuture[void]("sleepAsync")
- let p = getGlobalDispatcher()
- when ms is int:
- p.timers.push((getMonoTime() + initDuration(milliseconds = ms), retFuture))
- elif ms is float:
- let ns = (ms * 1_000_000).int64
- p.timers.push((getMonoTime() + initDuration(nanoseconds = ns), retFuture))
- return retFuture
- proc withTimeout*[T](fut: Future[T], timeout: int): owned(Future[bool]) =
- ## Returns a future which will complete once `fut` completes or after
- ## `timeout` milliseconds has elapsed.
- ##
- ## If `fut` completes first the returned future will hold true,
- ## otherwise, if `timeout` milliseconds has elapsed first, the returned
- ## future will hold false.
- var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`")
- var timeoutFuture = sleepAsync(timeout)
- fut.callback =
- proc () =
- if not retFuture.finished:
- if fut.failed:
- retFuture.fail(fut.error)
- else:
- retFuture.complete(true)
- timeoutFuture.callback =
- proc () =
- if not retFuture.finished: retFuture.complete(false)
- return retFuture
- proc accept*(socket: AsyncFD,
- flags = {SocketFlag.SafeDisconn},
- inheritable = defined(nimInheritHandles)): owned(Future[AsyncFD]) =
- ## Accepts a new connection. Returns a future containing the client socket
- ## corresponding to that connection.
- ##
- ## If `inheritable` is false (the default), the resulting client socket
- ## will not be inheritable by child processes.
- ##
- ## The future will complete when the connection is successfully accepted.
- var retFut = newFuture[AsyncFD]("accept")
- var fut = acceptAddr(socket, flags, inheritable)
- fut.callback =
- proc (future: Future[tuple[address: string, client: AsyncFD]]) =
- assert future.finished
- if future.failed:
- retFut.fail(future.error)
- else:
- retFut.complete(future.read.client)
- return retFut
- proc keepAlive(x: string) =
- discard "mark 'x' as escaping so that it is put into a closure for us to keep the data alive"
- proc send*(socket: AsyncFD, data: string,
- flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
- ## Sends `data` to `socket`. The returned future will complete once all
- ## data has been sent.
- var retFuture = newFuture[void]("send")
- if data.len > 0:
- let sendFut = socket.send(unsafeAddr data[0], data.len, flags)
- sendFut.callback =
- proc () =
- keepAlive(data)
- if sendFut.failed:
- retFuture.fail(sendFut.error)
- else:
- retFuture.complete()
- else:
- retFuture.complete()
- return retFuture
- # -- Await Macro
- import asyncmacro
- export asyncmacro
- proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} =
- ## Returns a future that will complete when all the string data from the
- ## specified future stream is retrieved.
- result = ""
- while true:
- let (hasValue, value) = await future.read()
- if hasValue:
- result.add(value)
- else:
- break
- proc callSoon(cbproc: proc () {.gcsafe.}) =
- getGlobalDispatcher().callbacks.addLast(cbproc)
- proc runForever*() =
- ## Begins a never ending global dispatcher poll loop.
- while true:
- poll()
- proc waitFor*[T](fut: Future[T]): T =
- ## **Blocks** the current thread until the specified future completes.
- while not fut.finished:
- poll()
- fut.read
- proc activeDescriptors*(): int {.inline.} =
- ## Returns the current number of active file descriptors for the current
- ## event loop. This is a cheap operation that does not involve a system call.
- when defined(windows):
- result = getGlobalDispatcher().handles.len
- elif not defined(nimdoc):
- result = getGlobalDispatcher().selector.count
- when defined(posix):
- import posix
- when defined(linux) or defined(windows) or defined(macosx) or defined(bsd) or
- defined(solaris) or defined(zephyr) or defined(freertos):
- proc maxDescriptors*(): int {.raises: OSError.} =
- ## Returns the maximum number of active file descriptors for the current
- ## process. This involves a system call. For now `maxDescriptors` is
- ## supported on the following OSes: Windows, Linux, OSX, BSD, Solaris.
- when defined(windows):
- result = 16_700_000
- elif defined(zephyr) or defined(freertos):
- result = FD_MAX
- else:
- var fdLim: RLimit
- if getrlimit(RLIMIT_NOFILE, fdLim) < 0:
- raiseOSError(osLastError())
- result = int(fdLim.rlim_cur) - 1
- when defined(genode):
- proc scheduleCallbacks*(): bool {.discardable.} =
- ## *Genode only.*
- ## Schedule callback processing and return immediately.
- ## Returns `false` if there is nothing to schedule.
- ## RPC servers should call this to dispatch `callSoon`
- ## bodies after retiring an RPC to its client.
- ## This is effectively a non-blocking `poll(…)` and is
- ## equivalent to scheduling a momentary no-op timeout
- ## but faster and with less overhead.
- let dis = getGlobalDispatcher()
- result = dis.callbacks.len > 0
- if result: submit(dis.signalHandler.cap)
|