51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

区块链网络库 go-libp2p 源码剖析 Swarm

区块链网络库 go-libp2p 源码剖析 Swarm

go-libp2p的源码很庞杂,而且其中集合了各种第三库 + 项目自身的"故意"封装,看起来还是有一定的难度。本篇文章对Swarm功能流程进行梳理,把我自己的理解尽可能详细的展示出来。

该库代码个人感觉不太适合初学者或者完全没有使用过该库的人,上来就阅读源码会非常吃力。如果你是从事区块链开发,有一定的基础,可以学习下。

简述

libp2p是一个网络堆栈和库,从IPFS(分布式文件存储)项目中模块化出来以供其他工具使用。Libp2p是长期艰苦探索的产物(深入了解互联网的网络堆栈,以及过去大量的peer-to-peer协议)。在过去的15年中,构建大规模的peer-to-peer系统一直是复杂和困难的,libp2p是解决这个问题的一种方法。它是一个"网络堆栈"(一个协议套件)它清晰地分离关注点,并使复杂的应用程序只使用它们绝对需要的协议,而不放弃互操作性和可升级性。是现在很多做区块链应用使用的基础底层网络库。

引子

这段代码来自官方的echo 范例代码 https://github.com/libp2p/go-libp2p/tree/master/examples/echo演示了怎么创建节点和连接其他的节点

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 设定日志级别
	golog.SetAllLoggers(golog.LevelInfo) 

	// 解析命令行参数
	listenF := flag.Int("l", 0, "wait for incoming connections") // 监听端口
	targetF := flag.String("d", "", "target peer to dial") // 作为客户端的时候,被连接的服务端的地址
	insecureF := flag.Bool("insecure", false, "use an unencrypted connection")
	seedF := flag.Int64("seed", 0, "set random seed for id generation")
	flag.Parse()

	if *listenF == 0 {
		log.Fatal("Please provide a port to bind on with -l")
	}

	// 创建 host.Host 接口(本质就是 *BasicHost 结构体)
	ha, err := makeBasicHost(*listenF, *insecureF, *seedF)
	if err != nil {
		log.Fatal(err)
	}


	if *targetF == "" {  
        // hostA 设定流的处理 handler
		startListener(ctx, ha, *listenF, *insecureF)
		// Run until canceled.
		<-ctx.Done()
	} else {
		// hostB,连接对应的服务端 hostA
		runSender(ctx, ha, *targetF)
	}
}

func makeBasicHost(listenPort int, insecure bool, randseed int64) (host.Host, error) {
	var r io.Reader
	if randseed == 0 {
		r = rand.Reader
	} else {
		r = mrand.New(mrand.NewSource(randseed))
	}

	// 生成 RSA 私钥对(可以用来获取 peer.ID)
	priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
	if err != nil {
		return nil, err
	}

    // 设定基础参数
	opts := []libp2p.Option{

		// /ip4/127.0.0.1/tcp/0   如果 listenPort = 0 表示随机端口 , listenPort = 2222 表示固定端口 2222
		libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),
		libp2p.Identity(priv), // 私钥
		libp2p.DisableRelay(),
	}

	if insecure {
		opts = append(opts, libp2p.NoSecurity)
	}

    // 创建对象 *BasicHost
	return libp2p.New(opts...)
}

func startListener(ctx context.Context, ha host.Host, listenPort int, insecure bool) {

    // 完整的地址
	fullAddr := getHostAddress(ha)
	log.Printf("I am %s\n", fullAddr)

	// 在 hostA上设置流处理函数handler  /echo/1.0.0 是用户自定义的协议名 
	ha.SetStreamHandler("/echo/1.0.0", func(s network.Stream) { // s 本质就是  *yamux.Stream
		log.Println("listener received new stream")
		if err := doEcho(s); err != nil {
			log.Println(err)
			s.Reset()
		} else {
			s.Close()
		}
	})

	log.Println("listening for connections")

	if insecure {
		log.Printf("Now run \"./echo -l %d -d %s -insecure\" on a different terminal\n", listenPort+1, fullAddr)
	} else {
		log.Printf("Now run \"./echo -l %d -d %s\" on a different terminal\n", listenPort+1, fullAddr)
	}
}

func getHostAddress(ha host.Host) string {
	//从 ha 中获取 peer.ID 字符串(其实就是一个对公钥进行hash计算出来的一个字符串)
	hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", ha.ID())) // /p2p/xxxx

	// ha 中存在的地址 /ip4/127.0.0.1/tcp/2222
	addr := ha.Addrs()[0]
	// 拼接 /ip4/127.0.0.1/tcp/2222  + /p2p/xxxx
	return addr.Encapsulate(hostAddr).String()
}


func runSender(ctx context.Context, ha host.Host, targetPeer string) {
	
    // 这里是 host B 的完整地址
    fullAddr := getHostAddress(ha)
	log.Printf("I am %s\n", fullAddr)

	// 将 targetPeer =  /ip4/127.0.0.1/tcp/2222/p2p/QmcX4KsBCoqyAfQ1hTVSgAB3RxMNaeRSZf3pAGAcxrKTVc 转成 multiaddr.
	maddr, err := ma.NewMultiaddr(targetPeer) 
	if err != nil {
		log.Println(err)
		return
	}

	// 从 multiaddr 获取 peer ID
	info, err := peer.AddrInfoFromP2pAddr(maddr)
	if err != nil {
		log.Println(err)
		return
	}

    // 将 a peer ID and a targetAddr 键值保存在内存缓存中
    // PermanentAddrTTL 缓存有效期
	ha.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) 

	log.Println("sender opening stream")

    // 从 host B 到 host A的连接中,创建一个流。最终的数据处理应该在 host A上(也就是上面 host A 设定的 /echo/1.0.0 的handler)
	s, err := ha.NewStream(context.Background(), info.ID, "/echo/1.0.0")
	if err != nil {
		log.Println(err)
		return
	}

	log.Println("sender saying hello")
    // 向 hostA 发送数据 Hello, world!
	_, err = s.Write([]byte("Hello, world!\n"))
	if err != nil {
		log.Println(err)
		return
	}

	out, err := io.ReadAll(s)
	if err != nil {
		log.Println(err)
		return
	}

	log.Printf("read reply: %q\n", out)
}

// doEcho reads a line of data a stream and writes it back
func doEcho(s network.Stream) error {
    // host A中处理数据
	buf := bufio.NewReader(s)
	str, err := buf.ReadString('\n')
	if err != nil {
		return err
	}

	log.Printf("read: %s", str)
	_, err = s.Write([]byte(str))
	return err
}

按照如下执行,可以看效果:

# 启动 hostA 
cd examples/echo
go run main.go -l 2222
# 此时 hostA 监听在 2222端口,等待着连接
# I am /ip4/127.0.0.1/tcp/2222/p2p/QmTDQTqY8GcLGeMFK2sm9R1oCy2dEKKvMT6Kigf96fjKQL


# 开启另外一个终端 启动 hostB
# Now run "./echo -l 2223 -d /ip4/127.0.0.1/tcp/2222/p2p/QmTDQTqY8GcLGeMFK2sm9R1oCy2dEKKvMT6Kigf96fjKQL" on a different terminal

cd examples/echo
go run main.go -l 2223 -d /ip4/127.0.0.1/tcp/2222/p2p/QmTDQTqY8GcLGeMFK2sm9R1oCy2dEKKvMT6Kigf96fjKQL

# 此时 hostB 监听在 2223 端口,等待着连接(同时向 hostA 2222 端口,发送了一个消息 Hello, world! )
# I am /ip4/127.0.0.1/tcp/2223/p2p/QmQzxCFuk1Pc8J6tsDChHdaJ3A7GRjxRGfeJ4HZJFTmmpm

这里的 hostA hostB并非绝对的服务端和客户端的关系,他们即可以都是服务端也可以都是客户端,只是这里演示的是 hostA是服务端,hostB作为客户端去连接hostA

源码解析

我们先从 makeBasicHost函数看起:

  • 构建opts []libp2p.Option切片,libp2p.Option本质是函数类型type Option func(cfg *Config) error(开源代码中很多这种设计,通过函数切片的方式进行初始化参数)

  • 最后实际调用的函数为 libp2p.New(opts...)

    func makeBasicHost(listenPort int, insecure bool, randseed int64) (host.Host, error) { //.....省略....

      opts := []libp2p.Option{
      	libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),
      	libp2p.Identity(priv), // 私钥
      	libp2p.DisableRelay(),
      }
    
      if insecure {
      	opts = append(opts, libp2p.NoSecurity)
      }
    
      return libp2p.New(opts...)
    

    }

