123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644 |
- discard """
- output: "All tests passed!"
- """
- import selectors
- const hasThreadSupport = compileOption("threads")
- template processTest(t, x: untyped) =
- #stdout.write(t)
- #stdout.flushFile()
- if not x: echo(t & " FAILED\r\n")
- when not defined(windows):
- import os, posix, nativesockets
- when ioselSupportedPlatform:
- import osproc
- proc socket_notification_test(): bool =
- proc create_test_socket(): SocketHandle =
- var sock = posix.socket(posix.AF_INET, posix.SOCK_STREAM,
- posix.IPPROTO_TCP)
- var x: int = fcntl(sock, F_GETFL, 0)
- if x == -1: raiseOSError(osLastError())
- else:
- var mode = x or O_NONBLOCK
- if fcntl(sock, F_SETFL, mode) == -1:
- raiseOSError(osLastError())
- result = sock
- var client_message = "SERVER HELLO =>"
- var server_message = "CLIENT HELLO"
- var buffer : array[128, char]
- var selector = newSelector[int]()
- var client_socket = create_test_socket()
- var server_socket = create_test_socket()
- var option : int32 = 1
- if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR),
- addr(option), sizeof(option).SockLen) < 0:
- raiseOSError(osLastError())
- var aiList = getAddrInfo("0.0.0.0", Port(13337))
- if bindAddr(server_socket, aiList.ai_addr,
- aiList.ai_addrlen.Socklen) < 0'i32:
- freeAddrInfo(aiList)
- raiseOSError(osLastError())
- if server_socket.listen() == -1:
- raiseOSError(osLastError())
- freeAddrInfo(aiList)
- aiList = getAddrInfo("127.0.0.1", Port(13337))
- discard posix.connect(client_socket, aiList.ai_addr,
- aiList.ai_addrlen.Socklen)
- registerHandle(selector, server_socket, {Event.Read}, 0)
- registerHandle(selector, client_socket, {Event.Write}, 0)
- freeAddrInfo(aiList)
- # make sure both sockets are selected
- var nevs = 0
- while nevs < 2:
- nevs += selector.select(100).len
- var sockAddress: SockAddr
- var addrLen = sizeof(sockAddress).Socklen
- var server2_socket = accept(server_socket,
- cast[ptr SockAddr](addr(sockAddress)),
- addr(addrLen))
- assert(server2_socket != osInvalidSocket)
- selector.registerHandle(server2_socket, {Event.Read}, 0)
- if posix.send(client_socket, addr(client_message[0]),
- len(client_message), 0) == -1:
- raiseOSError(osLastError())
- selector.updateHandle(client_socket, {Event.Read})
- var rc2 = selector.select(100)
- assert(len(rc2) == 1)
- var read_count = posix.recv(server2_socket, addr buffer[0], 128, 0)
- if read_count == -1:
- raiseOSError(osLastError())
- assert(read_count == len(client_message))
- var test1 = true
- for i in 0..<read_count:
- if client_message[i] != buffer[i]:
- test1 = false
- break
- assert(test1)
- selector.updateHandle(server2_socket, {Event.Write})
- var rc3 = selector.select(0)
- assert(len(rc3) == 1)
- if posix.send(server2_socket, addr(server_message[0]),
- len(server_message), 0) == -1:
- raiseOSError(osLastError())
- selector.updateHandle(server2_socket, {Event.Read})
- var rc4 = selector.select(100)
- assert(len(rc4) == 1)
- read_count = posix.recv(client_socket, addr(buffer[0]), 128, 0)
- if read_count == -1:
- raiseOSError(osLastError())
- assert(read_count == len(server_message))
- var test2 = true
- for i in 0..<read_count:
- if server_message[i] != buffer[i]:
- test2 = false
- break
- assert(test2)
- selector.unregister(server_socket)
- selector.unregister(server2_socket)
- selector.unregister(client_socket)
- discard posix.close(server_socket)
- discard posix.close(server2_socket)
- discard posix.close(client_socket)
- assert(selector.isEmpty())
- close(selector)
- result = true
- proc event_notification_test(): bool =
- var selector = newSelector[int]()
- var event = newSelectEvent()
- selector.registerEvent(event, 1)
- var rc0 = selector.select(0)
- event.trigger()
- var rc1 = selector.select(0)
- event.trigger()
- var rc2 = selector.select(0)
- var rc3 = selector.select(0)
- assert(len(rc0) == 0 and len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0)
- var ev1 = selector.getData(rc1[0].fd)
- var ev2 = selector.getData(rc2[0].fd)
- assert(ev1 == 1 and ev2 == 1)
- selector.unregister(event)
- event.close()
- assert(selector.isEmpty())
- selector.close()
- result = true
- when ioselSupportedPlatform:
- proc timer_notification_test(): bool =
- var selector = newSelector[int]()
- var timer = selector.registerTimer(100, false, 0)
- var rc1 = selector.select(10000)
- var rc2 = selector.select(10000)
- # if this flakes, see tests/m14634.nim
- assert len(rc1) == 1 and len(rc2) == 1, $(len(rc1), len(rc2))
- selector.unregister(timer)
- discard selector.select(0)
- selector.registerTimer(100, true, 0)
- var rc4 = selector.select(10000)
- var rc5 = selector.select(1000) # this will be an actual wait, keep it small
- assert len(rc4) == 1 and len(rc5) == 0, $(len(rc4), len(rc5))
- assert(selector.isEmpty())
- selector.close()
- result = true
- proc process_notification_test(): bool =
- var selector = newSelector[int]()
- var process2 = startProcess("sleep", "", ["2"], nil,
- {poStdErrToStdOut, poUsePath})
- discard startProcess("sleep", "", ["1"], nil,
- {poStdErrToStdOut, poUsePath})
- selector.registerProcess(process2.processID, 0)
- var rc1 = selector.select(1500)
- var rc2 = selector.select(1500)
- var r = len(rc1) + len(rc2)
- assert(r == 1)
- result = true
- proc signal_notification_test(): bool =
- var sigset1n, sigset1o, sigset2n, sigset2o: Sigset
- var pid = posix.getpid()
- discard sigemptyset(sigset1n)
- discard sigemptyset(sigset1o)
- discard sigemptyset(sigset2n)
- discard sigemptyset(sigset2o)
- when hasThreadSupport:
- if pthread_sigmask(SIG_BLOCK, sigset1n, sigset1o) == -1:
- raiseOSError(osLastError())
- else:
- if sigprocmask(SIG_BLOCK, sigset1n, sigset1o) == -1:
- raiseOSError(osLastError())
- var selector = newSelector[int]()
- var s1 = selector.registerSignal(SIGUSR1, 1)
- var s2 = selector.registerSignal(SIGUSR2, 2)
- var s3 = selector.registerSignal(SIGTERM, 3)
- discard selector.select(0)
- discard posix.kill(pid, SIGUSR1)
- discard posix.kill(pid, SIGUSR2)
- discard posix.kill(pid, SIGTERM)
- var rc = selector.select(0)
- var cd0 = selector.getData(rc[0].fd)
- var cd1 = selector.getData(rc[1].fd)
- var cd2 = selector.getData(rc[2].fd)
- selector.unregister(s1)
- selector.unregister(s2)
- selector.unregister(s3)
- when hasThreadSupport:
- if pthread_sigmask(SIG_BLOCK, sigset2n, sigset2o) == -1:
- raiseOSError(osLastError())
- else:
- if sigprocmask(SIG_BLOCK, sigset2n, sigset2o) == -1:
- raiseOSError(osLastError())
- assert(len(rc) == 3)
- assert(cd0 + cd1 + cd2 == 6, $(cd0 + cd1 + cd2)) # 1 + 2 + 3
- assert(equalMem(addr sigset1o, addr sigset2o, sizeof(Sigset)))
- assert(selector.isEmpty())
- result = true
- when defined(macosx) or defined(freebsd) or defined(openbsd) or
- defined(netbsd):
- proc rename(frompath: cstring, topath: cstring): cint
- {.importc: "rename", header: "<stdio.h>".}
- proc createFile(name: string): cint =
- result = posix.open(cstring(name), posix.O_CREAT or posix.O_RDWR)
- if result == -1:
- raiseOsError(osLastError())
- proc writeFile(name: string, data: string) =
- let fd = posix.open(cstring(name), posix.O_APPEND or posix.O_RDWR)
- if fd == -1:
- raiseOsError(osLastError())
- let length = len(data).cint
- if posix.write(fd, cast[pointer](addr data[0]),
- len(data).cint) != length:
- raiseOsError(osLastError())
- if posix.close(fd) == -1:
- raiseOsError(osLastError())
- proc closeFile(fd: cint) =
- if posix.close(fd) == -1:
- raiseOsError(osLastError())
- proc removeFile(name: string) =
- let err = posix.unlink(cstring(name))
- if err == -1:
- raiseOsError(osLastError())
- proc createDir(name: string) =
- let err = posix.mkdir(cstring(name), 0x1FF)
- if err == -1:
- raiseOsError(osLastError())
- proc removeDir(name: string) =
- let err = posix.rmdir(cstring(name))
- if err == -1:
- raiseOsError(osLastError())
- proc chmodPath(name: string, mode: cint) =
- let err = posix.chmod(cstring(name), Mode(mode))
- if err == -1:
- raiseOsError(osLastError())
- proc renameFile(names: string, named: string) =
- let err = rename(cstring(names), cstring(named))
- if err == -1:
- raiseOsError(osLastError())
- proc symlink(names: string, named: string) =
- let err = posix.symlink(cstring(names), cstring(named))
- if err == -1:
- raiseOsError(osLastError())
- proc openWatch(name: string): cint =
- result = posix.open(cstring(name), posix.O_RDONLY)
- if result == -1:
- raiseOsError(osLastError())
- const
- testDirectory = "/tmp/kqtest"
- type
- valType = object
- fd: cint
- events: set[Event]
- proc vnode_test(): bool =
- proc validate(test: openArray[ReadyKey],
- check: openArray[valType]): bool =
- result = false
- if len(test) == len(check):
- for checkItem in check:
- result = false
- for testItem in test:
- if testItem.fd == checkItem.fd and
- checkItem.events <= testItem.events:
- result = true
- break
- if not result:
- break
- var res: seq[ReadyKey]
- var selector = newSelector[int]()
- var events = {Event.VnodeWrite, Event.VnodeDelete, Event.VnodeExtend,
- Event.VnodeAttrib, Event.VnodeLink, Event.VnodeRename,
- Event.VnodeRevoke}
- result = true
- discard posix.unlink(testDirectory)
- createDir(testDirectory)
- var dirfd = posix.open(cstring(testDirectory), posix.O_RDONLY)
- if dirfd == -1:
- raiseOsError(osLastError())
- selector.registerVnode(dirfd, events, 1)
- discard selector.select(0)
- # chmod testDirectory to 0777
- chmodPath(testDirectory, 0x1FF)
- res = selector.select(0)
- doAssert(len(res) == 1)
- doAssert(len(selector.select(0)) == 0)
- doAssert(res[0].fd == dirfd and
- {Event.Vnode, Event.VnodeAttrib} <= res[0].events)
- # create subdirectory
- createDir(testDirectory & "/test")
- res = selector.select(0)
- doAssert(len(res) == 1)
- doAssert(len(selector.select(0)) == 0)
- doAssert(res[0].fd == dirfd and
- {Event.Vnode, Event.VnodeWrite,
- Event.VnodeLink} <= res[0].events)
- # open test directory for watching
- var testfd = openWatch(testDirectory & "/test")
- selector.registerVnode(testfd, events, 2)
- doAssert(len(selector.select(0)) == 0)
- # rename test directory
- renameFile(testDirectory & "/test", testDirectory & "/renamed")
- res = selector.select(0)
- doAssert(len(res) == 2)
- doAssert(len(selector.select(0)) == 0)
- doAssert(validate(res,
- [valType(fd: dirfd, events: {Event.Vnode, Event.VnodeWrite}),
- valType(fd: testfd,
- events: {Event.Vnode, Event.VnodeRename})])
- )
- # remove test directory
- removeDir(testDirectory & "/renamed")
- res = selector.select(0)
- doAssert(len(res) == 2)
- doAssert(len(selector.select(0)) == 0)
- doAssert(validate(res,
- [valType(fd: dirfd, events: {Event.Vnode, Event.VnodeWrite,
- Event.VnodeLink}),
- valType(fd: testfd,
- events: {Event.Vnode, Event.VnodeDelete})])
- )
- # create file new test file
- testfd = createFile(testDirectory & "/testfile")
- res = selector.select(0)
- doAssert(len(res) == 1)
- doAssert(len(selector.select(0)) == 0)
- doAssert(res[0].fd == dirfd and
- {Event.Vnode, Event.VnodeWrite} <= res[0].events)
- # close new test file
- closeFile(testfd)
- doAssert(len(selector.select(0)) == 0)
- doAssert(len(selector.select(0)) == 0)
- # chmod test file with 0666
- chmodPath(testDirectory & "/testfile", 0x1B6)
- doAssert(len(selector.select(0)) == 0)
- testfd = openWatch(testDirectory & "/testfile")
- selector.registerVnode(testfd, events, 1)
- discard selector.select(0)
- # write data to test file
- writeFile(testDirectory & "/testfile", "TESTDATA")
- res = selector.select(0)
- doAssert(len(res) == 1)
- doAssert(len(selector.select(0)) == 0)
- doAssert(res[0].fd == testfd and
- {Event.Vnode, Event.VnodeWrite,
- Event.VnodeExtend} <= res[0].events)
- # symlink test file
- symlink(testDirectory & "/testfile", testDirectory & "/testlink")
- res = selector.select(0)
- doAssert(len(res) == 1)
- doAssert(len(selector.select(0)) == 0)
- doAssert(res[0].fd == dirfd and
- {Event.Vnode, Event.VnodeWrite} <= res[0].events)
- # remove test file
- removeFile(testDirectory & "/testfile")
- res = selector.select(0)
- doAssert(len(res) == 2)
- doAssert(len(selector.select(0)) == 0)
- doAssert(validate(res,
- [valType(fd: testfd, events: {Event.Vnode, Event.VnodeDelete}),
- valType(fd: dirfd, events: {Event.Vnode, Event.VnodeWrite})])
- )
- # remove symlink
- removeFile(testDirectory & "/testlink")
- res = selector.select(0)
- doAssert(len(res) == 1)
- doAssert(len(selector.select(0)) == 0)
- doAssert(res[0].fd == dirfd and
- {Event.Vnode, Event.VnodeWrite} <= res[0].events)
- # remove testDirectory
- removeDir(testDirectory)
- res = selector.select(0)
- doAssert(len(res) == 1)
- doAssert(len(selector.select(0)) == 0)
- doAssert(res[0].fd == dirfd and
- {Event.Vnode, Event.VnodeDelete} <= res[0].events)
- proc pipe_test(): bool =
- # closing the read end of a pipe will result in it automatically
- # being removed from the kqueue; make sure no exception is raised
- var s = newSelector[int]()
- var fds: array[2, cint]
- discard pipe(fds)
- s.registerHandle(fds[1], {Write}, 0)
- discard close(fds[0])
- let res = s.select(-1)
- doAssert(res.len == 1)
- s.unregister(fds[1])
- discard close(fds[1])
- return true
- when hasThreadSupport:
- var counter = 0
- proc event_wait_thread(event: SelectEvent) {.thread.} =
- var selector = newSelector[int]()
- selector.registerEvent(event, 1)
- var rc = selector.select(1000)
- if len(rc) == 1:
- inc(counter)
- selector.unregister(event)
- assert(selector.isEmpty())
- proc mt_event_test(): bool =
- var
- thr: array[0..7, Thread[SelectEvent]]
- var selector = newSelector[int]()
- var sock = createNativeSocket()
- var event = newSelectEvent()
- for i in 0..high(thr):
- createThread(thr[i], event_wait_thread, event)
- selector.registerHandle(sock, {Event.Read}, 1)
- discard selector.select(500)
- selector.unregister(sock)
- event.trigger()
- joinThreads(thr)
- assert(counter == 1)
- result = true
- processTest("Socket notification test...", socket_notification_test())
- processTest("User event notification test...", event_notification_test())
- when hasThreadSupport:
- processTest("Multithreaded user event notification test...",
- mt_event_test())
- when ioselSupportedPlatform:
- processTest("Timer notification test...", timer_notification_test())
- processTest("Process notification test...", process_notification_test())
- processTest("Signal notification test...", signal_notification_test())
- when defined(macosx) or defined(freebsd) or defined(openbsd) or
- defined(netbsd):
- processTest("File notification test...", vnode_test())
- processTest("Pipe test...", pipe_test())
- echo("All tests passed!")
- else:
- import nativesockets, winlean, os, osproc
- proc socket_notification_test(): bool =
- proc create_test_socket(): SocketHandle =
- var sock = createNativeSocket()
- setBlocking(sock, false)
- result = sock
- var client_message = "SERVER HELLO =>"
- var server_message = "CLIENT HELLO"
- var buffer : array[128, char]
- var selector = newSelector[int]()
- var client_socket = create_test_socket()
- var server_socket = create_test_socket()
- selector.registerHandle(server_socket, {Event.Read}, 0)
- selector.registerHandle(client_socket, {Event.Write}, 0)
- var option : int32 = 1
- if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR),
- addr(option), sizeof(option).SockLen) < 0:
- raiseOSError(osLastError())
- var aiList = getAddrInfo("0.0.0.0", Port(13337))
- if bindAddr(server_socket, aiList.ai_addr,
- aiList.ai_addrlen.Socklen) < 0'i32:
- freeAddrInfo(aiList)
- raiseOSError(osLastError())
- discard server_socket.listen()
- freeAddrInfo(aiList)
- aiList = getAddrInfo("127.0.0.1", Port(13337))
- discard connect(client_socket, aiList.ai_addr,
- aiList.ai_addrlen.Socklen)
- freeAddrInfo(aiList)
- # for some reason Windows select doesn't return both
- # descriptors from first call, so we need to make 2 calls
- var n = 0
- var rcm = selector.select(1000)
- while n < 10 and len(rcm) < 2:
- sleep(1000)
- rcm = selector.select(1000)
- inc(n)
- assert(len(rcm) == 2)
- var sockAddress = SockAddr()
- var addrLen = sizeof(sockAddress).Socklen
- var server2_socket = accept(server_socket,
- cast[ptr SockAddr](addr(sockAddress)),
- addr(addrLen))
- assert(server2_socket != osInvalidSocket)
- selector.registerHandle(server2_socket, {Event.Read}, 0)
- if send(client_socket, cast[pointer](addr(client_message[0])),
- cint(len(client_message)), 0) == -1:
- raiseOSError(osLastError())
- selector.updateHandle(client_socket, {Event.Read})
- var rc2 = selector.select(1000)
- assert(len(rc2) == 1)
- var read_count = recv(server2_socket, addr buffer[0], 128, 0)
- if read_count == -1:
- raiseOSError(osLastError())
- assert(read_count == len(client_message))
- var test1 = true
- for i in 0..<read_count:
- if client_message[i] != buffer[i]:
- test1 = false
- break
- assert(test1)
- if send(server2_socket, cast[pointer](addr(server_message[0])),
- cint(len(server_message)), 0) == -1:
- raiseOSError(osLastError())
- var rc3 = selector.select(0)
- assert(len(rc3) == 1)
- read_count = recv(client_socket, addr(buffer[0]), 128, 0)
- if read_count == -1:
- raiseOSError(osLastError())
- assert(read_count == len(server_message))
- var test2 = true
- for i in 0..<read_count:
- if server_message[i] != buffer[i]:
- test2 = false
- break
- assert(test2)
- selector.unregister(server_socket)
- selector.unregister(server2_socket)
- selector.unregister(client_socket)
- close(server_socket)
- close(server2_socket)
- close(client_socket)
- assert(selector.isEmpty())
- close(selector)
- result = true
- proc event_notification_test(): bool =
- var selector = newSelector[int]()
- var event = newSelectEvent()
- selector.registerEvent(event, 1)
- discard selector.select(0)
- event.trigger()
- var rc1 = selector.select(0)
- event.trigger()
- var rc2 = selector.select(0)
- var rc3 = selector.select(0)
- assert(len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0)
- var ev1 = selector.getData(rc1[0].fd)
- var ev2 = selector.getData(rc2[0].fd)
- assert(ev1 == 1 and ev2 == 1)
- selector.unregister(event)
- event.close()
- assert(selector.isEmpty())
- selector.close()
- result = true
- when hasThreadSupport:
- var counter = 0
- proc event_wait_thread(event: SelectEvent) {.thread.} =
- var selector = newSelector[int]()
- selector.registerEvent(event, 1)
- var rc = selector.select(1500)
- if len(rc) == 1:
- inc(counter)
- selector.unregister(event)
- assert(selector.isEmpty())
- proc mt_event_test(): bool =
- var thr: array[0..7, Thread[SelectEvent]]
- var event = newSelectEvent()
- for i in 0..high(thr):
- createThread(thr[i], event_wait_thread, event)
- event.trigger()
- joinThreads(thr)
- assert(counter == 1)
- result = true
- processTest("Socket notification test...", socket_notification_test())
- processTest("User event notification test...", event_notification_test())
- when hasThreadSupport:
- processTest("Multithreaded user event notification test...",
- mt_event_test())
- echo("All tests passed!")
|