pointsub

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2024 License: Apache-2.0 Imports: 17 Imported by: 2

README

PointSub

PointSub 是一个基于 LibP2P 的网络通信库,提供了使用 LibP2P streams 替换 Go 标准网络栈的功能。

主要特性

  • 基于 LibP2P 的流式通信
  • 提供标准的 net.Conn 和 net.Listener 接口实现
  • 支持多路由、NAT 穿透和流复用
  • 使用 Peer ID 进行寻址,无需传统的 host:port 方式
  • 可配置的连接超时、并发控制、资源限制等
  • 支持同一个 Host 同时作为服务端和客户端使用
  • 支持多节点之间的全双工通信
  • 内置连接池管理和自动重试机制
  • 支持消息大小限制和压缩

使用限制

  • LibP2P hosts 不能自己连接自己
  • 每个 Host 需要唯一的 Peer ID
  • 同一个 Host 上的不同协议需要使用不同的 Protocol ID
  • 消息大小受 MaxBlockSize 限制(默认32MB)

配置选项

服务端配置 (ServerConfig)
  • ReadTimeout:读取超时时间(默认30秒)
  • WriteTimeout:写入超时时间(默认30秒)
  • MaxConcurrentConns:最大并发连接数(默认1000)
  • MaxBlockSize:最大消息块大小(默认32MB)
  • BufferPoolSize:缓冲池大小(默认4KB)
  • CleanupInterval:空闲连接清理间隔(默认5分钟)
  • EnableCompression:是否启用压缩(默认true)
客户端配置 (ClientConfig)
  • ReadTimeout:读取超时时间(默认30秒)
  • WriteTimeout:写入超时时间(默认30秒)
  • ConnectTimeout:连接超时时间(默认5秒)
  • MaxRetries:最大重试次数(默认3次)
  • RetryInterval:重试间隔时间(默认1秒)
  • MaxBlockSize:最大消息块大小(默认32MB)
  • MaxIdleConns:最大空闲连接数(默认100)
  • IdleConnTimeout:空闲连接超时时间(默认5分钟)
  • EnableCompression:是否启用压缩(默认true)

使用方式

服务端使用
  1. 创建 LibP2P Host:
    serverHost, err := libp2p.New()
    if err != nil {
    // 处理错误
    }
    defer serverHost.Close()
  1. 创建服务端实例:
    server, err := pointsub.NewServer(serverHost, pointsub.DefaultServerConfig())
    if err != nil {
    // 处理错误
    }
    defer server.Stop()
  1. 定义消息处理函数:
    handler := func(request []byte) ([]byte, error) {
    // 处理请求并返回响应
    return response, nil
    }
  1. 启动服务:
    protocolID := protocol.ID("/test/1.0.0")
    err = server.Start(protocolID, handler)
    if err != nil {
    // 处理错误
    }
客户端使用
  1. 创建 LibP2P Host:
    clientHost, err := libp2p.New()
    if err != nil {
    // 处理错误
    }
    defer clientHost.Close()
  1. 创建客户端实例:
    client, err := pointsub.NewClient(clientHost, pointsub.DefaultClientConfig())
    if err != nil {
    // 处理错误
    }
    defer client.Close()
  1. 连接到服务端:
    err = clientHost.Connect(context.Background(), serverHost.Peerstore().PeerInfo(serverHost.ID()))
    if err != nil {
    // 处理错误
    }
  1. 发送请求:
    response, err := client.Send(
    context.Background(),
    serverHost.ID(),
    protocolID,
    []byte("请求数据")
    )
    if err != nil {
    // 处理错误
    }

高级特性

多节点通信

PointSub 支持多个节点之间的全双工通信:

  • 每个节点可以同时作为服务端和客户端
  • 支持多对多的通信模式
  • 自动处理连接管理和资源清理
连接管理
  • 自动清理空闲连接
  • 连接池管理
  • 并发连接数限制
  • 自动重试机制
性能优化
  • 内置缓冲池管理
  • 消息大小限制
  • 可配置的压缩选项
  • 高效的内存使用

错误处理

PointSub 提供了完善的错误处理机制:

  • 网络错误自动重试
  • 超时控制
  • 资源限制保护
  • 优雅的错误恢复

监控和调试

连接信息获取

可以通过 Server 的 GetConnectionsInfo 方法获取当前活跃连接的详细信息:

    connInfo := server.GetConnectionsInfo()
    for , info := range connInfo {
    fmt.Printf("远程地址: %s, 最后活跃: %v, 空闲时间: %v\n",
    info.RemoteAddr, info.LastActive, info.IdleTime)
    }

最佳实践

  1. 合理配置超时时间
  2. 根据实际需求调整并发连接数
  3. 适当设置消息大小限制
  4. 在高并发场景下使用连接池
  5. 启用压缩以节省带宽
  6. 定期监控连接状态
  7. 实现适当的错误重试策略

Documentation

Overview

Package PointSub 提供了基于 libp2p 的流式处理功能

Package pointsub 提供点对点订阅功能的实现

Package PointSub 提供了基于 libp2p 的流式处理功能

Package PointSub 提供了基于 libp2p 的流式处理功能

Package PointSub 提供了使用 LibP2P streams 替换 Go 标准网络栈的功能

主要功能: - 接收一个 libp2p.Host 参数 - 提供 Dial() 和 Listen() 方法,返回 net.Conn 和 net.Listener 的实现

网络寻址: - 不使用传统的 "host:port" 寻址方式 - 使用 Peer ID 进行寻址 - 使用 libp2p 的 net.Stream 替代原始 TCP 连接 - 支持 LibP2P 的多路由、NAT 穿透和流复用功能

使用限制: - LibP2P hosts 不能自己连接自己 - 同一个 Host 不能同时作为服务端和客户端使用

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrServerAlreadyStarted = errors.New("服务器已经启动") // 服务器已启动错误
)

错误定义

View Source
var Network = "pointsub"

Network 定义了 PointSub 连接使用的网络类型名称 用于在调用 net.Addr.Network() 时返回 对应的 net.Addr.String() 将返回 peer ID

Functions

func Dial

func Dial(ctx context.Context, h host.Host, pid peer.ID, tag protocol.ID) (net.Conn, error)

Dial 使用给定的主机打开到目标地址的流 参数:

  • ctx: 上下文对象,用于控制操作的生命周期
  • h: libp2p 主机对象
  • pid: 目标对等节点的 ID
  • tag: 协议标识符

返回值:

  • net.Conn: 标准网络连接接口
  • error: 错误信息

func Listen

func Listen(h host.Host, tag protocol.ID) (net.Listener, error)

Listen 提供一个标准的 net.Listener 接口用于接受"连接" 底层使用带有指定协议 ID 标签的 libp2p 流 参数:

  • h: libp2p 主机对象
  • tag: 协议标识符

返回值:

  • net.Listener: 标准网络监听器接口
  • error: 错误信息

func NewLimitedConn

func NewLimitedConn(conn net.Conn, maxSize int) net.Conn

NewLimitedConn 创建一个新的限制大小的连接 参数:

  • conn: 原始连接
  • maxSize: 最大大小限制

返回值:

  • net.Conn: 限制大小的连接

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client 定义客户端结构体

func NewClient

func NewClient(h host.Host, config *ClientConfig) (*Client, error)

NewClient 创建新的客户端实例 参数: - h: libp2p主机实例 - config: 客户端配置,如果为nil则使用默认配置 返回值: - *Client: 新创建的客户端实例 - error: 如果配置无效则返回错误

func (*Client) Close

func (c *Client) Close() error

Close 关闭客户端 返回值: - error: 关闭过程中的错误

func (*Client) Send

func (c *Client) Send(ctx context.Context, peerID peer.ID, protocolID protocol.ID, request []byte) ([]byte, error)

Send 发送请求并接收响应 参数: - ctx: 上下文 - peerID: 目标节点ID - protocolID: 协议ID - request: 请求数据 返回值: - []byte: 响应数据 - error: 发送过程中的错误

type ClientConfig

type ClientConfig struct {
	ReadTimeout       time.Duration // 读取超时时间
	WriteTimeout      time.Duration // 写入超时时间
	ConnectTimeout    time.Duration // 连接超时时间
	MaxRetries        int           // 最大重试次数
	RetryInterval     time.Duration // 重试间隔时间
	MaxBlockSize      int           // 最大数据块大小
	EnableCompression bool          // 是否启用压缩
	MaxIdleConns      int           // 最大空闲连接数
	IdleConnTimeout   time.Duration // 空闲连接超时时间
}

ClientConfig 定义客户端配置参数

func DefaultClientConfig

func DefaultClientConfig() *ClientConfig

DefaultClientConfig 返回默认的客户端配置 返回值: - *ClientConfig: 包含默认配置值的ClientConfig实例

type ConnectionInfo

type ConnectionInfo struct {
	RemoteAddr string        // 远程地址
	LastActive time.Time     // 最后活跃时间
	IdleTime   time.Duration // 空闲时间
}

ConnectionInfo 连接信息结构体

type LimitedConn

type LimitedConn struct {
	net.Conn // 底层连接
	// contains filtered or unexported fields
}

LimitedConn 包装了一个带有大小限制的连接

func (*LimitedConn) Read

func (c *LimitedConn) Read(b []byte) (n int, err error)

Read 读取数据 参数:

  • b: 读取缓冲区

返回值:

  • n: 读取的字节数
  • err: 错误信息

func (*LimitedConn) Write

func (c *LimitedConn) Write(b []byte) (n int, err error)

Write 写入数据 参数:

  • b: 要写入的数据

返回值:

  • n: 写入的字节数
  • err: 错误信息

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server 定义服务结构 包含服务器运行所需的各种组件

func NewServer

func NewServer(h host.Host, config *ServerConfig) (*Server, error)

NewServer 创建新的服务器实例 参数:

  • h: libp2p 主机实例
  • config: 服务器配置,如果为nil则使用默认配置

返回值:

  • *Server: 新创建的服务器实例
  • error: 错误信息

func (*Server) GetConnectionsInfo

func (s *Server) GetConnectionsInfo() []ConnectionInfo

GetConnectionsInfo 获取当前活跃连接信息 返回值:

  • []ConnectionInfo: 连接信息列表

func (*Server) Start

func (s *Server) Start(protocolID protocol.ID, handler StreamHandler) error

Start 启动服务器 参数:

  • protocolID: 协议标识符
  • handler: 消息处理函数

返回值:

  • error: 错误信息

func (*Server) Stop

func (s *Server) Stop() error

Stop 停止服务器 返回值:

  • error: 错误信息

type ServerConfig

type ServerConfig struct {
	// 连接相关配置
	ReadTimeout  time.Duration // 读取超时时间
	WriteTimeout time.Duration // 写入超时时间
	MaxBlockSize int           // 最大消息大小

	// 并发控制
	MaxConcurrentConns int           // 最大并发连接数
	ConnectTimeout     time.Duration // 连接建立超时时间

	// 资源控制
	MaxRequestsPerConn int           // 每个连接最大请求数
	IdleTimeout        time.Duration // 空闲连接超时时间

	// 内存控制
	BufferPoolSize    int  // 缓冲池大小
	EnableCompression bool // 是否启用压缩

	// 连接清理相关配置
	CleanupInterval time.Duration // 清理检查间隔
	MaxIdleTime     time.Duration // 最大空闲时间

	// 告警相关配置
	MaxMemoryUsage uint64 // 最大内存使用量
}

ServerConfig 定义服务器配置 包含连接、并发、资源、内存和监控等相关配置项

func DefaultServerConfig

func DefaultServerConfig() *ServerConfig

DefaultServerConfig 返回默认的服务器配置 返回值:

  • *ServerConfig: 包含默认值的配置对象

func (*ServerConfig) Validate

func (c *ServerConfig) Validate() error

Validate 验证配置是否有效 返回值:

  • error: 错误信息

type StreamHandler

type StreamHandler func(request []byte) (response []byte, err error)

StreamHandler 定义了处理消息的函数类型 参数:

  • request: 请求消息字节切片

返回值:

  • []byte: 响应消息
  • error: 错误信息

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL