Documentation
¶
Index ¶
- Variables
- func NewBackend() backends.Backend
- func NewConnection(rawConn interface{}) backends.RealtimeConnection
- func NewEventHandler(relayServer interface{}) backends.RealtimeEventHandler
- func NewFullBackend() backends.FullBackend
- func NewRealtimeBackend() backends.RealtimeBackend
- func NewSubscriptionController(eventHandler backends.RealtimeEventHandler) backends.RealtimeSubscriptionController
- func NewTestableEventHandler(relayServer interface{}) backends.TestableRealtimeEventHandler
- type Backend
- func (l *Backend) GetMode() string
- func (l *Backend) Logger() backends.MiddlewareFunc
- func (l *Backend) NewEngine() backends.Engine
- func (l *Backend) NewEngineWithDefaults() backends.Engine
- func (l *Backend) NewServer(port int, engine backends.Engine) backends.Server
- func (l *Backend) NewServerWithAddr(addr string, engine backends.Engine) backends.Server
- func (l *Backend) Recovery() backends.MiddlewareFunc
- func (l *Backend) SetMode(mode string)
- type Connection
- type EventHandler
- func (e *EventHandler) Signal(process *core.Process)
- func (e *EventHandler) Stop()
- func (e *EventHandler) Subscribe(executorType string, state int, processID string, ctx context.Context) (chan *core.Process, chan error)
- func (e *EventHandler) WaitForProcess(executorType string, state int, processID string, ctx context.Context) (*core.Process, error)
- type FullBackend
- type P2PRealtimeHandler
- type RealtimeBackend
- func (r *RealtimeBackend) CreateConnection(rawConn interface{}) (backends.RealtimeConnection, error)
- func (r *RealtimeBackend) CreateEventHandler(relayServer interface{}) backends.RealtimeEventHandler
- func (r *RealtimeBackend) CreateSubscriptionController(eventHandler backends.RealtimeEventHandler) backends.RealtimeSubscriptionController
- func (r *RealtimeBackend) CreateTestableEventHandler(relayServer interface{}) backends.TestableRealtimeEventHandler
- type StreamContext
- func (c *StreamContext) Abort()
- func (c *StreamContext) AbortWithStatus(code int)
- func (c *StreamContext) AbortWithStatusJSON(code int, jsonObj interface{})
- func (c *StreamContext) Bind(obj interface{}) error
- func (c *StreamContext) BindJSON(obj interface{}) error
- func (c *StreamContext) Data(code int, contentType string, data []byte)
- func (c *StreamContext) DefaultPostForm(key, defaultValue string) string
- func (c *StreamContext) DefaultQuery(key, defaultValue string) string
- func (c *StreamContext) Get(key string) (value interface{}, exists bool)
- func (c *StreamContext) GetBool(key string) bool
- func (c *StreamContext) GetFloat64(key string) float64
- func (c *StreamContext) GetHeader(key string) string
- func (c *StreamContext) GetInt(key string) int
- func (c *StreamContext) GetInt64(key string) int64
- func (c *StreamContext) GetPeerID() string
- func (c *StreamContext) GetPubSub() *pubsub.PubSub
- func (c *StreamContext) GetStream() network.Stream
- func (c *StreamContext) GetString(key string) string
- func (c *StreamContext) Header(key, value string)
- func (c *StreamContext) IsAborted() bool
- func (c *StreamContext) JSON(code int, obj interface{})
- func (c *StreamContext) Next()
- func (c *StreamContext) Param(key string) string
- func (c *StreamContext) PostForm(key string) string
- func (c *StreamContext) Query(key string) string
- func (c *StreamContext) ReadBody() ([]byte, error)
- func (c *StreamContext) Request() *http.Request
- func (c *StreamContext) SendError(message string)
- func (c *StreamContext) Set(key string, value interface{})
- func (c *StreamContext) ShouldBind(obj interface{}) error
- func (c *StreamContext) ShouldBindJSON(obj interface{}) error
- func (c *StreamContext) Status(code int)
- func (c *StreamContext) String(code int, format string, values ...interface{})
- func (c *StreamContext) XML(code int, obj interface{})
- type SubscriptionController
- func (s *SubscriptionController) AddProcessSubscriber(executorID string, process *core.Process, ...) error
- func (s *SubscriptionController) AddProcessesSubscriber(executorID string, subscription *backends.RealtimeSubscription) error
- func (s *SubscriptionController) GetSubscriptionCount() int
- func (s *SubscriptionController) NotifySubscribers(process *core.Process)
- func (s *SubscriptionController) RemoveSubscription(executorID string, subscription *backends.RealtimeSubscription) error
- type TestableEventHandler
Constants ¶
This section is empty.
Variables ¶
var ErrEventHandlerStopped = errors.New("event handler has been stopped")
Functions ¶
func NewBackend ¶
NewBackend creates a new libp2p backend implementation
func NewConnection ¶
func NewConnection(rawConn interface{}) backends.RealtimeConnection
NewConnection creates a new libp2p connection wrapper
func NewEventHandler ¶
func NewEventHandler(relayServer interface{}) backends.RealtimeEventHandler
NewEventHandler creates a new libp2p event handler
func NewFullBackend ¶
func NewFullBackend() backends.FullBackend
NewFullBackend creates a new complete libp2p backend
func NewRealtimeBackend ¶
func NewRealtimeBackend() backends.RealtimeBackend
NewRealtimeBackend creates a new libp2p realtime backend implementation
func NewSubscriptionController ¶
func NewSubscriptionController(eventHandler backends.RealtimeEventHandler) backends.RealtimeSubscriptionController
NewSubscriptionController creates a new libp2p subscription controller
func NewTestableEventHandler ¶
func NewTestableEventHandler(relayServer interface{}) backends.TestableRealtimeEventHandler
NewTestableEventHandler creates a new testable libp2p event handler
Types ¶
type Backend ¶
type Backend struct {
// contains filtered or unexported fields
}
Backend implements the backends.Backend interface for libp2p
func (*Backend) Logger ¶
func (l *Backend) Logger() backends.MiddlewareFunc
Logger returns a logger middleware (no-op for libp2p)
func (*Backend) NewEngineWithDefaults ¶
NewEngineWithDefaults creates a new libp2p engine with defaults (not applicable for libp2p)
func (*Backend) NewServerWithAddr ¶
NewServerWithAddr creates a new libp2p server with address (not applicable in this pattern)
func (*Backend) Recovery ¶
func (l *Backend) Recovery() backends.MiddlewareFunc
Recovery returns a recovery middleware (no-op for libp2p)
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection implements the backends.RealtimeConnection interface for libp2p streams
func (*Connection) IsOpen ¶
func (c *Connection) IsOpen() bool
IsOpen returns true if the stream is still open
func (*Connection) WriteMessage ¶
func (c *Connection) WriteMessage(msgType int, data []byte) error
WriteMessage sends a message through the libp2p stream
type EventHandler ¶
type EventHandler struct {
// contains filtered or unexported fields
}
EventHandler implements backends.RealtimeEventHandler for libp2p
func (*EventHandler) Signal ¶
func (e *EventHandler) Signal(process *core.Process)
Signal sends a process event to all registered listeners
type FullBackend ¶
type FullBackend struct {
*Backend
*RealtimeBackend
}
FullBackend implements both Backend and RealtimeBackend
type P2PRealtimeHandler ¶
type P2PRealtimeHandler struct {
// contains filtered or unexported fields
}
P2PRealtimeHandler handles realtime connections via libp2p pubsub
func NewP2PRealtimeHandler ¶
func NewP2PRealtimeHandler(pubsub *pubsub.PubSub) *P2PRealtimeHandler
NewP2PRealtimeHandler creates a new libp2p realtime handler
func (*P2PRealtimeHandler) HandleRealtimeRequest ¶
func (h *P2PRealtimeHandler) HandleRealtimeRequest(c backends.Context, jsonString string)
HandleRealtimeRequest handles realtime subscription requests via pubsub
func (*P2PRealtimeHandler) PublishProcessUpdate ¶
func (h *P2PRealtimeHandler) PublishProcessUpdate(process *core.Process) error
PublishProcessUpdate publishes a process update to the appropriate topic
type RealtimeBackend ¶
type RealtimeBackend struct {
*Backend
}
RealtimeBackend implements the backends.RealtimeBackend interface for libp2p
func (*RealtimeBackend) CreateConnection ¶
func (r *RealtimeBackend) CreateConnection(rawConn interface{}) (backends.RealtimeConnection, error)
CreateConnection creates a libp2p connection from a raw connection
func (*RealtimeBackend) CreateEventHandler ¶
func (r *RealtimeBackend) CreateEventHandler(relayServer interface{}) backends.RealtimeEventHandler
CreateEventHandler creates a libp2p event handler
func (*RealtimeBackend) CreateSubscriptionController ¶
func (r *RealtimeBackend) CreateSubscriptionController(eventHandler backends.RealtimeEventHandler) backends.RealtimeSubscriptionController
CreateSubscriptionController creates a libp2p subscription controller
func (*RealtimeBackend) CreateTestableEventHandler ¶
func (r *RealtimeBackend) CreateTestableEventHandler(relayServer interface{}) backends.TestableRealtimeEventHandler
CreateTestableEventHandler creates a testable libp2p event handler
type StreamContext ¶
type StreamContext struct {
// contains filtered or unexported fields
}
StreamContext adapts a libp2p stream to the backends.Context interface
func NewStreamContext ¶
func NewStreamContext(stream network.Stream, pubsub *pubsub.PubSub) *StreamContext
NewStreamContext creates a new stream context
func (*StreamContext) AbortWithStatus ¶
func (c *StreamContext) AbortWithStatus(code int)
func (*StreamContext) AbortWithStatusJSON ¶
func (c *StreamContext) AbortWithStatusJSON(code int, jsonObj interface{})
func (*StreamContext) Bind ¶
func (c *StreamContext) Bind(obj interface{}) error
Bind reads and unmarshals the request data
func (*StreamContext) BindJSON ¶
func (c *StreamContext) BindJSON(obj interface{}) error
BindJSON reads and unmarshals JSON data
func (*StreamContext) Data ¶
func (c *StreamContext) Data(code int, contentType string, data []byte)
Data writes raw data to the stream
func (*StreamContext) DefaultPostForm ¶
func (c *StreamContext) DefaultPostForm(key, defaultValue string) string
DefaultPostForm returns a POST form value with default
func (*StreamContext) DefaultQuery ¶
func (c *StreamContext) DefaultQuery(key, defaultValue string) string
DefaultQuery returns a query parameter with default value
func (*StreamContext) Get ¶
func (c *StreamContext) Get(key string) (value interface{}, exists bool)
func (*StreamContext) GetBool ¶
func (c *StreamContext) GetBool(key string) bool
func (*StreamContext) GetFloat64 ¶
func (c *StreamContext) GetFloat64(key string) float64
func (*StreamContext) GetHeader ¶
func (c *StreamContext) GetHeader(key string) string
GetHeader returns a header value (libp2p doesn't have headers, return empty)
func (*StreamContext) GetInt ¶
func (c *StreamContext) GetInt(key string) int
func (*StreamContext) GetInt64 ¶
func (c *StreamContext) GetInt64(key string) int64
func (*StreamContext) GetPeerID ¶
func (c *StreamContext) GetPeerID() string
GetPeerID returns the peer ID of the remote peer
func (*StreamContext) GetPubSub ¶
func (c *StreamContext) GetPubSub() *pubsub.PubSub
GetPubSub returns the pubsub instance
func (*StreamContext) GetStream ¶
func (c *StreamContext) GetStream() network.Stream
GetStream returns the underlying stream
func (*StreamContext) GetString ¶
func (c *StreamContext) GetString(key string) string
func (*StreamContext) Header ¶
func (c *StreamContext) Header(key, value string)
Header sets a response header (no-op for libp2p)
func (*StreamContext) IsAborted ¶
func (c *StreamContext) IsAborted() bool
func (*StreamContext) JSON ¶
func (c *StreamContext) JSON(code int, obj interface{})
JSON writes a JSON response to the stream
func (*StreamContext) Next ¶
func (c *StreamContext) Next()
func (*StreamContext) Param ¶
func (c *StreamContext) Param(key string) string
Param returns a URL parameter (libp2p doesn't have URL params, return empty)
func (*StreamContext) PostForm ¶
func (c *StreamContext) PostForm(key string) string
PostForm returns a POST form value (libp2p doesn't have forms, return empty)
func (*StreamContext) Query ¶
func (c *StreamContext) Query(key string) string
Query returns a query parameter (libp2p doesn't have query params, return empty)
func (*StreamContext) ReadBody ¶
func (c *StreamContext) ReadBody() ([]byte, error)
ReadBody reads the request body
func (*StreamContext) Request ¶
func (c *StreamContext) Request() *http.Request
Request returns a dummy HTTP request (libp2p doesn't have HTTP requests)
func (*StreamContext) SendError ¶
func (c *StreamContext) SendError(message string)
SendError sends an error response
func (*StreamContext) Set ¶
func (c *StreamContext) Set(key string, value interface{})
Context storage methods
func (*StreamContext) ShouldBind ¶
func (c *StreamContext) ShouldBind(obj interface{}) error
ShouldBind is like Bind but returns error instead of aborting
func (*StreamContext) ShouldBindJSON ¶
func (c *StreamContext) ShouldBindJSON(obj interface{}) error
ShouldBindJSON is like BindJSON but returns error instead of aborting
func (*StreamContext) Status ¶
func (c *StreamContext) Status(code int)
Status sets the response status (no-op for libp2p)
func (*StreamContext) String ¶
func (c *StreamContext) String(code int, format string, values ...interface{})
String writes a string response to the stream
func (*StreamContext) XML ¶
func (c *StreamContext) XML(code int, obj interface{})
XML writes an XML response to the stream
type SubscriptionController ¶
type SubscriptionController struct {
// contains filtered or unexported fields
}
SubscriptionController manages libp2p-based subscriptions
func (*SubscriptionController) AddProcessSubscriber ¶
func (s *SubscriptionController) AddProcessSubscriber(executorID string, process *core.Process, subscription *backends.RealtimeSubscription) error
AddProcessSubscriber adds a subscription for a specific process
func (*SubscriptionController) AddProcessesSubscriber ¶
func (s *SubscriptionController) AddProcessesSubscriber(executorID string, subscription *backends.RealtimeSubscription) error
AddProcessesSubscriber adds a subscription for all processes of a certain type
func (*SubscriptionController) GetSubscriptionCount ¶
func (s *SubscriptionController) GetSubscriptionCount() int
GetSubscriptionCount returns the number of active subscriptions (for monitoring)
func (*SubscriptionController) NotifySubscribers ¶
func (s *SubscriptionController) NotifySubscribers(process *core.Process)
NotifySubscribers notifies all relevant subscribers about a process event
func (*SubscriptionController) RemoveSubscription ¶
func (s *SubscriptionController) RemoveSubscription(executorID string, subscription *backends.RealtimeSubscription) error
RemoveSubscription removes a subscription (cleanup method)
type TestableEventHandler ¶
type TestableEventHandler struct {
*EventHandler
}
TestableEventHandler extends EventHandler for testing
func (*TestableEventHandler) HasStopped ¶
func (t *TestableEventHandler) HasStopped() bool
HasStopped returns whether the handler has stopped for testing
func (*TestableEventHandler) NumberOfListeners ¶
NumberOfListeners returns listener counts for testing