【开源品鉴】FRP源码阅读(frpc源码)

frp 是一款高性能的反向代理应用,专注于内网穿透,支持多种协议和 P2P 通信功能,目前在 GitHub 上已有 80k 的 star。本文将深入探讨其源码,揭示其背后的实现原理。

1. 前言

frp 是一款高性能的反向代理应用,专注于内网穿透。它支持多种协议,包括 TCP、UDP、HTTP、HTTPS 等,并且具备 P2P 通信功能。使用 frp,您可以安全、便捷地将内网服务暴露到公网,通过拥有公网 IP 的节点进行中转,具体场景就是:将客户端部署到你的内网中,然后该客户端与你内网服务网络可达,当客户端与在公网的服务端连接后,我们就可以通过访问服务端的指定端口,去访问到内网服务。

目前 GitHub 已经有 80k 的 star,这么猛的项目,我决定阅读一番源码偷师一波。

2. pkg/auth

这个包负责客户端和服务端认证的代码,这里面一共用到了 2 种验证机制,一种是基于 token,就是预共享密钥,客户端和服务端实现配置一样的字符串密钥,第二种是 OAuth 2.0,依赖第三方授权服务器颁发的访问令牌,然后客户端带着令牌去访问服务端。

这里面有很多技巧值得学习:

2.1. 工厂函数

通过不同的配置生成对应的认证方式。

type Setter interface {
	SetLogin(*msg.Login) error
	SetPing(*msg.Ping) error
	SetNewWorkConn(*msg.NewWorkConn) error
}

// 根据客户端配置创建认证提供者
func NewAuthSetter(cfg v1.AuthClientConfig) (authProvider Setter) {
    switch cfg.Method {
        // token 认证模式
        case v1.AuthMethodToken:
        authProvider = NewTokenAuth(cfg.AdditionalScopes, cfg.Token)
        // openid 认证模式
        case v1.AuthMethodOIDC:
        authProvider = NewOidcAuthSetter(cfg.AdditionalScopes, cfg.OIDC)
        default:
        panic(fmt.Sprintf(「wrong method: 『%s』」, cfg.Method))
    }
    return authProvider
}

2.2. 常量时间的字符串比较

正常情况来说,token 模式下,两边比较一下字符串是不是相等就完了,但其实这个是有安全隐患的,第一个就是攻击者可以进行重放攻击,一直进行密码爆破,第二个就是攻击者可以进行定时攻击,比如普通比较(如 ==)在发现第一个不匹配字节时会立即返回,攻击者可通过测量响应时间差异推断出匹配的字节位置,ConstantTimeCompare 始终遍历全部字节(即使已发现不匹配),使攻击者无法通过时间差获取敏感信息。

// token 和客户端上线的时间戳组成 key
func GetAuthKey(token string, timestamp int64) (key string) {
	md5Ctx := md5.New()
	md5Ctx.Write([]byte(token))
	md5Ctx.Write([]byte(strconv.FormatInt(timestamp, 10)))
	data := md5Ctx.Sum(nil)
	return hex.EncodeToString(data)
}

// 全量匹配字节
func ConstantTimeCompare(x, y []byte) int {
	if len(x) != len(y) {
		return 0
	}

	var v byte

	for i := 0; i < len(x); i++ {
		v |= x[i] ^ y[i]
	}

	return ConstantTimeByteEq(v, 0)
}

// ConstantTimeByteEq returns 1 if x == y and 0 otherwise.
func ConstantTimeByteEq(x, y uint8) int {
	return int((uint32(x^y) - 1) >> 31)
}

3. pkg/config

config 文件夹是 frp 配置管理的核心模块,涵盖了配置的加载、解析、验证、转换和命令行支持等功能。它确保了 frp 的灵活性和兼容性,同时为用户提供了多种配置方式。

3.1. 使用环境变量进行模板渲染

serverAddr = 「{{ .Envs.FRP_SERVER_ADDR }}」
serverPort = 7000

[[proxies]]
name = 「ssh」
type = 「tcp」
localIP = 「127.0.0.1」
localPort = 22
remotePort = {{ .Envs.FRP_SSH_REMOTE_PORT }}


export FRP_SERVER_ADDR=「x.x.x.x」
export FRP_SSH_REMOTE_PORT=「6000」
./frpc -C ./frpc.toml

