main.go 12 KB


  1. package main
  2. import (
  3. "fmt"
  4. "io"
  5. "log"
  6. "math/rand"
  7. "net"
  8. "os"
  9. "time"
  10. "github.com/johnsonjh/gfcptun/generic"
  11. smux "github.com/johnsonjh/gfsmux"
  12. "github.com/pkg/errors"
  13. "github.com/urfave/cli"
  14. )
  15. const (
  16. // maximum supported smux version
  17. maxSmuxVer = 2
  18. )
  19. // VERSION is injected by buildflags
  20. var VERSION = "SELFBUILD"
  21. // handleClient aggregates connection p1 on mux with 'writeLock'
  22. func handleClient(session *smux.Session, p1 net.Conn, quiet bool) {
  23. logln := func(v ...interface{}) {
  24. if !quiet {
  25. log.Println(v...)
  26. }
  27. }
  28. defer p1.Close()
  29. p2, err := session.OpenStream()
  30. if err != nil {
  31. logln(err)
  32. return
  33. }
  34. defer p2.Close()
  35. logln("stream opened", "in:", p1.RemoteAddr(), "out:", fmt.Sprint(p2.RemoteAddr(), "(", p2.ID(), ")"))
  36. defer logln("stream closed", "in:", p1.RemoteAddr(), "out:", fmt.Sprint(p2.RemoteAddr(), "(", p2.ID(), ")"))
  37. // start tunnel & wait for tunnel termination
  38. streamCopy := func(dst io.Writer, src io.ReadCloser) {
  39. if _, err := generic.Copy(dst, src); err != nil {
  40. // report protocol error
  41. if err == smux.ErrInvalidProtocol {
  42. log.Println("smux", err, "in:", p1.RemoteAddr(), "out:", fmt.Sprint(p2.RemoteAddr(), "(", p2.ID(), ")"))
  43. }
  44. }
  45. p1.Close()
  46. p2.Close()
  47. }
  48. go streamCopy(p1, p2)
  49. streamCopy(p2, p1)
  50. }
  51. func checkError(err error) {
  52. if err != nil {
  53. log.Printf("%+v\n", err)
  54. os.Exit(-1)
  55. }
  56. }
  57. type timedSession struct {
  58. session *smux.Session
  59. expiryDate time.Time
  60. }
  61. func main() {
  62. rand.Seed(int64(time.Now().Nanosecond()))
  63. if VERSION == "SELFBUILD" {
  64. // add more log flags for debugging
  65. log.SetFlags(log.LstdFlags | log.Lshortfile)
  66. }
  67. myApp := cli.NewApp()
  68. myApp.Name = "gfcptun"
  69. myApp.Usage = "client(with SMUX)"
  70. myApp.Version = VERSION
  71. myApp.Flags = []cli.Flag{
  72. cli.StringFlag{
  73. Name: "localaddr,l",
  74. Value: ":12948",
  75. Usage: "local listen address",
  76. },
  77. cli.StringFlag{
  78. Name: "remoteaddr, r",
  79. Value: "vps:29900",
  80. Usage: "gfcp server address",
  81. },
  82. cli.StringFlag{
  83. Name: "key",
  84. Value: "it's a secrect",
  85. Usage: "pre-shared secret between client and server",
  86. EnvVar: "GFCPTUN_KEY",
  87. },
  88. cli.StringFlag{
  89. Name: "mode",
  90. Value: "fast",
  91. Usage: "profiles: fast3, fast2, fast, normal, manual",
  92. },
  93. cli.IntFlag{
  94. Name: "conn",
  95. Value: 1,
  96. Usage: "set num of UDP connections to server",
  97. },
  98. cli.IntFlag{
  99. Name: "autoexpire",
  100. Value: 0,
  101. Usage: "set auto expiration time(in seconds) for a single UDP connection, 0 to disable",
  102. },
  103. cli.IntFlag{
  104. Name: "scavengettl",
  105. Value: 600,
  106. Usage: "set how long an expired connection can live (in seconds)",
  107. },
  108. cli.IntFlag{
  109. Name: "mtu",
  110. Value: 1350,
  111. Usage: "set maximum transmission unit for UDP packets",
  112. },
  113. cli.IntFlag{
  114. Name: "sndwnd",
  115. Value: 128,
  116. Usage: "set send window size(num of packets)",
  117. },
  118. cli.IntFlag{
  119. Name: "rcvwnd",
  120. Value: 512,
  121. Usage: "set receive window size(num of packets)",
  122. },
  123. cli.IntFlag{
  124. Name: "datashard,ds",
  125. Value: 10,
  126. Usage: "set reed-solomon erasure coding - datashard",
  127. },
  128. cli.IntFlag{
  129. Name: "parityshard,ps",
  130. Value: 3,
  131. Usage: "set reed-solomon erasure coding - parityshard",
  132. },
  133. cli.IntFlag{
  134. Name: "dscp",
  135. Value: 0,
  136. Usage: "set DSCP(6bit)",
  137. },
  138. cli.BoolFlag{
  139. Name: "nocomp",
  140. Usage: "disable compression",
  141. },
  142. cli.BoolFlag{
  143. Name: "acknodelay",
  144. Usage: "flush ack immediately when a packet is received",
  145. Hidden: true,
  146. },
  147. cli.IntFlag{
  148. Name: "nodelay",
  149. Value: 0,
  150. Hidden: true,
  151. },
  152. cli.IntFlag{
  153. Name: "interval",
  154. Value: 50,
  155. Hidden: true,
  156. },
  157. cli.IntFlag{
  158. Name: "resend",
  159. Value: 0,
  160. Hidden: true,
  161. },
  162. cli.IntFlag{
  163. Name: "nc",
  164. Value: 0,
  165. Hidden: true,
  166. },
  167. cli.IntFlag{
  168. Name: "sockbuf",
  169. Value: 4194304, // socket buffer size in bytes
  170. Usage: "per-socket buffer in bytes",
  171. },
  172. cli.IntFlag{
  173. Name: "smuxver",
  174. Value: 1,
  175. Usage: "specify smux version, available 1,2",
  176. },
  177. cli.IntFlag{
  178. Name: "smuxbuf",
  179. Value: 4194304,
  180. Usage: "the overall de-mux buffer in bytes",
  181. },
  182. cli.IntFlag{
  183. Name: "streambuf",
  184. Value: 2097152,
  185. Usage: "per stream receive buffer in bytes, smux v2+",
  186. },
  187. cli.IntFlag{
  188. Name: "keepalive",
  189. Value: 10, // nat keepalive interval in seconds
  190. Usage: "seconds between heartbeats",
  191. },
  192. cli.StringFlag{
  193. Name: "snsilog",
  194. Value: "",
  195. Usage: "collect snsi to file, aware of timeformat in golang, like: ./snsi-20060102.log",
  196. },
  197. cli.IntFlag{
  198. Name: "snsiperiod",
  199. Value: 60,
  200. Usage: "snsi collect period, in seconds",
  201. },
  202. cli.StringFlag{
  203. Name: "log",
  204. Value: "",
  205. Usage: "specify a log file to output, default goes to stderr",
  206. },
  207. cli.BoolFlag{
  208. Name: "quiet",
  209. Usage: "to suppress the 'stream open/close' messages",
  210. },
  211. cli.BoolFlag{
  212. Name: "tcp",
  213. Usage: "to emulate a TCP connection(linux)",
  214. },
  215. cli.StringFlag{
  216. Name: "c",
  217. Value: "", // when the value is not empty, the config path must exists
  218. Usage: "config from json file, which will override the command from shell",
  219. },
  220. }
  221. myApp.Action = func(c *cli.Context) error {
  222. config := Config{}
  223. config.LocalAddr = c.String("localaddr")
  224. config.RemoteAddr = c.String("remoteaddr")
  225. config.Key = c.String("key")
  226. config.Mode = c.String("mode")
  227. config.Conn = c.Int("conn")
  228. config.AutoExpire = c.Int("autoexpire")
  229. config.ScavengeTTL = c.Int("scavengettl")
  230. config.MTU = c.Int("mtu")
  231. config.SndWnd = c.Int("sndwnd")
  232. config.RcvWnd = c.Int("rcvwnd")
  233. config.DataShard = c.Int("datashard")
  234. config.ParityShard = c.Int("parityshard")
  235. config.DSCP = c.Int("dscp")
  236. config.NoComp = c.Bool("nocomp")
  237. config.AckNodelay = c.Bool("acknodelay")
  238. config.NoDelay = c.Int("nodelay")
  239. config.Interval = c.Int("interval")
  240. config.Resend = c.Int("resend")
  241. config.NoCongestion = c.Int("nc")
  242. config.SockBuf = c.Int("sockbuf")
  243. config.SmuxBuf = c.Int("smuxbuf")
  244. config.StreamBuf = c.Int("streambuf")
  245. config.SmuxVer = c.Int("smuxver")
  246. config.KeepAlive = c.Int("keepalive")
  247. config.Log = c.String("log")
  248. config.SnsiLog = c.String("snsilog")
  249. config.SnsiPeriod = c.Int("snsiperiod")
  250. config.Quiet = c.Bool("quiet")
  251. config.TCP = c.Bool("tcp")
  252. if c.String("c") != "" {
  253. err := parseJSONConfig(&config, c.String("c"))
  254. checkError(err)
  255. }
  256. // log redirect
  257. if config.Log != "" {
  258. f, err := os.OpenFile(config.Log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o666)
  259. checkError(err)
  260. defer f.Close()
  261. log.SetOutput(f)
  262. }
  263. switch config.Mode {
  264. case "normal":
  265. config.NoDelay, config.Interval, config.Resend, config.NoCongestion = 0, 40, 2, 1
  266. case "fast":
  267. config.NoDelay, config.Interval, config.Resend, config.NoCongestion = 0, 30, 2, 1
  268. case "fast2":
  269. config.NoDelay, config.Interval, config.Resend, config.NoCongestion = 1, 20, 2, 1
  270. case "fast3":
  271. config.NoDelay, config.Interval, config.Resend, config.NoCongestion = 1, 10, 2, 1
  272. }
  273. log.Println("version:", VERSION)
  274. addr, err := net.ResolveTCPAddr("tcp", config.LocalAddr)
  275. checkError(err)
  276. listener, err := net.ListenTCP("tcp", addr)
  277. checkError(err)
  278. log.Println("smux version:", config.SmuxVer)
  279. log.Println("listening on:", listener.Addr())
  280. log.Println("nodelay parameters:", config.NoDelay, config.Interval, config.Resend, config.NoCongestion)
  281. log.Println("remote address:", config.RemoteAddr)
  282. log.Println("sndwnd:", config.SndWnd, "rcvwnd:", config.RcvWnd)
  283. log.Println("compression:", !config.NoComp)
  284. log.Println("mtu:", config.MTU)
  285. log.Println("datashard:", config.DataShard, "parityshard:", config.ParityShard)
  286. log.Println("acknodelay:", config.AckNodelay)
  287. log.Println("dscp:", config.DSCP)
  288. log.Println("sockbuf:", config.SockBuf)
  289. log.Println("smuxbuf:", config.SmuxBuf)
  290. log.Println("streambuf:", config.StreamBuf)
  291. log.Println("keepalive:", config.KeepAlive)
  292. log.Println("conn:", config.Conn)
  293. log.Println("autoexpire:", config.AutoExpire)
  294. log.Println("scavengettl:", config.ScavengeTTL)
  295. log.Println("snsilog:", config.SnsiLog)
  296. log.Println("snsiperiod:", config.SnsiPeriod)
  297. log.Println("quiet:", config.Quiet)
  298. log.Println("tcp:", config.TCP)
  299. // parameters check
  300. if config.SmuxVer > maxSmuxVer {
  301. log.Fatal("unsupported smux version:", config.SmuxVer)
  302. }
  303. createConn := func() (*smux.Session, error) {
  304. kcpconn, err := dial(&config)
  305. if err != nil {
  306. return nil, errors.Wrap(err, "dial()")
  307. }
  308. kcpconn.SetStreamMode(true)
  309. kcpconn.SetWriteDelay(false)
  310. kcpconn.SetNoDelay(config.NoDelay, config.Interval, config.Resend, config.NoCongestion)
  311. kcpconn.SetWindowSize(config.SndWnd, config.RcvWnd)
  312. kcpconn.SetMtu(config.MTU)
  313. kcpconn.SetACKNoDelay(config.AckNodelay)
  314. if err := kcpconn.SetDSCP(config.DSCP); err != nil {
  315. log.Println("SetDSCP:", err)
  316. }
  317. if err := kcpconn.SetReadBuffer(config.SockBuf); err != nil {
  318. log.Println("SetReadBuffer:", err)
  319. }
  320. if err := kcpconn.SetWriteBuffer(config.SockBuf); err != nil {
  321. log.Println("SetWriteBuffer:", err)
  322. }
  323. log.Println("smux version:", config.SmuxVer, "on connection:", kcpconn.LocalAddr(), "->", kcpconn.RemoteAddr())
  324. smuxConfig := smux.DefaultConfig()
  325. smuxConfig.Version = config.SmuxVer
  326. smuxConfig.MaxReceiveBuffer = config.SmuxBuf
  327. smuxConfig.MaxStreamBuffer = config.StreamBuf
  328. smuxConfig.KeepAliveInterval = time.Duration(config.KeepAlive) * time.Second
  329. if err := smux.VerifyConfig(smuxConfig); err != nil {
  330. log.Fatalf("%+v", err)
  331. }
  332. // stream multiplex
  333. var session *smux.Session
  334. if config.NoComp {
  335. session, err = smux.Client(kcpconn, smuxConfig)
  336. } else {
  337. session, err = smux.Client(generic.NewCompStream(kcpconn), smuxConfig)
  338. }
  339. if err != nil {
  340. return nil, errors.Wrap(err, "createConn()")
  341. }
  342. return session, nil
  343. }
  344. // wait until a connection is ready
  345. waitConn := func() *smux.Session {
  346. for {
  347. if session, err := createConn(); err == nil {
  348. return session
  349. }
  350. log.Println("re-connecting:", err)
  351. time.Sleep(time.Second)
  352. }
  353. }
  354. // start snsi logger
  355. go generic.SnsiLogger(config.SnsiLog, config.SnsiPeriod)
  356. // start scavenger
  357. chScavenger := make(chan timedSession, 128)
  358. go scavenger(chScavenger, &config)
  359. // start listener
  360. numconn := uint16(config.Conn)
  361. muxes := make([]timedSession, numconn)
  362. rr := uint16(0)
  363. for {
  364. p1, err := listener.AcceptTCP()
  365. if err != nil {
  366. log.Fatalf("%+v", err)
  367. }
  368. idx := rr % numconn
  369. // do auto expiration && reconnection
  370. if muxes[idx].session == nil || muxes[idx].session.IsClosed() ||
  371. (config.AutoExpire > 0 && time.Now().After(muxes[idx].expiryDate)) {
  372. muxes[idx].session = waitConn()
  373. muxes[idx].expiryDate = time.Now().Add(time.Duration(config.AutoExpire) * time.Second)
  374. if config.AutoExpire > 0 { // only when autoexpire set
  375. chScavenger <- muxes[idx]
  376. }
  377. }
  378. go handleClient(muxes[idx].session, p1, config.Quiet)
  379. rr++
  380. }
  381. }
  382. myApp.Run(os.Args)
  383. }
  384. func scavenger(ch chan timedSession, config *Config) {
  385. // When AutoExpire is set to 0 (default), sessionList will keep empty.
  386. // Then this routine won't need to do anything; thus just terminate it.
  387. if config.AutoExpire <= 0 {
  388. return
  389. }
  390. ticker := time.NewTicker(time.Second)
  391. defer ticker.Stop()
  392. var sessionList []timedSession
  393. for {
  394. select {
  395. case item := <-ch:
  396. sessionList = append(sessionList, timedSession{
  397. item.session,
  398. item.expiryDate.Add(time.Duration(config.ScavengeTTL) * time.Second),
  399. })
  400. case <-ticker.C:
  401. if len(sessionList) == 0 {
  402. continue
  403. }
  404. var newList []timedSession
  405. for k := range sessionList {
  406. s := sessionList[k]
  407. if s.session.IsClosed() {
  408. log.Println("scavenger: session normally closed:", s.session.LocalAddr())
  409. } else if time.Now().After(s.expiryDate) {
  410. s.session.Close()
  411. log.Println("scavenger: session closed due to ttl:", s.session.LocalAddr())
  412. } else {
  413. newList = append(newList, sessionList[k])
  414. }
  415. }
  416. sessionList = newList
  417. }
  418. }
  419. }