区块链网络库 go-libp2p 源码剖析 Swarm
go-libp2p
的源码很庞杂,而且其中集合了各种第三库 + 项目自身的"故意"封装,看起来还是有一定的难度。本篇文章对Swarm
功能流程进行梳理,把我自己的理解尽可能详细的展示出来。
该库代码个人感觉不太适合初学者或者完全没有使用过该库的人,上来就阅读源码会非常吃力。如果你是从事区块链开发,有一定的基础,可以学习下。
简述
libp2p
是一个网络堆栈和库,从IPFS
(分布式文件存储)项目中模块化出来以供其他工具使用。Libp2p
是长期艰苦探索的产物(深入了解互联网的网络堆栈,以及过去大量的peer-to-peer
协议)。在过去的15年中,构建大规模的peer-to-peer
系统一直是复杂和困难的,libp2p
是解决这个问题的一种方法。它是一个"网络堆栈"(一个协议套件)它清晰地分离关注点,并使复杂的应用程序只使用它们绝对需要的协议,而不放弃互操作性和可升级性。是现在很多做区块链应用使用的基础底层网络库。
引子
这段代码来自官方的echo
范例代码 https://github.com/libp2p/go-libp2p/tree/master/examples/echo
演示了怎么创建节点和连接其他的节点
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 设定日志级别
golog.SetAllLoggers(golog.LevelInfo)
// 解析命令行参数
listenF := flag.Int("l", 0, "wait for incoming connections") // 监听端口
targetF := flag.String("d", "", "target peer to dial") // 作为客户端的时候,被连接的服务端的地址
insecureF := flag.Bool("insecure", false, "use an unencrypted connection")
seedF := flag.Int64("seed", 0, "set random seed for id generation")
flag.Parse()
if *listenF == 0 {
log.Fatal("Please provide a port to bind on with -l")
}
// 创建 host.Host 接口(本质就是 *BasicHost 结构体)
ha, err := makeBasicHost(*listenF, *insecureF, *seedF)
if err != nil {
log.Fatal(err)
}
if *targetF == "" {
// hostA 设定流的处理 handler
startListener(ctx, ha, *listenF, *insecureF)
// Run until canceled.
<-ctx.Done()
} else {
// hostB,连接对应的服务端 hostA
runSender(ctx, ha, *targetF)
}
}
func makeBasicHost(listenPort int, insecure bool, randseed int64) (host.Host, error) {
var r io.Reader
if randseed == 0 {
r = rand.Reader
} else {
r = mrand.New(mrand.NewSource(randseed))
}
// 生成 RSA 私钥对(可以用来获取 peer.ID)
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
if err != nil {
return nil, err
}
// 设定基础参数
opts := []libp2p.Option{
// /ip4/127.0.0.1/tcp/0 如果 listenPort = 0 表示随机端口 , listenPort = 2222 表示固定端口 2222
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),
libp2p.Identity(priv), // 私钥
libp2p.DisableRelay(),
}
if insecure {
opts = append(opts, libp2p.NoSecurity)
}
// 创建对象 *BasicHost
return libp2p.New(opts...)
}
func startListener(ctx context.Context, ha host.Host, listenPort int, insecure bool) {
// 完整的地址
fullAddr := getHostAddress(ha)
log.Printf("I am %s\n", fullAddr)
// 在 hostA上设置流处理函数handler /echo/1.0.0 是用户自定义的协议名
ha.SetStreamHandler("/echo/1.0.0", func(s network.Stream) { // s 本质就是 *yamux.Stream
log.Println("listener received new stream")
if err := doEcho(s); err != nil {
log.Println(err)
s.Reset()
} else {
s.Close()
}
})
log.Println("listening for connections")
if insecure {
log.Printf("Now run \"./echo -l %d -d %s -insecure\" on a different terminal\n", listenPort+1, fullAddr)
} else {
log.Printf("Now run \"./echo -l %d -d %s\" on a different terminal\n", listenPort+1, fullAddr)
}
}
func getHostAddress(ha host.Host) string {
//从 ha 中获取 peer.ID 字符串(其实就是一个对公钥进行hash计算出来的一个字符串)
hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", ha.ID())) // /p2p/xxxx
// ha 中存在的地址 /ip4/127.0.0.1/tcp/2222
addr := ha.Addrs()[0]
// 拼接 /ip4/127.0.0.1/tcp/2222 + /p2p/xxxx
return addr.Encapsulate(hostAddr).String()
}
func runSender(ctx context.Context, ha host.Host, targetPeer string) {
// 这里是 host B 的完整地址
fullAddr := getHostAddress(ha)
log.Printf("I am %s\n", fullAddr)
// 将 targetPeer = /ip4/127.0.0.1/tcp/2222/p2p/QmcX4KsBCoqyAfQ1hTVSgAB3RxMNaeRSZf3pAGAcxrKTVc 转成 multiaddr.
maddr, err := ma.NewMultiaddr(targetPeer)
if err != nil {
log.Println(err)
return
}
// 从 multiaddr 获取 peer ID
info, err := peer.AddrInfoFromP2pAddr(maddr)
if err != nil {
log.Println(err)
return
}
// 将 a peer ID and a targetAddr 键值保存在内存缓存中
// PermanentAddrTTL 缓存有效期
ha.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
log.Println("sender opening stream")
// 从 host B 到 host A的连接中,创建一个流。最终的数据处理应该在 host A上(也就是上面 host A 设定的 /echo/1.0.0 的handler)
s, err := ha.NewStream(context.Background(), info.ID, "/echo/1.0.0")
if err != nil {
log.Println(err)
return
}
log.Println("sender saying hello")
// 向 hostA 发送数据 Hello, world!
_, err = s.Write([]byte("Hello, world!\n"))
if err != nil {
log.Println(err)
return
}
out, err := io.ReadAll(s)
if err != nil {
log.Println(err)
return
}
log.Printf("read reply: %q\n", out)
}
// doEcho reads a line of data a stream and writes it back
func doEcho(s network.Stream) error {
// host A中处理数据
buf := bufio.NewReader(s)
str, err := buf.ReadString('\n')
if err != nil {
return err
}
log.Printf("read: %s", str)
_, err = s.Write([]byte(str))
return err
}
按照如下执行,可以看效果:
# 启动 hostA
cd examples/echo
go run main.go -l 2222
# 此时 hostA 监听在 2222端口,等待着连接
# I am /ip4/127.0.0.1/tcp/2222/p2p/QmTDQTqY8GcLGeMFK2sm9R1oCy2dEKKvMT6Kigf96fjKQL
# 开启另外一个终端 启动 hostB
# Now run "./echo -l 2223 -d /ip4/127.0.0.1/tcp/2222/p2p/QmTDQTqY8GcLGeMFK2sm9R1oCy2dEKKvMT6Kigf96fjKQL" on a different terminal
cd examples/echo
go run main.go -l 2223 -d /ip4/127.0.0.1/tcp/2222/p2p/QmTDQTqY8GcLGeMFK2sm9R1oCy2dEKKvMT6Kigf96fjKQL
# 此时 hostB 监听在 2223 端口,等待着连接(同时向 hostA 2222 端口,发送了一个消息 Hello, world! )
# I am /ip4/127.0.0.1/tcp/2223/p2p/QmQzxCFuk1Pc8J6tsDChHdaJ3A7GRjxRGfeJ4HZJFTmmpm
这里的 hostA hostB
并非绝对的服务端和客户端的关系,他们即可以都是服务端也可以都是客户端,只是这里演示的是 hostA
是服务端,hostB
作为客户端去连接hostA
。
源码解析
我们先从 makeBasicHost
函数看起:
-
构建
opts []libp2p.Option
切片,libp2p.Option
本质是函数类型type Option func(cfg *Config) error
(开源代码中很多这种设计,通过函数切片的方式进行初始化参数) -
最后实际调用的函数为
libp2p.New(opts...)
func makeBasicHost(listenPort int, insecure bool, randseed int64) (host.Host, error) { //.....省略....
opts := []libp2p.Option{ libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)), libp2p.Identity(priv), // 私钥 libp2p.DisableRelay(), } if insecure { opts = append(opts, libp2p.NoSecurity) } return libp2p.New(opts...)
}
进入到 libp2p.New
函数内部
-
最终实际调用的是
NewWithoutDefaults
函数(注意⚠️:这里的FallbackDefaults
这个是默认的配置函数)如果opts ...Option
给的配置信息比较少,那缺省的会用默认的进行填充。 -
在
NewWithoutDefaults
函数内部,将配置信息保存到var cfg Config
中 -
调用
cfg.NewNode()
开始进行节点的初始化+启动func New(opts ...Option) (host.Host, error) { return NewWithoutDefaults(append(opts, FallbackDefaults)...) // FallbackDefaults 默认的配置 }
func NewWithoutDefaults(opts ...Option) (host.Host, error) { var cfg Config // 应用配置信息,将信息保存到 cfg 中 if err := cfg.Apply(opts...); err != nil { return nil, err } return cfg.NewNode() }
// 对用户的配置进行检查,如果某些没有设定,就设定为默认的 var FallbackDefaults Option = func(cfg *Config) error {
for _, def := range defaults { if !def.fallback(cfg) { // 检查属性是否设定 continue } // 设定属性 if err := cfg.Apply(def.opt); err != nil { return err } } return nil
}
// DefaultTransports 默认的传输层包括了 tcp quic websocket var DefaultTransports = ChainOptions( Transport(tcp.NewTCPTransport), Transport(quic.NewTransport), Transport(ws.New), Transport(webtransport.New), )
// 这就是上面的 defaults var defaults = []struct { fallback func(cfg *Config) bool opt Option }{ { fallback: func(cfg *Config) bool { return cfg.Transports == nil && cfg.ListenAddrs == nil }, // 如果传输层 + 监听端口没有设定,就使用默认的 DefaultListenAddrs(本echo中有设定 ListenAddrs) opt: DefaultListenAddrs, }, { fallback: func(cfg *Config) bool { return cfg.Transports == nil && cfg.PSK == nil }, // 如果传输层没有制定,就使用默认的 DefaultTransports opt: DefaultTransports, // 上面? }, { fallback: func(cfg *Config) bool { return cfg.Transports == nil && cfg.PSK != nil }, opt: DefaultPrivateTransports, }, { fallback: func(cfg *Config) bool { return cfg.Muxers == nil }, opt: DefaultMuxers, }, { fallback: func(cfg *Config) bool { return !cfg.Insecure && cfg.SecurityTransports == nil }, opt: DefaultSecurity, }, { fallback: func(cfg *Config) bool { return cfg.PeerKey == nil }, opt: RandomIdentity, }, { fallback: func(cfg *Config) bool { return cfg.Peerstore == nil }, opt: DefaultPeerstore, }, { fallback: func(cfg *Config) bool { return !cfg.RelayCustom }, opt: DefaultEnableRelay, }, { fallback: func(cfg *Config) bool { return cfg.ResourceManager == nil }, opt: DefaultResourceManager, }, { fallback: func(cfg *Config) bool { return cfg.ConnManager == nil }, opt: DefaultConnectionManager, }, { fallback: func(cfg *Config) bool { return cfg.MultiaddrResolver == nil }, opt: DefaultMultiaddrResolver, }, { fallback: func(cfg *Config) bool { return !cfg.DisableMetrics && cfg.PrometheusRegisterer == nil }, opt: DefaultPrometheusRegisterer, }, }
进入 cfg.NewNode()
内部: 中间有省略大量的代码,并非不重要(比如路由 + 自动nat
+ 自动relay
,可以参考 https://blog.csdn.net/kk3909/category_10554659.html 研读)
-
cfg.makeSwarm
创建swrm
-
bhost.NewHost
创建*BasicHost
对象,同时将swrm
保存为type BasicHost struct
结构体中的network network.Network
-
cfg.addTransports(h)
完成依赖检测+初始化(使用了其他的第三方库go.uber.org/fx
)。 -
cfg.addTransports(h)
函数内部比较重要的逻辑,如下: -
swrm.transports
值的设定 (代码路径net/swarm/swarm_transport.go
第76行)
-
upgrader
结构体中muxers []StreamMuxer
值的设定(代码路径net/upgrader/upgrader.go
的第74行New
函数)
-
h.Network().Listen
开始监听(也就是swrm
开启监听,后续看到h.Network()
这个写法,就是代表的swrm
) -
最后返回
*BasicHost
对象,作为对host.Host
接口的实现(后续看到host.Host
就要当作*BasicHost
对象)func (cfg *Config) NewNode() (host.Host, error) { // ...省略...
// 创建 swarm swrm, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics) if err != nil { return nil, err } // 创建 *BasicHost (传入 swrm) h, err := bhost.NewHost(swrm, &bhost.HostOpts{ EventBus: eventBus, ConnManager: cfg.ConnManager, AddrsFactory: cfg.AddrsFactory, NATManager: cfg.NATManager, EnablePing: !cfg.DisablePing, UserAgent: cfg.UserAgent, ProtocolVersion: cfg.ProtocolVersion, EnableHolePunching: cfg.EnableHolePunching, HolePunchingOptions: cfg.HolePunchingOptions, EnableRelayService: cfg.EnableRelayService, RelayServiceOpts: cfg.RelayServiceOpts, EnableMetrics: !cfg.DisableMetrics, PrometheusRegisterer: cfg.PrometheusRegisterer, }) if err != nil { swrm.Close() return nil, err } // 完成依赖检测+初始化(使用了第三方的库go.uber.org/fx 实现,具体原理是如何,不知道) /* 比较重要内部逻辑: swrm.AddTransport(t),将 t 保存到swrm.transports中,让 swrm 知道了有哪些传输层(代码路径 net/swarm/swarm_transport.go 第76行) fx.Provide(fx.Annotate(tptu.New, fx.ParamTags(`name:"security"`))), fx.Supply(cfg.Muxers), 完成了对 upgrader 结构体中 muxers []StreamMuxer 的设定,代码位于 (net/upgrader/upgrader.go 的第74行 New 函数) */ if err := cfg.addTransports(h); err != nil { h.Close() return nil, err } /// Swarm 开启监听 tcp(因为我们的echo只设定tcp的监听地址) if err := h.Network().Listen(cfg.ListenAddrs...); err != nil { h.Close() return nil, err } // ... 省略 ... var ho host.Host ho = h // ... 省略 ... return ho, nil
}
这里优先看下 cfg.makeSwarm
函数内部逻辑(后续还需要回到NewNode()
函数看其他的逻辑)
func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
// 检查 Peerstore(这个值是在上面 FallbackDefaults 中设定的缺省值,肯定是有值的)
if cfg.Peerstore == nil {
return nil, fmt.Errorf("no peerstore specified")
}
// Check this early. Prevents us from even *starting* without verifying this.
if pnet.ForcePrivateNetwork && len(cfg.PSK) == 0 {
log.Error("tried to create a libp2p node with no Private" +
" Network Protector but usage of Private Networks" +
" is forced by the environment")
// Note: This is *also* checked the upgrader itself, so it'll be
// enforced even *if* you don't use the libp2p constructor.
return nil, pnet.ErrNotInPrivateNetwork
}
// 检查RSA私钥 (这个值是在上面 FallbackDefaults 中设定的缺省值,肯定是有值的)
if cfg.PeerKey == nil {
return nil, fmt.Errorf("no peer key specified")
}
// 从公钥中生成 peer ID
pid, err := peer.IDFromPublicKey(cfg.PeerKey.GetPublic()) // 来源于 public key
if err != nil {
return nil, err
}
// 在内存中保存 peer ID + 私钥
if err := cfg.Peerstore.AddPrivKey(pid, cfg.PeerKey); err != nil {
return nil, err
}
// 在内存中保存 peer ID + 公钥
if err := cfg.Peerstore.AddPubKey(pid, cfg.PeerKey.GetPublic()); err != nil {
return nil, err
}
opts := cfg.SwarmOpts
// 创建 *swarm.Swarm 对象
return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...)
}
peer ID
的生成规则额外提一下:peer ID
本质就是一个hash
值字符串,最最原始的数据来源于公钥key
,经过一系列编码,得到peer ID
。peer ID
是一个服务实例的唯一标识
再说一个点:因为服务每次重启的时候RandomIdentity
,都会重新生成公私钥 ,所以每次的peer ID
是不一样的(不理解的可以再看下代码,理解了,说明你确实有认真看代码)
// 这里是默认缺省的设定 cfg.PeerKey的逻辑
var RandomIdentity = func(cfg *Config) error {
priv, _, err := crypto.GenerateEd25519Key(rand.Reader) // priv 每次重新生成
if err != nil {
return err
}
return cfg.Apply(Identity(priv)) // 这里设定 cfg.PeerKey 值为 priv
}
var defaults = []struct {
fallback func(cfg *Config) bool
opt Option
}{
{
fallback: func(cfg *Config) bool { return cfg.PeerKey == nil },
opt: RandomIdentity, // 上面?的RandomIdentity,FallbackDefaults 中设定缺省值的函数
},
}
// 这里是生成 peer ID 的逻辑
// 这里的 pk 值来源于 cfg.PeerKey.GetPublic()
// 而 cfg.PeerKey 的值,就是在上面? RandomIdentity函数中的设定的值 priv
func IDFromPublicKey(pk ic.PubKey) (ID, error) {
b, err := ic.MarshalPublicKey(pk) // proto编码
if err != nil {
return "", err
}
var alg uint64 = mh.SHA2_256
if AdvancedEnableInlining && len(b) <= maxInlineKeyLength {
alg = mh.IDENTITY
}
// 对b字节流,经过hash编码
hash, _ := mh.Sum(b, alg, -1)
return ID(hash), nil
}
// proto编码
func MarshalPublicKey(k PubKey) ([]byte, error) {
pbmes, err := PublicKeyToProto(k) // 构建 pb.PublicKey 结构体
if err != nil {
return nil, err
}
return proto.Marshal(pbmes) // proto编码
}
// 构建 pb.PublicKey 结构体
func PublicKeyToProto(k PubKey) (*pb.PublicKey, error) {
data, err := k.Raw()
if err != nil {
return nil, err
}
return &pb.PublicKey{
Type: k.Type().Enum(),
Data: data,
}, nil
}
继续进入 swarm.NewSwarm
内部:
-
创建
&Swarm
对象 -
初始化
s.conns.m / s.listeners.m / s.transports.m
-
s.dsync / s.limiter
真正用来进行拨号连接的核心逻辑;最终逻辑体现在s.dialWorkerLoop
和s.dialAddr
函数上// NewSwarm constructs a Swarm. func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts ...Option) (*Swarm, error) {
// ... 省略 ... ctx, cancel := context.WithCancel(context.Background()) s := &Swarm{ local: local, // 当前实例的 peer.ID peers: peers, // 用来做存储用的 emitter: emitter, ctx: ctx, ctxCancel: cancel, dialTimeout: defaultDialTimeout, // 15s dialTimeoutLocal: defaultDialTimeoutLocal, // 5s maResolver: madns.DefaultResolver, dialRanker: DefaultDialRanker, // 地址排序 // A black hole is a binary property. On a network if UDP dials are blocked or there is // no IPv6 connectivity, all dials will fail. So a low success rate of 5 out 100 dials // is good enough. udpBlackHoleConfig: blackHoleConfig{Enabled: true, N: 100, MinSuccesses: 5}, ipv6BlackHoleConfig: blackHoleConfig{Enabled: true, N: 100, MinSuccesses: 5}, } s.conns.m = make(map[peer.ID][]*Conn) // peer.ID 对应的 所有的已建立的连接 s.listeners.m = make(map[transport.Listener]struct{}) // 传输层的监听句柄 s.transports.m = make(map[int]transport.Transport) // 预定义的id 对应的传输层实现 (这是在cfg.addTransports(h)中赋值的) s.notifs.m = make(map[network.Notifiee]struct{}) s.directConnNotifs.m = make(map[peer.ID][]chan struct{}) for _, opt := range opts { if err := opt(s); err != nil { return nil, err } } //// 这里是当host作为客户端的时候,如何连接服务端的逻辑处理 //////////////////////////////////////////////////// s.dsync = newDialSync(s.dialWorkerLoop) // 拨号协程 s.dialWorkerLoop,死循环检测拨号请求 s.limiter = newDialLimiter(s.dialAddr) // 真正进行拨号的函数 s.backf.init(s.ctx) // 启动后台任务,清理 peer.ID return s, nil
}
这里额外补充下 s.transports.m
的赋值逻辑(代码路径 net/swarm/swarm_transport.go
第76行):
-
本质就是为了在
s.transports.m
中记录k/v
对func (s *Swarm) AddTransport(t transport.Transport) error {
// 获取 t 传输层对应 id 标识(每个传输层都有预定义的id), tcp 传输层为 6 protocols := t.Protocols() if len(protocols) == 0 { return fmt.Errorf("useless transport handles no protocols: %T", t) } // 加锁 s.transports.Lock() defer s.transports.Unlock() if s.transports.m == nil { return ErrSwarmClosed } var registered []string for _, p := range protocols { // 看下该标识p(整数类型),在 s.transports.m中是否存在 if _, ok := s.transports.m[p]; ok { proto := ma.ProtocolWithCode(p) name := proto.Name if name == "" { name = fmt.Sprintf("unknown (%d)", p) } registered = append(registered, name) } } // 是否重复注册 if len(registered) > 0 { return fmt.Errorf( "transports already registered for protocol(s): %s", strings.Join(registered, ", "), ) } // 否则,将 t 对应的标识p 和 t 保存为 k/v对 for _, p := range protocols { s.transports.m[p] = t } return nil
}
这里t.Protocols()
标识信息来源于第三方库 go-multiaddr/protocols.go
中的定义
-
比如 6 就表示 TCP
-
比如 4 就表示 IP4
-
并在
fun init()
中 调用func AddProtocol(p Protocol) error
函数,将预定义的信息保存在var protocolsByName = map[string]Protocol{} var protocolsByCode = map[int]Protocol{}
中// 预定义的整数值 const ( P_IP4 = 4 P_TCP = 6 P_DNS = 53 // 4 or 6 P_DNS4 = 54 P_DNS6 = 55 P_DNSADDR = 56 P_UDP = 273 P_DCCP = 33 P_IP6 = 41 P_IP6ZONE = 42 P_IPCIDR = 43 P_QUIC = 460 P_QUIC_V1 = 461 P_WEBTRANSPORT = 465 P_CERTHASH = 466 P_SCTP = 132 P_CIRCUIT = 290 P_UDT = 301 P_UTP = 302 P_UNIX = 400 P_P2P = 421 P_IPFS = P_P2P // alias for backwards compatibility P_HTTP = 480 P_HTTPS = 443 // deprecated alias for /tls/http P_ONION = 444 // also for backwards compatibility P_ONION3 = 445 P_GARLIC64 = 446 P_GARLIC32 = 447 P_P2P_WEBRTC_DIRECT = 276 // Deprecated. use webrtc-direct instead P_TLS = 448 P_SNI = 449 P_NOISE = 454 P_WS = 477 P_WSS = 478 // deprecated alias for /tls/ws P_PLAINTEXTV2 = 7367777 P_WEBRTC_DIRECT = 280 P_WEBRTC = 281 )
var ( protoIP4 = Protocol{ Name: "ip4", Code: P_IP4, VCode: CodeToVarint(P_IP4), Size: 32, Path: false, Transcoder: TranscoderIP4, } protoTCP = Protocol{ Name: "tcp", Code: P_TCP, VCode: CodeToVarint(P_TCP), Size: 16, Path: false, Transcoder: TranscoderPort, } protoDNS = Protocol{ Code: P_DNS, Size: LengthPrefixedVarSize, Name: "dns", VCode: CodeToVarint(P_DNS), Transcoder: TranscoderDns, } protoDNS4 = Protocol{ Code: P_DNS4, Size: LengthPrefixedVarSize, Name: "dns4", VCode: CodeToVarint(P_DNS4), Transcoder: TranscoderDns, } protoDNS6 = Protocol{ Code: P_DNS6, Size: LengthPrefixedVarSize, Name: "dns6", VCode: CodeToVarint(P_DNS6), Transcoder: TranscoderDns, } protoDNSADDR = Protocol{ Code: P_DNSADDR, Size: LengthPrefixedVarSize, Name: "dnsaddr", VCode: CodeToVarint(P_DNSADDR), Transcoder: TranscoderDns, } protoUDP = Protocol{ Name: "udp", Code: P_UDP, VCode: CodeToVarint(P_UDP), Size: 16, Path: false, Transcoder: TranscoderPort, } protoDCCP = Protocol{ Name: "dccp", Code: P_DCCP, VCode: CodeToVarint(P_DCCP), Size: 16, Path: false, Transcoder: TranscoderPort, } protoIP6 = Protocol{ Name: "ip6", Code: P_IP6, VCode: CodeToVarint(P_IP6), Size: 128, Transcoder: TranscoderIP6, } protoIPCIDR = Protocol{ Name: "ipcidr", Code: P_IPCIDR, VCode: CodeToVarint(P_IPCIDR), Size: 8, Transcoder: TranscoderIPCIDR, } // these require varint protoIP6ZONE = Protocol{ Name: "ip6zone", Code: P_IP6ZONE, VCode: CodeToVarint(P_IP6ZONE), Size: LengthPrefixedVarSize, Path: false, Transcoder: TranscoderIP6Zone, } protoSCTP = Protocol{ Name: "sctp", Code: P_SCTP, VCode: CodeToVarint(P_SCTP), Size: 16, Transcoder: TranscoderPort, }
protoCIRCUIT = Protocol{ Code: P_CIRCUIT, Size: 0, Name: "p2p-circuit", VCode: CodeToVarint(P_CIRCUIT), } protoONION2 = Protocol{ Name: "onion", Code: P_ONION, VCode: CodeToVarint(P_ONION), Size: 96, Transcoder: TranscoderOnion, } protoONION3 = Protocol{ Name: "onion3", Code: P_ONION3, VCode: CodeToVarint(P_ONION3), Size: 296, Transcoder: TranscoderOnion3, } protoGARLIC64 = Protocol{ Name: "garlic64", Code: P_GARLIC64, VCode: CodeToVarint(P_GARLIC64), Size: LengthPrefixedVarSize, Transcoder: TranscoderGarlic64, } protoGARLIC32 = Protocol{ Name: "garlic32", Code: P_GARLIC32, VCode: CodeToVarint(P_GARLIC32), Size: LengthPrefixedVarSize, Transcoder: TranscoderGarlic32, } protoUTP = Protocol{ Name: "utp", Code: P_UTP, VCode: CodeToVarint(P_UTP), } protoUDT = Protocol{ Name: "udt", Code: P_UDT, VCode: CodeToVarint(P_UDT), } protoQUIC = Protocol{ Name: "quic", Code: P_QUIC, VCode: CodeToVarint(P_QUIC), } protoQUICV1 = Protocol{ Name: "quic-v1", Code: P_QUIC_V1, VCode: CodeToVarint(P_QUIC_V1), } protoWEBTRANSPORT = Protocol{ Name: "webtransport", Code: P_WEBTRANSPORT, VCode: CodeToVarint(P_WEBTRANSPORT), } protoCERTHASH = Protocol{ Name: "certhash", Code: P_CERTHASH, VCode: CodeToVarint(P_CERTHASH), Size: LengthPrefixedVarSize, Transcoder: TranscoderCertHash, } protoHTTP = Protocol{ Name: "http", Code: P_HTTP, VCode: CodeToVarint(P_HTTP), } protoHTTPS = Protocol{ Name: "https", Code: P_HTTPS, VCode: CodeToVarint(P_HTTPS), } protoP2P = Protocol{ Name: "p2p", Code: P_P2P, VCode: CodeToVarint(P_P2P), Size: LengthPrefixedVarSize, Transcoder: TranscoderP2P, } protoUNIX = Protocol{ Name: "unix", Code: P_UNIX, VCode: CodeToVarint(P_UNIX), Size: LengthPrefixedVarSize, Path: true, Transcoder: TranscoderUnix, } protoP2P_WEBRTC_DIRECT = Protocol{ Name: "p2p-webrtc-direct", Code: P_P2P_WEBRTC_DIRECT, VCode: CodeToVarint(P_P2P_WEBRTC_DIRECT), } protoTLS = Protocol{ Name: "tls", Code: P_TLS, VCode: CodeToVarint(P_TLS), } protoSNI = Protocol{ Name: "sni", Size: LengthPrefixedVarSize, Code: P_SNI, VCode: CodeToVarint(P_SNI), Transcoder: TranscoderDns, } protoNOISE = Protocol{ Name: "noise", Code: P_NOISE, VCode: CodeToVarint(P_NOISE), } protoPlaintextV2 = Protocol{ Name: "plaintextv2", Code: P_PLAINTEXTV2, VCode: CodeToVarint(P_PLAINTEXTV2), } protoWS = Protocol{ Name: "ws", Code: P_WS, VCode: CodeToVarint(P_WS), } protoWSS = Protocol{ Name: "wss", Code: P_WSS, VCode: CodeToVarint(P_WSS), } protoWebRTCDirect = Protocol{ Name: "webrtc-direct", Code: P_WEBRTC_DIRECT, VCode: CodeToVarint(P_WEBRTC_DIRECT), } protoWebRTC = Protocol{ Name: "webrtc", Code: P_WEBRTC, VCode: CodeToVarint(P_WEBRTC), }
)
// 将预定的信息,调用 AddProtocol 保存到 map中 func init() { for _, p := range []Protocol{ protoIP4, protoTCP, protoDNS, protoDNS4, protoDNS6, protoDNSADDR, protoUDP, protoDCCP, protoIP6, protoIP6ZONE, protoIPCIDR, protoSCTP, protoCIRCUIT, protoONION2, protoONION3, protoGARLIC64, protoGARLIC32, protoUTP, protoUDT, protoQUIC, protoQUICV1, protoWEBTRANSPORT, protoCERTHASH, protoHTTP, protoHTTPS, protoP2P, protoUNIX, protoP2P_WEBRTC_DIRECT, protoTLS, protoSNI, protoNOISE, protoWS, protoWSS, protoPlaintextV2, protoWebRTCDirect, protoWebRTC, } { if err := AddProtocol(p); err != nil { panic(err) } }
// explicitly set both of these protocolsByName["p2p"] = protoP2P protocolsByName["ipfs"] = protoP2P
}
到此整个的 cfg.makeSwarm
函数,才算完成。当然也仅仅只是初始化了一个 *swarm.Swarm
对象而已。真正的监听 or 拨号的逻辑还没有真正的执行。
服务端监听流程
再回到 NewNode
函数,直接看 h.Network().Listen(cfg.ListenAddrs...)
这块是关于服务端监听+如何处理请求的流程梳理
// network.Network 是接口,h.netword的实际值是 swrm
func (h *BasicHost) Network() network.Network {
return h.network
}
所以,执行h.Network().Listen(cfg.ListenAddrs...)
本质是执行 swrm.Listen(cfg.ListenAddrs...)
// Swarm中的 Listen
func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
errs := make([]error, len(addrs))
var succeeded int
// 遍历在代码开始的时候设定的监听地址(我们只设定了 /ip4/127.0.0.1/tcp/2222 这样的一个地址)
for i, a := range addrs {
// 针对每一个地址,开启监听
if err := s.AddListenAddr(a); err != nil {
errs[i] = err
} else {
succeeded++
}
}
// 监听失败的日志报错
for i, e := range errs {
if e != nil {
log.Warnw("listening failed", "on", addrs[i], "error", errs[i])
}
}
if succeeded == 0 && len(addrs) > 0 {
return fmt.Errorf("failed to listen on any addresses: %s", errs)
}
return nil
}
继续进入 s.AddListenAddr(a)
函数查看:
-
先获取对应协议
TransportForListening
相对应的传输层对象*TcpTransport
-
然后开启监听
tpt.Listen(a)
-
启动协程,死循环
list.Accept()
获取客户端的连接c
-
每个客户端连接
c
,再启动一个协程,对c
进行读写s.addConn(c, network.DirInbound)
func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
// 当前的tcp协议,对应的传输层(本demo获取的传输层应该是 *TcpTransport ) tpt := s.TransportForListening(a) if tpt == nil { // TransportForListening will return nil if either: // 1. No transport has been registered. // 2. We're closed (so we've nulled out the transport map. // // Distinguish between these two cases to avoid confusing users. select { case <-s.ctx.Done(): return ErrSwarmClosed default: return ErrNoTransport } } // 这里就是在调用 *TcpTransport.Listen,返回监听对象 list, err := tpt.Listen(a) if err != nil { return err } s.listeners.Lock() if s.listeners.m == nil { s.listeners.Unlock() list.Close() return ErrSwarmClosed } s.refs.Add(1) // 保存 list 监听句柄 s.listeners.m[list] = struct{}{} s.listeners.cacheEOL = time.Time{} s.listeners.Unlock() maddr := list.Multiaddr() // signal to our notifiees on listen. s.notifyAll(func(n network.Notifiee) { n.Listen(s, maddr) }) // 启动协程 go func() { defer func() { s.listeners.Lock() _, ok := s.listeners.m[list] if ok { delete(s.listeners.m, list) s.listeners.cacheEOL = time.Time{} } s.listeners.Unlock() if ok { list.Close() log.Errorf("swarm listener unintentionally closed") } // signal to our notifiees on listen close. s.notifyAll(func(n network.Notifiee) { n.ListenClose(s, maddr) }) s.refs.Done() }() for { // 死循环 // 获取一个 客户端连接 c, err := list.Accept() // &transportConn if err != nil { if !errors.Is(err, transport.ErrListenerClosed) { log.Errorf("swarm listener for %s accept error: %s", a, err) } return } canonicallog.LogPeerStatus(100, c.RemotePeer(), c.RemoteMultiaddr(), "connection_status", "established", "dir", "inbound") if s.metricsTracer != nil { c = wrapWithMetrics(c, s.metricsTracer, time.Now(), network.DirInbound) } log.Debugf("swarm listener accepted connection: %s <-> %s", c.LocalMultiaddr(), c.RemoteMultiaddr()) s.refs.Add(1) // 再启动一个协程 go func() { defer s.refs.Done() // 单独和该客户端连接进行交互 _, err := s.addConn(c, network.DirInbound) switch err { case nil: case ErrSwarmClosed: // ignore. return default: log.Warnw("adding connection failed", "to", a, "error", err) return } }() } }() return nil
}
// 获取地址格式,对应的传输层(本demo获取的传输层应该是 *TcpTransport ) func (s *Swarm) TransportForListening(a ma.Multiaddr) transport.Transport { protocols := a.Protocols() if len(protocols) == 0 { return nil }
s.transports.RLock() defer s.transports.RUnlock() if len(s.transports.m) == 0 { // make sure we're not just shutting down. if s.transports.m != nil { log.Error("you have no transports configured") } return nil } selected := s.transports.m[protocols[len(protocols)-1].Code] for _, p := range protocols { transport, ok := s.transports.m[p.Code] if !ok { continue } if transport.Proxy() { selected = transport } } return selected
}
这里的 *TcpTransport.Listen
内部逻辑,有一层额外的设计,如下(感兴趣可以看下):
-
t.maListen(laddr)
内部本质就是调用的net.Listen
,获取到最最最最最最原始的网络监听对象 -
然后利用
t.upgrader.UpgradeListener(t, list)
对原始的监听做了一层升级设计(代码位于p2p/net/upgrader/upgrader.go
107行) -
- 内部启动了一个协程
go l.handleIncoming()
,死循环不断的调用maconn, err := l.Listener.Accept()
获取原始的客户端连接maconn
,然后再调用conn, err := l.upgrader.Upgrade(ctx, l.transport, maconn, network.DirInbound, "", connScope)
对客户端连接maconn
,又进行了一次升级,其实就是返回了一个&transportConn
类型的conn
,最后将conn
对象保存到了l.incoming <- conn
通道中
- 内部启动了一个协程
-
当外部调用
c, err := list.Accept()
获取客户端连接的时候,其实是从l.incoming
中获取的&transportConn
类型的conn
(代码位于p2p/net/upgrader/listener.go
163行)// Listen listens on the given multiaddr. func (t *TcpTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {
// 这个就是最最最最最最原始的网络监听对象 list, err := t.maListen(laddr) if err != nil { return nil, err } if t.enableMetrics { list = newTracingListener(&tcpListener{list, 0}) } // 进入这里,看更详细的逻辑(这里就不展示代码了) return t.upgrader.UpgradeListener(t, list), nil
}
func (t *TcpTransport) maListen(laddr ma.Multiaddr) (manet.Listener, error) { // maListener if t.UseReuseport() { return t.reuse.Listen(laddr) } return manet.Listen(laddr) }
func Listen(laddr ma.Multiaddr) (Listener, error) {
lnet, lnaddr, err := DialArgs(laddr) if err != nil { return nil, err } // 这个就是最最最最最最原始的网络监听对象 nl, err := net.Listen(lnet, lnaddr) if err != nil { return nil, err } return WrapNetListener(nl)
}
func WrapNetListener(nl net.Listener) (Listener, error) { // maListener if nla, ok := nl.(*netListenerAdapter); ok { return nla.Listener, nil }
laddr, err := FromNetAddr(nl.Addr()) if err != nil { return nil, err } return &maListener{ Listener: nl, laddr: laddr, }, nil
}
最后我们再来看下 s.addConn
中实际是怎么读写数据的
func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, error) {
// ... 省略....
c := &Conn{
conn: tc, // &transportConn
swarm: s,
stat: stat,
id: s.nextConnID.Add(1),
}
// ... 省略....
// 这里保存了 和对方peer ID 已建立的连接(为了后续的连接复用)
s.conns.m[p] = append(s.conns.m[p], c)
// ... 省略....
// 当host作为服务端,真正处理逻辑的地方(进行协议匹配,调用 handler函数进行用户的业务逻辑处理)
c.start()
return c, nil
}
进入c.start()
-
这里的
c.conn.AcceptStream()
实际调用的是var s *yamux.Session 的 AcceptStream() 方法
,具体为什么,还是需要看下上面说到的conn, err := l.upgrader.Upgrade(ctx, l.transport, maconn, network.DirInbound, "", connScope)
内部的代码逻辑。关键代码位于p2p/net/upgrader/upgrader.go
中的183行muxer, smconn, err := u.setupMuxer(ctx, sconn, isServer, connScope.PeerScope())
函数内部的逻辑。 -
这里对流概念进行解释下:比如 hostA 和 hostB 建立了一个 socket连接,在这一个socket连接上,可以同时并发的发送多个不同的stream(简单可以认为每个stream就是一个完整的数据包),对于每个stream感觉上就像自己独享了这个socket连接(实际是复用的一个socket)。到了服务端,基于 stream id 可以将不同的stream分离开来,而并不会混淆在一起。再打个比方:比如一条公路,一般情况下一次过一辆车,利用率是不是太低了。那我把这条公路做点额外的设计,每次都可以让多辆车同时行驶,并且不会"撞"在一起,那对于每辆车来说,感觉这条路就是自己家开的。 这里利用了
github.com/libp2p/go-yamux/v4
这个第三方库,实现了 stream的功能。如果你对http2.0
有了解过,它里面的stream的实现和这里是类似的。这里是官方的解释https://docs.libp2p.io/concepts/multiplex/overview/
可以看下。 -
最终实际执行业务逻辑的代码位于
h(s)
,这里的h
的值就是n.SetStreamHandler(h.newStreamHandler)
设置的值h.newStreamHandler
(代码位于p2p/host/basic/basic_host.go
第 303行 )func (c *Conn) start() { go func() { defer c.swarm.refs.Done() defer c.Close()
for { // 这里我直接说结论,实际调用的是 var s *yamux.Session 的 AcceptStream() 方法,从当前连接中获取一个多路复用流 *yamux.Stream ts, err := c.conn.AcceptStream() if err != nil { return } c.swarm.refs.Add(1) go func() { // 对 yamux.Stream进行了包装,成了 *swarm.Stream s, err := c.addStream(ts, network.DirInbound, scope) c.swarm.refs.Done() // func (h *BasicHost) newStreamHandler(s network.Stream) if h := c.swarm.StreamHandler(); h != nil { // 这里的 s 是 *swarm.Stream h(s) } s.completeAcceptStreamGoroutine() }() } }()
}
所以直接看下 h.newStreamHandler
函数源码
-
就是从流中读取协议
-
遍历
msm.handlerlock
看能否匹配echo
中设定的协议/echo/1.0.0
(这里是全文匹配 ),如果相同,返回h.Handle
-
最后执行这个
h.Handle
函数handle(protoID, s)
func (h *BasicHost) newStreamHandler(s network.Stream) {
// 从 Mux中获取 处理句柄 protoID, handle, err := h.Mux().Negotiate(s) // 执行函数 handle(protoID, s)
}
const ProtocolID = "/multistream/1.0.0"
// 从 Mux中获取 处理句柄 func (msm *MultistreamMuxer[T]) Negotiate(rwc io.ReadWriteCloser) (proto T, handler HandlerFunc[T], err error) {
_ = delimWriteBuffered(rwc, []byte(ProtocolID)) // 写入 ProtocolID line, err := ReadNextToken[T](rwc) // 读取客户端发送来的 protocolID if err != nil { return "", nil, err } if line != ProtocolID { // 协议的公共前缀头(固定的格式) rwc.Close() return "", nil, ErrIncorrectVersion }
loop: for { // 读取并响应命令,直到它们发送一个有效的协议id (就是echo中的 /echo/1.0.0 ) tok, err := ReadNextTokenT // 从 rwc 中读取 if err != nil { return "", nil, err }
// 在这里匹配到对应的处理 函数 h := msm.findHandler(tok) if h == nil { if err := delimWriteBuffered(rwc, []byte("na")); err != nil { return "", nil, err } continue loop } _ = delimWriteBuffered(rwc, []byte(tok)) return tok, h.Handle, nil // 返回 h.Handle }
}
// 匹配的逻辑 func (msm *MultistreamMuxer[T]) findHandler(proto T) *Handler[T] { msm.handlerlock.RLock() defer msm.handlerlock.RUnlock()
// 遍历 msm.handlers 切片 for _, h := range msm.handlers { // 执行判断函数 if h.MatchFunc(proto) { return &h // 返回 handler } } return nil
}
echo
范例中,设定协议实际调用ha.SetStreamHandler("/echo/1.0.0" func(){})
的函数内部逻辑如下(这就和上面的匹配逻辑对应起来了):
-
向
msm.handlers
中保存记录func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) { // 保存到 msm.handlers 切片中 h.Mux().AddHandler(pid, func(p protocol.ID, rwc io.ReadWriteCloser) error { is := rwc.(network.Stream) handler(is) return nil }) h.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{ Added: []protocol.ID{pid}, }) }
func (msm *MultistreamMuxer[T]) AddHandler(protocol T, handler HandlerFunc[T]) { // 在msm.handlers 中添加记录 msm.AddHandlerWithFunc(protocol, fulltextMatch(protocol), handler) }
// 全文匹配 func fulltextMatch[T StringLike](s T) func(T) bool { return func(a T) bool { return a == s } }
// 在msm.handlers 中添加记录 func (msm *MultistreamMuxer[T]) AddHandlerWithFunc(protocol T, match func(T) bool, handler HandlerFunc[T]) { msm.handlerlock.Lock() defer msm.handlerlock.Unlock()
// 删除历史的 msm.removeHandler(protocol) // 新增 msm.handlers = append(msm.handlers, Handler[T]{ MatchFunc: match, // 这里的匹配是全文匹配 Handle: handler, // 处理函数 AddName: protocol, // 协议(路由) })
}
至此服务端的基本流程已经梳理完毕
服务端这个流程其实和普通的http
的处理流程大致是类似于的,都是先设定路由+处理函数,当请求来了以后,读取请求中的路由,和代码中设定好的路由进行匹配。
客户端拨号流程
需要大家再回到 echo
的源码中这句话 s, err := ha.NewStream(context.Background(), info.ID, "/echo/1.0.0")
,这里做的事情就是拨号
-
拨号的逻辑在
h.Connect
和h.Network().NewStream
都存在。 -
这里以
s, err := h.Network().NewStream(network.WithNoDial(ctx, "already dialed"), p)
函数来进行说明,本质都是调用s.dialPeer
拨号函数func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, error) {
// GetNoDial 目的在于是否执行拨号的过程,设置了就不走,不设置,就执行拨号 if nodial, _ := network.GetNoDial(ctx); !nodial { // 这里进行拨号 err := h.Connect(ctx, peer.AddrInfo{ID: p}) if err != nil { return nil, err } } // h.Network() 就是 swrm s, err := h.Network().NewStream(network.WithNoDial(ctx, "already dialed"), p) if err != nil { // TODO: It would be nicer to get the actual error from the swarm, // but this will require some more work. if errors.Is(err, network.ErrNoConn) { return nil, errors.New("connection failed") } return nil, fmt.Errorf("failed to open stream: %w", err) } //... 省略 ...
}
前文提过 h.Network() 就是 swrm
,所以直接看 *swarm.Swarm
中的NewStream
函数的实现。(其实这里你去看 h.Connect
内部逻辑也行,最后都是在调用 s.dialPeer
函数,进行拨号)
-
从下文可知,拨号调用的是
s.dialPeer
函数func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error) {
numDials := 0 for { c := s.bestConnToPeer(p) // 查看 p 对应的已经存在的连接 if c == nil { // 如果不存在 if nodial, _ := network.GetNoDial(ctx); !nodial { // 不允许拨号为 false(也就是允许拨号) numDials++ if numDials > DialAttempts { return nil, errors.New("max dial attempts exceeded") } var err error // 主动拨号 c, err = s.dialPeer(ctx, p) if err != nil { return nil, err } } else { return nil, network.ErrNoConn } } //... 略 ... // 在socket的基础上,创建一个流 str, err := c.NewStream(ctx) if err != nil { if c.conn.IsClosed() { continue } return nil, err } return str, nil // 返回这个流 }
}
进入 s.dialPeer
函数:
-
做前置检查,实际拨号又调用了
s.dsync.Dial(ctx, p)
。s.dsync
的值,在前面NewSwarm
中进行的初始化,不记得可以回看下。func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { log.Debugw("dialing peer", "from", s.local, "to", p) err := p.Validate() // 判断是否为空 字符串 if err != nil { return nil, err }
if p == s.local { // 自己不允许连接自己 return nil, ErrDialToSelf } // 如果存在 peer.ID 对应的 *Conn,就直接返回 conn := s.bestAcceptableConnToPeer(ctx, p) if conn != nil { return conn, nil } // 类似于黑名单 gater,做前置检查 if s.gater != nil && !s.gater.InterceptPeerDial(p) { // 判断是否允许运行连接 到 peer.ID log.Debugf("gater disallowed outbound connection to peer %s", p) return nil, &DialError{Peer: p, Cause: ErrGaterDisallowedConnection} } // apply the DialPeer timeout ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx)) // 60s (拨号超时时间) defer cancel() // 利用 s.dsync 进行拨号, p 为对方的 peer.ID conn, err = s.dsync.Dial(ctx, p) if err == nil { // 再判断一次,是不是要连接的 peer if conn.RemotePeer() != p { conn.Close() log.Errorw("Handshake failed to properly authenticate peer", "authenticated", conn.RemotePeer(), "expected", p) return nil, fmt.Errorf("unexpected peer") } // 拨号成功,返回 conn return conn, nil } log.Debugf("network for %s finished dialing %s", s.local, p) if ctx.Err() != nil { // Context error trumps any dial errors as it was likely the ultimate cause. return nil, ctx.Err() } if s.ctx.Err() != nil { // Ok, so the swarm is shutting down. return nil, ErrSwarmClosed } return nil, err
}
// 如果和peer.ID 存在连接,找到最好的连接 func (s *Swarm) bestConnToPeer(p peer.ID) *Conn {
s.conns.RLock() defer s.conns.RUnlock() var best *Conn // 查询 s.conns.m for _, c := range s.conns.m[p] { // 检查切片下的每个连接 if c.conn.IsClosed() { continue } if best == nil || isBetterConn(c, best) { best = c } } return best
}
// 比较两个连接 func isBetterConn(a, b *Conn) bool { aTransient := a.Stat().Transient // 是否是临时连接 bTransient := b.Stat().Transient if aTransient != bTransient { return !aTransient }
aDirect := isDirectConn(a) // 没经过代理 bDirect := isDirectConn(b) if aDirect != bDirect { return aDirect } a.streams.Lock() aLen := len(a.streams.m) // 已经打开了更多的 *Stream a.streams.Unlock() b.streams.Lock() bLen := len(b.streams.m) b.streams.Unlock() if aLen != bLen { return aLen > bLen } // finally, pick the last connection. return true
}
继续进入s.dsync.Dial
源码:
-
重点在
ds.getActiveDial(p)
函数内部逻辑,针对一个 peer.ID 只有一个 *activeDial 对象用来进行拨号操作 -
- 启动协程
go ds.dialWorker(p, actd.reqch)
消费reqch
通道消息,生成conn
- 启动协程
-
ad.dial(ctx)
负责往reqch
通道中发送消息,并阻塞等待结果func (ds *dialSync) Dial(ctx context.Context, p peer.ID) (*Conn, error) { // 针对一个 peer.ID 只有一个 *activeDial 对象用来进行拨号操作 ad, err := ds.getActiveDial(p) if err != nil { return nil, err }
// 利用 *activeDial,当想要一个 *Conn的时候,就往 *activeDial reqch 通道中发送消息,并阻塞等待 conn的生成 conn, err := ad.dial(ctx) ds.mutex.Lock() defer ds.mutex.Unlock() ad.refCnt-- if ad.refCnt == 0 { // 表示 当 *activeDial 对象不再被使用 if err == nil { ad.cancelCause(errConcurrentDialSuccessful) } else { ad.cancelCause(err) } close(ad.reqch) delete(ds.dials, p) // 删除掉 } return conn, err
}
// 利用 *activeDial,当想要一个 *Conn的时候,就往 *activeDial reqch 通道中发送消息,并阻塞等待 conn的生成 func (ad *activeDial) dial(ctx context.Context) (*Conn, error) { dialCtx := ad.ctx
// 获取ctx中是否要求,强制直接连接 if forceDirect, reason := network.GetForceDirectDial(ctx); forceDirect { dialCtx = network.WithForceDirectDial(dialCtx, reason) } // 同步连接 if simConnect, isClient, reason := network.GetSimultaneousConnect(ctx); simConnect { dialCtx = network.WithSimultaneousConnect(dialCtx, isClient, reason) } // 保存拨号完成后的结果 resch := make(chan dialResponse, 1) select { case ad.reqch <- dialRequest{ctx: dialCtx, resch: resch}: // 发消息给 ad.reqch case <-ctx.Done(): return nil, ctx.Err() } /// 阻塞中.. 等待拨号的完成 select { case res := <-resch: // 检查resch 是否有结果(有,说明拨号成功) return res.conn, res.err case <-ctx.Done(): return nil, ctx.Err() }
}
func (ds *dialSync) getActiveDial(p peer.ID) (*activeDial, error) { ds.mutex.Lock() defer ds.mutex.Unlock()
// 针对一个 peer.ID 只有一个 *activeDial 对象用来进行拨号操作 actd, ok := ds.dials[p] if !ok { ctx, cancel := context.WithCancelCause(context.Background()) // 新建 *activeDial actd = &activeDial{ ctx: ctx, cancelCause: cancel, reqch: make(chan dialRequest), } ///!!! 这里才是真正进行拨号的逻辑,消费 reqch,完成拨号连接 go ds.dialWorker(p, actd.reqch) ds.dials[p] = actd } // 复用原来的 *activeDial ,计数+1 actd.refCnt++ return actd, nil
}
所以接下来看下协程 go ds.dialWorker(p, actd.reqch)
里面怎么消费reqch
消息的。
这里的 ds.dialWorker
是一个函数,而且是在 NewSwarm
中设定的函数s.dialWorkerLoop
(不记得去看下 NewSwarm
)
所以直接查看 s.dialWorkerLoop
的内部逻辑:
-
创建了
*dialWorker
对象 -
调用
w.loop()
函数func (s *Swarm) dialWorkerLoop(p peer.ID, reqch <-chan dialRequest) {
// 创建了 *dialWorker 对象 w := newDialWorker(s, p, reqch, nil) w.loop()
}
func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest, cl Clock) *dialWorker { if cl == nil { cl = RealClock{} } return &dialWorker{ s: s, peer: p, reqch: reqch, pendingRequests: make(map[*pendRequest]struct{}), trackedDials: make(map[string]*addrDial), resch: make(chan tpt.DialUpdate), cl: cl, } }
继续看 w.loop()
内部逻辑:
-
就是一个
loop
死循环 -
包括了3部分逻辑:
-
- 从
w.reqch
队列中获取外部发送来的消息,并判断是否第一次拨号,如果是,将数据保存到db
中,并重置定时器;
- 从
-
- 当定时器超时,从
db
中获取待拨号的地址,调用w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch)
进行实际的拨号,一旦拨号返回结果,保存在w.resch
中
- 当定时器超时,从
-
- 读取
w.resch
是否有结果,如果有结果,最终通过pr.req.resch <- dialResponse{conn: conn}
通道,通知外部已经拨号完成,可以结束阻塞了。不记得的可以看下上面的func (ad *activeDial) dial
中的注释
- 读取
-
总结下,就是数据经过
w.reqch -> db -> w.resch -> pr.req.resch
这一整套的流转func (w *dialWorker) loop() {
startTime := w.cl.Now() // dialTimer 对象 dialTimer := w.cl.InstantTimer(startTime.Add(math.MaxInt64)) defer dialTimer.Stop() timerRunning := true // 重置定时器 scheduleNextDial := func() { if timerRunning && !dialTimer.Stop() { <-dialTimer.Ch() } timerRunning = false if dq.Len() > 0 { if dialsInFlight == 0 && !w.connected { // 如果无拨号,立刻触发定时器 dialTimer.Reset(startTime) } else { // 下一个拨号的延迟触发时间(作为定时器的触发时间) resetTime := startTime.Add(dq.top().Delay) for _, ad := range w.trackedDials { if !ad.expectedTCPUpgradeTime.IsZero() && ad.expectedTCPUpgradeTime.After(resetTime) { resetTime = ad.expectedTCPUpgradeTime } } dialTimer.Reset(resetTime) } timerRunning = true } } totalDials := 0
loop: for {
select { // 获取外部消息 case req, ok := <-w.reqch: if !ok { if w.s.metricsTracer != nil { w.s.metricsTracer.DialCompleted(w.connected, totalDials) } return } // 获取已经存在的连接 c := w.s.bestAcceptableConnToPeer(req.ctx, w.peer) if c != nil { req.resch <- dialResponse{conn: c} continue loop // 继续等待下一个消息 } // 执行到这里,说明 peer.ID 没有已存在的 *Conn // 过滤出可联通的地址 addrs, addrErrs, err := w.s.addrsForDial(req.ctx, w.peer) if err != nil { req.resch <- dialResponse{ err: &DialError{ Peer: w.peer, DialErrors: addrErrs, Cause: err, }} continue loop } // 从拨号器中获取拨号这些地址的延迟时间 simConnect, _, _ := network.GetSimultaneousConnect(req.ctx) addrRanking := w.rankAddrs(addrs, simConnect) // 地址排序 addrDelay := make(map[string]time.Duration, len(addrRanking)) // 创建挂起的请求对象 pr := &pendRequest{ req: req, addrs: make(map[string]struct{}, len(addrRanking)), err: &DialError{Peer: w.peer, DialErrors: addrErrs}, } // 保存 地址 + 地址拨号延迟时间 for _, adelay := range addrRanking { pr.addrs[string(adelay.Addr.Bytes())] = struct{}{} addrDelay[string(adelay.Addr.Bytes())] = adelay.Delay } // Check if dials to any of the addrs have completed already // If they have errored, record the error in pr. If they have succeeded, // respond with the connection. // If they are pending, add them to tojoin. // If we haven't seen any of the addresses before, add them to todial. var todial []ma.Multiaddr var tojoin []*addrDial for _, adelay := range addrRanking { // 判断地址是否已经加入到 todial(保证只拨打一次地址) ad, ok := w.trackedDials[string(adelay.Addr.Bytes())] if !ok { todial = append(todial, adelay.Addr) continue } if ad.conn != nil { // dial to this addr was successful, complete the request req.resch <- dialResponse{conn: ad.conn} continue loop } if ad.err != nil { // dial to this addr errored, accumulate the error pr.err.recordErr(ad.addr, ad.err) delete(pr.addrs, string(ad.addr.Bytes())) continue } // 说明拨打还在进行中... tojoin = append(tojoin, ad) } if len(todial) == 0 && len(tojoin) == 0 { // all request applicable addrs have been dialed, we must have errored pr.err.Cause = ErrAllDialsFailed req.resch <- dialResponse{err: pr.err} continue loop } // The request has some pending or new dials w.pendingRequests[pr] = struct{}{} for _, ad := range tojoin { // 如果还没有拨号过该地址 if !ad.dialed { // we haven't dialed this address. update the ad.ctx to have simultaneous connect values // set correctly if simConnect, isClient, reason := network.GetSimultaneousConnect(req.ctx); simConnect { if simConnect, _, _ := network.GetSimultaneousConnect(ad.ctx); !simConnect { ad.ctx = network.WithSimultaneousConnect(ad.ctx, isClient, reason) // 保存到 dq 中,等待拨号 dq.Add(network.AddrDelay{ Addr: ad.addr, Delay: addrDelay[string(ad.addr.Bytes())], }) } } } // add the request to the addrDial } // 等待拨号 if len(todial) > 0 { now := time.Now() // these are new addresses, track them and add them to dq for _, a := range todial { w.trackedDials[string(a.Bytes())] = &addrDial{ addr: a, ctx: req.ctx, createdAt: now, } // 保存到 dq 中,等待拨号 dq.Add(network.AddrDelay{Addr: a, Delay: addrDelay[string(a.Bytes())]}) } } // 重制定时器时间 scheduleNextDial() case <-dialTimer.Ch(): // It's time to dial the next batch of addresses. // We don't check the delay of the addresses received from the queue here // because if the timer triggered before the delay, it means that all // the inflight dials have errored and we should dial the next batch of // addresses now := time.Now() // 定时器到时间点,从dq队列中获取数据 for _, adelay := range dq.NextBatch() { // spawn the dial ad, ok := w.trackedDials[string(adelay.Addr.Bytes())] if !ok { log.Errorf("SWARM BUG: no entry for address %s in trackedDials", adelay.Addr) continue } ad.dialed = true ad.dialRankingDelay = now.Sub(ad.createdAt) // 启动协程进行拨号,结果保存在 w.resch中 err := w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch) if err != nil { w.dispatchError(ad, err) } else { dialsInFlight++ totalDials++ } } timerRunning = false // 重制定时器时间 scheduleNextDial() case res := <-w.resch: // 又从 w.resch 中获取结果 ad, ok := w.trackedDials[string(res.Addr.Bytes())] if !ok { log.Errorf("SWARM BUG: no entry for address %s in trackedDials", res.Addr) if res.Conn != nil { res.Conn.Close() } dialsInFlight-- continue } // TCP Connection has been established. Wait for connection upgrade on this address // before making new dials. if res.Kind == tpt.UpdateKindHandshakeProgressed { // Only wait for public addresses to complete dialing since private dials // are quick any way if manet.IsPublicAddr(res.Addr) { ad.expectedTCPUpgradeTime = w.cl.Now().Add(PublicTCPDelay) } scheduleNextDial() continue } dialsInFlight-- ad.expectedTCPUpgradeTime = time.Time{} // res.Conn 就是拨号后到有效 连接 if res.Conn != nil { // 这里会将连接保存到 s.conns.m[p] = append(s.conns.m[p], c) 中,下次拨号可以复用连接,不用重复拨号 conn, err := w.s.addConn(res.Conn, network.DirOutbound) if err != nil { // oops no, we failed to add it to the swarm res.Conn.Close() w.dispatchError(ad, err) continue loop } for pr := range w.pendingRequests { if _, ok := pr.addrs[string(ad.addr.Bytes())]; ok { // 这里就是返回最终的结果,返回给外部的函数(外部函数正阻塞在 resch 等待结果...) pr.req.resch <- dialResponse{conn: conn} delete(w.pendingRequests, pr) } } ad.conn = conn if !w.connected { w.connected = true if w.s.metricsTracer != nil { w.s.metricsTracer.DialRankingDelay(ad.dialRankingDelay) } } continue loop } // it must be an error -- add backoff if applicable and dispatch // ErrDialRefusedBlackHole shouldn't end up here, just a safety check if res.Err != ErrDialRefusedBlackHole && res.Err != context.Canceled && !w.connected { // we only add backoff if there has not been a successful connection // for consistency with the old dialer behavior. w.s.backf.AddBackoff(w.peer, res.Addr) } else if res.Err == ErrDialRefusedBlackHole { log.Errorf("SWARM BUG: unexpected ErrDialRefusedBlackHole while dialing peer %s to addr %s", w.peer, res.Addr) } w.dispatchError(ad, res.Err) // Only schedule next dial on error. // If we scheduleNextDial on success, we will end up making one dial more than // required because the final successful dial will spawn one more dial scheduleNextDial() } }
}
此时我们看下真正拨号的函数 w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch)
内部的逻辑:
-
函数的调用链路如下:
dialNextAddr
->limitedDial
->AddDialJob
->addCheckPeerLimit
-> 启动协程go dl.executeDial(dj)
->dl.dialFunc
-
按照这个调用链路可知:(为了提高并发行)拨号其实是单独启动一个协程进行的,并且还对拨号进行了限流,最后实际执行的拨号函数为
dl.dialFunc
-
而
dl.dialFunc
是在dialLimiter
初始化的时候,赋值的s.dialAddr
。(不清楚的看下NewSwarm
函数)func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan transport.DialUpdate) error { // check the dial backoff if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect { if s.backf.Backoff(p, addr) { return ErrDialBackoff } }
// 开始拨号 s.limitedDial(ctx, p, addr, resch) return nil
}
// 开始拨号 func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan transport.DialUpdate) { timeout := s.dialTimeout if manet.IsPrivateAddr(a) && s.dialTimeoutLocal < s.dialTimeout { timeout = s.dialTimeoutLocal }
// 新建拨号任务 s.limiter.AddDialJob(&dialJob{ addr: a, peer: p, resp: resp, ctx: ctx, timeout: timeout, })
}
// 新建拨号任务,本质就是启动一个协程 func (dl *dialLimiter) AddDialJob(dj *dialJob) { dl.lk.Lock() defer dl.lk.Unlock()
log.Debugf("[limiter] adding a dial job through limiter: %v", dj.addr) dl.addCheckPeerLimit(dj)
}
func (dl *dialLimiter) addCheckPeerLimit(dj *dialJob) {
// 这里做了限流,一个 peer 最多只能有 perPeerLimit 个任务在同时执行 if dl.activePerPeer[dj.peer] >= dl.perPeerLimit { log.Debugf("[limiter] blocked dial waiting on peer limit; peer: %s; addr: %s; active: %d; "+ "peer limit: %d; waiting: %d", dj.peer, dj.addr, dl.activePerPeer[dj.peer], dl.perPeerLimit, len(dl.waitingOnPeerLimit[dj.peer])) wlist := dl.waitingOnPeerLimit[dj.peer] dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj) return } dl.activePerPeer[dj.peer]++ // 检查 dl.addCheckFdLimit(dj)
}
func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) { if dl.shouldConsumeFd(dj.addr) { // 总的拨号限制 160 if dl.fdConsuming >= dl.fdLimit { log.Debugf("[limiter] blocked dial waiting on FD token; peer: %s; addr: %s; consuming: %d; "+ "limit: %d; waiting: %d", dj.peer, dj.addr, dl.fdConsuming, dl.fdLimit, len(dl.waitingOnFd)) dl.waitingOnFd = append(dl.waitingOnFd, dj) return }
log.Debugf("[limiter] taking FD token: peer: %s; addr: %s; prev consuming: %d", dj.peer, dj.addr, dl.fdConsuming) // take token dl.fdConsuming++ } log.Debugf("[limiter] executing dial; peer: %s; addr: %s; FD consuming: %d; waiting: %d", dj.peer, dj.addr, dl.fdConsuming, len(dl.waitingOnFd)) // 启动协程,对地址进行拨号,返回的 socket保存在了 dj.resp中 go dl.executeDial(dj)
}
func (dl *dialLimiter) executeDial(j *dialJob) { defer dl.finishedDial(j) if j.cancelled() { return }
dctx, cancel := context.WithTimeout(j.ctx, j.timeout) defer cancel() // 这里拨号 con, err := dl.dialFunc(dctx, j.peer, j.addr, j.resp) kind := transport.UpdateKindDialSuccessful if err != nil { kind = transport.UpdateKindDialFailed } select { // 拨号后的结果通过 resp 返回出去 case j.resp <- transport.DialUpdate{Kind: kind, Conn: con, Addr: j.addr, Err: err}: case <-j.ctx.Done(): if con != nil { con.Close() } }
}
这个就是最最最最最最最最终的拨号函数 dialAddr
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, updCh chan<- transport.DialUpdate) (transport.CapableConn, error) {
// 不能对自己拨号
if s.local == p {
return nil, ErrDialToSelf
}
// Check before we start work
if err := ctx.Err(); err != nil {
log.Debugf("%s swarm not dialing. Context cancelled: %v. %s %s", s.local, err, p, addr)
return nil, err
}
log.Debugf("%s swarm dialing %s %s", s.local, p, addr)
// 获取tcp协议的,传输控制层对象 *Tcptransport
tpt := s.TransportForDialing(addr)
if tpt == nil {
return nil, ErrNoTransport
}
start := time.Now()
var connC transport.CapableConn // 就是一个socket连接
var err error
if du, ok := tpt.(transport.DialUpdater); ok {
connC, err = du.DialWithUpdates(ctx, addr, p, updCh)
} else {
connC, err = tpt.Dial(ctx, addr, p) //对地址拨号
}
// 额外再检查一次,是否是要连接的 peer
if connC.RemotePeer() != p {
connC.Close()
err = fmt.Errorf("BUG in transport %T: tried to dial %s, dialed %s", p, connC.RemotePeer(), tpt)
log.Error(err)
return nil, err
}
// 这里返回这个连接
return connC, nil
}
总结
没啥好总结,按照上面的流程看完,我估计你应该头都要大了。。
我个人觉得值得学习的也就是生产者-消费者模型熟练的运用,其他的设计给我的感觉就是为了整合第三方的库,为了封装而封装出来很多的接口(写的人应该很爽,看的人确实很痛苦)
参考资料:
https://juejin.cn/post/7118036611169779743
https://blog.csdn.net/kk3909/category_10554659.html https://docs.libp2p.io/concepts/introduction/overview/
https://zhuanlan.zhihu.com/p/277593169
https://keenjin.github.io/2021/04/p2p/