mutex.go 743 B

123456789101112131415161718192021222324252627282930313233343536
  1. package rx
  2. type Mutex struct {
  3. resource Object
  4. sched *QueueScheduler
  5. }
  6. func CreateMutex(res Object, sched Scheduler) *Mutex {
  7. return &Mutex {
  8. resource: res,
  9. sched: QueueSchedulerFrom(sched, 1),
  10. }
  11. }
  12. func NewMutex(res Object) Observable {
  13. return Observable { func(sched Scheduler, ob *observer) {
  14. ob.next(CreateMutex(res, sched))
  15. ob.complete()
  16. } }
  17. }
  18. func (mu *Mutex) Lock(mutation func(Object)(Observable)) Observable {
  19. return Observable { func(sched Scheduler, ob *observer) {
  20. mu.sched.run(mutation(mu.resource), &observer {
  21. context: Background(), // atomic mutation is NOT cancellable
  22. next: ob.next,
  23. error: func(_ Object) {
  24. panic("unexpected exception")
  25. },
  26. complete: ob.complete,
  27. })
  28. } }
  29. }