libp2p

package
v1.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEventHandlerStopped = errors.New("event handler has been stopped")

Functions

func NewBackend

func NewBackend() backends.Backend

NewBackend creates a new libp2p backend implementation

func NewConnection

func NewConnection(rawConn interface{}) backends.RealtimeConnection

NewConnection creates a new libp2p connection wrapper

func NewEventHandler

func NewEventHandler(relayServer interface{}) backends.RealtimeEventHandler

NewEventHandler creates a new libp2p event handler

func NewFullBackend

func NewFullBackend() backends.FullBackend

NewFullBackend creates a new complete libp2p backend

func NewRealtimeBackend

func NewRealtimeBackend() backends.RealtimeBackend

NewRealtimeBackend creates a new libp2p realtime backend implementation

func NewSubscriptionController

func NewSubscriptionController(eventHandler backends.RealtimeEventHandler) backends.RealtimeSubscriptionController

NewSubscriptionController creates a new libp2p subscription controller

func NewTestableEventHandler

func NewTestableEventHandler(relayServer interface{}) backends.TestableRealtimeEventHandler

NewTestableEventHandler creates a new testable libp2p event handler

Types

type Backend

type Backend struct {
	// contains filtered or unexported fields
}

Backend implements the backends.Backend interface for libp2p

func (*Backend) GetMode

func (l *Backend) GetMode() string

GetMode returns the backend mode

func (*Backend) Logger

func (l *Backend) Logger() backends.MiddlewareFunc

Logger returns a logger middleware (no-op for libp2p)

func (*Backend) NewEngine

func (l *Backend) NewEngine() backends.Engine

NewEngine creates a new libp2p engine (not applicable for libp2p)

func (*Backend) NewEngineWithDefaults

func (l *Backend) NewEngineWithDefaults() backends.Engine

NewEngineWithDefaults creates a new libp2p engine with defaults (not applicable for libp2p)

func (*Backend) NewServer

func (l *Backend) NewServer(port int, engine backends.Engine) backends.Server

NewServer creates a new libp2p server (not applicable in this pattern)

func (*Backend) NewServerWithAddr

func (l *Backend) NewServerWithAddr(addr string, engine backends.Engine) backends.Server

NewServerWithAddr creates a new libp2p server with address (not applicable in this pattern)

func (*Backend) Recovery

func (l *Backend) Recovery() backends.MiddlewareFunc

Recovery returns a recovery middleware (no-op for libp2p)

func (*Backend) SetMode

func (l *Backend) SetMode(mode string)

SetMode sets the backend mode

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection implements the backends.RealtimeConnection interface for libp2p streams

func (*Connection) Close

func (c *Connection) Close() error

Close closes the libp2p stream

func (*Connection) IsOpen

func (c *Connection) IsOpen() bool

IsOpen returns true if the stream is still open

func (*Connection) WriteMessage

func (c *Connection) WriteMessage(msgType int, data []byte) error

WriteMessage sends a message through the libp2p stream

type EventHandler

type EventHandler struct {
	// contains filtered or unexported fields
}

EventHandler implements backends.RealtimeEventHandler for libp2p

func (*EventHandler) Signal

func (e *EventHandler) Signal(process *core.Process)

Signal sends a process event to all registered listeners

func (*EventHandler) Stop

func (e *EventHandler) Stop()

Stop stops the event handler

func (*EventHandler) Subscribe

func (e *EventHandler) Subscribe(executorType string, state int, processID string, ctx context.Context) (chan *core.Process, chan error)

Subscribe registers a subscription and returns channels for process events and errors

func (*EventHandler) WaitForProcess

func (e *EventHandler) WaitForProcess(executorType string, state int, processID string, ctx context.Context) (*core.Process, error)

WaitForProcess waits for a specific process state change

type FullBackend

type FullBackend struct {
	*Backend
	*RealtimeBackend
}

FullBackend implements both Backend and RealtimeBackend

type P2PRealtimeHandler

type P2PRealtimeHandler struct {
	// contains filtered or unexported fields
}