这个实现是采用了 template 模板库,其中 Envs 前缀是由字段名 Envs 决定的:

type Values struct {
    Envs map[string]string // 「{{ .Envs.FRP_SERVER_ADDR }}」 Envs 的由来
}

func RenderWithTemplate(in []byte, values *Values) ([]byte, error) {
	tmpl, err := template.New(「frp」).Funcs(template.FuncMap{
		「parseNumberRange」:     parseNumberRange,
		「parseNumberRangePair」: parseNumberRangePair,
	}).Parse(string(in))
	if err != nil {
		return nil, err
	}

	buffer := bytes.NewBufferString(「」)
	if err := tmpl.Execute(buffer, values); err != nil {
		return nil, err
	}
	return buffer.Bytes(), nil
}

// 将端口范围解析为 端口列表
func parseNumberRange(firstRangeStr string) ([]int64, error) {
  ... ... 
}

这里面有一些自定义的解析函数,比如说:

ports = 「{{ parseNumberRange .Envs.PORT_RANGE }}」

export PORT_RANGE = 「1000-1005」

// 这样 ports 就会被 template 的 parseNumberRange 函数解析并渲染为
// ports = 1000, 1001, 1002, 1003, 1004, 1005

3.2. 配置拆分

通过 includes 参数可以在主配置中包含其他配置文件,从而实现将代理配置拆分到多个文件中管理

