gfcp_readloop_linux.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. // Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
  2. // Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
  3. // Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
  4. // Copyright © 2021 Jeffrey H. Johnson <jeff@gridfinity.com>.
  5. //
  6. // All rights reserved.
  7. //
  8. // All use of this code is governed by the MIT license.
  9. // The complete license is available in the LICENSE file.
  10. // +build linux
  11. package gfcp // import "go.gridfinity.dev/gfcp"
  12. import (
  13. "net"
  14. "sync/atomic"
  15. "golang.org/x/net/ipv4"
  16. "golang.org/x/net/ipv6"
  17. )
  18. const (
  19. batchSize = 16
  20. )
  21. func (
  22. s *UDPSession,
  23. ) readLoop() {
  24. addr, _ := net.ResolveUDPAddr(
  25. "udp",
  26. s.conn.LocalAddr().String(),
  27. )
  28. if addr.IP.To4() != nil {
  29. s.readLoopIPv4()
  30. } else {
  31. s.readLoopIPv6()
  32. }
  33. }
  34. func (
  35. s *UDPSession,
  36. ) readLoopIPv6() {
  37. var src string
  38. msgs := make(
  39. []ipv6.Message,
  40. batchSize,
  41. )
  42. for k := range msgs {
  43. msgs[k].Buffers = [][]byte{
  44. make(
  45. []byte,
  46. GFcpMtuLimit,
  47. ),
  48. }
  49. }
  50. conn := ipv6.NewPacketConn(
  51. s.conn,
  52. )
  53. for {
  54. if count, err := conn.ReadBatch(
  55. msgs,
  56. 0,
  57. ); err == nil {
  58. for i := 0; i < count; i++ {
  59. msg := &msgs[i]
  60. if src == "" {
  61. src = msg.Addr.String()
  62. } else if msg.Addr.String() != src {
  63. atomic.AddUint64(
  64. &DefaultSnsi.GFcpPreInputErrors,
  65. 1,
  66. )
  67. continue
  68. }
  69. if msg.N < s.headerSize+GfcpOverhead {
  70. atomic.AddUint64(
  71. &DefaultSnsi.GFcpInputErrors,
  72. 1,
  73. )
  74. continue
  75. }
  76. s.packetInput(
  77. msg.Buffers[0][:msg.N],
  78. )
  79. }
  80. } else {
  81. s.chReadError <- err
  82. return
  83. }
  84. }
  85. }
  86. func (
  87. s *UDPSession,
  88. ) readLoopIPv4() {
  89. var src string
  90. msgs := make(
  91. []ipv4.Message,
  92. batchSize,
  93. )
  94. for k := range msgs {
  95. msgs[k].Buffers = [][]byte{make(
  96. []byte,
  97. GFcpMtuLimit,
  98. )}
  99. }
  100. conn := ipv4.NewPacketConn(
  101. s.conn,
  102. )
  103. for {
  104. if count, err := conn.ReadBatch(
  105. msgs,
  106. 0,
  107. ); err == nil {
  108. for i := 0; i < count; i++ {
  109. msg := &msgs[i]
  110. if src == "" {
  111. src = msg.Addr.String()
  112. } else if msg.Addr.String() != src {
  113. atomic.AddUint64(
  114. &DefaultSnsi.GFcpInputErrors,
  115. 1,
  116. )
  117. continue
  118. }
  119. if msg.N < s.headerSize+GfcpOverhead {
  120. atomic.AddUint64(
  121. &DefaultSnsi.GFcpInputErrors,
  122. 1,
  123. )
  124. continue
  125. }
  126. s.packetInput(
  127. msg.Buffers[0][:msg.N],
  128. )
  129. }
  130. } else {
  131. s.chReadError <- err
  132. return
  133. }
  134. }
  135. }
  136. func (
  137. l *Listener,
  138. ) monitor() {
  139. addr, _ := net.ResolveUDPAddr(
  140. "udp",
  141. l.conn.LocalAddr().String(),
  142. )
  143. if addr.IP.To4() != nil {
  144. l.monitorIPv4()
  145. } else {
  146. l.monitorIPv6()
  147. }
  148. }
  149. func (
  150. l *Listener,
  151. ) monitorIPv4() {
  152. msgs := make(
  153. []ipv4.Message,
  154. batchSize,
  155. )
  156. for k := range msgs {
  157. msgs[k].Buffers = [][]byte{make(
  158. []byte,
  159. GFcpMtuLimit,
  160. )}
  161. }
  162. conn := ipv4.NewPacketConn(
  163. l.conn,
  164. )
  165. for {
  166. if count, err := conn.ReadBatch(
  167. msgs,
  168. 0,
  169. ); err == nil {
  170. for i := 0; i < count; i++ {
  171. msg := &msgs[i]
  172. if msg.N >= l.headerSize+GfcpOverhead {
  173. l.packetInput(
  174. msg.Buffers[0][:msg.N],
  175. msg.Addr,
  176. )
  177. } else {
  178. atomic.AddUint64(
  179. &DefaultSnsi.GFcpInputErrors,
  180. 1,
  181. )
  182. }
  183. }
  184. } else {
  185. return
  186. }
  187. }
  188. }
  189. func (
  190. l *Listener,
  191. ) monitorIPv6() {
  192. msgs := make(
  193. []ipv6.Message,
  194. batchSize,
  195. )
  196. for k := range msgs {
  197. msgs[k].Buffers = [][]byte{make(
  198. []byte,
  199. GFcpMtuLimit,
  200. )}
  201. }
  202. conn := ipv4.NewPacketConn(
  203. l.conn,
  204. )
  205. for {
  206. if count, err := conn.ReadBatch(
  207. msgs,
  208. 0,
  209. ); err == nil {
  210. for i := 0; i < count; i++ {
  211. msg := &msgs[i]
  212. if msg.N >= l.headerSize+GfcpOverhead {
  213. l.packetInput(
  214. msg.Buffers[0][:msg.N],
  215. msg.Addr,
  216. )
  217. } else {
  218. atomic.AddUint64(
  219. &DefaultSnsi.GFcpInputErrors,
  220. 1,
  221. )
  222. }
  223. }
  224. } else {
  225. return
  226. }
  227. }
  228. }