P2PRealtimeHandler handles realtime connections via libp2p pubsub

func NewP2PRealtimeHandler

func NewP2PRealtimeHandler(pubsub *pubsub.PubSub) *P2PRealtimeHandler

NewP2PRealtimeHandler creates a new libp2p realtime handler

func (*P2PRealtimeHandler) HandleRealtimeRequest

func (h *P2PRealtimeHandler) HandleRealtimeRequest(c backends.Context, jsonString string)

HandleRealtimeRequest handles realtime subscription requests via pubsub

func (*P2PRealtimeHandler) PublishProcessUpdate

func (h *P2PRealtimeHandler) PublishProcessUpdate(process *core.Process) error

PublishProcessUpdate publishes a process update to the appropriate topic

type RealtimeBackend

type RealtimeBackend struct {
	*Backend
}

RealtimeBackend implements the backends.RealtimeBackend interface for libp2p

func (*RealtimeBackend) CreateConnection

func (r *RealtimeBackend) CreateConnection(rawConn interface{}) (backends.RealtimeConnection, error)

CreateConnection creates a libp2p connection from a raw connection

func (*RealtimeBackend) CreateEventHandler

func (r *RealtimeBackend) CreateEventHandler(relayServer interface{}) backends.RealtimeEventHandler

CreateEventHandler creates a libp2p event handler

func (*RealtimeBackend) CreateSubscriptionController

func (r *RealtimeBackend) CreateSubscriptionController(eventHandler backends.RealtimeEventHandler) backends.RealtimeSubscriptionController

CreateSubscriptionController creates a libp2p subscription controller

func (*RealtimeBackend) CreateTestableEventHandler

func (r *RealtimeBackend) CreateTestableEventHandler(relayServer interface{}) backends.TestableRealtimeEventHandler

CreateTestableEventHandler creates a testable libp2p event handler

type StreamContext

type StreamContext struct {
	// contains filtered or unexported fields
}

StreamContext adapts a libp2p stream to the backends.Context interface

func NewStreamContext

func NewStreamContext(stream network.Stream, pubsub *pubsub.PubSub) *StreamContext

NewStreamContext creates a new stream context

func (*StreamContext) Abort

func (c *StreamContext) Abort()

Flow control methods

func (*StreamContext) AbortWithStatus

func (c *StreamContext) AbortWithStatus(code int)

func (*StreamContext) AbortWithStatusJSON

func (c *StreamContext) AbortWithStatusJSON(code int, jsonObj interface{})

func (*StreamContext) Bind

func (c *StreamContext) Bind(obj interface{}) error

Bind reads and unmarshals the request data

func (*StreamContext) BindJSON

func (c *StreamContext) BindJSON(obj interface{}) error

BindJSON reads and unmarshals JSON data

func (*StreamContext) Data

func (c *StreamContext) Data(code int, contentType string, data []byte)

Data writes raw data to the stream

func (*StreamContext) DefaultPostForm

func (c *StreamContext) DefaultPostForm(key, defaultValue string) string

DefaultPostForm returns a POST form value with default

func (*StreamContext) DefaultQuery

func (c *StreamContext) DefaultQuery(key, defaultValue string) string

DefaultQuery returns a query parameter with default value

func (*StreamContext) Get

func (c *StreamContext) Get(key string) (value interface{}, exists bool)

func (*StreamContext) GetBool

func (c *StreamContext) GetBool(key string) bool

func (*StreamContext) GetFloat64

func (c *StreamContext) GetFloat64(key string) float64

func (*StreamContext) GetHeader

func (c *StreamContext) GetHeader(key string) string

GetHeader returns a header value (libp2p doesn't have headers, return empty)

func (*StreamContext) GetInt

func (c *StreamContext) GetInt(key string) int

func (*StreamContext) GetInt64

func (c *StreamContext) GetInt64(key string) int64

func (*StreamContext) GetPeerID

func (c *StreamContext) GetPeerID() string

GetPeerID returns the peer ID of the remote peer

func (*StreamContext) GetPubSub

func (c *StreamContext) GetPubSub() *pubsub.PubSub

GetPubSub returns the pubsub instance

func (*StreamContext) GetStream

func (c *StreamContext) GetStream() network.Stream

GetStream returns the underlying stream

func (*StreamContext) GetString

func (c *StreamContext) GetString(key string) string

func (*StreamContext) Header

func (c *StreamContext) Header(key, value string)

Header sets a response header (no-op for libp2p)

func (*StreamContext) IsAborted

func (c *StreamContext) IsAborted() bool

func (*StreamContext) JSON

func (c *StreamContext) JSON(code int, obj interface{})

JSON writes a JSON response to the stream

func (*StreamContext) Next

func (c *StreamContext) Next()

func (*StreamContext) Param

func (c *StreamContext) Param(key string) string

Param returns a URL parameter (libp2p doesn't have URL params, return empty)

func (*StreamContext) PostForm

func (c *StreamContext) PostForm(key string) string

PostForm returns a POST form value (libp2p doesn't have forms, return empty)

func (*StreamContext) Query

func (c *StreamContext) Query(key string) string

Query returns a query parameter (libp2p doesn't have query params, return empty)

func (*StreamContext) ReadBody

func (c *StreamContext) ReadBody() ([]byte, error)

ReadBody reads the request body

func (*StreamContext) Request

func (c *StreamContext) Request() *http.Request

Request returns a dummy HTTP request (libp2p doesn't have HTTP requests)

func (*StreamContext) SendError

func (c *StreamContext) SendError(message string)

SendError sends an error response

func (*StreamContext) Set

func (c *StreamContext) Set(key string, value interface{})

Context storage methods

func (*StreamContext) ShouldBind

func (c *StreamContext) ShouldBind(obj interface{}) error

ShouldBind is like Bind but returns error instead of aborting

func (*StreamContext) ShouldBindJSON

func (c *StreamContext) ShouldBindJSON(obj interface{}) error

ShouldBindJSON is like BindJSON but returns error instead of aborting

func (*StreamContext) Status

func (c *StreamContext) Status(code int)

Status sets the response status (no-op for libp2p)

func (*StreamContext) String

func (c *StreamContext) String(code int, format string, values ...interface{})

String writes a string response to the stream

func (*StreamContext) XML

func (c *StreamContext) XML(code int, obj interface{})

XML writes an XML response to the stream

type SubscriptionController

type SubscriptionController struct {
	// contains filtered or unexported fields
}

SubscriptionController manages libp2p-based subscriptions

func (*SubscriptionController) AddProcessSubscriber

func (s *SubscriptionController) AddProcessSubscriber(executorID string, process *core.Process, subscription *backends.RealtimeSubscription) error

AddProcessSubscriber adds a subscription for a specific process

func (*SubscriptionController) AddProcessesSubscriber

func (s *SubscriptionController) AddProcessesSubscriber(executorID string, subscription *backends.RealtimeSubscription) error

AddProcessesSubscriber adds a subscription for all processes of a certain type

func (*SubscriptionController) GetSubscriptionCount

func (s *SubscriptionController) GetSubscriptionCount() int

GetSubscriptionCount returns the number of active subscriptions (for monitoring)

func (*SubscriptionController) NotifySubscribers

func (s *SubscriptionController) NotifySubscribers(process *core.Process)

NotifySubscribers notifies all relevant subscribers about a process event

func (*SubscriptionController) RemoveSubscription

func (s *SubscriptionController) RemoveSubscription(executorID string, subscription *backends.RealtimeSubscription) error

RemoveSubscription removes a subscription (cleanup method)

type TestableEventHandler

type TestableEventHandler struct {
	*EventHandler
}

TestableEventHandler extends EventHandler for testing

func (*TestableEventHandler) HasStopped

func (t *TestableEventHandler) HasStopped() bool

HasStopped returns whether the handler has stopped for testing

func (*TestableEventHandler) NumberOfListeners

func (t *TestableEventHandler) NumberOfListeners(executorType string, state int) (int, int, int)

NumberOfListeners returns listener counts for testing

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL