event.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package rx
  2. import "runtime"
  3. const MinimumEventLoopBufferSize = 32768
  4. type event struct {
  5. kind event_kind
  6. payload Object
  7. observer *observer
  8. }
  9. type event_kind int
  10. const (
  11. ev_next event_kind = iota
  12. ev_error
  13. ev_complete
  14. )
  15. type task func()
  16. type EventLoop struct {
  17. event_channel chan event
  18. task_channel chan task
  19. }
  20. func SpawnEventLoop() *EventLoop {
  21. return SpawnEventLoopWithBufferSize(MinimumEventLoopBufferSize)
  22. }
  23. func SpawnEventLoopWithBufferSize(buf_size uint) *EventLoop {
  24. if buf_size < MinimumEventLoopBufferSize {
  25. buf_size = MinimumEventLoopBufferSize
  26. }
  27. var events = make(chan event, buf_size)
  28. var tasks = make(chan task, buf_size)
  29. go (func() {
  30. runtime.LockOSThread()
  31. for {
  32. select {
  33. case ev := <- events:
  34. process_event(ev)
  35. default:
  36. select {
  37. case t := <- tasks:
  38. t()
  39. case ev := <- events:
  40. process_event(ev)
  41. }
  42. }
  43. }
  44. })()
  45. return &EventLoop {
  46. event_channel: events,
  47. task_channel: tasks,
  48. }
  49. }
  50. func (el *EventLoop) dispatch(ev event) {
  51. el.event_channel <- ev
  52. }
  53. func (el *EventLoop) commit(t task) {
  54. el.task_channel <- t
  55. }
  56. func process_event(ev event) {
  57. switch ev.kind {
  58. case ev_next:
  59. ev.observer.next(ev.payload)
  60. case ev_error:
  61. ev.observer.error(ev.payload)
  62. case ev_complete:
  63. ev.observer.complete()
  64. }
  65. }