进入到 libp2p.New函数内部

  • 最终实际调用的是NewWithoutDefaults函数(注意⚠️:这里的FallbackDefaults这个是默认的配置函数)如果opts ...Option给的配置信息比较少,那缺省的会用默认的进行填充。

  • NewWithoutDefaults函数内部,将配置信息保存到var cfg Config

  • 调用 cfg.NewNode()开始进行节点的初始化+启动

    func New(opts ...Option) (host.Host, error) { return NewWithoutDefaults(append(opts, FallbackDefaults)...) // FallbackDefaults 默认的配置 }

    func NewWithoutDefaults(opts ...Option) (host.Host, error) { var cfg Config // 应用配置信息,将信息保存到 cfg 中 if err := cfg.Apply(opts...); err != nil { return nil, err } return cfg.NewNode() }

    // 对用户的配置进行检查,如果某些没有设定,就设定为默认的 var FallbackDefaults Option = func(cfg *Config) error {

      for _, def := range defaults {
      	if !def.fallback(cfg) { // 检查属性是否设定
      		continue
      	}
          // 设定属性
      	if err := cfg.Apply(def.opt); err != nil {
      		return err
      	}
      }
      return nil
    

    }

    // DefaultTransports 默认的传输层包括了 tcp quic websocket var DefaultTransports = ChainOptions( Transport(tcp.NewTCPTransport), Transport(quic.NewTransport), Transport(ws.New), Transport(webtransport.New), )

    // 这就是上面的 defaults var defaults = []struct { fallback func(cfg *Config) bool opt Option }{ { fallback: func(cfg *Config) bool { return cfg.Transports == nil && cfg.ListenAddrs == nil }, // 如果传输层 + 监听端口没有设定,就使用默认的 DefaultListenAddrs(本echo中有设定 ListenAddrs) opt: DefaultListenAddrs, }, { fallback: func(cfg *Config) bool { return cfg.Transports == nil && cfg.PSK == nil }, // 如果传输层没有制定,就使用默认的 DefaultTransports opt: DefaultTransports, // 上面👆 }, { fallback: func(cfg *Config) bool { return cfg.Transports == nil && cfg.PSK != nil }, opt: DefaultPrivateTransports, }, { fallback: func(cfg *Config) bool { return cfg.Muxers == nil }, opt: DefaultMuxers, }, { fallback: func(cfg *Config) bool { return !cfg.Insecure && cfg.SecurityTransports == nil }, opt: DefaultSecurity, }, { fallback: func(cfg *Config) bool { return cfg.PeerKey == nil }, opt: RandomIdentity, }, { fallback: func(cfg *Config) bool { return cfg.Peerstore == nil }, opt: DefaultPeerstore, }, { fallback: func(cfg *Config) bool { return !cfg.RelayCustom }, opt: DefaultEnableRelay, }, { fallback: func(cfg *Config) bool { return cfg.ResourceManager == nil }, opt: DefaultResourceManager, }, { fallback: func(cfg *Config) bool { return cfg.ConnManager == nil }, opt: DefaultConnectionManager, }, { fallback: func(cfg *Config) bool { return cfg.MultiaddrResolver == nil }, opt: DefaultMultiaddrResolver, }, { fallback: func(cfg *Config) bool { return !cfg.DisableMetrics && cfg.PrometheusRegisterer == nil }, opt: DefaultPrometheusRegisterer, }, }

进入 cfg.NewNode()内部: 中间有省略大量的代码,并非不重要(比如路由 + 自动nat + 自动relay,可以参考 https://blog.csdn.net/kk3909/category_10554659.html 研读)

  • cfg.makeSwarm 创建 swrm

  • bhost.NewHost 创建 *BasicHost 对象,同时将 swrm 保存为 type BasicHost struct结构体中的 network network.Network

  • cfg.addTransports(h) 完成依赖检测+初始化(使用了其他的第三方库go.uber.org/fx)。

  • cfg.addTransports(h)函数内部比较重要的逻辑,如下:

    • swrm.transports值的设定 (代码路径 net/swarm/swarm_transport.go 第76行)
    • upgrader 结构体中 muxers []StreamMuxer值的设定(代码路径 net/upgrader/upgrader.go 的第74行 New 函数)
  • h.Network().Listen 开始监听(也就是swrm开启监听,后续看到 h.Network()这个写法,就是代表的 swrm)

  • 最后返回 *BasicHost 对象,作为对 host.Host接口的实现(后续看到host.Host就要当作*BasicHost 对象)

    func (cfg *Config) NewNode() (host.Host, error) { // ...省略...

      // 创建 swarm
      swrm, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
      if err != nil {
      	return nil, err
      }
    
      // 创建 *BasicHost (传入 swrm)
      h, err := bhost.NewHost(swrm, &bhost.HostOpts{
      	EventBus:             eventBus,
      	ConnManager:          cfg.ConnManager,
      	AddrsFactory:         cfg.AddrsFactory,
      	NATManager:           cfg.NATManager,
      	EnablePing:           !cfg.DisablePing,
      	UserAgent:            cfg.UserAgent,
      	ProtocolVersion:      cfg.ProtocolVersion,
      	EnableHolePunching:   cfg.EnableHolePunching,
      	HolePunchingOptions:  cfg.HolePunchingOptions,
      	EnableRelayService:   cfg.EnableRelayService,
      	RelayServiceOpts:     cfg.RelayServiceOpts,
      	EnableMetrics:        !cfg.DisableMetrics,
      	PrometheusRegisterer: cfg.PrometheusRegisterer,
      })
      if err != nil {
      	swrm.Close()
      	return nil, err
      }
    
      // 完成依赖检测+初始化(使用了第三方的库go.uber.org/fx 实现,具体原理是如何,不知道)
    
      /* 比较重要内部逻辑:
    
      swrm.AddTransport(t),将 t 保存到swrm.transports中,让 swrm 知道了有哪些传输层(代码路径 net/swarm/swarm_transport.go 第76行)
    
      fx.Provide(fx.Annotate(tptu.New, fx.ParamTags(`name:"security"`))),
      fx.Supply(cfg.Muxers),
      完成了对 upgrader 结构体中 muxers []StreamMuxer 的设定,代码位于 (net/upgrader/upgrader.go 的第74行 New 函数)
      */
      if err := cfg.addTransports(h); err != nil {
      	h.Close()
      	return nil, err
      }
    
    
      /// Swarm 开启监听 tcp(因为我们的echo只设定tcp的监听地址)
      if err := h.Network().Listen(cfg.ListenAddrs...); err != nil {
      	h.Close()
      	return nil, err
      }
    
      // ... 省略 ...
    
      var ho host.Host
      ho = h
    
      // ... 省略 ...
    
      return ho, nil
    

    }

这里优先看下 cfg.makeSwarm函数内部逻辑(后续还需要回到NewNode()函数看其他的逻辑)

func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {

    // 检查 Peerstore(这个值是在上面 FallbackDefaults 中设定的缺省值,肯定是有值的)
	if cfg.Peerstore == nil {
		return nil, fmt.Errorf("no peerstore specified")
	}

	// Check this early. Prevents us from even *starting* without verifying this.
	if pnet.ForcePrivateNetwork && len(cfg.PSK) == 0 {
		log.Error("tried to create a libp2p node with no Private" +
			" Network Protector but usage of Private Networks" +
			" is forced by the environment")
		// Note: This is *also* checked the upgrader itself, so it'll be
		// enforced even *if* you don't use the libp2p constructor.
		return nil, pnet.ErrNotInPrivateNetwork
	}

    // 检查RSA私钥 (这个值是在上面 FallbackDefaults 中设定的缺省值,肯定是有值的)
	if cfg.PeerKey == nil {
		return nil, fmt.Errorf("no peer key specified")
	}

	// 从公钥中生成 peer ID
	pid, err := peer.IDFromPublicKey(cfg.PeerKey.GetPublic()) // 来源于 public key
	if err != nil {
		return nil, err
	}

    // 在内存中保存 peer ID + 私钥 
	if err := cfg.Peerstore.AddPrivKey(pid, cfg.PeerKey); err != nil {
		return nil, err
	}
    // 在内存中保存 peer ID + 公钥 
	if err := cfg.Peerstore.AddPubKey(pid, cfg.PeerKey.GetPublic()); err != nil {
		return nil, err
	}

	opts := cfg.SwarmOpts
	
	// 创建 *swarm.Swarm 对象
	return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...)
}

peer ID的生成规则额外提一下:peer ID 本质就是一个hash值字符串,最最原始的数据来源于公钥key,经过一系列编码,得到peer IDpeer ID是一个服务实例的唯一标识

再说一个点:因为服务每次重启的时候RandomIdentity,都会重新生成公私钥 ,所以每次的peer ID是不一样的(不理解的可以再看下代码,理解了,说明你确实有认真看代码)

// 这里是默认缺省的设定 cfg.PeerKey的逻辑
var RandomIdentity = func(cfg *Config) error {
	priv, _, err := crypto.GenerateEd25519Key(rand.Reader) // priv 每次重新生成
	if err != nil {
		return err
	}
	return cfg.Apply(Identity(priv)) // 这里设定 cfg.PeerKey 值为 priv
}

var defaults = []struct {
	fallback func(cfg *Config) bool
	opt      Option
}{
	
	{
		fallback: func(cfg *Config) bool { return cfg.PeerKey == nil },
		opt:      RandomIdentity,  // 上面👆的RandomIdentity,FallbackDefaults 中设定缺省值的函数
	},
	
}


// 这里是生成 peer ID 的逻辑

// 这里的 pk 值来源于 cfg.PeerKey.GetPublic()
// 而 cfg.PeerKey 的值,就是在上面👆 RandomIdentity函数中的设定的值 priv
func IDFromPublicKey(pk ic.PubKey) (ID, error) {
	b, err := ic.MarshalPublicKey(pk) // proto编码
	if err != nil {
		return "", err
	}
	var alg uint64 = mh.SHA2_256
	if AdvancedEnableInlining && len(b) <= maxInlineKeyLength {
		alg = mh.IDENTITY
	}

    // 对b字节流,经过hash编码
	hash, _ := mh.Sum(b, alg, -1) 
	return ID(hash), nil
}

// proto编码
func MarshalPublicKey(k PubKey) ([]byte, error) {
	pbmes, err := PublicKeyToProto(k) // 构建 pb.PublicKey 结构体
	if err != nil {
		return nil, err
	}

	return proto.Marshal(pbmes) // proto编码
}

// 构建 pb.PublicKey 结构体
func PublicKeyToProto(k PubKey) (*pb.PublicKey, error) {
	data, err := k.Raw()
	if err != nil {
		return nil, err
	}
	return &pb.PublicKey{
		Type: k.Type().Enum(),
		Data: data,
	}, nil
}

继续进入 swarm.NewSwarm内部:

  • 创建 &Swarm对象

  • 初始化 s.conns.m / s.listeners.m / s.transports.m

  • s.dsync / s.limiter真正用来进行拨号连接的核心逻辑;最终逻辑体现在 s.dialWorkerLoops.dialAddr函数上

    // NewSwarm constructs a Swarm. func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts ...Option) (*Swarm, error) {

      // ... 省略 ...
      ctx, cancel := context.WithCancel(context.Background())
      s := &Swarm{
      	local:            local, // 当前实例的 peer.ID
      	peers:            peers, // 用来做存储用的
      	emitter:          emitter,
      	ctx:              ctx,
      	ctxCancel:        cancel,
      	dialTimeout:      defaultDialTimeout,      // 15s
      	dialTimeoutLocal: defaultDialTimeoutLocal, // 5s
      	maResolver:       madns.DefaultResolver,
      	dialRanker:       DefaultDialRanker, // 地址排序
    
      	// A black hole is a binary property. On a network if UDP dials are blocked or there is
      	// no IPv6 connectivity, all dials will fail. So a low success rate of 5 out 100 dials
      	// is good enough.
      	udpBlackHoleConfig:  blackHoleConfig{Enabled: true, N: 100, MinSuccesses: 5},
      	ipv6BlackHoleConfig: blackHoleConfig{Enabled: true, N: 100, MinSuccesses: 5},
      }
    
      s.conns.m = make(map[peer.ID][]*Conn) // peer.ID 对应的 所有的已建立的连接
      s.listeners.m = make(map[transport.Listener]struct{}) // 传输层的监听句柄
      s.transports.m = make(map[int]transport.Transport) // 预定义的id 对应的传输层实现 (这是在cfg.addTransports(h)中赋值的)
      s.notifs.m = make(map[network.Notifiee]struct{})
      s.directConnNotifs.m = make(map[peer.ID][]chan struct{})
    
      for _, opt := range opts {
      	if err := opt(s); err != nil {
      		return nil, err
      	}
      }
    
    
      //// 这里是当host作为客户端的时候,如何连接服务端的逻辑处理
      ////////////////////////////////////////////////////
      s.dsync = newDialSync(s.dialWorkerLoop) // 拨号协程 s.dialWorkerLoop,死循环检测拨号请求
      s.limiter = newDialLimiter(s.dialAddr) // 真正进行拨号的函数
      s.backf.init(s.ctx)                    // 启动后台任务,清理 peer.ID
      return s, nil
    

    }

这里额外补充下 s.transports.m的赋值逻辑(代码路径 net/swarm/swarm_transport.go 第76行):

  • 本质就是为了在 s.transports.m中记录 k/v

    func (s *Swarm) AddTransport(t transport.Transport) error {

      // 获取 t 传输层对应 id 标识(每个传输层都有预定义的id), tcp 传输层为 6
      protocols := t.Protocols()
      if len(protocols) == 0 {
      	return fmt.Errorf("useless transport handles no protocols: %T", t)
      }
    
      // 加锁
      s.transports.Lock()
      defer s.transports.Unlock()
      if s.transports.m == nil {
      	return ErrSwarmClosed
      }
      var registered []string
      for _, p := range protocols {
          // 看下该标识p(整数类型),在 s.transports.m中是否存在
      	if _, ok := s.transports.m[p]; ok {
      		proto := ma.ProtocolWithCode(p)
      		name := proto.Name
      		if name == "" {
      			name = fmt.Sprintf("unknown (%d)", p)
      		}
      		registered = append(registered, name)
      	}
      }
    
      // 是否重复注册
      if len(registered) > 0 {
      	return fmt.Errorf(
      		"transports already registered for protocol(s): %s",
      		strings.Join(registered, ", "),
      	)
      }
    
      // 否则,将 t 对应的标识p 和 t 保存为 k/v对
      for _, p := range protocols {
      	s.transports.m[p] = t
      }
      return nil
    

    }

这里t.Protocols() 标识信息来源于第三方库 go-multiaddr/protocols.go 中的定义

  • 比如 6 就表示 TCP

  • 比如 4 就表示 IP4

  • 并在 fun init()中 调用 func AddProtocol(p Protocol) error函数,将预定义的信息保存在 var protocolsByName = map[string]Protocol{} var protocolsByCode = map[int]Protocol{}

    // 预定义的整数值 const ( P_IP4 = 4 P_TCP = 6 P_DNS = 53 // 4 or 6 P_DNS4 = 54 P_DNS6 = 55 P_DNSADDR = 56 P_UDP = 273 P_DCCP = 33 P_IP6 = 41 P_IP6ZONE = 42 P_IPCIDR = 43 P_QUIC = 460 P_QUIC_V1 = 461 P_WEBTRANSPORT = 465 P_CERTHASH = 466 P_SCTP = 132 P_CIRCUIT = 290 P_UDT = 301 P_UTP = 302 P_UNIX = 400 P_P2P = 421 P_IPFS = P_P2P // alias for backwards compatibility P_HTTP = 480 P_HTTPS = 443 // deprecated alias for /tls/http P_ONION = 444 // also for backwards compatibility P_ONION3 = 445 P_GARLIC64 = 446 P_GARLIC32 = 447 P_P2P_WEBRTC_DIRECT = 276 // Deprecated. use webrtc-direct instead P_TLS = 448 P_SNI = 449 P_NOISE = 454 P_WS = 477 P_WSS = 478 // deprecated alias for /tls/ws P_PLAINTEXTV2 = 7367777 P_WEBRTC_DIRECT = 280 P_WEBRTC = 281 )

    var ( protoIP4 = Protocol{ Name: "ip4", Code: P_IP4, VCode: CodeToVarint(P_IP4), Size: 32, Path: false, Transcoder: TranscoderIP4, } protoTCP = Protocol{ Name: "tcp", Code: P_TCP, VCode: CodeToVarint(P_TCP), Size: 16, Path: false, Transcoder: TranscoderPort, } protoDNS = Protocol{ Code: P_DNS, Size: LengthPrefixedVarSize, Name: "dns", VCode: CodeToVarint(P_DNS), Transcoder: TranscoderDns, } protoDNS4 = Protocol{ Code: P_DNS4, Size: LengthPrefixedVarSize, Name: "dns4", VCode: CodeToVarint(P_DNS4), Transcoder: TranscoderDns, } protoDNS6 = Protocol{ Code: P_DNS6, Size: LengthPrefixedVarSize, Name: "dns6", VCode: CodeToVarint(P_DNS6), Transcoder: TranscoderDns, } protoDNSADDR = Protocol{ Code: P_DNSADDR, Size: LengthPrefixedVarSize, Name: "dnsaddr", VCode: CodeToVarint(P_DNSADDR), Transcoder: TranscoderDns, } protoUDP = Protocol{ Name: "udp", Code: P_UDP, VCode: CodeToVarint(P_UDP), Size: 16, Path: false, Transcoder: TranscoderPort, } protoDCCP = Protocol{ Name: "dccp", Code: P_DCCP, VCode: CodeToVarint(P_DCCP), Size: 16, Path: false, Transcoder: TranscoderPort, } protoIP6 = Protocol{ Name: "ip6", Code: P_IP6, VCode: CodeToVarint(P_IP6), Size: 128, Transcoder: TranscoderIP6, } protoIPCIDR = Protocol{ Name: "ipcidr", Code: P_IPCIDR, VCode: CodeToVarint(P_IPCIDR), Size: 8, Transcoder: TranscoderIPCIDR, } // these require varint protoIP6ZONE = Protocol{ Name: "ip6zone", Code: P_IP6ZONE, VCode: CodeToVarint(P_IP6ZONE), Size: LengthPrefixedVarSize, Path: false, Transcoder: TranscoderIP6Zone, } protoSCTP = Protocol{ Name: "sctp", Code: P_SCTP, VCode: CodeToVarint(P_SCTP), Size: 16, Transcoder: TranscoderPort, }

      protoCIRCUIT = Protocol{
      	Code:  P_CIRCUIT,
      	Size:  0,
      	Name:  "p2p-circuit",
      	VCode: CodeToVarint(P_CIRCUIT),
      }
    
      protoONION2 = Protocol{
      	Name:       "onion",
      	Code:       P_ONION,
      	VCode:      CodeToVarint(P_ONION),
      	Size:       96,
      	Transcoder: TranscoderOnion,
      }
      protoONION3 = Protocol{
      	Name:       "onion3",
      	Code:       P_ONION3,
      	VCode:      CodeToVarint(P_ONION3),
      	Size:       296,
      	Transcoder: TranscoderOnion3,
      }
      protoGARLIC64 = Protocol{
      	Name:       "garlic64",
      	Code:       P_GARLIC64,
      	VCode:      CodeToVarint(P_GARLIC64),
      	Size:       LengthPrefixedVarSize,
      	Transcoder: TranscoderGarlic64,
      }
      protoGARLIC32 = Protocol{
      	Name:       "garlic32",
      	Code:       P_GARLIC32,
      	VCode:      CodeToVarint(P_GARLIC32),
      	Size:       LengthPrefixedVarSize,
      	Transcoder: TranscoderGarlic32,
      }
      protoUTP = Protocol{
      	Name:  "utp",
      	Code:  P_UTP,
      	VCode: CodeToVarint(P_UTP),
      }
      protoUDT = Protocol{
      	Name:  "udt",
      	Code:  P_UDT,
      	VCode: CodeToVarint(P_UDT),
      }
      protoQUIC = Protocol{
      	Name:  "quic",
      	Code:  P_QUIC,
      	VCode: CodeToVarint(P_QUIC),
      }
      protoQUICV1 = Protocol{
      	Name:  "quic-v1",
      	Code:  P_QUIC_V1,
      	VCode: CodeToVarint(P_QUIC_V1),
      }
      protoWEBTRANSPORT = Protocol{
      	Name:  "webtransport",
      	Code:  P_WEBTRANSPORT,
      	VCode: CodeToVarint(P_WEBTRANSPORT),
      }
      protoCERTHASH = Protocol{
      	Name:       "certhash",
      	Code:       P_CERTHASH,
      	VCode:      CodeToVarint(P_CERTHASH),
      	Size:       LengthPrefixedVarSize,
      	Transcoder: TranscoderCertHash,
      }
      protoHTTP = Protocol{
      	Name:  "http",
      	Code:  P_HTTP,
      	VCode: CodeToVarint(P_HTTP),
      }
      protoHTTPS = Protocol{
      	Name:  "https",
      	Code:  P_HTTPS,
      	VCode: CodeToVarint(P_HTTPS),
      }
      protoP2P = Protocol{
      	Name:       "p2p",
      	Code:       P_P2P,
      	VCode:      CodeToVarint(P_P2P),
      	Size:       LengthPrefixedVarSize,
      	Transcoder: TranscoderP2P,
      }
      protoUNIX = Protocol{
      	Name:       "unix",
      	Code:       P_UNIX,
      	VCode:      CodeToVarint(P_UNIX),
      	Size:       LengthPrefixedVarSize,
      	Path:       true,
      	Transcoder: TranscoderUnix,
      }
      protoP2P_WEBRTC_DIRECT = Protocol{
      	Name:  "p2p-webrtc-direct",
      	Code:  P_P2P_WEBRTC_DIRECT,
      	VCode: CodeToVarint(P_P2P_WEBRTC_DIRECT),
      }
      protoTLS = Protocol{
      	Name:  "tls",
      	Code:  P_TLS,
      	VCode: CodeToVarint(P_TLS),
      }
      protoSNI = Protocol{
      	Name:       "sni",
      	Size:       LengthPrefixedVarSize,
      	Code:       P_SNI,
      	VCode:      CodeToVarint(P_SNI),
      	Transcoder: TranscoderDns,
      }
      protoNOISE = Protocol{
      	Name:  "noise",
      	Code:  P_NOISE,
      	VCode: CodeToVarint(P_NOISE),
      }
      protoPlaintextV2 = Protocol{
      	Name:  "plaintextv2",
      	Code:  P_PLAINTEXTV2,
      	VCode: CodeToVarint(P_PLAINTEXTV2),
      }
      protoWS = Protocol{
      	Name:  "ws",
      	Code:  P_WS,
      	VCode: CodeToVarint(P_WS),
      }
      protoWSS = Protocol{
      	Name:  "wss",
      	Code:  P_WSS,
      	VCode: CodeToVarint(P_WSS),
      }
      protoWebRTCDirect = Protocol{
      	Name:  "webrtc-direct",
      	Code:  P_WEBRTC_DIRECT,
      	VCode: CodeToVarint(P_WEBRTC_DIRECT),
      }
      protoWebRTC = Protocol{
      	Name:  "webrtc",
      	Code:  P_WEBRTC,
      	VCode: CodeToVarint(P_WEBRTC),
      }
    

    )

    // 将预定的信息,调用 AddProtocol 保存到 map中 func init() { for _, p := range []Protocol{ protoIP4, protoTCP, protoDNS, protoDNS4, protoDNS6, protoDNSADDR, protoUDP, protoDCCP, protoIP6, protoIP6ZONE, protoIPCIDR, protoSCTP, protoCIRCUIT, protoONION2, protoONION3, protoGARLIC64, protoGARLIC32, protoUTP, protoUDT, protoQUIC, protoQUICV1, protoWEBTRANSPORT, protoCERTHASH, protoHTTP, protoHTTPS, protoP2P, protoUNIX, protoP2P_WEBRTC_DIRECT, protoTLS, protoSNI, protoNOISE, protoWS, protoWSS, protoPlaintextV2, protoWebRTCDirect, protoWebRTC, } { if err := AddProtocol(p); err != nil { panic(err) } }

      // explicitly set both of these
      protocolsByName["p2p"] = protoP2P
      protocolsByName["ipfs"] = protoP2P
    

    }

到此整个的 cfg.makeSwarm函数,才算完成。当然也仅仅只是初始化了一个 *swarm.Swarm对象而已。真正的监听 or 拨号的逻辑还没有真正的执行。


服务端监听流程

再回到 NewNode函数,直接看 h.Network().Listen(cfg.ListenAddrs...)这块是关于服务端监听+如何处理请求的流程梳理

// network.Network 是接口,h.netword的实际值是 swrm
func (h *BasicHost) Network() network.Network {
	return h.network
}

所以,执行h.Network().Listen(cfg.ListenAddrs...)本质是执行 swrm.Listen(cfg.ListenAddrs...)

// Swarm中的 Listen
func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {

    
	errs := make([]error, len(addrs))
	var succeeded int

    // 遍历在代码开始的时候设定的监听地址(我们只设定了 /ip4/127.0.0.1/tcp/2222 这样的一个地址)
	for i, a := range addrs {
		// 针对每一个地址,开启监听
		if err := s.AddListenAddr(a); err != nil {
			errs[i] = err
		} else {
			succeeded++
		}
	}

    // 监听失败的日志报错
	for i, e := range errs {
		if e != nil {
			log.Warnw("listening failed", "on", addrs[i], "error", errs[i])
		}
	}

	if succeeded == 0 && len(addrs) > 0 {
		return fmt.Errorf("failed to listen on any addresses: %s", errs)
	}

	return nil
}

继续进入 s.AddListenAddr(a)函数查看:

  • 先获取对应协议TransportForListening相对应的传输层对象 *TcpTransport

  • 然后开启监听 tpt.Listen(a)

  • 启动协程,死循环list.Accept()获取客户端的连接c

  • 每个客户端连接c,再启动一个协程,对c进行读写s.addConn(c, network.DirInbound)

    func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {

      // 当前的tcp协议,对应的传输层(本demo获取的传输层应该是 *TcpTransport )
      tpt := s.TransportForListening(a)
      if tpt == nil {
      	// TransportForListening will return nil if either:
      	// 1. No transport has been registered.
      	// 2. We're closed (so we've nulled out the transport map.
      	//
      	// Distinguish between these two cases to avoid confusing users.
      	select {
      	case <-s.ctx.Done():
      		return ErrSwarmClosed
      	default:
      		return ErrNoTransport
      	}
      }
    
      // 这里就是在调用 *TcpTransport.Listen,返回监听对象
      list, err := tpt.Listen(a)
      if err != nil {
      	return err
      }
    
      s.listeners.Lock()
      if s.listeners.m == nil {
      	s.listeners.Unlock()
      	list.Close()
      	return ErrSwarmClosed
      }
      s.refs.Add(1)
      // 保存 list 监听句柄
      s.listeners.m[list] = struct{}{} 
      s.listeners.cacheEOL = time.Time{}
      s.listeners.Unlock()
    
      maddr := list.Multiaddr()
    
      // signal to our notifiees on listen.
      s.notifyAll(func(n network.Notifiee) {
      	n.Listen(s, maddr)
      })
    
    
      // 启动协程
      go func() {
      	defer func() {
      		s.listeners.Lock()
      		_, ok := s.listeners.m[list]
      		if ok {
      			delete(s.listeners.m, list)
      			s.listeners.cacheEOL = time.Time{}
      		}
      		s.listeners.Unlock()
    
      		if ok {
      			list.Close()
      			log.Errorf("swarm listener unintentionally closed")
      		}
    
      		// signal to our notifiees on listen close.
      		s.notifyAll(func(n network.Notifiee) {
      			n.ListenClose(s, maddr)
      		})
      		s.refs.Done()
      	}()
      	for { // 死循环
    
              // 获取一个 客户端连接
      		c, err := list.Accept() // &transportConn
      		if err != nil {
      			if !errors.Is(err, transport.ErrListenerClosed) {
      				log.Errorf("swarm listener for %s accept error: %s", a, err)
      			}
      			return
      		}
      		canonicallog.LogPeerStatus(100, c.RemotePeer(), c.RemoteMultiaddr(), "connection_status", "established", "dir", "inbound")
      		if s.metricsTracer != nil {
      			c = wrapWithMetrics(c, s.metricsTracer, time.Now(), network.DirInbound)
      		}
    
      		log.Debugf("swarm listener accepted connection: %s <-> %s", c.LocalMultiaddr(), c.RemoteMultiaddr())
      		s.refs.Add(1)
    
              // 再启动一个协程
      		go func() {
      			defer s.refs.Done()
    
                  // 单独和该客户端连接进行交互
      			_, err := s.addConn(c, network.DirInbound)
      			switch err {
      			case nil:
      			case ErrSwarmClosed:
      				// ignore.
      				return
      			default:
      				log.Warnw("adding connection failed", "to", a, "error", err)
      				return
      			}
      		}()
      	}
      }()
      return nil
    

    }

    // 获取地址格式,对应的传输层(本demo获取的传输层应该是 *TcpTransport ) func (s *Swarm) TransportForListening(a ma.Multiaddr) transport.Transport { protocols := a.Protocols() if len(protocols) == 0 { return nil }

      s.transports.RLock()
      defer s.transports.RUnlock()
      if len(s.transports.m) == 0 {
      	// make sure we're not just shutting down.
      	if s.transports.m != nil {
      		log.Error("you have no transports configured")
      	}
      	return nil
      }
    
      selected := s.transports.m[protocols[len(protocols)-1].Code]
    
      for _, p := range protocols {
      	transport, ok := s.transports.m[p.Code]
      	if !ok {
      		continue
      	}
      	if transport.Proxy() {
      		selected = transport
      	}
      }
      return selected
    

    }

这里的 *TcpTransport.Listen内部逻辑,有一层额外的设计,如下(感兴趣可以看下):

  • t.maListen(laddr)内部本质就是调用的 net.Listen,获取到最最最最最最原始的网络监听对象

  • 然后利用 t.upgrader.UpgradeListener(t, list) 对原始的监听做了一层升级设计(代码位于 p2p/net/upgrader/upgrader.go 107行)

    • 内部启动了一个协程 go l.handleIncoming(),死循环不断的调用 maconn, err := l.Listener.Accept()获取原始的客户端连接 maconn,然后再调用 conn, err := l.upgrader.Upgrade(ctx, l.transport, maconn, network.DirInbound, "", connScope) 对客户端连接maconn,又进行了一次升级,其实就是返回了一个 &transportConn类型的 conn,最后将 conn对象保存到了 l.incoming <- conn通道中
  • 当外部调用 c, err := list.Accept()获取客户端连接的时候,其实是从 l.incoming中获取的 &transportConn类型的 conn (代码位于 p2p/net/upgrader/listener.go 163行)

    // Listen listens on the given multiaddr. func (t *TcpTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {

      // 这个就是最最最最最最原始的网络监听对象
      list, err := t.maListen(laddr)
      if err != nil {
      	return nil, err
      }
      if t.enableMetrics {
      	list = newTracingListener(&tcpListener{list, 0})
      }
    
      // 进入这里,看更详细的逻辑(这里就不展示代码了)
      return t.upgrader.UpgradeListener(t, list), nil
    

    }

    func (t *TcpTransport) maListen(laddr ma.Multiaddr) (manet.Listener, error) { // maListener if t.UseReuseport() { return t.reuse.Listen(laddr) } return manet.Listen(laddr) }

    func Listen(laddr ma.Multiaddr) (Listener, error) {

      lnet, lnaddr, err := DialArgs(laddr)
      if err != nil {
      	return nil, err
      }
       // 这个就是最最最最最最原始的网络监听对象
      nl, err := net.Listen(lnet, lnaddr)
      if err != nil {
      	return nil, err
      }
    
      return WrapNetListener(nl)
    

    }

    func WrapNetListener(nl net.Listener) (Listener, error) { // maListener if nla, ok := nl.(*netListenerAdapter); ok { return nla.Listener, nil }

      laddr, err := FromNetAddr(nl.Addr())
      if err != nil {
      	return nil, err
      }
    
      return &maListener{
      	Listener: nl,
      	laddr:    laddr,
      }, nil
    

    }


最后我们再来看下 s.addConn中实际是怎么读写数据的

func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, error) {
	
     // ... 省略....

	c := &Conn{
		conn:  tc, // &transportConn
		swarm: s,
		stat:  stat,
		id:    s.nextConnID.Add(1),
	}

    // ... 省略....

    // 这里保存了 和对方peer ID 已建立的连接(为了后续的连接复用)
	s.conns.m[p] = append(s.conns.m[p], c)

	// ... 省略....

    // 当host作为服务端,真正处理逻辑的地方(进行协议匹配,调用 handler函数进行用户的业务逻辑处理)
	c.start()
	return c, nil
}

进入c.start()

  • 这里的 c.conn.AcceptStream() 实际调用的是 var s *yamux.Session 的 AcceptStream() 方法,具体为什么,还是需要看下上面说到的 conn, err := l.upgrader.Upgrade(ctx, l.transport, maconn, network.DirInbound, "", connScope)内部的代码逻辑。关键代码位于 p2p/net/upgrader/upgrader.go 中的183行 muxer, smconn, err := u.setupMuxer(ctx, sconn, isServer, connScope.PeerScope()) 函数内部的逻辑。

  • 这里对流概念进行解释下:比如 hostA 和 hostB 建立了一个 socket连接,在这一个socket连接上,可以同时并发的发送多个不同的stream(简单可以认为每个stream就是一个完整的数据包),对于每个stream感觉上就像自己独享了这个socket连接(实际是复用的一个socket)。到了服务端,基于 stream id 可以将不同的stream分离开来,而并不会混淆在一起。再打个比方:比如一条公路,一般情况下一次过一辆车,利用率是不是太低了。那我把这条公路做点额外的设计,每次都可以让多辆车同时行驶,并且不会"撞"在一起,那对于每辆车来说,感觉这条路就是自己家开的。 这里利用了 github.com/libp2p/go-yamux/v4这个第三方库,实现了 stream的功能。如果你对http2.0有了解过,它里面的stream的实现和这里是类似的。这里是官方的解释 https://docs.libp2p.io/concepts/multiplex/overview/ 可以看下。

  • 最终实际执行业务逻辑的代码位于 h(s),这里的 h的值就是 n.SetStreamHandler(h.newStreamHandler)设置的值 h.newStreamHandler (代码位于 p2p/host/basic/basic_host.go 第 303行 )

    func (c *Conn) start() { go func() { defer c.swarm.refs.Done() defer c.Close()

      	for {
              // 这里我直接说结论,实际调用的是 var s *yamux.Session 的 AcceptStream() 方法,从当前连接中获取一个多路复用流 *yamux.Stream
      		ts, err := c.conn.AcceptStream()
      		if err != nil {
      			return
      		}
    
    
      		c.swarm.refs.Add(1)
      		go func() {
    
      			// 对 yamux.Stream进行了包装,成了 *swarm.Stream
      			s, err := c.addStream(ts, network.DirInbound, scope)
    
      			c.swarm.refs.Done()
    
                  // func (h *BasicHost) newStreamHandler(s network.Stream)
      			if h := c.swarm.StreamHandler(); h != nil {
                      // 这里的 s 是 *swarm.Stream
      				h(s) 
      			}
      			s.completeAcceptStreamGoroutine()
      		}()
      	}
      }()
    

    }

所以直接看下 h.newStreamHandler函数源码

  • 就是从流中读取协议

  • 遍历msm.handlerlock 看能否匹配echo中设定的协议/echo/1.0.0(这里是全文匹配 ),如果相同,返回 h.Handle

  • 最后执行这个 h.Handle函数 handle(protoID, s)

    func (h *BasicHost) newStreamHandler(s network.Stream) {

      // 从 Mux中获取 处理句柄
      protoID, handle, err := h.Mux().Negotiate(s)
    
      // 执行函数
      handle(protoID, s)
    

    }

    const ProtocolID = "/multistream/1.0.0"

    // 从 Mux中获取 处理句柄 func (msm *MultistreamMuxer[T]) Negotiate(rwc io.ReadWriteCloser) (proto T, handler HandlerFunc[T], err error) {

      _ = delimWriteBuffered(rwc, []byte(ProtocolID)) // 写入  ProtocolID
    
      line, err := ReadNextToken[T](rwc)              // 读取客户端发送来的 protocolID
      if err != nil {
      	return "", nil, err
      }
    
      if line != ProtocolID { // 协议的公共前缀头(固定的格式)
      	rwc.Close()
      	return "", nil, ErrIncorrectVersion
      }
    

    loop: for { // 读取并响应命令,直到它们发送一个有效的协议id (就是echo中的 /echo/1.0.0 ) tok, err := ReadNextTokenT // 从 rwc 中读取 if err != nil { return "", nil, err }

      	// 在这里匹配到对应的处理 函数
      	h := msm.findHandler(tok)
      	if h == nil {
      		if err := delimWriteBuffered(rwc, []byte("na")); err != nil {
      			return "", nil, err
      		}
      		continue loop
      	}
    
    
      	_ = delimWriteBuffered(rwc, []byte(tok))
    
      	return tok, h.Handle, nil // 返回 h.Handle
      }
    

    }

    // 匹配的逻辑 func (msm *MultistreamMuxer[T]) findHandler(proto T) *Handler[T] { msm.handlerlock.RLock() defer msm.handlerlock.RUnlock()

      // 遍历 msm.handlers 切片
      for _, h := range msm.handlers {
          // 执行判断函数
      	if h.MatchFunc(proto) {
      		return &h // 返回 handler
      	}
      }
    
      return nil
    

    }

echo范例中,设定协议实际调用ha.SetStreamHandler("/echo/1.0.0" func(){})的函数内部逻辑如下(这就和上面的匹配逻辑对应起来了):

  • msm.handlers 中保存记录

    func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) { // 保存到 msm.handlers 切片中 h.Mux().AddHandler(pid, func(p protocol.ID, rwc io.ReadWriteCloser) error { is := rwc.(network.Stream) handler(is) return nil }) h.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{ Added: []protocol.ID{pid}, }) }

    func (msm *MultistreamMuxer[T]) AddHandler(protocol T, handler HandlerFunc[T]) { // 在msm.handlers 中添加记录 msm.AddHandlerWithFunc(protocol, fulltextMatch(protocol), handler) }

    // 全文匹配 func fulltextMatch[T StringLike](s T) func(T) bool { return func(a T) bool { return a == s } }

    // 在msm.handlers 中添加记录 func (msm *MultistreamMuxer[T]) AddHandlerWithFunc(protocol T, match func(T) bool, handler HandlerFunc[T]) { msm.handlerlock.Lock() defer msm.handlerlock.Unlock()

      // 删除历史的
      msm.removeHandler(protocol)
      // 新增
      msm.handlers = append(msm.handlers, Handler[T]{
      	MatchFunc: match,    // 这里的匹配是全文匹配
      	Handle:    handler,  // 处理函数
      	AddName:   protocol, // 协议(路由)
      })
    

    }

至此服务端的基本流程已经梳理完毕

服务端这个流程其实和普通的http的处理流程大致是类似于的,都是先设定路由+处理函数,当请求来了以后,读取请求中的路由,和代码中设定好的路由进行匹配。

客户端拨号流程

需要大家再回到 echo的源码中这句话 s, err := ha.NewStream(context.Background(), info.ID, "/echo/1.0.0"),这里做的事情就是拨号

  • 拨号的逻辑在 h.Connecth.Network().NewStream都存在。

  • 这里以 s, err := h.Network().NewStream(network.WithNoDial(ctx, "already dialed"), p)函数来进行说明,本质都是调用 s.dialPeer 拨号函数

    func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, error) {

      // GetNoDial 目的在于是否执行拨号的过程,设置了就不走,不设置,就执行拨号
      if nodial, _ := network.GetNoDial(ctx); !nodial { 
    
          // 这里进行拨号 
      	err := h.Connect(ctx, peer.AddrInfo{ID: p})
      	if err != nil {
      		return nil, err
      	}
      }
    
      // h.Network() 就是 swrm
      s, err := h.Network().NewStream(network.WithNoDial(ctx, "already dialed"), p)
      if err != nil {
      	// TODO: It would be nicer to get the actual error from the swarm,
      	// but this will require some more work.
      	if errors.Is(err, network.ErrNoConn) {
      		return nil, errors.New("connection failed")
      	}
      	return nil, fmt.Errorf("failed to open stream: %w", err)
      }
    
      //... 省略 ...
    

    }

前文提过 h.Network() 就是 swrm,所以直接看 *swarm.Swarm中的NewStream函数的实现。(其实这里你去看 h.Connect内部逻辑也行,最后都是在调用 s.dialPeer函数,进行拨号)

  • 从下文可知,拨号调用的是 s.dialPeer函数

    func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error) {

      numDials := 0
      for {
      	c := s.bestConnToPeer(p) // 查看 p 对应的已经存在的连接
      	if c == nil { // 如果不存在
    
      		if nodial, _ := network.GetNoDial(ctx); !nodial { // 不允许拨号为 false(也就是允许拨号)
      			numDials++
      			if numDials > DialAttempts {
      				return nil, errors.New("max dial attempts exceeded")
      			}
    
    
      			var err error
                  // 主动拨号
      			c, err = s.dialPeer(ctx, p) 
      			if err != nil {
      				return nil, err
      			}
      		} else {
      			return nil, network.ErrNoConn
      		}
      	}
    
      	//... 略 ...
    
      	// 在socket的基础上,创建一个流
      	str, err := c.NewStream(ctx)
      	if err != nil {
      		if c.conn.IsClosed() {
      			continue
      		}
      		return nil, err
      	}
      	return str, nil // 返回这个流
      }
    

    }

进入 s.dialPeer函数:

  • 做前置检查,实际拨号又调用了 s.dsync.Dial(ctx, p)s.dsync的值,在前面 NewSwarm中进行的初始化,不记得可以回看下。

    func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { log.Debugw("dialing peer", "from", s.local, "to", p) err := p.Validate() // 判断是否为空 字符串 if err != nil { return nil, err }

      if p == s.local { // 自己不允许连接自己
      	return nil, ErrDialToSelf
      }
    
      // 如果存在 peer.ID 对应的 *Conn,就直接返回
      conn := s.bestAcceptableConnToPeer(ctx, p) 
      if conn != nil {
      	return conn, nil
      }
    
      // 类似于黑名单 gater,做前置检查
      if s.gater != nil && !s.gater.InterceptPeerDial(p) { // 判断是否允许运行连接 到 peer.ID
      	log.Debugf("gater disallowed outbound connection to peer %s", p)
      	return nil, &DialError{Peer: p, Cause: ErrGaterDisallowedConnection}
      }
    
      // apply the DialPeer timeout
      ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx)) // 60s (拨号超时时间)
      defer cancel()
    
      // 利用 s.dsync 进行拨号, p 为对方的 peer.ID
      conn, err = s.dsync.Dial(ctx, p) 
      if err == nil {
          // 再判断一次,是不是要连接的 peer
      	if conn.RemotePeer() != p { 
      		conn.Close()
      		log.Errorw("Handshake failed to properly authenticate peer", "authenticated", conn.RemotePeer(), "expected", p)
      		return nil, fmt.Errorf("unexpected peer")
      	}
          // 拨号成功,返回 conn
      	return conn, nil
      }
    
      log.Debugf("network for %s finished dialing %s", s.local, p)
    
      if ctx.Err() != nil {
      	// Context error trumps any dial errors as it was likely the ultimate cause.
      	return nil, ctx.Err()
      }
    
      if s.ctx.Err() != nil {
      	// Ok, so the swarm is shutting down.
      	return nil, ErrSwarmClosed
      }
    
      return nil, err
    

    }

    // 如果和peer.ID 存在连接,找到最好的连接 func (s *Swarm) bestConnToPeer(p peer.ID) *Conn {

      s.conns.RLock()
      defer s.conns.RUnlock()
    
      var best *Conn
      // 查询 s.conns.m 
      for _, c := range s.conns.m[p] {
          // 检查切片下的每个连接
      	if c.conn.IsClosed() {
    
      		continue
      	}
      	if best == nil || isBetterConn(c, best) {
      		best = c
      	}
      }
      return best
    

    }

    // 比较两个连接 func isBetterConn(a, b *Conn) bool { aTransient := a.Stat().Transient // 是否是临时连接 bTransient := b.Stat().Transient if aTransient != bTransient { return !aTransient }

      aDirect := isDirectConn(a) // 没经过代理
      bDirect := isDirectConn(b)
      if aDirect != bDirect {
      	return aDirect
      }
    
      a.streams.Lock()
      aLen := len(a.streams.m) // 已经打开了更多的 *Stream
      a.streams.Unlock()
    
      b.streams.Lock()
      bLen := len(b.streams.m)
      b.streams.Unlock()
    
      if aLen != bLen {
      	return aLen > bLen
      }
    
      // finally, pick the last connection.
      return true
    

    }

继续进入s.dsync.Dial源码:

  • 重点在 ds.getActiveDial(p)函数内部逻辑,针对一个 peer.ID 只有一个 *activeDial 对象用来进行拨号操作

    • 启动协程go ds.dialWorker(p, actd.reqch)消费 reqch 通道消息,生成 conn
  • ad.dial(ctx)负责往 reqch 通道中发送消息,并阻塞等待结果

    func (ds *dialSync) Dial(ctx context.Context, p peer.ID) (*Conn, error) { // 针对一个 peer.ID 只有一个 *activeDial 对象用来进行拨号操作 ad, err := ds.getActiveDial(p) if err != nil { return nil, err }

      // 利用 *activeDial,当想要一个 *Conn的时候,就往 *activeDial reqch 通道中发送消息,并阻塞等待 conn的生成
      conn, err := ad.dial(ctx)
    
      ds.mutex.Lock()
      defer ds.mutex.Unlock()
    
      ad.refCnt--
      if ad.refCnt == 0 { // 表示 当 *activeDial 对象不再被使用
      	if err == nil {
      		ad.cancelCause(errConcurrentDialSuccessful)
      	} else {
      		ad.cancelCause(err)
      	}
      	close(ad.reqch)
      	delete(ds.dials, p) // 删除掉
      }
    
      return conn, err
    

    }

    // 利用 *activeDial,当想要一个 *Conn的时候,就往 *activeDial reqch 通道中发送消息,并阻塞等待 conn的生成 func (ad *activeDial) dial(ctx context.Context) (*Conn, error) { dialCtx := ad.ctx

      // 获取ctx中是否要求,强制直接连接
      if forceDirect, reason := network.GetForceDirectDial(ctx); forceDirect {
      	dialCtx = network.WithForceDirectDial(dialCtx, reason)
      }
      // 同步连接
      if simConnect, isClient, reason := network.GetSimultaneousConnect(ctx); simConnect {
      	dialCtx = network.WithSimultaneousConnect(dialCtx, isClient, reason)
      }
    
      // 保存拨号完成后的结果
      resch := make(chan dialResponse, 1) 
      select {
      case ad.reqch <- dialRequest{ctx: dialCtx, resch: resch}: // 发消息给 ad.reqch
      case <-ctx.Done():
      	return nil, ctx.Err()
      }
    
      /// 阻塞中.. 等待拨号的完成
      select {
      case res := <-resch: // 检查resch 是否有结果(有,说明拨号成功)
      	return res.conn, res.err
      case <-ctx.Done():
      	return nil, ctx.Err()
      }
    

    }

    func (ds *dialSync) getActiveDial(p peer.ID) (*activeDial, error) { ds.mutex.Lock() defer ds.mutex.Unlock()

      // 针对一个 peer.ID 只有一个 *activeDial 对象用来进行拨号操作
      actd, ok := ds.dials[p] 
      if !ok {
      	ctx, cancel := context.WithCancelCause(context.Background())
          // 新建 *activeDial
      	actd = &activeDial{
      		ctx:         ctx,
      		cancelCause: cancel,
      		reqch:       make(chan dialRequest),
      	}
    
          ///!!! 这里才是真正进行拨号的逻辑,消费 reqch,完成拨号连接
      	go ds.dialWorker(p, actd.reqch) 
      	ds.dials[p] = actd
      }
      // 复用原来的 *activeDial ,计数+1
      actd.refCnt++
      return actd, nil
    

    }

所以接下来看下协程 go ds.dialWorker(p, actd.reqch)里面怎么消费reqch消息的。

这里的 ds.dialWorker 是一个函数,而且是在 NewSwarm中设定的函数s.dialWorkerLoop(不记得去看下 NewSwarm)

所以直接查看 s.dialWorkerLoop的内部逻辑:

  • 创建了 *dialWorker 对象

  • 调用 w.loop()函数

    func (s *Swarm) dialWorkerLoop(p peer.ID, reqch <-chan dialRequest) {

      // 创建了 *dialWorker 对象
      w := newDialWorker(s, p, reqch, nil)
      w.loop()
    

    }

    func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest, cl Clock) *dialWorker { if cl == nil { cl = RealClock{} } return &dialWorker{ s: s, peer: p, reqch: reqch, pendingRequests: make(map[*pendRequest]struct{}), trackedDials: make(map[string]*addrDial), resch: make(chan tpt.DialUpdate), cl: cl, } }

继续看 w.loop()内部逻辑:

  • 就是一个 loop 死循环

  • 包括了3部分逻辑:

    • w.reqch队列中获取外部发送来的消息,并判断是否第一次拨号,如果是,将数据保存到 db中,并重置定时器;
    • 当定时器超时,从 db中获取待拨号的地址,调用 w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch)进行实际的拨号,一旦拨号返回结果,保存在 w.resch
    • 读取 w.resch是否有结果,如果有结果,最终通过 pr.req.resch <- dialResponse{conn: conn}通道,通知外部已经拨号完成,可以结束阻塞了。不记得的可以看下上面的 func (ad *activeDial) dial中的注释
  • 总结下,就是数据经过 w.reqch -> db -> w.resch -> pr.req.resch 这一整套的流转

    func (w *dialWorker) loop() {

      startTime := w.cl.Now()
      // dialTimer 对象
      dialTimer := w.cl.InstantTimer(startTime.Add(math.MaxInt64))
      defer dialTimer.Stop()
    
      timerRunning := true
    
      // 重置定时器
      scheduleNextDial := func() {
      	if timerRunning && !dialTimer.Stop() {
      		<-dialTimer.Ch()
      	}
      	timerRunning = false
      	if dq.Len() > 0 {
      		if dialsInFlight == 0 && !w.connected {
      			// 如果无拨号,立刻触发定时器
      			dialTimer.Reset(startTime)
      		} else {
    
                  // 下一个拨号的延迟触发时间(作为定时器的触发时间)
      			resetTime := startTime.Add(dq.top().Delay) 
      			for _, ad := range w.trackedDials {
      				if !ad.expectedTCPUpgradeTime.IsZero() && ad.expectedTCPUpgradeTime.After(resetTime) {
      					resetTime = ad.expectedTCPUpgradeTime
      				}
      			}
      			dialTimer.Reset(resetTime)
      		}
      		timerRunning = true
      	}
      }
    
      totalDials := 0
    

    loop: for {

      	select {
              // 获取外部消息
      	case req, ok := <-w.reqch:
      		if !ok {
      			if w.s.metricsTracer != nil {
      				w.s.metricsTracer.DialCompleted(w.connected, totalDials)
      			}
      			return
      		}
    
              // 获取已经存在的连接
      		c := w.s.bestAcceptableConnToPeer(req.ctx, w.peer) 
      		if c != nil {
      			req.resch <- dialResponse{conn: c}
      			continue loop // 继续等待下一个消息
      		}
    
      		// 执行到这里,说明 peer.ID 没有已存在的 *Conn
              // 过滤出可联通的地址
      		addrs, addrErrs, err := w.s.addrsForDial(req.ctx, w.peer)
      		if err != nil {
      			req.resch <- dialResponse{
      				err: &DialError{
      					Peer:       w.peer,
      					DialErrors: addrErrs,
      					Cause:      err,
      				}}
      			continue loop
      		}
    
      		// 从拨号器中获取拨号这些地址的延迟时间
      		simConnect, _, _ := network.GetSimultaneousConnect(req.ctx)
      		addrRanking := w.rankAddrs(addrs, simConnect) // 地址排序
      		addrDelay := make(map[string]time.Duration, len(addrRanking))
    
      		// 创建挂起的请求对象
      		pr := &pendRequest{
      			req:   req,
      			addrs: make(map[string]struct{}, len(addrRanking)),
      			err:   &DialError{Peer: w.peer, DialErrors: addrErrs},
      		}
              // 保存 地址 + 地址拨号延迟时间
      		for _, adelay := range addrRanking {
      			pr.addrs[string(adelay.Addr.Bytes())] = struct{}{}
      			addrDelay[string(adelay.Addr.Bytes())] = adelay.Delay
      		}
    
      		// Check if dials to any of the addrs have completed already
      		// If they have errored, record the error in pr. If they have succeeded,
      		// respond with the connection.
      		// If they are pending, add them to tojoin.
      		// If we haven't seen any of the addresses before, add them to todial.
      		var todial []ma.Multiaddr
      		var tojoin []*addrDial
    
      		for _, adelay := range addrRanking {
                  // 判断地址是否已经加入到 todial(保证只拨打一次地址)
      			ad, ok := w.trackedDials[string(adelay.Addr.Bytes())]
      			if !ok {
      				todial = append(todial, adelay.Addr)
      				continue
      			}
    
      			if ad.conn != nil {
      				// dial to this addr was successful, complete the request
      				req.resch <- dialResponse{conn: ad.conn}
      				continue loop
      			}
    
      			if ad.err != nil {
      				// dial to this addr errored, accumulate the error
      				pr.err.recordErr(ad.addr, ad.err)
      				delete(pr.addrs, string(ad.addr.Bytes()))
      				continue
      			}
    
                  // 说明拨打还在进行中...
      			tojoin = append(tojoin, ad)
      		}
    
      		if len(todial) == 0 && len(tojoin) == 0 {
      			// all request applicable addrs have been dialed, we must have errored
      			pr.err.Cause = ErrAllDialsFailed
      			req.resch <- dialResponse{err: pr.err}
      			continue loop
      		}
    
      		// The request has some pending or new dials
      		w.pendingRequests[pr] = struct{}{}
    
      		for _, ad := range tojoin {
                  // 如果还没有拨号过该地址
      			if !ad.dialed {
      				// we haven't dialed this address. update the ad.ctx to have simultaneous connect values
      				// set correctly
      				if simConnect, isClient, reason := network.GetSimultaneousConnect(req.ctx); simConnect {
      					if simConnect, _, _ := network.GetSimultaneousConnect(ad.ctx); !simConnect {
      						ad.ctx = network.WithSimultaneousConnect(ad.ctx, isClient, reason)
      						// 保存到 dq 中,等待拨号
      						dq.Add(network.AddrDelay{
      							Addr:  ad.addr,
      							Delay: addrDelay[string(ad.addr.Bytes())],
      						})
      					}
      				}
      			}
      			// add the request to the addrDial
      		}
    
              // 等待拨号
      		if len(todial) > 0 {
      			now := time.Now()
      			// these are new addresses, track them and add them to dq
      			for _, a := range todial {
      				w.trackedDials[string(a.Bytes())] = &addrDial{
      					addr:      a,
      					ctx:       req.ctx,
      					createdAt: now,
      				}
                     // 保存到 dq 中,等待拨号
      				dq.Add(network.AddrDelay{Addr: a, Delay: addrDelay[string(a.Bytes())]}) 
      			}
      		}
      		// 重制定时器时间
      		scheduleNextDial()
    
      	case <-dialTimer.Ch():
      		// It's time to dial the next batch of addresses.
      		// We don't check the delay of the addresses received from the queue here
      		// because if the timer triggered before the delay, it means that all
      		// the inflight dials have errored and we should dial the next batch of
      		// addresses
      		now := time.Now()
    
              // 定时器到时间点,从dq队列中获取数据
      		for _, adelay := range dq.NextBatch() { 
      			// spawn the dial
      			ad, ok := w.trackedDials[string(adelay.Addr.Bytes())]
      			if !ok {
      				log.Errorf("SWARM BUG: no entry for address %s in trackedDials", adelay.Addr)
      				continue
      			}
      			ad.dialed = true
      			ad.dialRankingDelay = now.Sub(ad.createdAt)
    
      			// 启动协程进行拨号,结果保存在 w.resch中
      			err := w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch)
      			if err != nil {
      				w.dispatchError(ad, err)
      			} else {
      				dialsInFlight++
      				totalDials++
      			}
      		}
      		timerRunning = false
      		// 重制定时器时间
      		scheduleNextDial()
    
      	case res := <-w.resch: // 又从 w.resch 中获取结果
    
      		ad, ok := w.trackedDials[string(res.Addr.Bytes())]
      		if !ok {
      			log.Errorf("SWARM BUG: no entry for address %s in trackedDials", res.Addr)
      			if res.Conn != nil {
      				res.Conn.Close()
      			}
      			dialsInFlight--
      			continue
      		}
    
      		// TCP Connection has been established. Wait for connection upgrade on this address
      		// before making new dials.
      		if res.Kind == tpt.UpdateKindHandshakeProgressed {
      			// Only wait for public addresses to complete dialing since private dials
      			// are quick any way
      			if manet.IsPublicAddr(res.Addr) {
      				ad.expectedTCPUpgradeTime = w.cl.Now().Add(PublicTCPDelay)
      			}
      			scheduleNextDial()
      			continue
      		}
      		dialsInFlight--
      		ad.expectedTCPUpgradeTime = time.Time{}
    
              // res.Conn 就是拨号后到有效 连接
      		if res.Conn != nil {
      			// 这里会将连接保存到 s.conns.m[p] = append(s.conns.m[p], c) 中,下次拨号可以复用连接,不用重复拨号
      			conn, err := w.s.addConn(res.Conn, network.DirOutbound)
      			if err != nil {
      				// oops no, we failed to add it to the swarm
      				res.Conn.Close()
      				w.dispatchError(ad, err)
      				continue loop
      			}
    
      			for pr := range w.pendingRequests {
      				if _, ok := pr.addrs[string(ad.addr.Bytes())]; ok {
    
                          // 这里就是返回最终的结果,返回给外部的函数(外部函数正阻塞在 resch 等待结果...)
      					pr.req.resch <- dialResponse{conn: conn}
      					delete(w.pendingRequests, pr)
      				}
      			}
    
    
    
      			ad.conn = conn 
      			if !w.connected {
      				w.connected = true
      				if w.s.metricsTracer != nil {
      					w.s.metricsTracer.DialRankingDelay(ad.dialRankingDelay)
      				}
      			}
    
      			continue loop
      		}
    
      		// it must be an error -- add backoff if applicable and dispatch
      		// ErrDialRefusedBlackHole shouldn't end up here, just a safety check
      		if res.Err != ErrDialRefusedBlackHole && res.Err != context.Canceled && !w.connected {
      			// we only add backoff if there has not been a successful connection
      			// for consistency with the old dialer behavior.
      			w.s.backf.AddBackoff(w.peer, res.Addr)
      		} else if res.Err == ErrDialRefusedBlackHole {
      			log.Errorf("SWARM BUG: unexpected ErrDialRefusedBlackHole while dialing peer %s to addr %s",
      				w.peer, res.Addr)
      		}
    
      		w.dispatchError(ad, res.Err)
      		// Only schedule next dial on error.
      		// If we scheduleNextDial on success, we will end up making one dial more than
      		// required because the final successful dial will spawn one more dial
      		scheduleNextDial()
      	}
      }
    

    }

此时我们看下真正拨号的函数 w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch)内部的逻辑:

  • 函数的调用链路如下: dialNextAddr -> limitedDial -> AddDialJob -> addCheckPeerLimit -> 启动协程go dl.executeDial(dj) -> dl.dialFunc

  • 按照这个调用链路可知:(为了提高并发行)拨号其实是单独启动一个协程进行的,并且还对拨号进行了限流,最后实际执行的拨号函数为dl.dialFunc

  • dl.dialFunc 是在 dialLimiter初始化的时候,赋值的 s.dialAddr。(不清楚的看下 NewSwarm函数)

    func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan transport.DialUpdate) error { // check the dial backoff if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect { if s.backf.Backoff(p, addr) { return ErrDialBackoff } }

      // 开始拨号
      s.limitedDial(ctx, p, addr, resch)
      return nil
    

    }

    // 开始拨号 func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan transport.DialUpdate) { timeout := s.dialTimeout if manet.IsPrivateAddr(a) && s.dialTimeoutLocal < s.dialTimeout { timeout = s.dialTimeoutLocal }

      // 新建拨号任务
      s.limiter.AddDialJob(&dialJob{
      	addr:    a,
      	peer:    p,
      	resp:    resp,
      	ctx:     ctx,
      	timeout: timeout,
      })
    

    }

    // 新建拨号任务,本质就是启动一个协程 func (dl *dialLimiter) AddDialJob(dj *dialJob) { dl.lk.Lock() defer dl.lk.Unlock()

      log.Debugf("[limiter] adding a dial job through limiter: %v", dj.addr)
      dl.addCheckPeerLimit(dj)
    

    }

    func (dl *dialLimiter) addCheckPeerLimit(dj *dialJob) {

      // 这里做了限流,一个 peer 最多只能有 perPeerLimit 个任务在同时执行
      if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
      	log.Debugf("[limiter] blocked dial waiting on peer limit; peer: %s; addr: %s; active: %d; "+
      		"peer limit: %d; waiting: %d", dj.peer, dj.addr, dl.activePerPeer[dj.peer], dl.perPeerLimit,
      		len(dl.waitingOnPeerLimit[dj.peer]))
      	wlist := dl.waitingOnPeerLimit[dj.peer]
      	dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
      	return
      }
      dl.activePerPeer[dj.peer]++
    
      // 检查
      dl.addCheckFdLimit(dj)
    

    }

    func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) { if dl.shouldConsumeFd(dj.addr) { // 总的拨号限制 160 if dl.fdConsuming >= dl.fdLimit { log.Debugf("[limiter] blocked dial waiting on FD token; peer: %s; addr: %s; consuming: %d; "+ "limit: %d; waiting: %d", dj.peer, dj.addr, dl.fdConsuming, dl.fdLimit, len(dl.waitingOnFd)) dl.waitingOnFd = append(dl.waitingOnFd, dj) return }

      	log.Debugf("[limiter] taking FD token: peer: %s; addr: %s; prev consuming: %d",
      		dj.peer, dj.addr, dl.fdConsuming)
      	// take token
      	dl.fdConsuming++
      }
    
      log.Debugf("[limiter] executing dial; peer: %s; addr: %s; FD consuming: %d; waiting: %d",
      	dj.peer, dj.addr, dl.fdConsuming, len(dl.waitingOnFd))
    
      // 启动协程,对地址进行拨号,返回的 socket保存在了 dj.resp中
      go dl.executeDial(dj)
    

    }

    func (dl *dialLimiter) executeDial(j *dialJob) { defer dl.finishedDial(j) if j.cancelled() { return }

      dctx, cancel := context.WithTimeout(j.ctx, j.timeout)
      defer cancel()
    
      // 这里拨号
      con, err := dl.dialFunc(dctx, j.peer, j.addr, j.resp)
      kind := transport.UpdateKindDialSuccessful
      if err != nil {
      	kind = transport.UpdateKindDialFailed
      }
      select {
          // 拨号后的结果通过 resp 返回出去
      case j.resp <- transport.DialUpdate{Kind: kind, Conn: con, Addr: j.addr, Err: err}:
      case <-j.ctx.Done():
      	if con != nil {
      		con.Close()
      	}
      }
    

    }

这个就是最最最最最最最最终的拨号函数 dialAddr

func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, updCh chan<- transport.DialUpdate) (transport.CapableConn, error) {
	// 不能对自己拨号
	if s.local == p {
		return nil, ErrDialToSelf 
	}
	// Check before we start work
	if err := ctx.Err(); err != nil {
		log.Debugf("%s swarm not dialing. Context cancelled: %v. %s %s", s.local, err, p, addr)
		return nil, err
	}
	log.Debugf("%s swarm dialing %s %s", s.local, p, addr)

    // 获取tcp协议的,传输控制层对象 *Tcptransport
	tpt := s.TransportForDialing(addr) 
	if tpt == nil {
		return nil, ErrNoTransport
	}

	start := time.Now()
	var connC transport.CapableConn // 就是一个socket连接
	var err error
	if du, ok := tpt.(transport.DialUpdater); ok {
		connC, err = du.DialWithUpdates(ctx, addr, p, updCh)
	} else {
		connC, err = tpt.Dial(ctx, addr, p) //对地址拨号
	}

	

	// 额外再检查一次,是否是要连接的 peer
	if connC.RemotePeer() != p { 
		connC.Close()
		err = fmt.Errorf("BUG in transport %T: tried to dial %s, dialed %s", p, connC.RemotePeer(), tpt)
		log.Error(err)
		return nil, err
	}

	// 这里返回这个连接
	return connC, nil
}

总结

没啥好总结,按照上面的流程看完,我估计你应该头都要大了。。

我个人觉得值得学习的也就是生产者-消费者模型熟练的运用,其他的设计给我的感觉就是为了整合第三方的库,为了封装而封装出来很多的接口(写的人应该很爽,看的人确实很痛苦)

参考资料:

https://juejin.cn/post/7118036611169779743

https://blog.csdn.net/kk3909/category_10554659.html https://docs.libp2p.io/concepts/introduction/overview/

https://zhuanlan.zhihu.com/p/277593169

https://keenjin.github.io/2021/04/p2p/


赞(0)
未经允许不得转载:工具盒子 » 区块链网络库 go-libp2p 源码剖析 Swarm