worker.go 985 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package rx
  2. import (
  3. "sync"
  4. "runtime"
  5. )
  6. type Worker struct {
  7. mutex *sync.Mutex
  8. pending [] func()
  9. notify chan struct{}
  10. disposed bool
  11. }
  12. func CreateWorker() *Worker {
  13. var mutex sync.Mutex
  14. var w = &Worker {
  15. mutex: &mutex,
  16. pending: make([]func(), 0),
  17. notify: make(chan struct{}, 1),
  18. }
  19. go (func() {
  20. for range w.notify {
  21. w.mutex.Lock()
  22. if len(w.pending) > 0 {
  23. var current_works = w.pending
  24. w.pending = make([] func(), 0)
  25. w.mutex.Unlock()
  26. for _, work := range current_works {
  27. work()
  28. }
  29. } else {
  30. w.mutex.Unlock()
  31. }
  32. }
  33. })()
  34. runtime.SetFinalizer(w, func(w *Worker) {
  35. w.Dispose()
  36. })
  37. return w
  38. }
  39. func (w *Worker) Do(work func()) {
  40. w.mutex.Lock()
  41. if !(w.disposed) {
  42. w.pending = append(w.pending, work)
  43. select {
  44. case w.notify <- struct{} {}:
  45. default:
  46. }
  47. }
  48. w.mutex.Unlock()
  49. }
  50. func (w *Worker) Dispose() {
  51. w.mutex.Lock()
  52. if !(w.disposed) {
  53. w.disposed = true
  54. close(w.notify)
  55. }
  56. w.mutex.Unlock()
  57. }