pointsub

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2025 License: Apache-2.0 Imports: 16 Imported by: 2

README

PointSub

PointSub 是一个基于 LibP2P 的网络通信库,提供了使用 LibP2P streams 替换 Go 标准网络栈的功能。

主要特性

  • 基于 LibP2P 的流式通信
  • 提供标准的 net.Conn 和 net.Listener 接口实现
  • 支持多路由、NAT 穿透和流复用
  • 使用 Peer ID 进行寻址,无需传统的 host:port 方式
  • 可配置的连接超时、并发控制、资源限制等
  • 支持同一个 Host 同时作为服务端和客户端使用
  • 支持多节点之间的全双工通信
  • 内置连接池管理和自动重试机制
  • 支持消息大小限制和压缩

使用限制

  • LibP2P hosts 不能自己连接自己
  • 客户端不能向自己发送请求
  • 每个 Host 需要唯一的 Peer ID
  • 同一个 Host 上的不同协议需要使用不同的 Protocol ID
  • 消息大小受 MaxBlockSize 限制(默认32MB)

配置选项

服务端配置 (ServerConfig)
  • ReadTimeout:读取超时时间(默认30秒)
  • WriteTimeout:写入超时时间(默认30秒)
  • MaxConcurrentConns:最大并发连接数(默认1000)
  • MaxBlockSize:最大消息块大小(默认32MB)
  • BufferPoolSize:缓冲池大小(默认4KB)
  • CleanupInterval:空闲连接清理间隔(默认5分钟)
  • EnableCompression:是否启用压缩(默认true)
客户端配置 (ClientConfig)
  • ReadTimeout:读取超时时间(默认30秒)
  • WriteTimeout:写入超时时间(默认30秒)
  • ConnectTimeout:连接超时时间(默认5秒)
  • MaxRetries:最大重试次数(默认3次)
  • RetryInterval:重试间隔时间(默认1秒)
  • MaxBlockSize:最大消息块大小(默认32MB)
  • MaxIdleConns:最大空闲连接数(默认100)
  • IdleConnTimeout:空闲连接超时时间(默认5分钟)
  • EnableCompression:是否启用压缩(默认true)

使用方式

服务端使用
  1. 创建 LibP2P Host:
    serverHost, err := libp2p.New()
    if err != nil {
    // 处理错误
    }
    defer serverHost.Close()
  1. 创建服务端实例:
    server, err := pointsub.NewServer(serverHost, pointsub.DefaultServerConfig())
    if err != nil {
    // 处理错误
    }
    defer server.Stop()
  1. 定义消息处理函数:
    handler := func(request []byte) ([]byte, error) {
    // 处理请求并返回响应
    return response, nil
    }
  1. 启动服务:
    protocolID := protocol.ID("/test/1.0.0")
    err = server.Start(protocolID, handler)
    if err != nil {
    // 处理错误
    }
客户端使用
  1. 创建 LibP2P Host:
    clientHost, err := libp2p.New()
    if err != nil {
    // 处理错误
    }
    defer clientHost.Close()
  1. 创建客户端实例:
    client, err := pointsub.NewClient(clientHost, pointsub.DefaultClientConfig())
    if err != nil {
    // 处理错误
    }
    defer client.Close()
  1. 连接到服务端:
    err = clientHost.Connect(context.Background(), serverHost.Peerstore().PeerInfo(serverHost.ID()))
    if err != nil {
    // 处理错误
    }
  1. 发送请求:
    response, err := client.Send(
    context.Background(),
    serverHost.ID(),
    protocolID,
    []byte("请求数据")
    )
    if err != nil {
    // 处理错误
    }

高级特性

多节点通信

PointSub 支持多个节点之间的全双工通信:

  • 每个节点可以同时作为服务端和客户端
  • 支持多对多的通信模式
  • 自动处理连接管理和资源清理
连接管理
  • 自动清理空闲连接
  • 连接池管理
  • 并发连接数限制
  • 自动重试机制
性能优化
  • 内置缓冲池管理
  • 消息大小限制
  • 可配置的压缩选项
  • 高效的内存使用

错误处理

PointSub 提供了完善的错误处理机制:

  • 网络错误自动重试
  • 超时控制
  • 资源限制保护
  • 优雅的错误恢复

监控和调试

连接信息获取

可以通过 Server 的 GetConnectionsInfo 方法获取当前活跃连接的详细信息:

    connInfo := server.GetConnectionsInfo()
    for , info := range connInfo {
    fmt.Printf("远程地址: %s, 最后活跃: %v, 空闲时间: %v\n",
    info.RemoteAddr, info.LastActive, info.IdleTime)
    }

