main.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. package main
  2. import (
  3. "fmt"
  4. "io"
  5. "log"
  6. "math/rand"
  7. "net"
  8. "net/http"
  9. _ "net/http/pprof"
  10. "os"
  11. "sync"
  12. "time"
  13. kcp "github.com/johnsonjh/gfcp"
  14. "github.com/johnsonjh/gfcptun/generic"
  15. smux "github.com/johnsonjh/gfsmux"
  16. "github.com/urfave/cli"
  17. "github.com/xtaci/tcpraw"
  18. )
  19. const (
  20. // maximum supported smux version
  21. maxSmuxVer = 2
  22. )
  23. // VERSION is injected by buildflags
  24. var VERSION = "SELFBUILD"
  25. // handle multiplex-ed connection
  26. func handleMux(conn net.Conn, config *Config) {
  27. // check if target is unix domain socket
  28. var isUnix bool
  29. if _, _, err := net.SplitHostPort(config.Target); err != nil {
  30. isUnix = true
  31. }
  32. log.Println("smux version:", config.SmuxVer, "on connection:", conn.LocalAddr(), "->", conn.RemoteAddr())
  33. // stream multiplex
  34. smuxConfig := smux.DefaultConfig()
  35. smuxConfig.Version = config.SmuxVer
  36. smuxConfig.MaxReceiveBuffer = config.SmuxBuf
  37. smuxConfig.MaxStreamBuffer = config.StreamBuf
  38. smuxConfig.KeepAliveInterval = time.Duration(config.KeepAlive) * time.Second
  39. mux, err := smux.Server(conn, smuxConfig)
  40. if err != nil {
  41. log.Println(err)
  42. return
  43. }
  44. defer mux.Close()
  45. for {
  46. stream, err := mux.AcceptStream()
  47. if err != nil {
  48. log.Println(err)
  49. return
  50. }
  51. go func(p1 *smux.Stream) {
  52. var p2 net.Conn
  53. var err error
  54. if !isUnix {
  55. p2, err = net.Dial("tcp", config.Target)
  56. } else {
  57. p2, err = net.Dial("unix", config.Target)
  58. }
  59. if err != nil {
  60. log.Println(err)
  61. p1.Close()
  62. return
  63. }
  64. handleClient(p1, p2, config.Quiet)
  65. }(stream)
  66. }
  67. }
  68. func handleClient(p1 *smux.Stream, p2 net.Conn, quiet bool) {
  69. logln := func(v ...interface{}) {
  70. if !quiet {
  71. log.Println(v...)
  72. }
  73. }
  74. defer p1.Close()
  75. defer p2.Close()
  76. logln("stream opened", "in:", fmt.Sprint(p1.RemoteAddr(), "(", p1.ID(), ")"), "out:", p2.RemoteAddr())
  77. defer logln("stream closed", "in:", fmt.Sprint(p1.RemoteAddr(), "(", p1.ID(), ")"), "out:", p2.RemoteAddr())
  78. // start tunnel & wait for tunnel termination
  79. streamCopy := func(dst io.Writer, src io.ReadCloser) {
  80. if _, err := generic.Copy(dst, src); err != nil {
  81. if err == smux.ErrInvalidProtocol {
  82. log.Println("smux", err, "in:", fmt.Sprint(p1.RemoteAddr(), "(", p1.ID(), ")"), "out:", p2.RemoteAddr())
  83. }
  84. }
  85. p1.Close()
  86. p2.Close()
  87. }
  88. go streamCopy(p2, p1)
  89. streamCopy(p1, p2)
  90. }
  91. func checkError(err error) {
  92. if err != nil {
  93. log.Printf("%+v\n", err)
  94. os.Exit(-1)
  95. }
  96. }
  97. func main() {
  98. rand.Seed(int64(time.Now().Nanosecond()))
  99. if VERSION == "SELFBUILD" {
  100. // add more log flags for debugging
  101. log.SetFlags(log.LstdFlags | log.Lshortfile)
  102. }
  103. myApp := cli.NewApp()
  104. myApp.Name = "gfcptun"
  105. myApp.Usage = "server(with SMUX)"
  106. myApp.Version = VERSION
  107. myApp.Flags = []cli.Flag{
  108. cli.StringFlag{
  109. Name: "listen,l",
  110. Value: ":29900",
  111. Usage: "gfcp server listen address",
  112. },
  113. cli.StringFlag{
  114. Name: "target, t",
  115. Value: "127.0.0.1:12948",
  116. Usage: "target server address, or path/to/unix_socket",
  117. },
  118. cli.StringFlag{
  119. Name: "key",
  120. Value: "it's a secrect",
  121. Usage: "pre-shared secret between client and server",
  122. EnvVar: "GFCPTUN_KEY",
  123. },
  124. cli.StringFlag{
  125. Name: "mode",
  126. Value: "fast",
  127. Usage: "profiles: fast3, fast2, fast, normal, manual",
  128. },
  129. cli.IntFlag{
  130. Name: "mtu",
  131. Value: 1350,
  132. Usage: "set maximum transmission unit for UDP packets",
  133. },
  134. cli.IntFlag{
  135. Name: "sndwnd",
  136. Value: 1024,
  137. Usage: "set send window size(num of packets)",
  138. },
  139. cli.IntFlag{
  140. Name: "rcvwnd",
  141. Value: 1024,
  142. Usage: "set receive window size(num of packets)",
  143. },
  144. cli.IntFlag{
  145. Name: "datashard,ds",
  146. Value: 10,
  147. Usage: "set reed-solomon erasure coding - datashard",
  148. },
  149. cli.IntFlag{
  150. Name: "parityshard,ps",
  151. Value: 3,
  152. Usage: "set reed-solomon erasure coding - parityshard",
  153. },
  154. cli.IntFlag{
  155. Name: "dscp",
  156. Value: 0,
  157. Usage: "set DSCP(6bit)",
  158. },
  159. cli.BoolFlag{
  160. Name: "nocomp",
  161. Usage: "disable compression",
  162. },
  163. cli.BoolFlag{
  164. Name: "acknodelay",
  165. Usage: "flush ack immediately when a packet is received",
  166. Hidden: true,
  167. },
  168. cli.IntFlag{
  169. Name: "nodelay",
  170. Value: 0,
  171. Hidden: true,
  172. },
  173. cli.IntFlag{
  174. Name: "interval",
  175. Value: 50,
  176. Hidden: true,
  177. },
  178. cli.IntFlag{
  179. Name: "resend",
  180. Value: 0,
  181. Hidden: true,
  182. },
  183. cli.IntFlag{
  184. Name: "nc",
  185. Value: 0,
  186. Hidden: true,
  187. },
  188. cli.IntFlag{
  189. Name: "sockbuf",
  190. Value: 4194304, // socket buffer size in bytes
  191. Usage: "per-socket buffer in bytes",
  192. },
  193. cli.IntFlag{
  194. Name: "smuxver",
  195. Value: 1,
  196. Usage: "specify smux version, available 1,2",
  197. },
  198. cli.IntFlag{
  199. Name: "smuxbuf",
  200. Value: 4194304,
  201. Usage: "the overall de-mux buffer in bytes",
  202. },
  203. cli.IntFlag{
  204. Name: "streambuf",
  205. Value: 2097152,
  206. Usage: "per stream receive buffer in bytes, smux v2+",
  207. },
  208. cli.IntFlag{
  209. Name: "keepalive",
  210. Value: 10, // nat keepalive interval in seconds
  211. Usage: "seconds between heartbeats",
  212. },
  213. cli.StringFlag{
  214. Name: "snsilog",
  215. Value: "",
  216. Usage: "collect snsi to file, aware of timeformat in golang, like: ./snsi-20060102.log",
  217. },
  218. cli.IntFlag{
  219. Name: "snsiperiod",
  220. Value: 60,
  221. Usage: "snsi collect period, in seconds",
  222. },
  223. cli.BoolFlag{
  224. Name: "pprof",
  225. Usage: "start profiling server on :6060",
  226. },
  227. cli.StringFlag{
  228. Name: "log",
  229. Value: "",
  230. Usage: "specify a log file to output, default goes to stderr",
  231. },
  232. cli.BoolFlag{
  233. Name: "quiet",
  234. Usage: "to suppress the 'stream open/close' messages",
  235. },
  236. cli.BoolFlag{
  237. Name: "tcp",
  238. Usage: "to emulate a TCP connection(linux)",
  239. },
  240. cli.StringFlag{
  241. Name: "c",
  242. Value: "", // when the value is not empty, the config path must exists
  243. Usage: "config from json file, which will override the command from shell",
  244. },
  245. }
  246. myApp.Action = func(c *cli.Context) error {
  247. config := Config{}
  248. config.Listen = c.String("listen")
  249. config.Target = c.String("target")
  250. config.Key = c.String("key")
  251. config.Mode = c.String("mode")
  252. config.MTU = c.Int("mtu")
  253. config.SndWnd = c.Int("sndwnd")
  254. config.RcvWnd = c.Int("rcvwnd")
  255. config.DataShard = c.Int("datashard")
  256. config.ParityShard = c.Int("parityshard")
  257. config.DSCP = c.Int("dscp")
  258. config.NoComp = c.Bool("nocomp")
  259. config.AckNodelay = c.Bool("acknodelay")
  260. config.NoDelay = c.Int("nodelay")
  261. config.Interval = c.Int("interval")
  262. config.Resend = c.Int("resend")
  263. config.NoCongestion = c.Int("nc")
  264. config.SockBuf = c.Int("sockbuf")
  265. config.SmuxBuf = c.Int("smuxbuf")
  266. config.StreamBuf = c.Int("streambuf")
  267. config.SmuxVer = c.Int("smuxver")
  268. config.KeepAlive = c.Int("keepalive")
  269. config.Log = c.String("log")
  270. config.SnsiLog = c.String("snsilog")
  271. config.SnsiPeriod = c.Int("snsiperiod")
  272. config.Pprof = c.Bool("pprof")
  273. config.Quiet = c.Bool("quiet")
  274. config.TCP = c.Bool("tcp")
  275. if c.String("c") != "" {
  276. // Now only support json config file
  277. err := parseJSONConfig(&config, c.String("c"))
  278. checkError(err)
  279. }
  280. // log redirect
  281. if config.Log != "" {
  282. f, err := os.OpenFile(config.Log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o666)
  283. checkError(err)
  284. defer f.Close()
  285. log.SetOutput(f)
  286. }
  287. switch config.Mode {
  288. case "normal":
  289. config.NoDelay, config.Interval, config.Resend, config.NoCongestion = 0, 40, 2, 1
  290. case "fast":
  291. config.NoDelay, config.Interval, config.Resend, config.NoCongestion = 0, 30, 2, 1
  292. case "fast2":
  293. config.NoDelay, config.Interval, config.Resend, config.NoCongestion = 1, 20, 2, 1
  294. case "fast3":
  295. config.NoDelay, config.Interval, config.Resend, config.NoCongestion = 1, 10, 2, 1
  296. }
  297. log.Println("version:", VERSION)
  298. log.Println("smux version:", config.SmuxVer)
  299. log.Println("listening on:", config.Listen)
  300. log.Println("target:", config.Target)
  301. log.Println("nodelay parameters:", config.NoDelay, config.Interval, config.Resend, config.NoCongestion)
  302. log.Println("sndwnd:", config.SndWnd, "rcvwnd:", config.RcvWnd)
  303. log.Println("compression:", !config.NoComp)
  304. log.Println("mtu:", config.MTU)
  305. log.Println("datashard:", config.DataShard, "parityshard:", config.ParityShard)
  306. log.Println("acknodelay:", config.AckNodelay)
  307. log.Println("dscp:", config.DSCP)
  308. log.Println("sockbuf:", config.SockBuf)
  309. log.Println("smuxbuf:", config.SmuxBuf)
  310. log.Println("streambuf:", config.StreamBuf)
  311. log.Println("keepalive:", config.KeepAlive)
  312. log.Println("snsilog:", config.SnsiLog)
  313. log.Println("snsiperiod:", config.SnsiPeriod)
  314. log.Println("pprof:", config.Pprof)
  315. log.Println("quiet:", config.Quiet)
  316. log.Println("tcp:", config.TCP)
  317. // parameters check
  318. if config.SmuxVer > maxSmuxVer {
  319. log.Fatal("unsupported smux version:", config.SmuxVer)
  320. }
  321. go generic.SnsiLogger(config.SnsiLog, config.SnsiPeriod)
  322. if config.Pprof {
  323. go http.ListenAndServe(":6060", nil)
  324. }
  325. // main loop
  326. var wg sync.WaitGroup
  327. loop := func(lis *kcp.Listener) {
  328. defer wg.Done()
  329. if err := lis.SetDSCP(config.DSCP); err != nil {
  330. log.Println("SetDSCP:", err)
  331. }
  332. if err := lis.SetReadBuffer(config.SockBuf); err != nil {
  333. log.Println("SetReadBuffer:", err)
  334. }
  335. if err := lis.SetWriteBuffer(config.SockBuf); err != nil {
  336. log.Println("SetWriteBuffer:", err)
  337. }
  338. for {
  339. if conn, err := lis.AcceptGFCP(); err == nil {
  340. log.Println("remote address:", conn.RemoteAddr())
  341. conn.SetStreamMode(true)
  342. conn.SetWriteDelay(false)
  343. conn.SetNoDelay(config.NoDelay, config.Interval, config.Resend, config.NoCongestion)
  344. conn.SetMtu(config.MTU)
  345. conn.SetWindowSize(config.SndWnd, config.RcvWnd)
  346. conn.SetACKNoDelay(config.AckNodelay)
  347. if config.NoComp {
  348. go handleMux(conn, &config)
  349. } else {
  350. go handleMux(generic.NewCompStream(conn), &config)
  351. }
  352. } else {
  353. log.Printf("%+v", err)
  354. }
  355. }
  356. }
  357. if config.TCP { // tcp dual stack
  358. if conn, err := tcpraw.Listen("tcp", config.Listen); err == nil {
  359. lis, err := kcp.ServeConn(config.DataShard, config.ParityShard, conn)
  360. checkError(err)
  361. wg.Add(1)
  362. go loop(lis)
  363. } else {
  364. log.Println(err)
  365. }
  366. }
  367. // udp stack
  368. lis, err := kcp.ListenWithOptions(config.Listen, config.DataShard, config.ParityShard)
  369. checkError(err)
  370. wg.Add(1)
  371. go loop(lis)
  372. wg.Wait()
  373. return nil
  374. }
  375. myApp.Run(os.Args)
  376. }