Documentation
¶
Overview ¶
Package serverless provides a WASM-based serverless function engine for the Orama Network. It enables users to deploy and execute Go functions (compiled to WASM) across all nodes, with support for HTTP/WebSocket triggers, cron jobs, database triggers, pub/sub triggers, one-time timers, retries with DLQ, and background jobs.
Index ¶
- Variables
- func CreateTimeoutContext(ctx context.Context, fn *Function, maxTimeout int) (context.Context, context.CancelFunc)
- func IsNotFound(err error) bool
- func IsResourceExhausted(err error) bool
- func IsRetryable(err error) bool
- func IsServiceUnavailable(err error) bool
- func IsUnauthorized(err error) bool
- type BatchInvokeRequest
- type BatchInvokeResponse
- type Config
- type ConfigError
- type CronTrigger
- type DBChangeEvent
- type DBOperation
- type DBTrigger
- type DBTriggerConfig
- type DLQMessage
- type DeployError
- type DeployRequest
- type DeployResult
- type Engine
- func (e *Engine) Close(ctx context.Context) error
- func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx *InvocationContext) ([]byte, error)
- func (e *Engine) GetCacheStats() (size int, capacity int)
- func (e *Engine) Invalidate(wasmCID string)
- func (e *Engine) Precompile(ctx context.Context, wasmCID string, wasmBytes []byte) error
- type EngineOption
- type ExecutionError
- type Function
- type FunctionDefinition
- type FunctionExecutor
- type FunctionRegistry
- type FunctionStatus
- type GorillaWSConn
- type HostFunctionError
- type HostServices
- type InvocationContext
- type InvocationLogger
- type InvocationRecord
- type InvocationResult
- type InvocationStatus
- type InvokeRequest
- type InvokeResponse
- type Invoker
- func (i *Invoker) BatchInvoke(ctx context.Context, req *BatchInvokeRequest) (*BatchInvokeResponse, error)
- func (i *Invoker) CanInvoke(ctx context.Context, namespace, functionName string, callerWallet string) (bool, error)
- func (i *Invoker) GetFunctionInfo(ctx context.Context, namespace, functionName string, version int) (*Function, error)
- func (i *Invoker) InvalidateCache(wasmCID string)
- func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeResponse, error)
- func (i *Invoker) InvokeByID(ctx context.Context, functionID string, input []byte, ...) (*InvokeResponse, error)
- func (i *Invoker) ValidateInput(input []byte, maxSize int) error
- type Job
- type JobManager
- type JobStatus
- type LogEntry
- type PubSubTrigger
- type RateLimiter
- type Registry
- func (r *Registry) Delete(ctx context.Context, namespace, name string, version int) error
- func (r *Registry) Get(ctx context.Context, namespace, name string, version int) (*Function, error)
- func (r *Registry) GetByID(ctx context.Context, id string) (*Function, error)
- func (r *Registry) GetEnvVars(ctx context.Context, functionID string) (map[string]string, error)
- func (r *Registry) GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error)
- func (r *Registry) GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error)
- func (r *Registry) List(ctx context.Context, namespace string) ([]*Function, error)
- func (r *Registry) ListVersions(ctx context.Context, namespace, name string) ([]*Function, error)
- func (r *Registry) Log(ctx context.Context, inv *InvocationRecord) error
- func (r *Registry) Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error)
- type RegistryConfig
- type RetryableError
- type SecretsManager
- type Timer
- type TriggerError
- type TriggerManager
- type TriggerType
- type ValidationError
- type WSManager
- func (m *WSManager) Broadcast(topic string, data []byte) error
- func (m *WSManager) Close()
- func (m *WSManager) GetClientTopics(clientID string) []string
- func (m *WSManager) GetConnectionCount() int
- func (m *WSManager) GetStats() *WSStats
- func (m *WSManager) GetTopicSubscriberCount(topic string) int
- func (m *WSManager) IsConnected(clientID string) bool
- func (m *WSManager) Register(clientID string, conn WebSocketConn)
- func (m *WSManager) Send(clientID string, data []byte) error
- func (m *WSManager) Subscribe(clientID, topic string)
- func (m *WSManager) Unregister(clientID string)
- func (m *WSManager) Unsubscribe(clientID, topic string)
- type WSStats
- type WebSocketConn
- type WebSocketManager
Constants ¶
This section is empty.
Variables ¶
var ( // ErrFunctionNotFound is returned when a function does not exist. ErrFunctionNotFound = errors.New("function not found") // ErrFunctionExists is returned when attempting to create a function that already exists. ErrFunctionExists = errors.New("function already exists") // ErrVersionNotFound is returned when a specific function version does not exist. ErrVersionNotFound = errors.New("function version not found") // ErrSecretNotFound is returned when a secret does not exist. ErrSecretNotFound = errors.New("secret not found") // ErrJobNotFound is returned when a job does not exist. ErrJobNotFound = errors.New("job not found") // ErrTriggerNotFound is returned when a trigger does not exist. ErrTriggerNotFound = errors.New("trigger not found") // ErrTimerNotFound is returned when a timer does not exist. ErrTimerNotFound = errors.New("timer not found") ErrUnauthorized = errors.New("unauthorized") // ErrRateLimited is returned when the rate limit is exceeded. ErrRateLimited = errors.New("rate limit exceeded") // ErrInvalidWASM is returned when the WASM module is invalid. ErrInvalidWASM = errors.New("invalid WASM module") // ErrCompilationFailed is returned when WASM compilation fails. ErrCompilationFailed = errors.New("WASM compilation failed") // ErrExecutionFailed is returned when function execution fails. ErrExecutionFailed = errors.New("function execution failed") // ErrTimeout is returned when function execution times out. ErrTimeout = errors.New("function execution timeout") // ErrMemoryExceeded is returned when the function exceeds memory limits. ErrMemoryExceeded = errors.New("memory limit exceeded") // ErrInvalidInput is returned when function input is invalid. ErrInvalidInput = errors.New("invalid input") // ErrWSNotAvailable is returned when WebSocket operations are used outside WS context. ErrWSNotAvailable = errors.New("websocket operations not available in this context") // ErrWSClientNotFound is returned when a WebSocket client is not connected. ErrWSClientNotFound = errors.New("websocket client not found") // ErrInvalidCronExpression is returned when a cron expression is invalid. ErrInvalidCronExpression = errors.New("invalid cron expression") // ErrPayloadTooLarge is returned when a job payload exceeds the maximum size. ErrPayloadTooLarge = errors.New("payload too large") // ErrQueueFull is returned when the job queue is full. ErrQueueFull = errors.New("job queue is full") // ErrJobCancelled is returned when a job is cancelled. ErrJobCancelled = errors.New("job cancelled") ErrStorageUnavailable = errors.New("storage unavailable") ErrDatabaseUnavailable = errors.New("database unavailable") ErrCacheUnavailable = errors.New("cache unavailable") )
Sentinel errors for common conditions.
Functions ¶
func CreateTimeoutContext ¶
func CreateTimeoutContext(ctx context.Context, fn *Function, maxTimeout int) (context.Context, context.CancelFunc)
CreateTimeoutContext creates a context with timeout based on function configuration.
func IsNotFound ¶
IsNotFound checks if an error indicates a resource was not found.
func IsResourceExhausted ¶
IsResourceExhausted checks if an error indicates resource exhaustion.
func IsRetryable ¶
IsRetryable checks if an error should be retried.
func IsServiceUnavailable ¶
IsServiceUnavailable checks if an error indicates a service is unavailable.
func IsUnauthorized ¶
IsUnauthorized checks if an error indicates a lack of authorization.
Types ¶
type BatchInvokeRequest ¶
type BatchInvokeRequest struct {
Requests []*InvokeRequest `json:"requests"`
}
BatchInvokeRequest contains parameters for batch invocation.
type BatchInvokeResponse ¶
type BatchInvokeResponse struct {
Responses []*InvokeResponse `json:"responses"`
Duration time.Duration `json:"duration"`
}
BatchInvokeResponse contains results of batch invocation.
type Config ¶
type Config struct {
// Memory limits
DefaultMemoryLimitMB int `yaml:"default_memory_limit_mb"`
MaxMemoryLimitMB int `yaml:"max_memory_limit_mb"`
// Execution limits
DefaultTimeoutSeconds int `yaml:"default_timeout_seconds"`
MaxTimeoutSeconds int `yaml:"max_timeout_seconds"`
// Retry configuration
DefaultRetryCount int `yaml:"default_retry_count"`
MaxRetryCount int `yaml:"max_retry_count"`
DefaultRetryDelaySeconds int `yaml:"default_retry_delay_seconds"`
// Rate limiting (global)
GlobalRateLimitPerMinute int `yaml:"global_rate_limit_per_minute"`
// Background job configuration
JobWorkers int `yaml:"job_workers"`
JobPollInterval time.Duration `yaml:"job_poll_interval"`
JobMaxQueueSize int `yaml:"job_max_queue_size"`
JobMaxPayloadSize int `yaml:"job_max_payload_size"` // bytes
// Scheduler configuration
CronPollInterval time.Duration `yaml:"cron_poll_interval"`
TimerPollInterval time.Duration `yaml:"timer_poll_interval"`
DBPollInterval time.Duration `yaml:"db_poll_interval"`
// WASM compilation cache
ModuleCacheSize int `yaml:"module_cache_size"` // Number of compiled modules to cache
EnablePrewarm bool `yaml:"enable_prewarm"` // Pre-compile frequently used functions
// Secrets encryption
SecretsEncryptionKey string `yaml:"secrets_encryption_key"` // AES-256 key (32 bytes, hex-encoded)
// Logging
LogInvocations bool `yaml:"log_invocations"` // Log all invocations to database
LogRetention int `yaml:"log_retention"` // Days to retain logs
}
Config holds configuration for the serverless engine.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a configuration with sensible defaults.
func (*Config) ApplyDefaults ¶
func (c *Config) ApplyDefaults()
ApplyDefaults fills in zero values with defaults.
func (*Config) WithMemoryLimit ¶
WithMemoryLimit returns a copy with the memory limit set.
func (*Config) WithRateLimit ¶
WithRateLimit returns a copy with the rate limit set.
func (*Config) WithTimeout ¶
WithTimeout returns a copy with the timeout set.
type ConfigError ¶
ConfigError represents a configuration validation error.
func (*ConfigError) Error ¶
func (e *ConfigError) Error() string
type CronTrigger ¶
type CronTrigger struct {
ID string `json:"id"`
FunctionID string `json:"function_id"`
CronExpression string `json:"cron_expression"`
NextRunAt *time.Time `json:"next_run_at,omitempty"`
LastRunAt *time.Time `json:"last_run_at,omitempty"`
Enabled bool `json:"enabled"`
}
CronTrigger represents a cron-based trigger.
type DBChangeEvent ¶
type DBChangeEvent struct {
Table string `json:"table"`
Operation DBOperation `json:"operation"`
Row map[string]interface{} `json:"row"`
OldRow map[string]interface{} `json:"old_row,omitempty"`
}
DBChangeEvent is passed to functions triggered by database changes.
type DBOperation ¶
type DBOperation string
DBOperation represents the type of database operation that triggered a function.
const ( DBOperationInsert DBOperation = "INSERT" DBOperationUpdate DBOperation = "UPDATE" DBOperationDelete DBOperation = "DELETE" )
type DBTrigger ¶
type DBTrigger struct {
ID string `json:"id"`
FunctionID string `json:"function_id"`
TableName string `json:"table_name"`
Operation DBOperation `json:"operation"`
Condition string `json:"condition,omitempty"`
Enabled bool `json:"enabled"`
}
DBTrigger represents a database trigger.
type DBTriggerConfig ¶
type DBTriggerConfig struct {
Table string `json:"table"`
Operation DBOperation `json:"operation"`
Condition string `json:"condition,omitempty"`
}
DBTriggerConfig defines a database trigger configuration.
type DLQMessage ¶
type DLQMessage struct {
FunctionID string `json:"function_id"`
FunctionName string `json:"function_name"`
Namespace string `json:"namespace"`
RequestID string `json:"request_id"`
Input []byte `json:"input"`
Error string `json:"error"`
FailedAt time.Time `json:"failed_at"`
TriggerType TriggerType `json:"trigger_type"`
CallerWallet string `json:"caller_wallet,omitempty"`
}
DLQMessage represents a message sent to the dead letter queue.
type DeployError ¶
DeployError represents an error during function deployment.
func (*DeployError) Error ¶
func (e *DeployError) Error() string
func (*DeployError) Unwrap ¶
func (e *DeployError) Unwrap() error
type DeployRequest ¶
type DeployRequest struct {
Definition *FunctionDefinition `json:"definition"`
Source io.Reader `json:"-"` // Go source code or WASM bytes
IsWASM bool `json:"is_wasm"` // True if Source contains WASM bytes
}
DeployRequest represents a request to deploy a function.
type DeployResult ¶
type DeployResult struct {
Function *Function `json:"function"`
WASMCID string `json:"wasm_cid"`
Triggers []string `json:"triggers,omitempty"`
}
DeployResult represents the result of a deployment.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is the core WASM execution engine using wazero. It manages compiled module caching and function execution.
func NewEngine ¶
func NewEngine(cfg *Config, registry FunctionRegistry, hostServices HostServices, logger *zap.Logger, opts ...EngineOption) (*Engine, error)
NewEngine creates a new WASM execution engine.
func (*Engine) Execute ¶
func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx *InvocationContext) ([]byte, error)
Execute runs a function with the given input and returns the output.
func (*Engine) GetCacheStats ¶
GetCacheStats returns cache statistics.
func (*Engine) Invalidate ¶
Invalidate removes a compiled module from the cache.
type EngineOption ¶
type EngineOption func(*Engine)
EngineOption configures the Engine.
func WithInvocationLogger ¶
func WithInvocationLogger(logger InvocationLogger) EngineOption
WithInvocationLogger sets the invocation logger.
func WithRateLimiter ¶
func WithRateLimiter(limiter RateLimiter) EngineOption
WithRateLimiter sets the rate limiter.
type ExecutionError ¶
ExecutionError represents an error during function execution.
func (*ExecutionError) Error ¶
func (e *ExecutionError) Error() string
func (*ExecutionError) Unwrap ¶
func (e *ExecutionError) Unwrap() error
type Function ¶
type Function struct {
ID string `json:"id"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Version int `json:"version"`
WASMCID string `json:"wasm_cid"`
SourceCID string `json:"source_cid,omitempty"`
MemoryLimitMB int `json:"memory_limit_mb"`
TimeoutSeconds int `json:"timeout_seconds"`
IsPublic bool `json:"is_public"`
RetryCount int `json:"retry_count"`
RetryDelaySeconds int `json:"retry_delay_seconds"`
DLQTopic string `json:"dlq_topic,omitempty"`
Status FunctionStatus `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CreatedBy string `json:"created_by"`
}
Function represents a deployed serverless function.
type FunctionDefinition ¶
type FunctionDefinition struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
Version int `json:"version,omitempty"`
MemoryLimitMB int `json:"memory_limit_mb,omitempty"`
TimeoutSeconds int `json:"timeout_seconds,omitempty"`
IsPublic bool `json:"is_public,omitempty"`
RetryCount int `json:"retry_count,omitempty"`
RetryDelaySeconds int `json:"retry_delay_seconds,omitempty"`
DLQTopic string `json:"dlq_topic,omitempty"`
EnvVars map[string]string `json:"env_vars,omitempty"`
CronExpressions []string `json:"cron_expressions,omitempty"`
DBTriggers []DBTriggerConfig `json:"db_triggers,omitempty"`
PubSubTopics []string `json:"pubsub_topics,omitempty"`
}
FunctionDefinition contains the configuration for deploying a function.
type FunctionExecutor ¶
type FunctionExecutor interface {
// Execute runs a function with the given input and returns the output.
Execute(ctx context.Context, fn *Function, input []byte, invCtx *InvocationContext) ([]byte, error)
// Precompile compiles a WASM module and caches it for faster execution.
Precompile(ctx context.Context, wasmCID string, wasmBytes []byte) error
// Invalidate removes a compiled module from the cache.
Invalidate(wasmCID string)
}
FunctionExecutor handles the actual execution of WASM functions.
type FunctionRegistry ¶
type FunctionRegistry interface {
// Register deploys a new function or updates an existing one.
// Returns the old function definition if it was updated, or nil if it was a new registration.
Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error)
// Get retrieves a function by name and optional version.
// If version is 0, returns the latest version.
Get(ctx context.Context, namespace, name string, version int) (*Function, error)
// List returns all functions for a namespace.
List(ctx context.Context, namespace string) ([]*Function, error)
// Delete removes a function. If version is 0, removes all versions.
Delete(ctx context.Context, namespace, name string, version int) error
// GetWASMBytes retrieves the compiled WASM bytecode for a function.
GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error)
// GetLogs retrieves logs for a function.
GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error)
}
FunctionRegistry manages function metadata and bytecode storage. Responsible for CRUD operations on function definitions.
type FunctionStatus ¶
type FunctionStatus string
FunctionStatus represents the current state of a deployed function.
const ( FunctionStatusActive FunctionStatus = "active" FunctionStatusInactive FunctionStatus = "inactive" FunctionStatusError FunctionStatus = "error" )
type GorillaWSConn ¶
GorillaWSConn wraps a gorilla/websocket.Conn to implement WebSocketConn.
func (*GorillaWSConn) Close ¶
func (c *GorillaWSConn) Close() error
Close closes the WebSocket connection.
func (*GorillaWSConn) ReadMessage ¶
func (c *GorillaWSConn) ReadMessage() (messageType int, p []byte, err error)
ReadMessage reads a message from the WebSocket connection.
func (*GorillaWSConn) WriteMessage ¶
func (c *GorillaWSConn) WriteMessage(messageType int, data []byte) error
WriteMessage writes a message to the WebSocket connection.
type HostFunctionError ¶
HostFunctionError represents an error in a host function call.
func (*HostFunctionError) Error ¶
func (e *HostFunctionError) Error() string
func (*HostFunctionError) Unwrap ¶
func (e *HostFunctionError) Unwrap() error
type HostServices ¶
type HostServices interface {
// Database operations
DBQuery(ctx context.Context, query string, args []interface{}) ([]byte, error)
DBExecute(ctx context.Context, query string, args []interface{}) (int64, error)
// Cache operations
CacheGet(ctx context.Context, key string) ([]byte, error)
CacheSet(ctx context.Context, key string, value []byte, ttlSeconds int64) error
CacheDelete(ctx context.Context, key string) error
CacheIncr(ctx context.Context, key string) (int64, error)
CacheIncrBy(ctx context.Context, key string, delta int64) (int64, error)
// Storage operations
StoragePut(ctx context.Context, data []byte) (string, error)
StorageGet(ctx context.Context, cid string) ([]byte, error)
// PubSub operations
PubSubPublish(ctx context.Context, topic string, data []byte) error
// WebSocket operations (only valid in WS context)
WSSend(ctx context.Context, clientID string, data []byte) error
WSBroadcast(ctx context.Context, topic string, data []byte) error
// HTTP operations
HTTPFetch(ctx context.Context, method, url string, headers map[string]string, body []byte) ([]byte, error)
// Context operations
GetEnv(ctx context.Context, key string) (string, error)
GetSecret(ctx context.Context, name string) (string, error)
GetRequestID(ctx context.Context) string
GetCallerWallet(ctx context.Context) string
// Job operations
EnqueueBackground(ctx context.Context, functionName string, payload []byte) (string, error)
ScheduleOnce(ctx context.Context, functionName string, runAt time.Time, payload []byte) (string, error)
// Logging
LogInfo(ctx context.Context, message string)
LogError(ctx context.Context, message string)
}
HostServices provides access to Orama services from within WASM functions. This interface is implemented by the host and exposed to WASM modules.
type InvocationContext ¶
type InvocationContext struct {
RequestID string `json:"request_id"`
FunctionID string `json:"function_id"`
FunctionName string `json:"function_name"`
Namespace string `json:"namespace"`
CallerWallet string `json:"caller_wallet,omitempty"`
TriggerType TriggerType `json:"trigger_type"`
WSClientID string `json:"ws_client_id,omitempty"`
EnvVars map[string]string `json:"env_vars,omitempty"`
}
InvocationContext provides context for a function invocation.
func EnsureInvocationContext ¶
func EnsureInvocationContext(ctx *InvocationContext, fn *Function) *InvocationContext
EnsureInvocationContext creates a default context if none is provided.
type InvocationLogger ¶
type InvocationLogger interface {
Log(ctx context.Context, inv *InvocationRecord) error
}
InvocationLogger logs function invocations (optional).
type InvocationRecord ¶
type InvocationRecord struct {
ID string `json:"id"`
FunctionID string `json:"function_id"`
RequestID string `json:"request_id"`
TriggerType TriggerType `json:"trigger_type"`
CallerWallet string `json:"caller_wallet,omitempty"`
InputSize int `json:"input_size"`
OutputSize int `json:"output_size"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at"`
DurationMS int64 `json:"duration_ms"`
Status InvocationStatus `json:"status"`
ErrorMessage string `json:"error_message,omitempty"`
MemoryUsedMB float64 `json:"memory_used_mb"`
Logs []LogEntry `json:"logs,omitempty"`
}
InvocationRecord represents a logged invocation.
type InvocationResult ¶
type InvocationResult struct {
RequestID string `json:"request_id"`
Output []byte `json:"output,omitempty"`
Status InvocationStatus `json:"status"`
Error string `json:"error,omitempty"`
DurationMS int64 `json:"duration_ms"`
Logs []LogEntry `json:"logs,omitempty"`
}
InvocationResult represents the result of a function invocation.
type InvocationStatus ¶
type InvocationStatus string
InvocationStatus represents the result of a function invocation.
const ( InvocationStatusSuccess InvocationStatus = "success" InvocationStatusError InvocationStatus = "error" InvocationStatusTimeout InvocationStatus = "timeout" )
type InvokeRequest ¶
type InvokeRequest struct {
Namespace string `json:"namespace"`
FunctionName string `json:"function_name"`
Version int `json:"version,omitempty"` // 0 = latest
Input []byte `json:"input"`
TriggerType TriggerType `json:"trigger_type"`
CallerWallet string `json:"caller_wallet,omitempty"`
WSClientID string `json:"ws_client_id,omitempty"`
}
InvokeRequest contains the parameters for invoking a function.
type InvokeResponse ¶
type InvokeResponse struct {
RequestID string `json:"request_id"`
Output []byte `json:"output,omitempty"`
Status InvocationStatus `json:"status"`
Error string `json:"error,omitempty"`
DurationMS int64 `json:"duration_ms"`
Retries int `json:"retries,omitempty"`
}
InvokeResponse contains the result of a function invocation.
type Invoker ¶
type Invoker struct {
// contains filtered or unexported fields
}
Invoker handles function invocation with retry logic and DLQ support. It wraps the Engine to provide higher-level invocation semantics.
func NewInvoker ¶
func NewInvoker(engine *Engine, registry FunctionRegistry, hostServices HostServices, logger *zap.Logger) *Invoker
NewInvoker creates a new function invoker.
func (*Invoker) BatchInvoke ¶
func (i *Invoker) BatchInvoke(ctx context.Context, req *BatchInvokeRequest) (*BatchInvokeResponse, error)
BatchInvoke executes multiple functions in parallel.
func (*Invoker) CanInvoke ¶
func (i *Invoker) CanInvoke(ctx context.Context, namespace, functionName string, callerWallet string) (bool, error)
CanInvoke checks if a caller is authorized to invoke a function.
func (*Invoker) GetFunctionInfo ¶
func (i *Invoker) GetFunctionInfo(ctx context.Context, namespace, functionName string, version int) (*Function, error)
GetFunctionInfo returns basic info about a function for invocation.
func (*Invoker) InvalidateCache ¶
InvalidateCache removes a compiled module from the engine's cache.
func (*Invoker) Invoke ¶
func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeResponse, error)
Invoke executes a function with automatic retry logic.
func (*Invoker) InvokeByID ¶
func (i *Invoker) InvokeByID(ctx context.Context, functionID string, input []byte, invCtx *InvocationContext) (*InvokeResponse, error)
InvokeByID invokes a function by its ID.
type Job ¶
type Job struct {
ID string `json:"id"`
FunctionID string `json:"function_id"`
Payload []byte `json:"payload,omitempty"`
Status JobStatus `json:"status"`
Progress int `json:"progress"`
Result []byte `json:"result,omitempty"`
Error string `json:"error,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
Job represents a background job.
type JobManager ¶
type JobManager interface {
// Enqueue adds a job to the queue for background execution.
Enqueue(ctx context.Context, functionID string, payload []byte) (string, error)
// GetStatus retrieves the current status of a job.
GetStatus(ctx context.Context, jobID string) (*Job, error)
// List returns jobs for a function.
List(ctx context.Context, functionID string, limit int) ([]*Job, error)
// Cancel attempts to cancel a pending or running job.
Cancel(ctx context.Context, jobID string) error
}
JobManager manages background job execution.
type LogEntry ¶
type LogEntry struct {
Level string `json:"level"`
Message string `json:"message"`
Timestamp time.Time `json:"timestamp"`
}
LogEntry represents a log message from a function.
type PubSubTrigger ¶
type PubSubTrigger struct {
ID string `json:"id"`
FunctionID string `json:"function_id"`
Topic string `json:"topic"`
Enabled bool `json:"enabled"`
}
PubSubTrigger represents a pubsub trigger.
type RateLimiter ¶
RateLimiter checks if a request should be rate limited.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages function metadata in RQLite and bytecode in IPFS. It implements the FunctionRegistry interface.
func NewRegistry ¶
func NewRegistry(db rqlite.Client, ipfsClient ipfs.IPFSClient, cfg RegistryConfig, logger *zap.Logger) *Registry
NewRegistry creates a new function registry.
func (*Registry) Get ¶
Get retrieves a function by name and optional version. If version is 0, returns the latest version.
func (*Registry) GetEnvVars ¶
GetEnvVars retrieves environment variables for a function.
func (*Registry) GetLogs ¶
func (r *Registry) GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error)
GetLogs retrieves logs for a function.
func (*Registry) GetWASMBytes ¶
GetWASMBytes retrieves the compiled WASM bytecode for a function.
func (*Registry) ListVersions ¶
ListVersions returns all versions of a function.
type RegistryConfig ¶
type RegistryConfig struct {
IPFSAPIURL string // IPFS API URL for content retrieval
}
RegistryConfig holds configuration for the Registry.
type RetryableError ¶
type RetryableError struct {
Cause error
RetryAfter int // Suggested retry delay in seconds
MaxRetries int // Maximum number of retries remaining
CurrentTry int // Current attempt number
}
RetryableError wraps an error that should be retried.
func (*RetryableError) Error ¶
func (e *RetryableError) Error() string
func (*RetryableError) Unwrap ¶
func (e *RetryableError) Unwrap() error
type SecretsManager ¶
type SecretsManager interface {
// Set stores an encrypted secret.
Set(ctx context.Context, namespace, name, value string) error
// Get retrieves a decrypted secret.
Get(ctx context.Context, namespace, name string) (string, error)
// List returns all secret names for a namespace (not values).
List(ctx context.Context, namespace string) ([]string, error)
// Delete removes a secret.
Delete(ctx context.Context, namespace, name string) error
}
SecretsManager handles secure storage and retrieval of secrets.
type Timer ¶
type Timer struct {
ID string `json:"id"`
FunctionID string `json:"function_id"`
RunAt time.Time `json:"run_at"`
Payload []byte `json:"payload,omitempty"`
Status JobStatus `json:"status"`
CreatedAt time.Time `json:"created_at"`
}
Timer represents a one-time scheduled execution.
type TriggerError ¶
TriggerError represents an error in trigger execution.
func (*TriggerError) Error ¶
func (e *TriggerError) Error() string
func (*TriggerError) Unwrap ¶
func (e *TriggerError) Unwrap() error
type TriggerManager ¶
type TriggerManager interface {
// AddCronTrigger adds a cron-based trigger to a function.
AddCronTrigger(ctx context.Context, functionID, cronExpr string) error
// AddDBTrigger adds a database trigger to a function.
AddDBTrigger(ctx context.Context, functionID, tableName string, operation DBOperation, condition string) error
// AddPubSubTrigger adds a pubsub trigger to a function.
AddPubSubTrigger(ctx context.Context, functionID, topic string) error
// ScheduleOnce schedules a one-time execution.
ScheduleOnce(ctx context.Context, functionID string, runAt time.Time, payload []byte) (string, error)
// RemoveTrigger removes a trigger by ID.
RemoveTrigger(ctx context.Context, triggerID string) error
}
TriggerManager manages function triggers (cron, database, pubsub, timer).
type TriggerType ¶
type TriggerType string
TriggerType identifies the type of event that triggered a function invocation.
const ( TriggerTypeHTTP TriggerType = "http" TriggerTypeWebSocket TriggerType = "websocket" TriggerTypeCron TriggerType = "cron" TriggerTypeDatabase TriggerType = "database" TriggerTypePubSub TriggerType = "pubsub" TriggerTypeTimer TriggerType = "timer" TriggerTypeJob TriggerType = "job" )
type ValidationError ¶
ValidationError represents an input validation error.
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
type WSManager ¶
type WSManager struct {
// contains filtered or unexported fields
}
WSManager manages WebSocket connections for serverless functions. It handles connection registration, message routing, and topic subscriptions.
func NewWSManager ¶
NewWSManager creates a new WebSocket manager.
func (*WSManager) Close ¶
func (m *WSManager) Close()
Close closes all connections and cleans up resources.
func (*WSManager) GetClientTopics ¶
GetClientTopics returns all topics a client is subscribed to.
func (*WSManager) GetConnectionCount ¶
GetConnectionCount returns the number of active connections.
func (*WSManager) GetTopicSubscriberCount ¶
GetTopicSubscriberCount returns the number of subscribers for a topic.
func (*WSManager) IsConnected ¶
IsConnected checks if a client is connected.
func (*WSManager) Register ¶
func (m *WSManager) Register(clientID string, conn WebSocketConn)
Register registers a new WebSocket connection.
func (*WSManager) Unregister ¶
Unregister removes a WebSocket connection and its subscriptions.
func (*WSManager) Unsubscribe ¶
Unsubscribe removes a client from a topic.
type WSStats ¶
type WSStats struct {
ConnectionCount int `json:"connection_count"`
TopicCount int `json:"topic_count"`
SubscriptionCount int `json:"subscription_count"`
TopicStats map[string]int `json:"topic_stats"` // topic -> subscriber count
}
Stats returns statistics about the WebSocket manager.
type WebSocketConn ¶
type WebSocketConn interface {
WriteMessage(messageType int, data []byte) error
ReadMessage() (messageType int, p []byte, err error)
Close() error
}
WebSocketConn abstracts a WebSocket connection for testability.
type WebSocketManager ¶
type WebSocketManager interface {
// Register registers a new WebSocket connection.
Register(clientID string, conn WebSocketConn)
// Unregister removes a WebSocket connection.
Unregister(clientID string)
// Send sends data to a specific client.
Send(clientID string, data []byte) error
// Broadcast sends data to all clients subscribed to a topic.
Broadcast(topic string, data []byte) error
// Subscribe adds a client to a topic.
Subscribe(clientID, topic string)
// Unsubscribe removes a client from a topic.
Unsubscribe(clientID, topic string)
}
WebSocketManager manages WebSocket connections for function streaming.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package registry manages function metadata in RQLite and bytecode in IPFS.
|
Package registry manages function metadata in RQLite and bytecode in IPFS. |