net.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package rx
  2. import (
  3. "net"
  4. "time"
  5. )
  6. type WrappedConnection struct {
  7. conn net.Conn
  8. timeout TimeoutPair
  9. sched Scheduler
  10. ob *observer
  11. worker *Worker
  12. context *Context
  13. dispose disposeFunc
  14. closed chan struct{}
  15. result Promise
  16. }
  17. type TimeoutPair struct {
  18. ReadTimeout time.Duration
  19. WriteTimeout time.Duration
  20. }
  21. func (w *WrappedConnection) Context() *Context {
  22. return w.context
  23. }
  24. func (w *WrappedConnection) Scheduler() Scheduler {
  25. return w.sched
  26. }
  27. func (w *WrappedConnection) Worker() *Worker {
  28. return w.worker
  29. }
  30. func (w *WrappedConnection) closeProperly(err error) {
  31. select {
  32. case <- w.closed:
  33. return
  34. default:
  35. }
  36. close(w.closed)
  37. _ = w.conn.Close()
  38. w.worker.Dispose()
  39. w.sched.commit(func() {
  40. if err != nil {
  41. w.ob.error(err)
  42. } else {
  43. w.ob.next(nil)
  44. w.ob.complete()
  45. }
  46. w.dispose(behaviour_cancel)
  47. })
  48. if err != nil {
  49. w.result.Reject(err, w.sched)
  50. } else {
  51. w.result.Resolve(nil, w.sched)
  52. }
  53. }
  54. func (w *WrappedConnection) Read(buf ([] byte)) (int, error) {
  55. var timeout = w.timeout.ReadTimeout
  56. if timeout != 0 {
  57. err := w.conn.SetReadDeadline(time.Now().Add(timeout))
  58. if err != nil { return 0, err }
  59. }
  60. n, err := w.conn.Read(buf)
  61. if err != nil {
  62. w.closeProperly(err)
  63. }
  64. return n, err
  65. }
  66. func (w *WrappedConnection) Write(buf ([] byte)) (int, error) {
  67. var timeout = w.timeout.WriteTimeout
  68. if timeout != 0 {
  69. err := w.conn.SetWriteDeadline(time.Now().Add(timeout))
  70. if err != nil { return 0, err }
  71. }
  72. n, err := w.conn.Write(buf)
  73. if err != nil {
  74. w.closeProperly(err)
  75. }
  76. return n, err
  77. }
  78. func (w *WrappedConnection) Fatal(err error) {
  79. w.closeProperly(err)
  80. }
  81. func (w *WrappedConnection) Close() error {
  82. w.closeProperly(nil)
  83. return nil
  84. }
  85. func (w *WrappedConnection) OnClose() Observable {
  86. return w.result.Outcome()
  87. }
  88. func NewConnectionHandler(conn net.Conn, timeout TimeoutPair, logic (func(*WrappedConnection))) Observable {
  89. return Observable { func(sched Scheduler, ob *observer) {
  90. var ctx, dispose = ob.context.create_disposable_child()
  91. var wrapped = &WrappedConnection {
  92. conn: conn,
  93. timeout: timeout,
  94. sched: sched,
  95. ob: ob,
  96. worker: CreateWorker(),
  97. context: ctx,
  98. dispose: dispose,
  99. closed: make(chan struct{}),
  100. result: CreatePromise(),
  101. }
  102. go logic(wrapped)
  103. go ob.context.WaitDispose(func() {
  104. _ = wrapped.Close()
  105. })
  106. } }
  107. }