Documentation
¶
Index ¶
- Constants
- Variables
- type Channel
- type ChannelSpec
- type MsgEntry
- type ProcessInfo
- type RateLimiter
- type Router
- func (r *Router) Append(channelID string, senderID string, sequence int64, inReplyTo int64, ...) error
- func (r *Router) AppendWithType(channelID string, senderID string, sequence int64, inReplyTo int64, ...) error
- func (r *Router) CleanupProcess(processID string)
- func (r *Router) Create(channel *Channel) error
- func (r *Router) CreateIfNotExists(channel *Channel) error
- func (r *Router) Get(channelID string) (*Channel, error)
- func (r *Router) GetByProcessAndName(processID string, name string) (*Channel, error)
- func (r *Router) GetChannelsByProcess(processID string) []*Channel
- func (r *Router) GetLogSize(channelID string) (int, error)
- func (r *Router) GetSequence(channelID string) (int64, error)
- func (r *Router) ReadAfter(channelID string, callerID string, afterIndex int64, limit int) ([]*MsgEntry, error)
- func (r *Router) SetExecutorID(channelID string, executorID string) error
- func (r *Router) SetExecutorIDForProcess(processID string, executorID string) error
- func (r *Router) SetMaxChannelsPerProcess(max int)
- func (r *Router) SetMaxLogEntries(max int)
- func (r *Router) SetRateLimitEnabled(enabled bool)
- func (r *Router) SetSubscriberBufferSize(size int)
- func (r *Router) Stats() (channelCount int, processCount int)
- func (r *Router) Subscribe(channelID string, callerID string) (chan *MsgEntry, error)
- func (r *Router) SubscriberCount(channelID string) int
- func (r *Router) Unsubscribe(channelID string, ch chan *MsgEntry)
- type Subscriber
Constants ¶
const ( MsgTypeData = "data" // Regular data message MsgTypeEnd = "end" // End-of-stream marker - signals streaming complete MsgTypeError = "error" // Error message )
Message types for MsgEntry
Variables ¶
var ( ErrChannelNotFound = errors.New("channel not found") ErrChannelExists = errors.New("channel already exists") ErrRateLimitExceeded = errors.New("rate limit exceeded") ErrMessageTooLarge = errors.New("message payload exceeds maximum size") ErrChannelFull = errors.New("channel log is full") ErrTooManyChannels = errors.New("process has too many channels") ErrSubscriberTooSlow = errors.New("subscriber disconnected: buffer full") )
Functions ¶
This section is empty.
Types ¶
type Channel ¶
type Channel struct {
ID string `json:"id"`
ProcessID string `json:"processid"`
Name string `json:"name"`
SubmitterID string `json:"submitterid"` // Process submitter
ExecutorID string `json:"executorid"` // Assigned executor
Sequence int64 `json:"sequence"`
Log []*MsgEntry `json:"log"`
}
Channel represents an append-only message log
type ChannelSpec ¶
type ChannelSpec struct {
Name string `json:"name"`
}
ChannelSpec defines a channel in a FunctionSpec
type MsgEntry ¶
type MsgEntry struct {
Sequence int64 `json:"sequence"`
InReplyTo int64 `json:"inreplyto,omitempty"` // References sequence from other sender
Timestamp time.Time `json:"timestamp"`
SenderID string `json:"senderid"`
Payload []byte `json:"payload"`
Type string `json:"type,omitempty"` // Message type: "data", "end", "error"
Error string `json:"error,omitempty"` // Error details when Type="error" or subscriber disconnected
}
MsgEntry represents a single message in a channel
type ProcessInfo ¶
ProcessInfo contains the minimal process information needed for authorization
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements a token bucket rate limiter
func NewRateLimiter ¶
func NewRateLimiter(maxTokens float64, refillRate float64) *RateLimiter
NewRateLimiter creates a new token bucket rate limiter
func (*RateLimiter) Allow ¶
func (rl *RateLimiter) Allow() bool
Allow checks if a request is allowed and consumes a token if so
func (*RateLimiter) Tokens ¶
func (rl *RateLimiter) Tokens() float64
Tokens returns the current number of tokens (for testing)
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router manages channels in memory
func NewRouter ¶
func NewRouter() *Router
NewRouter creates a new channel router with rate limiting enabled
func NewRouterForTesting ¶ added in v1.9.7
func NewRouterForTesting() *Router
NewRouterForTesting creates a router for testing with no token refilling This ensures deterministic behavior in tests by preventing time-based token replenishment
func NewRouterWithoutRateLimit ¶
func NewRouterWithoutRateLimit() *Router
NewRouterWithoutRateLimit creates a router without rate limiting (for testing)
func (*Router) Append ¶
func (r *Router) Append(channelID string, senderID string, sequence int64, inReplyTo int64, payload []byte) error
Append adds a message to a channel with client-assigned sequence number
func (*Router) AppendWithType ¶
func (r *Router) AppendWithType(channelID string, senderID string, sequence int64, inReplyTo int64, payload []byte, msgType string) error
AppendWithType adds a typed message to a channel (e.g., "end" for end-of-stream)
func (*Router) CleanupProcess ¶
CleanupProcess removes all channels for a process
func (*Router) CreateIfNotExists ¶
CreateIfNotExists creates a channel only if it doesn't already exist Returns nil on success or if channel already exists (idempotent)
func (*Router) GetByProcessAndName ¶
GetByProcessAndName retrieves a channel by process ID and name
func (*Router) GetChannelsByProcess ¶
GetChannelsByProcess retrieves all channels for a process
func (*Router) GetLogSize ¶
GetLogSize returns the number of entries in a channel
func (*Router) GetSequence ¶
GetSequence returns the current sequence number for a channel
func (*Router) ReadAfter ¶
func (r *Router) ReadAfter(channelID string, callerID string, afterIndex int64, limit int) ([]*MsgEntry, error)
ReadAfter reads entries after a given index (position in log) Since sequences are per-sender, we use index-based reading limit=0 means no limit
func (*Router) SetExecutorID ¶
SetExecutorID updates the executor ID for a channel (called when process is assigned)
func (*Router) SetExecutorIDForProcess ¶
SetExecutorIDForProcess updates executor ID for all channels of a process
func (*Router) SetMaxChannelsPerProcess ¶
SetMaxChannelsPerProcess sets the maximum channels per process (for testing)
func (*Router) SetMaxLogEntries ¶
SetMaxLogEntries sets the maximum log entries per channel (for testing)
func (*Router) SetRateLimitEnabled ¶
SetRateLimitEnabled enables or disables rate limiting
func (*Router) SetSubscriberBufferSize ¶
SetSubscriberBufferSize sets the subscriber buffer size (for testing)
func (*Router) Subscribe ¶
Subscribe registers for push notifications on a channel Returns a channel that receives entries as they're appended
func (*Router) SubscriberCount ¶
SubscriberCount returns the number of subscribers for a channel (for testing)
func (*Router) Unsubscribe ¶
Unsubscribe removes a subscriber from a channel
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber represents a channel subscriber waiting for new entries