pubsub

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2025 License: MIT Imports: 30 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// BeefProtocolID is the protocol ID for BEEF requests
	BeefProtocolID = "/bsv-overlay/beef/1.0.0"

	// TopicPrefix for BSV overlay topics
	TopicPrefix = "tm_"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelPubSub

type ChannelPubSub struct {
	// contains filtered or unexported fields
}

ChannelPubSub implements the PubSub interface using Go channels This provides a no-dependency pub/sub solution for SQLite-based deployments Recent events are handled by the storage layer via LookupOutpoints

func NewChannelPubSub

func NewChannelPubSub() *ChannelPubSub

NewChannelPubSub creates a new channel-based pub/sub implementation

func (*ChannelPubSub) Close

func (cp *ChannelPubSub) Close() error

Close closes the pub/sub system

func (*ChannelPubSub) Publish

func (cp *ChannelPubSub) Publish(ctx context.Context, topic string, data string, score ...float64) error

Publish sends data to all subscribers of a topic

func (*ChannelPubSub) Start

func (cp *ChannelPubSub) Start(ctx context.Context) error

Start initializes the pub/sub system

func (*ChannelPubSub) Stop

func (cp *ChannelPubSub) Stop() error

Stop stops the pub/sub system

func (*ChannelPubSub) Subscribe

func (cp *ChannelPubSub) Subscribe(ctx context.Context, topics []string) (<-chan Event, error)

Subscribe creates a subscription to the given topics

func (*ChannelPubSub) Unsubscribe

func (cp *ChannelPubSub) Unsubscribe(topics []string) error

Unsubscribe removes subscriptions (for compatibility)

type Event

type Event struct {
	Topic  string         `json:"topic"`
	Member string         `json:"member"` // Generic member data (usually outpoint string)
	Score  float64        `json:"score"`
	Source string         `json:"source"`         // "redis", "sse:peerURL", etc.
	Data   map[string]any `json:"data,omitempty"` // Additional event data
}

Event represents a unified event that can come from Redis or SSE sources

type EventData

type EventData struct {
	Outpoint string  `json:"outpoint"`
	Score    float64 `json:"score"`
}

EventData represents a buffered event for reconnection (backward compatibility)

type LibP2PEvent added in v0.3.0

type LibP2PEvent struct {
	Topic  string
	Txid   chainhash.Hash
	PeerID peer.ID
}

LibP2PEvent represents an event received from libp2p

type LibP2PSync added in v0.3.0

type LibP2PSync struct {
	// contains filtered or unexported fields
}

LibP2PSync manages libp2p-based transaction synchronization

func NewLibP2PSync added in v0.3.0

func NewLibP2PSync(engine engine.OverlayEngineProvider, storage engine.Storage, beefStorage beef.BeefStorage) (*LibP2PSync, error)

NewLibP2PSync creates a new libp2p-based sync manager

func (*LibP2PSync) PublishTxid added in v0.3.0

func (s *LibP2PSync) PublishTxid(ctx context.Context, topic string, txid chainhash.Hash) error

PublishTxid publishes a txid to a topic

func (*LibP2PSync) Start added in v0.3.0

func (s *LibP2PSync) Start(ctx context.Context, topics []string) error

Start begins libp2p sync with configured topics

func (*LibP2PSync) Stop added in v0.3.0

func (s *LibP2PSync) Stop() error

Stop stops the libp2p sync

type PeerBroadcaster

type PeerBroadcaster struct {
	// contains filtered or unexported fields
}

PeerBroadcaster handles broadcasting submitted transactions to peer overlays

func NewPeerBroadcaster

func NewPeerBroadcaster(peerTopics map[string][]string) *PeerBroadcaster

NewPeerBroadcaster creates a new peer broadcaster with the given peer-to-topics mapping

func (*PeerBroadcaster) BroadcastTransaction

func (p *PeerBroadcaster) BroadcastTransaction(ctx context.Context, taggedBEEF overlay.TaggedBEEF) error

BroadcastTransaction broadcasts a TaggedBEEF to all configured peers for the topics

type PubSub

type PubSub interface {
	// Publishing functionality
	Publish(ctx context.Context, topic string, data string, score ...float64) error

	// Subscribing functionality
	Subscribe(ctx context.Context, topics []string) (<-chan Event, error)
	Unsubscribe(topics []string) error

	// Connection management
	Start(ctx context.Context) error
	Stop() error
	Close() error
}

PubSub interface for unified publishing and subscribing

type RedisPubSub

type RedisPubSub struct {
	// contains filtered or unexported fields
}

RedisPubSub handles both publishing and subscribing to Redis

func NewRedisPubSub

func NewRedisPubSub(redisURL string) (*RedisPubSub, error)

NewRedisPubSub creates a new Redis pub/sub handler

func (*RedisPubSub) Close

func (r *RedisPubSub) Close() error

Close closes the Redis connection

func (*RedisPubSub) Publish

func (r *RedisPubSub) Publish(ctx context.Context, topic string, data string, score ...float64) error

Publish publishes an event to Redis

func (*RedisPubSub) Start

func (r *RedisPubSub) Start(ctx context.Context) error

Start starts the Redis pub/sub system

func (*RedisPubSub) Stop

func (r *RedisPubSub) Stop() error

Stop stops the Redis pub/sub system

func (*RedisPubSub) Subscribe

func (r *RedisPubSub) Subscribe(ctx context.Context, topics []string) (<-chan Event, error)

Subscribe subscribes to multiple topics and returns a channel of events

func (*RedisPubSub) Unsubscribe

func (r *RedisPubSub) Unsubscribe(topics []string) error

Unsubscribe unsubscribes from topics

type SSEConnection

type SSEConnection struct {
	// contains filtered or unexported fields
}

SSEConnection represents a single SSE connection to a peer

type SSEEvent

type SSEEvent struct {
	Topic    string
	Outpoint transaction.Outpoint
	PeerURL  string
}

SSEEvent represents an event received from SSE

type SSEManager

type SSEManager struct {
	// contains filtered or unexported fields
}

SSEManager provides SSE client management on top of any PubSub implementation

func NewSSEManager

func NewSSEManager(pubsub PubSub) *SSEManager

NewSSEManager creates a new SSE manager wrapping the given PubSub implementation

func (*SSEManager) AddSSEClient

func (s *SSEManager) AddSSEClient(topic string, client interface{}) error

AddSSEClient registers an SSE client for a specific topic

func (*SSEManager) Close

func (s *SSEManager) Close() error

Close closes the SSE manager and underlying PubSub

func (*SSEManager) Publish

func (s *SSEManager) Publish(ctx context.Context, topic string, data string, score ...float64) error

Expose the underlying PubSub methods

func (*SSEManager) RemoveSSEClient

func (s *SSEManager) RemoveSSEClient(topic string, client interface{}) error

RemoveSSEClient unregisters an SSE client from a topic

func (*SSEManager) Start

func (s *SSEManager) Start(ctx context.Context) error

Start begins listening for events and broadcasting to SSE clients

func (*SSEManager) Stop

func (s *SSEManager) Stop() error

Stop stops the SSE manager and underlying PubSub

type SSESync

type SSESync struct {
	// contains filtered or unexported fields
}

SSESync manages centralized SSE connections to peers and processes transactions

func NewSSESync

func NewSSESync(engine engine.OverlayEngineProvider, storage engine.Storage) *SSESync

NewSSESync creates a new centralized SSE sync manager

func (*SSESync) Start

func (s *SSESync) Start(ctx context.Context, peerTopics map[string][]string) error

Start begins SSE sync with configured peers

func (*SSESync) Stop

func (s *SSESync) Stop() error

Stop stops all SSE connections

type Storage

type Storage interface {
	DoesAppliedTransactionExist(ctx context.Context, tx *overlay.AppliedTransaction) (bool, error)
}

Storage interface for checking transaction existence

type StorageWithBeefByTopic added in v0.3.0

type StorageWithBeefByTopic interface {
	engine.Storage
	LoadBeefByTxidAndTopic(ctx context.Context, txid *chainhash.Hash, topic string) ([]byte, error)
}

StorageWithBeefByTopic defines the storage methods needed by LibP2PSync

type SubscriberConfig

type SubscriberConfig struct {
	// Redis configuration
	RedisURL string

	// SSE peer configuration
	SSEPeers map[string][]string // peer URL -> topics

	// Engine for transaction processing
	Engine interface {
		Submit(ctx context.Context, taggedBEEF any, mode any, onSteakReady any) (any, error)
		GetStorage() Storage
	}
}

SubscriberConfig configures different types of subscribers

type TransactionFetcher

type TransactionFetcher interface {
	FetchBEEF(ctx context.Context, peerURL, topic string, outpoint transaction.Outpoint) ([]byte, error)
}

TransactionFetcher interface for fetching BEEF from peers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL