123456789101112131415161718192021222324252627282930313233343536 |
- package rx
- type Mutex struct {
- resource Object
- sched *QueueScheduler
- }
- func CreateMutex(res Object, sched Scheduler) *Mutex {
- return &Mutex {
- resource: res,
- sched: QueueSchedulerFrom(sched, 1),
- }
- }
- func NewMutex(res Object) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- ob.next(CreateMutex(res, sched))
- ob.complete()
- } }
- }
- func (mu *Mutex) Lock(mutation func(Object)(Observable)) Observable {
- return Observable { func(sched Scheduler, ob *observer) {
- mu.sched.run(mutation(mu.resource), &observer {
- context: Background(), // atomic mutation is NOT cancellable
- next: ob.next,
- error: func(_ Object) {
- panic("unexpected exception")
- },
- complete: ob.complete,
- })
- } }
- }
|