最佳实践

  1. 合理配置超时时间
  2. 根据实际需求调整并发连接数
  3. 适当设置消息大小限制
  4. 在高并发场景下使用连接池
  5. 启用压缩以节省带宽
  6. 定期监控连接状态
  7. 实现适当的错误重试策略

Documentation

Overview

Package PointSub 提供了基于 libp2p 的流式处理功能

Package pointsub 提供点对点订阅功能的实现 该包实现了基于 LibP2P 的点对点通信功能,支持请求-响应模式的消息传递

Package PointSub 提供了基于 libp2p 的流式处理功能

Package PointSub 提供了基于 libp2p 的流式处理功能

Package PointSub 提供了使用 LibP2P streams 替换 Go 标准网络栈的功能

主要功能: - 接收一个 libp2p.Host 参数 - 提供 Dial() 和 Listen() 方法,返回 net.Conn 和 net.Listener 的实现

网络寻址: - 不使用传统的 "host:port" 寻址方式 - 使用 Peer ID 进行寻址 - 使用 libp2p 的 net.Stream 替代原始 TCP 连接 - 支持 LibP2P 的多路由、NAT 穿透和流复用功能

使用限制: - LibP2P hosts 不能自己连接自己 - 同一个 Host 不能同时作为服务端和客户端使用

Package pointsub 提供了点对点订阅功能的实现

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrServerAlreadyStarted = errors.New("服务器已经启动") // 服务器已启动错误
)

错误定义

View Source
var Network = "pointsub"

Network 定义了 PointSub 连接使用的网络类型名称 用于在调用 net.Addr.Network() 时返回 对应的 net.Addr.String() 将返回 peer ID

Functions

func Dial

func Dial(ctx context.Context, h host.Host, pid peer.ID, tag protocol.ID) (net.Conn, error)

Dial 使用给定的主机打开到目标地址的流 参数:

  • ctx: 上下文对象,用于控制操作的生命周期
  • h: libp2p 主机对象
  • pid: 目标对等节点的 ID
  • tag: 协议标识符

返回值:

  • net.Conn: 标准网络连接接口
  • error: 错误信息

func Listen

func Listen(h host.Host, tag protocol.ID) (net.Listener, error)

Listen 提供一个标准的 net.Listener 接口用于接受"连接" 底层使用带有指定协议 ID 标签的 libp2p 流 参数:

  • h: libp2p 主机对象
  • tag: 协议标识符

返回值:

  • net.Listener: 标准网络监听器接口
  • error: 错误信息

func NewLimitedConn

func NewLimitedConn(conn net.Conn, maxSize int) net.Conn

NewLimitedConn 创建一个新的限制大小的连接 参数:

  • conn: 原始连接
  • maxSize: 最大大小限制

返回值:

  • net.Conn: 限制大小的连接

func SetLog added in v0.0.4

func SetLog(filename string, stderr ...bool)

SetLog 设置日志配置 该方法允许自定义日志输出的文件路径和是否输出到标准错误 参数: - filename: 日志文件路径,指定日志输出的目标文件 - stderr: 可选参数,是否同时输出到标准错误,默认为false

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client 定义客户端结构体 该结构体封装了客户端的所有功能和状态

func NewClient

func NewClient(h host.Host, config *ClientConfig) (*Client, error)

NewClient 创建新的客户端实例 该函数初始化一个新的客户端,并启动必要的后台任务 参数: - h: libp2p主机实例,用于网络通信 - config: 客户端配置,如果为nil则使用默认配置 返回值: - *Client: 新创建的客户端实例 - error: 如果配置无效则返回错误

func (*Client) Close

func (c *Client) Close() error

Close 关闭客户端 该方法负责清理客户端资源,确保所有连接被正确关闭 返回值: - error: 关闭过程中的错误

func (*Client) Send

func (c *Client) Send(ctx context.Context, peerID peer.ID, protocolID protocol.ID, request []byte) ([]byte, error)

Send 发送请求并接收响应 该方法实现了完整的请求-响应通信流程,支持重试机制 参数: - ctx: 上下文,用于控制请求的生命周期 - peerID: 目标节点ID,指定请求发送的目标节点 - protocolID: 协议ID,指定使用的通信协议 - request: 请求数据,要发送的数据内容 返回值: - []byte: 响应数据,目标节点返回的响应内容 - error: 发送过程中的错误,如果发生错误则返回对应的错误信息

type ClientConfig

type ClientConfig struct {
	ReadTimeout       time.Duration // 读取超时时间,控制单次读取操作的最大等待时间
	WriteTimeout      time.Duration // 写入超时时间,控制单次写入操作的最大等待时间
	ConnectTimeout    time.Duration // 连接超时时间,控制建立连接的最大等待时间
	MaxRetries        int           // 最大重试次数,发送失败时的最大重试次数
	RetryInterval     time.Duration // 重试间隔时间,两次重试之间的等待时间
	MaxBlockSize      int           // 最大数据块大小,单次传输的最大字节数
	EnableCompression bool          // 是否启用压缩,控制是否对传输数据进行压缩
	MaxIdleConns      int           // 最大空闲连接数,连接池中保持的最大空闲连接数
	IdleConnTimeout   time.Duration // 空闲连接超时时间,空闲连接被清理前的最大存活时间
}

ClientConfig 定义客户端配置参数 包含了客户端运行所需的各项配置选项

func DefaultClientConfig

func DefaultClientConfig() *ClientConfig

DefaultClientConfig 返回默认的客户端配置 该函数返回一个使用推荐默认值的配置实例 返回值: - *ClientConfig: 包含默认配置值的 ClientConfig 实例

type ConnectionInfo

type ConnectionInfo struct {
	RemoteAddr string        // 远程地址
	LastActive time.Time     // 最后活跃时间
	IdleTime   time.Duration // 空闲时间
}

ConnectionInfo 连接信息结构体

type LimitedConn

type LimitedConn struct {
	net.Conn // 底层连接
	// contains filtered or unexported fields
}

LimitedConn 包装了一个带有大小限制的连接

func (*LimitedConn) Read

func (c *LimitedConn) Read(b []byte) (n int, err error)

Read 读取数据 参数:

  • b: 读取缓冲区

返回值:

  • n: 读取的字节数
  • err: 错误信息

func (*LimitedConn) Write

func (c *LimitedConn) Write(b []byte) (n int, err error)

Write 写入数据 参数:

  • b: 要写入的数据

返回值:

  • n: 写入的字节数
  • err: 错误信息

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server 定义服务结构 包含服务器运行所需的各种组件

func NewServer

func NewServer(h host.Host, config *ServerConfig) (*Server, error)

NewServer 创建新的服务器实例 参数:

  • h: libp2p 主机实例
  • config: 服务器配置,如果为nil则使用默认配置

返回值:

  • *Server: 新创建的服务器实例
  • error: 错误信息

func (*Server) GetConnectionsInfo

func (s *Server) GetConnectionsInfo() []ConnectionInfo

GetConnectionsInfo 获取当前活跃连接信息 返回值:

  • []ConnectionInfo: 连接信息列表

func (*Server) Start

func (s *Server) Start(protocolID protocol.ID, handler StreamHandler) error

Start 启动服务器 参数:

  • protocolID: 协议标识符
  • handler: 消息处理函数

返回值:

  • error: 错误信息

func (*Server) Stop

func (s *Server) Stop() error

Stop 停止服务器 返回值:

  • error: 错误信息

type ServerConfig

type ServerConfig struct {
	// 连接相关配置
	ReadTimeout  time.Duration // 读取超时时间,控制单次读取操作的最大时间
	WriteTimeout time.Duration // 写入超时时间,控制单次写入操作的最大时间
	MaxBlockSize int           // 最大消息大小,限制单条消息的最大字节数

	// 并发控制
	MaxConcurrentConns int           // 最大并发连接数,限制同时处理的最大连接数
	ConnectTimeout     time.Duration // 连接建立超时时间,控制连接建立的最大等待时间

	// 资源控制
	MaxRequestsPerConn int           // 每个连接最大请求数,限制单个连接可处理的最大请求数
	IdleTimeout        time.Duration // 空闲连接超时时间,超过此时间的空闲连接将被关闭

	// 内存控制
	BufferPoolSize    int  // 缓冲池大小,控制内存池中单个缓冲区的大小
	EnableCompression bool // 是否启用压缩,控制是否对消息进行压缩处理

	// 连接清理相关配置
	CleanupInterval time.Duration // 清理检查间隔,定期检查并清理空闲连接的时间间隔
	MaxIdleTime     time.Duration // 最大空闲时间,连接允许的最大空闲时间

	// 告警相关配置
	MaxMemoryUsage uint64 // 最大内存使用量,限制服务器的最大内存使用量
}

ServerConfig 定义服务器配置 包含连接、并发、资源、内存和监控等相关配置项

func DefaultServerConfig

func DefaultServerConfig() *ServerConfig

DefaultServerConfig 返回默认的服务器配置 返回值:

  • *ServerConfig: 包含默认值的配置对象

func (*ServerConfig) Validate

func (c *ServerConfig) Validate() error

Validate 验证配置是否有效 返回值:

  • error: 错误信息

type StreamHandler

type StreamHandler func(request []byte) (response []byte, err error)

StreamHandler 定义了处理消息的函数类型 参数:

  • request: 请求消息字节切片

返回值:

  • []byte: 响应消息
  • error: 错误信息

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL