ioselectors_select.nim 13 KB


  1. #
  2. #
  3. # Nim's Runtime Library
  4. # (c) Copyright 2016 Eugene Kabanov
  5. #
  6. # See the file "copying.txt", included in this
  7. # distribution, for details about the copyright.
  8. #
  9. # This module implements Posix and Windows select().
  10. import times, nativesockets
  11. when defined(windows):
  12. import winlean
  13. when defined(gcc):
  14. {.passl: "-lws2_32".}
  15. elif defined(vcc):
  16. {.passl: "ws2_32.lib".}
  17. const platformHeaders = """#include <winsock2.h>
  18. #include <windows.h>"""
  19. const EAGAIN = WSAEWOULDBLOCK
  20. else:
  21. const platformHeaders = """#include <sys/select.h>
  22. #include <sys/time.h>
  23. #include <sys/types.h>
  24. #include <unistd.h>"""
  25. type
  26. Fdset {.importc: "fd_set", header: platformHeaders, pure, final.} = object
  27. var
  28. FD_SETSIZE {.importc: "FD_SETSIZE", header: platformHeaders.}: cint
  29. proc IOFD_SET(fd: SocketHandle, fdset: ptr Fdset)
  30. {.cdecl, importc: "FD_SET", header: platformHeaders, inline.}
  31. proc IOFD_CLR(fd: SocketHandle, fdset: ptr Fdset)
  32. {.cdecl, importc: "FD_CLR", header: platformHeaders, inline.}
  33. proc IOFD_ZERO(fdset: ptr Fdset)
  34. {.cdecl, importc: "FD_ZERO", header: platformHeaders, inline.}
  35. when defined(windows):
  36. proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint
  37. {.stdcall, importc: "FD_ISSET", header: platformHeaders, inline.}
  38. proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset,
  39. timeout: ptr Timeval): cint
  40. {.stdcall, importc: "select", header: platformHeaders.}
  41. else:
  42. proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint
  43. {.cdecl, importc: "FD_ISSET", header: platformHeaders, inline.}
  44. proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset,
  45. timeout: ptr Timeval): cint
  46. {.cdecl, importc: "select", header: platformHeaders.}
  47. when hasThreadSupport:
  48. type
  49. SelectorImpl[T] = object
  50. rSet: FdSet
  51. wSet: FdSet
  52. eSet: FdSet
  53. maxFD: int
  54. fds: ptr SharedArray[SelectorKey[T]]
  55. count: int
  56. lock: Lock
  57. Selector*[T] = ptr SelectorImpl[T]
  58. else:
  59. type
  60. SelectorImpl[T] = object
  61. rSet: FdSet
  62. wSet: FdSet
  63. eSet: FdSet
  64. maxFD: int
  65. fds: seq[SelectorKey[T]]
  66. count: int
  67. Selector*[T] = ref SelectorImpl[T]
  68. type
  69. SelectEventImpl = object
  70. rsock: SocketHandle
  71. wsock: SocketHandle
  72. SelectEvent* = ptr SelectEventImpl
  73. when hasThreadSupport:
  74. template withSelectLock[T](s: Selector[T], body: untyped) =
  75. acquire(s.lock)
  76. {.locks: [s.lock].}:
  77. try:
  78. body
  79. finally:
  80. release(s.lock)
  81. else:
  82. template withSelectLock[T](s: Selector[T], body: untyped) =
  83. body
  84. proc newSelector*[T](): Selector[T] =
  85. when hasThreadSupport:
  86. result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
  87. result.fds = allocSharedArray[SelectorKey[T]](FD_SETSIZE)
  88. initLock result.lock
  89. else:
  90. result = Selector[T]()
  91. result.fds = newSeq[SelectorKey[T]](FD_SETSIZE)
  92. for i in 0 ..< FD_SETSIZE:
  93. result.fds[i].ident = InvalidIdent
  94. IOFD_ZERO(addr result.rSet)
  95. IOFD_ZERO(addr result.wSet)
  96. IOFD_ZERO(addr result.eSet)
  97. proc close*[T](s: Selector[T]) =
  98. when hasThreadSupport:
  99. deallocSharedArray(s.fds)
  100. deallocShared(cast[pointer](s))
  101. when defined(windows):
  102. proc newSelectEvent*(): SelectEvent =
  103. var ssock = newNativeSocket()
  104. var wsock = newNativeSocket()
  105. var rsock: SocketHandle = INVALID_SOCKET
  106. var saddr = Sockaddr_in()
  107. saddr.sin_family = winlean.AF_INET
  108. saddr.sin_port = 0
  109. saddr.sin_addr.s_addr = INADDR_ANY
  110. if bindAddr(ssock, cast[ptr SockAddr](addr(saddr)),
  111. sizeof(saddr).SockLen) < 0'i32:
  112. raiseIOSelectorsError(osLastError())
  113. if winlean.listen(ssock, 1) != 0:
  114. raiseIOSelectorsError(osLastError())
  115. var namelen = sizeof(saddr).SockLen
  116. if getsockname(ssock, cast[ptr SockAddr](addr(saddr)),
  117. addr(namelen)) != 0'i32:
  118. raiseIOSelectorsError(osLastError())
  119. saddr.sin_addr.s_addr = 0x0100007F
  120. if winlean.connect(wsock, cast[ptr SockAddr](addr(saddr)),
  121. sizeof(saddr).SockLen) != 0:
  122. raiseIOSelectorsError(osLastError())
  123. namelen = sizeof(saddr).SockLen
  124. rsock = winlean.accept(ssock, cast[ptr SockAddr](addr(saddr)),
  125. cast[ptr SockLen](addr(namelen)))
  126. if rsock == SocketHandle(-1):
  127. raiseIOSelectorsError(osLastError())
  128. if winlean.closesocket(ssock) != 0:
  129. raiseIOSelectorsError(osLastError())
  130. var mode = clong(1)
  131. if ioctlsocket(rsock, FIONBIO, addr(mode)) != 0:
  132. raiseIOSelectorsError(osLastError())
  133. mode = clong(1)
  134. if ioctlsocket(wsock, FIONBIO, addr(mode)) != 0:
  135. raiseIOSelectorsError(osLastError())
  136. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  137. result.rsock = rsock
  138. result.wsock = wsock
  139. proc trigger*(ev: SelectEvent) =
  140. var data: uint64 = 1
  141. if winlean.send(ev.wsock, cast[pointer](addr data),
  142. cint(sizeof(uint64)), 0) != sizeof(uint64):
  143. raiseIOSelectorsError(osLastError())
  144. proc close*(ev: SelectEvent) =
  145. let res1 = winlean.closesocket(ev.rsock)
  146. let res2 = winlean.closesocket(ev.wsock)
  147. deallocShared(cast[pointer](ev))
  148. if res1 != 0 or res2 != 0:
  149. raiseIOSelectorsError(osLastError())
  150. else:
  151. proc newSelectEvent*(): SelectEvent =
  152. var fds: array[2, cint]
  153. if posix.pipe(fds) != 0:
  154. raiseIOSelectorsError(osLastError())
  155. setNonBlocking(fds[0])
  156. setNonBlocking(fds[1])
  157. result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
  158. result.rsock = SocketHandle(fds[0])
  159. result.wsock = SocketHandle(fds[1])
  160. proc trigger*(ev: SelectEvent) =
  161. var data: uint64 = 1
  162. if posix.write(cint(ev.wsock), addr data, sizeof(uint64)) != sizeof(uint64):
  163. raiseIOSelectorsError(osLastError())
  164. proc close*(ev: SelectEvent) =
  165. let res1 = posix.close(cint(ev.rsock))
  166. let res2 = posix.close(cint(ev.wsock))
  167. deallocShared(cast[pointer](ev))
  168. if res1 != 0 or res2 != 0:
  169. raiseIOSelectorsError(osLastError())
  170. proc setSelectKey[T](s: Selector[T], fd: SocketHandle, events: set[Event],
  171. data: T) =
  172. var i = 0
  173. let fdi = int(fd)
  174. while i < FD_SETSIZE:
  175. if s.fds[i].ident == InvalidIdent:
  176. var pkey = addr(s.fds[i])
  177. pkey.ident = fdi
  178. pkey.events = events
  179. pkey.data = data
  180. break
  181. inc(i)
  182. if i >= FD_SETSIZE:
  183. raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
  184. proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] =
  185. var i = 0
  186. let fdi = int(fd)
  187. while i < FD_SETSIZE:
  188. if s.fds[i].ident == fdi:
  189. result = addr(s.fds[i])
  190. break
  191. inc(i)
  192. doAssert(i < FD_SETSIZE,
  193. "Descriptor [" & $int(fd) & "] is not registered in the queue!")
  194. proc delKey[T](s: Selector[T], fd: SocketHandle) =
  195. var empty: T
  196. var i = 0
  197. while i < FD_SETSIZE:
  198. if s.fds[i].ident == fd.int:
  199. s.fds[i].ident = InvalidIdent
  200. s.fds[i].events = {}
  201. s.fds[i].data = empty
  202. break
  203. inc(i)
  204. doAssert(i < FD_SETSIZE,
  205. "Descriptor [" & $int(fd) & "] is not registered in the queue!")
  206. proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
  207. events: set[Event], data: T) =
  208. when not defined(windows):
  209. let fdi = int(fd)
  210. s.withSelectLock():
  211. s.setSelectKey(fd, events, data)
  212. when not defined(windows):
  213. if fdi > s.maxFD: s.maxFD = fdi
  214. if Event.Read in events:
  215. IOFD_SET(fd, addr s.rSet)
  216. inc(s.count)
  217. if Event.Write in events:
  218. IOFD_SET(fd, addr s.wSet)
  219. IOFD_SET(fd, addr s.eSet)
  220. inc(s.count)
  221. proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
  222. when not defined(windows):
  223. let fdi = int(ev.rsock)
  224. s.withSelectLock():
  225. s.setSelectKey(ev.rsock, {Event.User}, data)
  226. when not defined(windows):
  227. if fdi > s.maxFD: s.maxFD = fdi
  228. IOFD_SET(ev.rsock, addr s.rSet)
  229. inc(s.count)
  230. proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
  231. events: set[Event]) =
  232. let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
  233. Event.User, Event.Oneshot, Event.Error}
  234. s.withSelectLock():
  235. var pkey = s.getKey(fd)
  236. doAssert(pkey.events * maskEvents == {})
  237. if pkey.events != events:
  238. if (Event.Read in pkey.events) and (Event.Read notin events):
  239. IOFD_CLR(fd, addr s.rSet)
  240. dec(s.count)
  241. if (Event.Write in pkey.events) and (Event.Write notin events):
  242. IOFD_CLR(fd, addr s.wSet)
  243. IOFD_CLR(fd, addr s.eSet)
  244. dec(s.count)
  245. if (Event.Read notin pkey.events) and (Event.Read in events):
  246. IOFD_SET(fd, addr s.rSet)
  247. inc(s.count)
  248. if (Event.Write notin pkey.events) and (Event.Write in events):
  249. IOFD_SET(fd, addr s.wSet)
  250. IOFD_SET(fd, addr s.eSet)
  251. inc(s.count)
  252. pkey.events = events
  253. proc unregister*[T](s: Selector[T], fd: SocketHandle|int) =
  254. s.withSelectLock():
  255. let fd = fd.SocketHandle
  256. var pkey = s.getKey(fd)
  257. if Event.Read in pkey.events:
  258. IOFD_CLR(fd, addr s.rSet)
  259. dec(s.count)
  260. if Event.Write in pkey.events:
  261. IOFD_CLR(fd, addr s.wSet)
  262. IOFD_CLR(fd, addr s.eSet)
  263. dec(s.count)
  264. s.delKey(fd)
  265. proc unregister*[T](s: Selector[T], ev: SelectEvent) =
  266. let fd = ev.rsock
  267. s.withSelectLock():
  268. var pkey = s.getKey(fd)
  269. IOFD_CLR(fd, addr s.rSet)
  270. dec(s.count)
  271. s.delKey(fd)
  272. proc selectInto*[T](s: Selector[T], timeout: int,
  273. results: var openarray[ReadyKey]): int =
  274. var tv = Timeval()
  275. var ptv = addr tv
  276. var rset, wset, eset: FdSet
  277. verifySelectParams(timeout)
  278. if timeout != -1:
  279. when defined(genode):
  280. tv.tv_sec = Time(timeout div 1_000)
  281. else:
  282. tv.tv_sec = timeout.int32 div 1_000
  283. tv.tv_usec = (timeout.int32 %% 1_000) * 1_000
  284. else:
  285. ptv = nil
  286. s.withSelectLock():
  287. rset = s.rSet
  288. wset = s.wSet
  289. eset = s.eSet
  290. var count = ioselect(cint(s.maxFD) + 1, addr(rset), addr(wset),
  291. addr(eset), ptv)
  292. if count < 0:
  293. result = 0
  294. when defined(windows):
  295. raiseIOSelectorsError(osLastError())
  296. else:
  297. let err = osLastError()
  298. if cint(err) != EINTR:
  299. raiseIOSelectorsError(err)
  300. elif count == 0:
  301. result = 0
  302. else:
  303. var rindex = 0
  304. var i = 0
  305. var k = 0
  306. while (i < FD_SETSIZE) and (k < count):
  307. if s.fds[i].ident != InvalidIdent:
  308. var flag = false
  309. var pkey = addr(s.fds[i])
  310. var rkey = ReadyKey(fd: int(pkey.ident), events: {})
  311. let fd = SocketHandle(pkey.ident)
  312. if IOFD_ISSET(fd, addr rset) != 0:
  313. if Event.User in pkey.events:
  314. var data: uint64 = 0
  315. if recv(fd, cast[pointer](addr(data)),
  316. sizeof(uint64).cint, 0) != sizeof(uint64):
  317. let err = osLastError()
  318. if cint(err) != EAGAIN:
  319. raiseIOSelectorsError(err)
  320. else:
  321. inc(i)
  322. inc(k)
  323. continue
  324. else:
  325. flag = true
  326. rkey.events = {Event.User}
  327. else:
  328. flag = true
  329. rkey.events = {Event.Read}
  330. if IOFD_ISSET(fd, addr wset) != 0:
  331. rkey.events.incl(Event.Write)
  332. if IOFD_ISSET(fd, addr eset) != 0:
  333. rkey.events.incl(Event.Error)
  334. flag = true
  335. if flag:
  336. results[rindex] = rkey
  337. inc(rindex)
  338. inc(k)
  339. inc(i)
  340. result = rindex
  341. proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
  342. result = newSeq[ReadyKey](FD_SETSIZE)
  343. var count = selectInto(s, timeout, result)
  344. result.setLen(count)
  345. proc flush*[T](s: Selector[T]) = discard
  346. template isEmpty*[T](s: Selector[T]): bool =
  347. (s.count == 0)
  348. proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
  349. s.withSelectLock():
  350. result = false
  351. let fdi = int(fd)
  352. for i in 0..<FD_SETSIZE:
  353. if s.fds[i].ident == fdi:
  354. return true
  355. when hasThreadSupport:
  356. template withSelectLock[T](s: Selector[T], body: untyped) =
  357. acquire(s.lock)
  358. {.locks: [s.lock].}:
  359. try:
  360. body
  361. finally:
  362. release(s.lock)
  363. else:
  364. template withSelectLock[T](s: Selector[T], body: untyped) =
  365. body
  366. proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
  367. s.withSelectLock():
  368. let fdi = int(fd)
  369. for i in 0..<FD_SETSIZE:
  370. if s.fds[i].ident == fdi:
  371. return s.fds[i].data
  372. proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
  373. s.withSelectLock():
  374. let fdi = int(fd)
  375. var i = 0
  376. while i < FD_SETSIZE:
  377. if s.fds[i].ident == fdi:
  378. var pkey = addr(s.fds[i])
  379. pkey.data = data
  380. result = true
  381. break
  382. template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
  383. body: untyped) =
  384. mixin withSelectLock
  385. s.withSelectLock():
  386. var value: ptr T
  387. let fdi = int(fd)
  388. var i = 0
  389. while i < FD_SETSIZE:
  390. if s.fds[i].ident == fdi:
  391. value = addr(s.fds[i].data)
  392. break
  393. inc(i)
  394. if i != FD_SETSIZE:
  395. body
  396. template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
  397. body1, body2: untyped) =
  398. mixin withSelectLock
  399. s.withSelectLock():
  400. block:
  401. var value: ptr T
  402. let fdi = int(fd)
  403. var i = 0
  404. while i < FD_SETSIZE:
  405. if s.fds[i].ident == fdi:
  406. value = addr(s.fds[i].data)
  407. break
  408. inc(i)
  409. if i != FD_SETSIZE:
  410. body1
  411. else:
  412. body2
  413. proc getFd*[T](s: Selector[T]): int =
  414. return -1