gfcp_readloop_linux.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. // Copyright © 2021 Jeffrey H. Johnson <trnsz@pobox.com>.
  2. // Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
  3. // Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
  4. // Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
  5. //
  6. // All rights reserved.
  7. //
  8. // All use of this code is governed by the MIT license.
  9. // The complete license is available in the LICENSE file.
  10. //go:build linux
  11. // +build linux
  12. package gfcp
  13. import (
  14. "net"
  15. "sync/atomic"
  16. "golang.org/x/net/ipv4"
  17. "golang.org/x/net/ipv6"
  18. )
  19. const (
  20. batchSize = 16
  21. )
  22. func (
  23. s *UDPSession,
  24. ) readLoop() {
  25. addr, _ := net.ResolveUDPAddr(
  26. "udp",
  27. s.conn.LocalAddr().String(),
  28. )
  29. if addr.IP.To4() != nil {
  30. s.readLoopIPv4()
  31. } else {
  32. s.readLoopIPv6()
  33. }
  34. }
  35. func (
  36. s *UDPSession,
  37. ) readLoopIPv6() {
  38. var src string
  39. msgs := make(
  40. []ipv6.Message,
  41. batchSize,
  42. )
  43. for k := range msgs {
  44. msgs[k].Buffers = [][]byte{
  45. make(
  46. []byte,
  47. GFcpMtuLimit,
  48. ),
  49. }
  50. }
  51. conn := ipv6.NewPacketConn(
  52. s.conn,
  53. )
  54. for {
  55. if count, err := conn.ReadBatch(
  56. msgs,
  57. 0,
  58. ); err == nil {
  59. for i := 0; i < count; i++ {
  60. msg := &msgs[i]
  61. if src == "" {
  62. src = msg.Addr.String()
  63. } else if msg.Addr.String() != src {
  64. atomic.AddUint64(
  65. &DefaultSnsi.GFcpPreInputErrors,
  66. 1,
  67. )
  68. continue
  69. }
  70. if msg.N < s.headerSize+GfcpOverhead {
  71. atomic.AddUint64(
  72. &DefaultSnsi.GFcpInputErrors,
  73. 1,
  74. )
  75. continue
  76. }
  77. s.packetInput(
  78. msg.Buffers[0][:msg.N],
  79. )
  80. }
  81. } else {
  82. s.chReadError <- err
  83. return
  84. }
  85. }
  86. }
  87. func (
  88. s *UDPSession,
  89. ) readLoopIPv4() {
  90. var src string
  91. msgs := make(
  92. []ipv4.Message,
  93. batchSize,
  94. )
  95. for k := range msgs {
  96. msgs[k].Buffers = [][]byte{make(
  97. []byte,
  98. GFcpMtuLimit,
  99. )}
  100. }
  101. conn := ipv4.NewPacketConn(
  102. s.conn,
  103. )
  104. for {
  105. if count, err := conn.ReadBatch(
  106. msgs,
  107. 0,
  108. ); err == nil {
  109. for i := 0; i < count; i++ {
  110. msg := &msgs[i]
  111. if src == "" {
  112. src = msg.Addr.String()
  113. } else if msg.Addr.String() != src {
  114. atomic.AddUint64(
  115. &DefaultSnsi.GFcpInputErrors,
  116. 1,
  117. )
  118. continue
  119. }
  120. if msg.N < s.headerSize+GfcpOverhead {
  121. atomic.AddUint64(
  122. &DefaultSnsi.GFcpInputErrors,
  123. 1,
  124. )
  125. continue
  126. }
  127. s.packetInput(
  128. msg.Buffers[0][:msg.N],
  129. )
  130. }
  131. } else {
  132. s.chReadError <- err
  133. return
  134. }
  135. }
  136. }
  137. func (
  138. l *Listener,
  139. ) monitor() {
  140. addr, _ := net.ResolveUDPAddr(
  141. "udp",
  142. l.conn.LocalAddr().String(),
  143. )
  144. if addr.IP.To4() != nil {
  145. l.monitorIPv4()
  146. } else {
  147. l.monitorIPv6()
  148. }
  149. }
  150. func (
  151. l *Listener,
  152. ) monitorIPv4() {
  153. msgs := make(
  154. []ipv4.Message,
  155. batchSize,
  156. )
  157. for k := range msgs {
  158. msgs[k].Buffers = [][]byte{make(
  159. []byte,
  160. GFcpMtuLimit,
  161. )}
  162. }
  163. conn := ipv4.NewPacketConn(
  164. l.conn,
  165. )
  166. for {
  167. if count, err := conn.ReadBatch(
  168. msgs,
  169. 0,
  170. ); err == nil {
  171. for i := 0; i < count; i++ {
  172. msg := &msgs[i]
  173. if msg.N >= l.headerSize+GfcpOverhead {
  174. l.packetInput(
  175. msg.Buffers[0][:msg.N],
  176. msg.Addr,
  177. )
  178. } else {
  179. atomic.AddUint64(
  180. &DefaultSnsi.GFcpInputErrors,
  181. 1,
  182. )
  183. }
  184. }
  185. } else {
  186. return
  187. }
  188. }
  189. }
  190. func (
  191. l *Listener,
  192. ) monitorIPv6() {
  193. msgs := make(
  194. []ipv6.Message,
  195. batchSize,
  196. )
  197. for k := range msgs {
  198. msgs[k].Buffers = [][]byte{make(
  199. []byte,
  200. GFcpMtuLimit,
  201. )}
  202. }
  203. conn := ipv4.NewPacketConn(
  204. l.conn,
  205. )
  206. for {
  207. if count, err := conn.ReadBatch(
  208. msgs,
  209. 0,
  210. ); err == nil {
  211. for i := 0; i < count; i++ {
  212. msg := &msgs[i]
  213. if msg.N >= l.headerSize+GfcpOverhead {
  214. l.packetInput(
  215. msg.Buffers[0][:msg.N],
  216. msg.Addr,
  217. )
  218. } else {
  219. atomic.AddUint64(
  220. &DefaultSnsi.GFcpInputErrors,
  221. 1,
  222. )
  223. }
  224. }
  225. } else {
  226. return
  227. }
  228. }
  229. }