# frpc.toml
serverAddr = 「x.x.x.x」
serverPort = 7000
includes = [「./confd/*.toml」]

上述配置在 frpc.toml 中通过 includes 额外包含了 ./confd 目录下所有的 toml 文件的代理配置内容,效果等价于将这两个文件合并成一个文件。

这个实现是采用了,循环读取文件内容 + 模板渲染 + 配置合并+ toml 反序列化 的方法:

// 主文件配置,就是 frpc.toml
var content []byte
content, err = GetRenderedConfFromFile(filePath)
if err != nil {
    return
}
configBuffer := bytes.NewBuffer(nil)
configBuffer.Write(content)

... ... 

var buf []byte
// 循环读取 include 的文件
// getIncludeContents
// ->ReadFile
// ->RenderContent
//   ->template.New(「frp」).Parse(string(in))
buf, err = getIncludeContents(cfg.IncludeConfigFiles)
if err != nil {
    err = fmt.Errorf(「getIncludeContents error: %v」, err)
    return
}
configBuffer.WriteString(「
」)
configBuffer.Write(buf)

// 将所有配置合并,然后将 toml 序列化为 type ClientCommonConf struct
代理 Cfgs, visitorCfgs, err = LoadAllProxyConfsFromIni(cfg.User, configBuffer.Bytes(), cfg.Start)
if err != nil {
    return
}
return

3.3. 配置热加载

frpc reload -C ./frpc.toml 等待一段时间后,客户端将根据新的配置文件创建、更新或删除代理。

这里面也比较简单,主要逻辑在于配置校验,旧配置中与新配置里同名的且代理内容不一样的 proxy 停止,新增的配置的 proxy 再启动,也就是说老配置和新配置完全一样的是不动的

func (pm *Manager) UpdateAll(proxyCfgs []v1.ProxyConfigurer) {
	xl := xlog.FromContextSafe(pm.ctx)
	proxyCfgsMap := lo.KeyBy(proxyCfgs, func(C v1.ProxyConfigurer) string {
		return C.GetBaseConfig().Name
	})
	pm.mu.Lock()
	defer pm.mu.Unlock()

	delPxyNames := make([]string, 0)
	for name, pxy := range pm.proxies {
		del := false
		cfg, ok := proxyCfgsMap[name]
		if !ok || !reflect.DeepEqual(pxy.Cfg, cfg) {
			del = true
		}

		if del {
			delPxyNames = append(delPxyNames, name)
			delete(pm.proxies, name)
			pxy.Stop()
		}
	}
	if len(delPxyNames) > 0 {
		xl.Infof(「proxy removed: %s」, delPxyNames)
	}

	addPxyNames := make([]string, 0)
	for _, cfg := range proxyCfgs {
		name := cfg.GetBaseConfig().Name
		if _, ok := pm.proxies[name]; !ok {
			pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.msgTransporter, pm.vnetController)
			if pm.inWorkConnCallback != nil {
				pxy.SetInWorkConnCallback(pm.inWorkConnCallback)
			}
			pm.proxies[name] = pxy
			addPxyNames = append(addPxyNames, name)

			pxy.Start()
		}
	}
	if len(addPxyNames) > 0 {
		xl.Infof(「proxy added: %s」, addPxyNames)
	}

4. 监控

frps 服务端支持两种监控系统:指标存在内存中,和指标输出到 Prometheus。主要监控以下指标:

type serverMetrics struct {
    // 记录当前连接到服务端的客户端数量。
    clientCount     Prometheus.Gauge
    // 记录当前代理的数量,按代理类型(如 TCP、HTTP)分类。
    proxyCount      *Prometheus.GaugeVec
    // 记录当前连接的数量,按代理类型(如 TCP、HTTP)分类。
    connectionCount *Prometheus.GaugeVec
    // 记录流入的总流量,按代理类型(如 TCP、HTTP)分类。
    trafficIn       *Prometheus.CounterVec
    // 记录流出的总流量,按代理类型(如 TCP、HTTP)分类。
    trafficOut      *Prometheus.CounterVec
}

内存监控没啥,但统计的增删改,这里用到了原子操作的技巧:

func (C *StandardCounter) Count() int32 {
	return atomic.LoadInt32(&C.count)
}

func (C *StandardCounter) Inc(count int32) {
	atomic.AddInt32(&C.count, count)
}

func (C *StandardCounter) Dec(count int32) {
	atomic.AddInt32(&C.count, -count)
}

对于不同类型的 proxy 的统计,frp 没有使用 syn map,而是用一把读写锁保平安

m.mu.Lock()
	defer m.mu.Unlock()
	counter, ok := m.info.ProxyTypeCounts[proxyType]
	if !ok {
		counter = metric.NewCounter()
	}
counter.Inc(1)

对于如何进行 Prometheus 监控,frp 的使用流程可以借鉴,整体来说分为以下几个步骤:

  1. 编码前,先定义指标,类似于:
Namespace: 「frp」,
Subsystem: 「server」,
Name:      「traffic_out」,
Help:      「The total out traffic」,
  1. frp 注册 Prometheus 指标
trafficOut: Prometheus.NewCounterVec(Prometheus.CounterOpts{
    Namespace: namespace,
    Subsystem: serverSubsystem,
    Name:      「traffic_out」,
    Help:      「The total out traffic」,
}, []string{「name」, 「type」}),
}

Prometheus.MustRegister(m.clientCount)
Prometheus.MustRegister(m.proxyCount)
Prometheus.MustRegister(m.connectionCount)
Prometheus.MustRegister(m.trafficIn)
Prometheus.MustRegister(m.trafficOut)
  1. frp 暴露 HTTP 服务,一般是/metric,promhttp 提供一个 HTTP 处理器,用于暴露所有注册的 Prometheus 指标。
if svr.cfg.EnablePrometheus {
    subRouter.Handle(「/metrics」, promhttp.Handler())
}
  1. 配置 Prometheus 定时抓取这个 HTTP 路径,舒服了
全球:
  scrape_interval: 15s # 每 15 秒抓取一次数据

scrape_configs:
  - job_name: 「frp_server」
    static_configs:
      - targets: [「localhost:8080」] # 替换为 frp 服务端暴露的 /metrics 端点

5. 通信安全

当 frpc 和 frps 之间启用了 TLS 之后,流量会被全局加密,不再需要配置单个代理上的加密,新版本中已经默认启用。每一个代理都可以选择是否启用加密和压缩的功能。

在每一个代理的配置中使用如下参数指定:

[[proxies]]
name = 「ssh」
type = 「tcp」
localPort = 22
remotePort = 6000
transport.useEncryption = true
transport.useCompression = true

5.1. 加密

通过设置 transport.useEncryption = true,将 frpc 与 frps 之间的通信内容加密传输,将会有效防止传输内容被截取。

这个加密它使用了装饰器模式,传入普通的 IO,WithEncryption 后就会得到一个可以加密的 IO

remote, err = libio.WithEncryption(remote, encKey)
if err != nil {
    workConn.
    xl.Errorf(「create encryption stream error: %v」, err)
    return
}

我们接下来看如何加密的:

总体加密算法采用 aes-128-cfb,aes 是一个对称加密,主要靠 key 和 iv 两个值

// pbkdf2 会生成一个用于 aes 加密的 key
// 入参 key 为:配置的 token
// DefaultSalt 为字符串默认值
key = pbkdf2.Key(key, []byte(DefaultSalt), 64, aes.BlockSize, sha1.New)

// iv 是用 rand 函数生成的安全加密的随机数
if _, err := io.ReadFull(rand.Reader, iv); err != nil {
    return nil, err
}

// Reader is a global, shared instance of a cryptographically
// secure random number generator. It is safe for concurrent use.
//
//   - On Linux, FreeBSD, Dragonfly, and Solaris, Reader uses getrandom(2).
//   - On legacy Linux (< 3.17), Reader opens /dev/urandom on first use.
//   - On macOS, iOS, and OpenBSD Reader, uses arc4random_buf(3).
//   - On NetBSD, Reader uses the kern.arandom sysctl.
//   - On Windows, Reader uses the ProcessPrng API.
//   - On js/wasm, Reader uses the Web Crypto API.
//   - On wasi/wasm, Reader uses random_get.
//
// In FIPS 140-3 mode, the output passes through an SP 800-90A Rev. 1
// Deterministic Random Bit Generator (DRBG).
var Reader io.Reader

这样后续的 IO 操作都会自带加密了。

5.2. 压缩

压缩也是同理,搞一个压缩的 IO 装饰器就好了。

如果传输的报文长度较长,通过设置 transport.useCompression = true 对传输内容进行压缩,可以有效减小 frpc 与 frps 之间的网络流量,加快流量转发速度,但是会额外消耗一些 CPU 资源。

压缩算法采用 snappy 库

sr := snappy.NewReader(rwc)
sw := snappy.NewWriter(rwc)
return WrapReadWriteCloser(sr, sw, func() error {
        _ = sw.Close()
        return rwc.Close()
    })
}

5.3. 自定义 TLS

这个其实就是使用自签发的 CA,去生成密钥和证书,然后客户端和服务端加载起来后,可以进行双向或者单向验证,进行 HTTPS 握手,后续流量也是 HTTPS 加密的。

客户端单向校验服务端:

# frpc.toml
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」

# frps.toml
transport.tls.certFile = 「/to/cert/path/server.crt」
transport.tls.keyFile = 「/to/key/path/server.key」

服务端单向校验客户端:

# frpc.toml
transport.tls.certFile = 「/to/cert/path/client.crt」
transport.tls.keyFile = 「/to/key/path/client.key」

# frps.toml
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」

双向验证

# frpc.toml
transport.tls.certFile = 「/to/cert/path/client.crt」
transport.tls.keyFile = 「/to/key/path/client.key」
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」

# frps.toml
transport.tls.certFile = 「/to/cert/path/server.crt」
transport.tls.keyFile = 「/to/key/path/server.key」
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」

介绍这个之前,我们先回顾以下 TLS 握手的过程,hhh:



okk,那我们看 frp 是如何实现 tls 的:

// 获取 TLS 配置,作为 dial 选项
// tlsConfig, err = transport.NewClientTLSConfig
// tlsConfig, err = transport.NewServerTLSConfig
dialOptions = append(dialOptions, libnet.WithTLSConfig(tlsConfig))

...

// dail tcp 本身就是 tls 的了
conn, err := libnet.DialContext(
    C.ctx,
    net.JoinHostPort(C.cfg.ServerAddr, strconv.Itoa(C.cfg.ServerPort)),
    dialOptions...,
)

// 加载服务端的 ca,证书+key
// 核心是 tls 库 tls.LoadX509KeyPair(certfile, keyfile),去管理证书和 key
func NewServerTLSConfig(certPath, keyPath, caPath string) (*tls.Config, error) {
    base := &tls.Config{}

    if certPath == <<>> || keyPath == <<>> {
        // server will generate tls conf by itself
        cert := newRandomTLSKeyPair()
        base.Certificates = []tls.Certificate{*cert}
    } else {
        // 调的是这个 tlsCert, err := tls.LoadX509KeyPair(certfile, keyfile)
        cert, err := newCustomTLSKeyPair(certPath, keyPath)
        if err != nil {
            return nil, err
        }

        base.Certificates = []tls.Certificate{*cert}
    }

    if caPath != '' {
        // ca 证书
        pool, err := newCertPool(caPath)
        if err != nil {
            return nil, err
        }
        // 校验客户端
        base.ClientAuth = tls.RequireAndVerifyClientCert
        base.ClientCAs = pool
    }

    return base, nil
}

// 加载客户端的 ca,证书+key
func NewClientTLSConfig(certPath, keyPath, caPath, serverName string) (*tls.Config, error) {
    base := &tls.Config{}

    if certPath != '' && keyPath != '' {
        cert, err := newCustomTLSKeyPair(certPath, keyPath)
        if err != nil {
            return nil, err
        }

        base.Certificates = []tls.Certificate{*cert}
    }

    base.ServerName = serverName

    if caPath != '' {
        pool, err := newCertPool(caPath)
        if err != nil {
            return nil, err
        }

        base.RootCAs = pool
        // 校验服务端
        base.InsecureSkipVerify = false
    } else {
        base.InsecureSkipVerify = true
    }

    return base, nil
}

// Only support one ca file to add
func newCertPool(caPath string) (*x509.CertPool, error) {
    pool := x509.NewCertPool()

    cacrt, err := os.ReadFile(caPath)
    if err != nil {
        return nil, err
    }

    pool.AppendCertsFromPEM(caCrt)

    return pool, nil
}

6. 代理配置

6.1. proxy

代理是 frp 的核心,这里详细介绍一下它的流程。

frpc 和 frps 的整体流程,里面可以抽象为 3 种连接,整体我画了一张图:

  1. 用户连接 (User Connection):
  • 这是外部用户连接到 FRP 服务端(frps)特定端口的连接,也就是说想要访问内网服务的,例如,当运维访问 frps.example.com:8080 时建立的连接就是用户连接,它实际访问的是客户侧某个管理平台
  • 在 frps 端,这个连接由 handleUserTCPConnection 函数处理。
  • 工作连接 (Work Connection):
    • 这是 frps 和 frpc 之间预先建立的连接,用于传输用户连接的数据。
    • frps 在需要处理用户连接时会从连接池中获取一个可用的工作连接。
    • 如果池中没有可用的工作连接,frps 会通知 frpc 创建新的工作连接。
    • 工作连接是 frps 和 frpc 之间的隧道,用户数据通过这个隧道在外部用户和内部服务之间传输。
  • 本地连接 (Local Connection):
    • 在 frp 的上下文中,远程连接通常指的是 frpc 连接到内部服务的连接
    • 例如,当 frpc 收到从工作连接传来的数据时,它会创建一个连接到配置中指定的本地服务(如 localhost:80),这个连接就是远程连接。

    下面是 FRP 数据流的完整过程:

    1. 外部用户(用户连接) -> frps 监听端口
    2. frps 从工作连接池中获取一个 工作连接(frps <-> frpc)
    3. frps 将用户连接与工作连接绑定(通过双向数据转发)
    4. frpc 接收到来自工作连接的数据,然后建立一个 远程连接(frpc -> 内部服务)
    5. frpc 将工作连接与远程连接绑定(通过双向数据转发)


    下面来看看关键代码实现:

    // 用户连接 (User Connection):
    // frps 侧
    // tcp 代理启动
    func (pxy *TCPProxy) Run() (string, error) {
        if pxy.cfg.LoadBalancer.Group != <<>> {
            // 获取组监听器(实际共享端口)
            l, realBindPort, err := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.LoadBalancer.Group, ...)
            pxy.listeners = append(pxy.listeners, l)
            
            // 启动连接处理器(最终调用 BaseProxy.startCommonTCPListenersHandler)
            pxy.startCommonTCPListenersHandler() 
        }
        // ...
    }
    // 用户链接处理
    func (pxy *BaseProxy) startCommonTCPListenersHandler() {
        for _, listener := range pxy.listeners {
            Go func(l net.Listener) {
                for {
                    conn, err := l.Accept() // 此处调用 TCPGroupListener.Accept()
                    Go pxy.handleUserTCPConnection(conn) // 处理连接
                }
            }(listener)
        }
    }
    
    
    // 工作连接 (Work Connection):
    // frps 侧
    // 从连接池中获取一个已建立的到 FRP 客户端的连接
    // 内部实现路径:pxy.GetWorkConn() → pxy.workConnManager.Get()
    // 底层通过 FRP 协议发送 NewWorkConn 消息到客户端建立隧道,这部分就是内部服务不一样的地方
    // -> GetWorkConn
    workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
    if err != nil {
        return
    }
    defer workConn.Close()
    
    var local io.ReadWriteCloser = workConn
    // 启动双向数据转发 
    inCount, outCount, _ := libio.Join(local, userConn)
    
    // 在取出工作连接后,frps 会立即向 frpc 发送 msg.ReqWorkConn 消息,请求新的工作连接。
    _ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
    // 如果连接池为空,frps 会等待 frpc 创建新的工作连接并发送过来。
    select {
    case workConn, ok = <-ctl.workConnCh:
        if !ok {
            err = pkgerr.ErrCtlClosed
            xl.Warnf(「no work connections available, %v」, err)
            return
        }
    case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second):
        err = fmt.Errorf(「timeout trying to get work connection」)
        xl.Warnf(「%v」, err)
        return
    }
    
    
    // 本地连接 (Local Connection):
    // frpc 侧
    // handleReqWorkConn
    // HandleWorkConn
    // HandleTCPWorkConnection
    unc (ctl *Control) handleReqWorkConn(_ msg.Message) {
    	xl := ctl.xl
    	workConn, err := ctl.connectServer()
    	if err != nil {
    		xl.Warnf(「start new connection to server error: %v」, err)
    		return
    	}
    
    	m := &msg.NewWorkConn{
    		RunID: ctl.sessionCtx.RunID,
    	}
    	if err = ctl.sessionCtx.AuthSetter.SetNewWorkConn(m); err != nil {
    		xl.Warnf(「error during NewWorkConn authentication: %v」, err)
    		workConn.Close()
    		return
    	}
    	if err = msg.WriteMsg(workConn, m); err != nil {
    		xl.Warnf(「work connection write to server error: %v」, err)
    		workConn.Close()
    		return
    	}
    
    	var startMsg msg.StartWorkConn
    	if err = msg.ReadMsgInto(workConn, &startMsg); err != nil {
    		xl.Tracef(「work connection closed before response StartWorkConn message: %v」, err)
    		workConn.Close()
    		return
    	}
    	if startMsg.Error != 「」 {
    		xl.Errorf(「StartWorkConn contains error: %s」, startMsg.Error)
    		workConn.Close()
    		return
    	}
    
    	// dispatch this work connection to related proxy
    	ctl.pm.HandleWorkConn(startMsg.ProxyName, workConn, &startMsg)
    }
    
    remote = workConn
    ... ... 
    localConn, err := libnet.Dial(
        net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort)),
        libnet.WithTimeout(10*time.Second),
    )
    ... ... 
    _, _, errs := libio.Join(localConn, remote)
    

    双向转发的实现灰常简洁,值得学习:

    func Join(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) (inCount int64, outCount int64, errors []error) {
    	var wait sync.WaitGroup
    	recordErrs := make([]error, 2)
    	pipe := func(number int, to io.ReadWriteCloser, from io.ReadWriteCloser, count *int64) {
    		defer wait.Done()
    		defer CosClose()
    		defer from.Close()
    
    		buf := pool.GetBuf(16 * 1024)
    		defer pool.PutBuf(buf)
    		*count, recordErrs[number] = io.CopyBuffer(to, from, buf)
    	}
    
    	wait.Add(2)
    	Go pipe(0, c1, c2, &inCount)
    	Go pipe(1, c2, c1, &outCount)
    	wait.Wait()
    
    	for _, e := range recordErrs {
    		if e != nil {
    			errors = append(errors, e)
    		}
    	}
    	return
    }
    

    6.2. 负载均衡

    你可以将多个相同类型的代理加入到同一个 group 中,以实现负载均衡的能力,当用户连接 frps 服务器的 80 端口时,frps 会将接收到的用户连接随机分发给其中一个存活的代理。这可以确保即使一台 frpc 机器挂掉,仍然有其他节点能够提供服务。

    # frpc.toml
    [[proxies]]
    name = 「test1」
    type = 「tcp」
    localPort = 8080
    remotePort = 80
    loadBalancer.group = 「web」
    loadBalancer.groupKey = 「123」
    
    [[proxies]]
    name = 「test2」
    type = 「tcp」
    localPort = 8081
    remotePort = 80
    loadBalancer.group = 「web」
    loadBalancer.groupKey = 「123」
    

    这个负载均衡的实现的关键结构体是 TCPGroupCtl *group.TCPGroupCtl:

    // 管理 TCP 代理的分组逻辑,包括分组的创建、监听、连接分发等功能。
    TCPGroupCtl *group.TCPGroupCtl
    
    // 主要有三大功能
    
    // 1. 分组管理:
    // 将多个 TCP 代理分组到一起,形成一个逻辑组。
    // 每个组可以共享一个端口,分发连接到组内的代理。
    
    // 2. 负载均衡:
    // 根据一定的规则随机分发,将链接分发到组内的代理。
    
    // 3. 资源管理:
    // 负责监听和关闭组内的连接。
    // 管理组的生命周期。
    // tcp 代理分组
    // 分组内统一监听,共享一个 remote port 的 coon,这个我们叫 remote conn,就是用户 connection
    func (tgc *TCPGroupCtl) Listen(proxyName string, group string, groupKey string, addr string, port int) (l net.Listener, realPort int, err error) {
        tgc.mu.Lock()
        tcpGroup, ok := tgc.groups[group]
        if !ok {
            tcpGroup = NewTCPGroup(tgc)
            tgc.groups[group] = tcpGroup
        }
        tgc.mu.Unlock()
    
        return tcpGroup.Listen(proxyName, group, groupKey, addr, port)
    }
    
    // 代理加入组
    func (tg *TCPGroup) Listen(proxyName, group, groupKey, addr string, port int) (*TCPGroupListener, int, error) {
        tg.mu.Lock()
        defer tg.mu.Unlock()
        
        // 首次加入组:创建真实监听
        if len(tg.lns) == 0 {
            realPort, err := tg.ctl.portManager.Acquire(proxyName, port) // 申请端口
            tcpLn, err := net.Listen(「tcp」, net.JoinHostPort(addr, strconv.Itoa(port)))
            
            tg.realPort = realPort
            tg.tcpLn = tcpLn
            Go tg.worker() // 启动连接分发协程
            ...
            }
    }
    
    // 当新连接到达共享端口时,会被放入全局通道(acceptCh),
    // 组内所有代理通过竞争机制获取链接,实现负载均衡
    func (tg *TCPGroup) worker() {
        for {
            conn, err := tg.tcpLn.Accept() // 接收新连接
            tg.acceptCh <- conn            // 放入全局通道
        }
    }
    func (ln *TCPGroupListener) Accept() (net.Conn, error) {
        select {
        case <-ln.closeCh:
            return nil, ErrListenerClosed
        case conn := <-ln.group.acceptCh: // 从全局通道竞争获取连接
            return conn, nil
        }
    }
    
    // tcp 代理启动
    func (pxy *TCPProxy) Run() (string, error) {
        if pxy.cfg.LoadBalancer.Group != 「」 {
            // 获取组监听器(实际共享端口)
            l, realBindPort, err := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.LoadBalancer.Group, ...)
            pxy.listeners = append(pxy.listeners, l)
            
            // 启动连接处理器(最终调用 BaseProxy.startCommonTCPListenersHandler)
            pxy.startCommonTCPListenersHandler() 
        }
        // ...
    }
    

    6.3. 健康检查

    通过给代理配置健康检查参数,可以在要反向代理的服务出现故障时,将该服务从 frps 中摘除。结合负载均衡的功能,这可用于实现高可用架构,避免服务单点故障。

    [[proxies]]
    name = 「test1」
    type = 「tcp」
    localPort = 22
    remotePort = 6000
    # 启用健康检查,类型为 tcp
    healthCheck.type = 「tcp」
    # 建立连接超时时间为 3 秒
    healthCheck.timeoutSeconds = 3
    # 连续 3 次检查失败,此 proxy 会被摘除
    healthCheck.maxFailed = 3
    # 每隔 10 秒进行一次健康检查
    healthCheck.intervalSeconds = 10
    

    这个配置被加载到 TCPProxyConfig-》ProxyBaseConfig-》HealthCheckConfig

    type HealthCheckConfig struct {
    	// Type specifies what protocol to use for health checking.
    	// Valid values include 「tcp」, 「HTTP」, and 「」. If this value is 「」, health
    	// checking will not be performed.
    	//
    	// If the type is 「tcp」, a connection will be attempted to the target
    	// server. If a connection cannot be established, the health check fails.
    	//
    	// If the type is 「HTTP」, a GET request will be made to the endpoint
    	// specified by HealthCheckURL. If the response is not a 200, the health
    	// check fails.
    	Type string `json:「type」` // tcp | HTTP
    	// TimeoutSeconds specifies the number of seconds to wait for a health
    	// check attempt to connect. If the timeout is reached, this counts as a
    	// health check failure. By default, this value is 3.
    	TimeoutSeconds int `json:「timeoutSeconds,omitempty」`
    	// MaxFailed specifies the number of allowed failures before the
    	// is stopped. By default, this value is 1.
    	MaxFailed int `json:「maxFailed,omitempty」`
    	// IntervalSeconds specifies the time in seconds between health
    	// checks. By default, this value is 10.
    	IntervalSeconds int `json:「intervalSeconds」`
    	// Path specifies the path to send health checks to if the
    	// health check type is 「HTTP」.
    	Path string `json:「path,omitempty」`
    	// HTTPHeaders specifies the headers to send with the health request, if
    	// the health check type is 「HTTP」.
    	HTTPHeaders []HTTPHeader `json:「httpHeaders,omitempty」`
    }
    

    这部分代码非常独立,相当于起了一个定时的 monitor,去监控代理的 proxy 是否有效,连续检查失败,此 proxy 会被摘除

    func (monitor *Monitor) checkWorker() {
        for
            err := monitor.doCheck(doCtx)
            ... ... 
            time.Sleep(monitor.interval)
        }   
    }
    
    func (monitor *Monitor) doCheck(ctx context.Context) error {
    	switch monitor.checkType {
    	case 「tcp」:
    		return monitor.doTCPCheck(ctx)
    	case 「HTTP」:
    		return monitor.doHTTPCheck(ctx)
    	default:
    		return ErrHealthCheckType
    	}
    }
    
    func (monitor *Monitor) doTCPCheck(ctx context.Context) error {
    	// if tcp address is not specified, always return nil
    	if monitor.addr == 「」 {
    		return nil
    	}
    
    	var d net.Dialer
    	conn, err := d.DialContext(ctx, 「tcp」, monitor.addr)
    	if err != nil {
    		return err
    	}
    	conn.Close()
    	return nil
    }
    

    6.4. 代理限速

    # frpc.toml
    [[proxies]]
    name = 「ssh」
    type = 「tcp」
    localPort = 22
    remotePort = 6000
    transport.bandwidthLimit = 「1MB」
    

    核心代码,依然是获取 tcp 连接时,加一个限速的装饰器:

    var limiter *rate.Limiter
    limitBytes := pxyConf.GetBaseConfig().Transport.BandwidthLimit.Bytes()
    if limitBytes > 0 && pxyConf.GetBaseConfig().Transport.BandwidthLimitMode == types.BandwidthLimitModeClient {
        limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
    }
    
    if pxy.GetLimiter() != nil {
        local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
            return local.Close()
        })
    }
    

    limit 使用的是原生的 rate 包:

    func (r *Reader) Read(p []byte) (n int, err error) {
        // 1. 获取令牌桶的突发容量
    	b := r.limiter.Burst()
    
        // 2. 如果请求的读取量超过突发容量,调整读取大小
    	if b < len(p) {
    		p = p[:b]
    	}
    
        // 3. 执行实际读取操作
    	n, err = r.r.Read(p)
    	if err != nil {
             // 4. 如果读取过程中出错,直接返回
    		return
    	}
    
         // 5. 根据实际读取的字节数消耗令牌
    	err = r.limiter.WaitN(context.Background(), n)
    	if err != nil {
    		return
    	}
    	return
    }
    

    7. 参考文献

    HTTPS://gofrp.org/zh-cn/docs/

    HTTPS://blog.csdn.net/u012175637/article/details/84138925

    HTTPS://cloud.tencent.com/developer/article/2093328

    原文链接:,转发请注明来源!