123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- package main
- import (
- "fmt"
- "io"
- "log"
- "math/rand"
- "net"
- "net/http"
- _ "net/http/pprof"
- "os"
- "sync"
- "time"
- kcp "github.com/johnsonjh/gfcp"
- "github.com/johnsonjh/gfcptun/generic"
- smux "github.com/johnsonjh/gfsmux"
- "github.com/urfave/cli"
- "github.com/xtaci/tcpraw"
- )
- const (
- // maximum supported smux version
- maxSmuxVer = 2
- )
- // VERSION is injected by buildflags
- var VERSION = "SELFBUILD"
- // handle multiplex-ed connection
- func handleMux(conn net.Conn, config *Config) {
- // check if target is unix domain socket
- var isUnix bool
- if _, _, err := net.SplitHostPort(config.Target); err != nil {
- isUnix = true
- }
- log.Println("smux version:", config.SmuxVer, "on connection:", conn.LocalAddr(), "->", conn.RemoteAddr())
- // stream multiplex
- smuxConfig := smux.DefaultConfig()
- smuxConfig.Version = config.SmuxVer
- smuxConfig.MaxReceiveBuffer = config.SmuxBuf
- smuxConfig.MaxStreamBuffer = config.StreamBuf
- smuxConfig.KeepAliveInterval = time.Duration(config.KeepAlive) * time.Second
- mux, err := smux.Server(conn, smuxConfig)
- if err != nil {
- log.Println(err)
- return
- }
- defer mux.Close()
- for {
- stream, err := mux.AcceptStream()
- if err != nil {
- log.Println(err)
- return
- }
- go func(p1 *smux.Stream) {
- var p2 net.Conn
- var err error
- if !isUnix {
- p2, err = net.Dial("tcp", config.Target)
- } else {
- p2, err = net.Dial("unix", config.Target)
- }
- if err != nil {
- log.Println(err)
- p1.Close()
- return
- }
- handleClient(p1, p2, config.Quiet)
- }(stream)
- }
- }
- func handleClient(p1 *smux.Stream, p2 net.Conn, quiet bool) {
- logln := func(v ...interface{}) {
- if !quiet {
- log.Println(v...)
- }
- }
- defer p1.Close()
- defer p2.Close()
- logln("stream opened", "in:", fmt.Sprint(p1.RemoteAddr(), "(", p1.ID(), ")"), "out:", p2.RemoteAddr())
- defer logln("stream closed", "in:", fmt.Sprint(p1.RemoteAddr(), "(", p1.ID(), ")"), "out:", p2.RemoteAddr())
- // start tunnel & wait for tunnel termination
- streamCopy := func(dst io.Writer, src io.ReadCloser) {
- if _, err := generic.Copy(dst, src); err != nil {
- if err == smux.ErrInvalidProtocol {
- log.Println("smux", err, "in:", fmt.Sprint(p1.RemoteAddr(), "(", p1.ID(), ")"), "out:", p2.RemoteAddr())
- }
- }
- p1.Close()
- p2.Close()
- }
- go streamCopy(p2, p1)
- streamCopy(p1, p2)
- }
- func checkError(err error) {
- if err != nil {
- log.Printf("%+v\n", err)
- os.Exit(-1)
- }
- }
- func main() {
- rand.Seed(int64(time.Now().Nanosecond()))
- if VERSION == "SELFBUILD" {
- // add more log flags for debugging
- log.SetFlags(log.LstdFlags | log.Lshortfile)
- }
- myApp := cli.NewApp()
- myApp.Name = "gfcptun"
- myApp.Usage = "server(with SMUX)"
- myApp.Version = VERSION
- myApp.Flags = []cli.Flag{
- cli.StringFlag{
- Name: "listen,l",
- Value: ":29900",
- Usage: "gfcp server listen address",
- },
- cli.StringFlag{
- Name: "target, t",
- Value: "127.0.0.1:12948",
- Usage: "target server address, or path/to/unix_socket",
- },
- cli.StringFlag{
- Name: "key",
- Value: "it's a secrect",
- Usage: "pre-shared secret between client and server",
- EnvVar: "GFCPTUN_KEY",
- },
- cli.StringFlag{
- Name: "mode",
- Value: "fast",
- Usage: "profiles: fast3, fast2, fast, normal, manual",
- },
- cli.IntFlag{
- Name: "mtu",
- Value: 1350,
- Usage: "set maximum transmission unit for UDP packets",
- },
- cli.IntFlag{
- Name: "sndwnd",
- Value: 1024,
- Usage: "set send window size(num of packets)",
- },
- cli.IntFlag{
- Name: "rcvwnd",
- Value: 1024,
- Usage: "set receive window size(num of packets)",
- },
- cli.IntFlag{
- Name: "datashard,ds",
- Value: 10,
- Usage: "set reed-solomon erasure coding - datashard",
- },
- cli.IntFlag{
- Name: "parityshard,ps",
- Value: 3,
- Usage: "set reed-solomon erasure coding - parityshard",
- },
- cli.IntFlag{
- Name: "dscp",
- Value: 0,
- Usage: "set DSCP(6bit)",
- },
- cli.BoolFlag{
- Name: "nocomp",
- Usage: "disable compression",
- },
- cli.BoolFlag{
- Name: "acknodelay",
- Usage: "flush ack immediately when a packet is received",
- Hidden: true,
- },
- cli.IntFlag{
- Name: "nodelay",
- Value: 0,
- Hidden: true,
- },
- cli.IntFlag{
- Name: "interval",
- Value: 50,
- Hidden: true,
- },
- cli.IntFlag{
- Name: "resend",
- Value: 0,
- Hidden: true,
- },
- cli.IntFlag{
- Name: "nc",
- Value: 0,
- Hidden: true,
- },
- cli.IntFlag{
- Name: "sockbuf",
- Value: 4194304, // socket buffer size in bytes
- Usage: "per-socket buffer in bytes",
- },
- cli.IntFlag{
- Name: "smuxver",
- Value: 1,
- Usage: "specify smux version, available 1,2",
- },
- cli.IntFlag{
- Name: "smuxbuf",
- Value: 4194304,
- Usage: "the overall de-mux buffer in bytes",
- },
- cli.IntFlag{
- Name: "streambuf",
- Value: 2097152,
- Usage: "per stream receive buffer in bytes, smux v2+",
- },
- cli.IntFlag{
- Name: "keepalive",
- Value: 10, // nat keepalive interval in seconds
- Usage: "seconds between heartbeats",
- },
- cli.StringFlag{
- Name: "snsilog",
- Value: "",
- Usage: "collect snsi to file, aware of timeformat in golang, like: ./snsi-20060102.log",
- },
- cli.IntFlag{
- Name: "snsiperiod",
- Value: 60,
- Usage: "snsi collect period, in seconds",
- },
- cli.BoolFlag{
- Name: "pprof",
- Usage: "start profiling server on :6060",
- },
- cli.StringFlag{
- Name: "log",
- Value: "",
- Usage: "specify a log file to output, default goes to stderr",
- },
- cli.BoolFlag{
- Name: "quiet",
- Usage: "to suppress the 'stream open/close' messages",
- },
- cli.BoolFlag{
- Name: "tcp",
- Usage: "to emulate a TCP connection(linux)",
- },
- cli.StringFlag{
- Name: "c",
- Value: "", // when the value is not empty, the config path must exists
- Usage: "config from json file, which will override the command from shell",
- },
- }
- myApp.Action = func(c *cli.Context) error {
- config := Config{}
- config.Listen = c.String("listen")
- config.Target = c.String("target")
- config.Key = c.String("key")
- config.Mode = c.String("mode")
- config.MTU = c.Int("mtu")
- config.SndWnd = c.Int("sndwnd")
- config.RcvWnd = c.Int("rcvwnd")
- config.DataShard = c.Int("datashard")
- config.ParityShard = c.Int("parityshard")
- config.DSCP = c.Int("dscp")
- config.NoComp = c.Bool("nocomp")
- config.AckNodelay = c.Bool("acknodelay")
- config.NoDelay = c.Int("nodelay")
- config.Interval = c.Int("interval")
- config.Resend = c.Int("resend")
- config.NoCongestion = c.Int("nc")
- config.SockBuf = c.Int("sockbuf")
- config.SmuxBuf = c.Int("smuxbuf")
- config.StreamBuf = c.Int("streambuf")
- config.SmuxVer = c.Int("smuxver")
- config.KeepAlive = c.Int("keepalive")
- config.Log = c.String("log")
- config.SnsiLog = c.String("snsilog")
- config.SnsiPeriod = c.Int("snsiperiod")
- config.Pprof = c.Bool("pprof")
- config.Quiet = c.Bool("quiet")
- config.TCP = c.Bool("tcp")
- if c.String("c") != "" {
- // Now only support json config file
- err := parseJSONConfig(&config, c.String("c"))
- checkError(err)
- }
- // log redirect
- if config.Log != "" {
- f, err := os.OpenFile(config.Log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o666)
- checkError(err)
- defer f.Close()
- log.SetOutput(f)
- }
- switch config.Mode {
- case "normal":
- config.NoDelay, config.Interval, config.Resend, config.NoCongestion = 0, 40, 2, 1
- case "fast":
- config.NoDelay, config.Interval, config.Resend, config.NoCongestion = 0, 30, 2, 1
- case "fast2":
- config.NoDelay, config.Interval, config.Resend, config.NoCongestion = 1, 20, 2, 1
- case "fast3":
- config.NoDelay, config.Interval, config.Resend, config.NoCongestion = 1, 10, 2, 1
- }
- log.Println("version:", VERSION)
- log.Println("smux version:", config.SmuxVer)
- log.Println("listening on:", config.Listen)
- log.Println("target:", config.Target)
- log.Println("nodelay parameters:", config.NoDelay, config.Interval, config.Resend, config.NoCongestion)
- log.Println("sndwnd:", config.SndWnd, "rcvwnd:", config.RcvWnd)
- log.Println("compression:", !config.NoComp)
- log.Println("mtu:", config.MTU)
- log.Println("datashard:", config.DataShard, "parityshard:", config.ParityShard)
- log.Println("acknodelay:", config.AckNodelay)
- log.Println("dscp:", config.DSCP)
- log.Println("sockbuf:", config.SockBuf)
- log.Println("smuxbuf:", config.SmuxBuf)
- log.Println("streambuf:", config.StreamBuf)
- log.Println("keepalive:", config.KeepAlive)
- log.Println("snsilog:", config.SnsiLog)
- log.Println("snsiperiod:", config.SnsiPeriod)
- log.Println("pprof:", config.Pprof)
- log.Println("quiet:", config.Quiet)
- log.Println("tcp:", config.TCP)
- // parameters check
- if config.SmuxVer > maxSmuxVer {
- log.Fatal("unsupported smux version:", config.SmuxVer)
- }
- go generic.SnsiLogger(config.SnsiLog, config.SnsiPeriod)
- if config.Pprof {
- go http.ListenAndServe(":6060", nil)
- }
- // main loop
- var wg sync.WaitGroup
- loop := func(lis *kcp.Listener) {
- defer wg.Done()
- if err := lis.SetDSCP(config.DSCP); err != nil {
- log.Println("SetDSCP:", err)
- }
- if err := lis.SetReadBuffer(config.SockBuf); err != nil {
- log.Println("SetReadBuffer:", err)
- }
- if err := lis.SetWriteBuffer(config.SockBuf); err != nil {
- log.Println("SetWriteBuffer:", err)
- }
- for {
- if conn, err := lis.AcceptGFCP(); err == nil {
- log.Println("remote address:", conn.RemoteAddr())
- conn.SetStreamMode(true)
- conn.SetWriteDelay(false)
- conn.SetNoDelay(config.NoDelay, config.Interval, config.Resend, config.NoCongestion)
- conn.SetMtu(config.MTU)
- conn.SetWindowSize(config.SndWnd, config.RcvWnd)
- conn.SetACKNoDelay(config.AckNodelay)
- if config.NoComp {
- go handleMux(conn, &config)
- } else {
- go handleMux(generic.NewCompStream(conn), &config)
- }
- } else {
- log.Printf("%+v", err)
- }
- }
- }
- if config.TCP { // tcp dual stack
- if conn, err := tcpraw.Listen("tcp", config.Listen); err == nil {
- lis, err := kcp.ServeConn(config.DataShard, config.ParityShard, conn)
- checkError(err)
- wg.Add(1)
- go loop(lis)
- } else {
- log.Println(err)
- }
- }
- // udp stack
- lis, err := kcp.ListenWithOptions(config.Listen, config.DataShard, config.ParityShard)
- checkError(err)
- wg.Add(1)
- go loop(lis)
- wg.Wait()
- return nil
- }
- myApp.Run(os.Args)
- }
|