channel

package
v1.9.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MsgTypeData  = "data"  // Regular data message
	MsgTypeEnd   = "end"   // End-of-stream marker - signals streaming complete
	MsgTypeError = "error" // Error message
)

Message types for MsgEntry

Variables

View Source
var (
	ErrChannelNotFound   = errors.New("channel not found")
	ErrUnauthorized      = errors.New("unauthorized access to channel")
	ErrChannelExists     = errors.New("channel already exists")
	ErrRateLimitExceeded = errors.New("rate limit exceeded")
	ErrMessageTooLarge   = errors.New("message payload exceeds maximum size")
	ErrChannelFull       = errors.New("channel log is full")
	ErrTooManyChannels   = errors.New("process has too many channels")
	ErrSubscriberTooSlow = errors.New("subscriber disconnected: buffer full")
)

Functions

This section is empty.

Types

type Channel

type Channel struct {
	ID          string      `json:"id"`
	ProcessID   string      `json:"processid"`
	Name        string      `json:"name"`
	SubmitterID string      `json:"submitterid"` // Process submitter
	ExecutorID  string      `json:"executorid"`  // Assigned executor
	Sequence    int64       `json:"sequence"`
	Log         []*MsgEntry `json:"log"`
}

Channel represents an append-only message log

type ChannelSpec

type ChannelSpec struct {
	Name string `json:"name"`
}

ChannelSpec defines a channel in a FunctionSpec

type MsgEntry

type MsgEntry struct {
	Sequence  int64     `json:"sequence"`
	InReplyTo int64     `json:"inreplyto,omitempty"` // References sequence from other sender
	Timestamp time.Time `json:"timestamp"`
	SenderID  string    `json:"senderid"`
	Payload   []byte    `json:"payload"`
	Type      string    `json:"type,omitempty"`  // Message type: "data", "end", "error"
	Error     string    `json:"error,omitempty"` // Error details when Type="error" or subscriber disconnected
}

MsgEntry represents a single message in a channel

type ProcessInfo

type ProcessInfo struct {
	ID          string
	SubmitterID string
	ExecutorID  string
}

ProcessInfo contains the minimal process information needed for authorization

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements a token bucket rate limiter

func NewRateLimiter

func NewRateLimiter(maxTokens float64, refillRate float64) *RateLimiter

NewRateLimiter creates a new token bucket rate limiter

func (*RateLimiter) Allow

func (rl *RateLimiter) Allow() bool

Allow checks if a request is allowed and consumes a token if so

func (*RateLimiter) Tokens

func (rl *RateLimiter) Tokens() float64

Tokens returns the current number of tokens (for testing)

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router manages channels in memory

func NewRouter

func NewRouter() *Router

NewRouter creates a new channel router with rate limiting enabled

func NewRouterForTesting added in v1.9.7

func NewRouterForTesting() *Router

NewRouterForTesting creates a router for testing with no token refilling This ensures deterministic behavior in tests by preventing time-based token replenishment

func NewRouterWithoutRateLimit

func NewRouterWithoutRateLimit() *Router

NewRouterWithoutRateLimit creates a router without rate limiting (for testing)

func (*Router) Append

func (r *Router) Append(channelID string, senderID string, sequence int64, inReplyTo int64, payload []byte) error

Append adds a message to a channel with client-assigned sequence number

func (*Router) AppendWithType

func (r *Router) AppendWithType(channelID string, senderID string, sequence int64, inReplyTo int64, payload []byte, msgType string) error

AppendWithType adds a typed message to a channel (e.g., "end" for end-of-stream)

func (*Router) CleanupProcess

func (r *Router) CleanupProcess(processID string)

CleanupProcess removes all channels for a process

func (*Router) Create

func (r *Router) Create(channel *Channel) error

Create creates a new channel

func (*Router) CreateIfNotExists

func (r *Router) CreateIfNotExists(channel *Channel) error

CreateIfNotExists creates a channel only if it doesn't already exist Returns nil on success or if channel already exists (idempotent)

func (*Router) Get

func (r *Router) Get(channelID string) (*Channel, error)

Get retrieves a channel by ID

func (*Router) GetByProcessAndName

func (r *Router) GetByProcessAndName(processID string, name string) (*Channel, error)

GetByProcessAndName retrieves a channel by process ID and name

func (*Router) GetChannelsByProcess

func (r *Router) GetChannelsByProcess(processID string) []*Channel

GetChannelsByProcess retrieves all channels for a process

func (*Router) GetLogSize

func (r *Router) GetLogSize(channelID string) (int, error)

GetLogSize returns the number of entries in a channel

func (*Router) GetSequence

func (r *Router) GetSequence(channelID string) (int64, error)

GetSequence returns the current sequence number for a channel

func (*Router) ReadAfter

func (r *Router) ReadAfter(channelID string, callerID string, afterIndex int64, limit int) ([]*MsgEntry, error)

ReadAfter reads entries after a given index (position in log) Since sequences are per-sender, we use index-based reading limit=0 means no limit

func (*Router) SetExecutorID

func (r *Router) SetExecutorID(channelID string, executorID string) error

SetExecutorID updates the executor ID for a channel (called when process is assigned)

func (*Router) SetExecutorIDForProcess

func (r *Router) SetExecutorIDForProcess(processID string, executorID string) error

SetExecutorIDForProcess updates executor ID for all channels of a process

func (*Router) SetMaxChannelsPerProcess

func (r *Router) SetMaxChannelsPerProcess(max int)

SetMaxChannelsPerProcess sets the maximum channels per process (for testing)

func (*Router) SetMaxLogEntries

func (r *Router) SetMaxLogEntries(max int)

SetMaxLogEntries sets the maximum log entries per channel (for testing)

func (*Router) SetRateLimitEnabled

func (r *Router) SetRateLimitEnabled(enabled bool)

SetRateLimitEnabled enables or disables rate limiting

func (*Router) SetSubscriberBufferSize

func (r *Router) SetSubscriberBufferSize(size int)

SetSubscriberBufferSize sets the subscriber buffer size (for testing)

func (*Router) Stats

func (r *Router) Stats() (channelCount int, processCount int)

Stats returns statistics about the router

func (*Router) Subscribe

func (r *Router) Subscribe(channelID string, callerID string) (chan *MsgEntry, error)

Subscribe registers for push notifications on a channel Returns a channel that receives entries as they're appended

func (*Router) SubscriberCount

func (r *Router) SubscriberCount(channelID string) int

SubscriberCount returns the number of subscribers for a channel (for testing)

func (*Router) Unsubscribe

func (r *Router) Unsubscribe(channelID string, ch chan *MsgEntry)

Unsubscribe removes a subscriber from a channel

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber represents a channel subscriber waiting for new entries

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL