serverless

package
v0.90.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2026 License: AGPL-3.0 Imports: 20 Imported by: 0

Documentation

Overview

Package serverless provides a WASM-based serverless function engine for the Orama Network. It enables users to deploy and execute Go functions (compiled to WASM) across all nodes, with support for HTTP/WebSocket triggers, cron jobs, database triggers, pub/sub triggers, one-time timers, retries with DLQ, and background jobs.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrFunctionNotFound is returned when a function does not exist.
	ErrFunctionNotFound = errors.New("function not found")

	// ErrFunctionExists is returned when attempting to create a function that already exists.
	ErrFunctionExists = errors.New("function already exists")

	// ErrVersionNotFound is returned when a specific function version does not exist.
	ErrVersionNotFound = errors.New("function version not found")

	// ErrSecretNotFound is returned when a secret does not exist.
	ErrSecretNotFound = errors.New("secret not found")

	// ErrJobNotFound is returned when a job does not exist.
	ErrJobNotFound = errors.New("job not found")

	// ErrTriggerNotFound is returned when a trigger does not exist.
	ErrTriggerNotFound = errors.New("trigger not found")

	// ErrTimerNotFound is returned when a timer does not exist.
	ErrTimerNotFound = errors.New("timer not found")

	// ErrUnauthorized is returned when the caller is not authorized.
	ErrUnauthorized = errors.New("unauthorized")

	// ErrRateLimited is returned when the rate limit is exceeded.
	ErrRateLimited = errors.New("rate limit exceeded")

	// ErrInvalidWASM is returned when the WASM module is invalid.
	ErrInvalidWASM = errors.New("invalid WASM module")

	// ErrCompilationFailed is returned when WASM compilation fails.
	ErrCompilationFailed = errors.New("WASM compilation failed")

	// ErrExecutionFailed is returned when function execution fails.
	ErrExecutionFailed = errors.New("function execution failed")

	// ErrTimeout is returned when function execution times out.
	ErrTimeout = errors.New("function execution timeout")

	// ErrMemoryExceeded is returned when the function exceeds memory limits.
	ErrMemoryExceeded = errors.New("memory limit exceeded")

	// ErrInvalidInput is returned when function input is invalid.
	ErrInvalidInput = errors.New("invalid input")

	// ErrWSNotAvailable is returned when WebSocket operations are used outside WS context.
	ErrWSNotAvailable = errors.New("websocket operations not available in this context")

	// ErrWSClientNotFound is returned when a WebSocket client is not connected.
	ErrWSClientNotFound = errors.New("websocket client not found")

	// ErrInvalidCronExpression is returned when a cron expression is invalid.
	ErrInvalidCronExpression = errors.New("invalid cron expression")

	// ErrPayloadTooLarge is returned when a job payload exceeds the maximum size.
	ErrPayloadTooLarge = errors.New("payload too large")

	// ErrQueueFull is returned when the job queue is full.
	ErrQueueFull = errors.New("job queue is full")

	// ErrJobCancelled is returned when a job is cancelled.
	ErrJobCancelled = errors.New("job cancelled")

	// ErrStorageUnavailable is returned when IPFS storage is unavailable.
	ErrStorageUnavailable = errors.New("storage unavailable")

	// ErrDatabaseUnavailable is returned when the database is unavailable.
	ErrDatabaseUnavailable = errors.New("database unavailable")

	// ErrCacheUnavailable is returned when the cache is unavailable.
	ErrCacheUnavailable = errors.New("cache unavailable")
)

Sentinel errors for common conditions.

Functions

func CreateTimeoutContext

func CreateTimeoutContext(ctx context.Context, fn *Function, maxTimeout int) (context.Context, context.CancelFunc)

CreateTimeoutContext creates a context with timeout based on function configuration.

func IsNotFound

func IsNotFound(err error) bool

IsNotFound checks if an error indicates a resource was not found.

func IsResourceExhausted

func IsResourceExhausted(err error) bool

IsResourceExhausted checks if an error indicates resource exhaustion.

func IsRetryable

func IsRetryable(err error) bool

IsRetryable checks if an error should be retried.

func IsServiceUnavailable

func IsServiceUnavailable(err error) bool

IsServiceUnavailable checks if an error indicates a service is unavailable.

func IsUnauthorized

func IsUnauthorized(err error) bool

IsUnauthorized checks if an error indicates a lack of authorization.

Types

type BatchInvokeRequest

type BatchInvokeRequest struct {
	Requests []*InvokeRequest `json:"requests"`
}

BatchInvokeRequest contains parameters for batch invocation.

type BatchInvokeResponse

type BatchInvokeResponse struct {
	Responses []*InvokeResponse `json:"responses"`
	Duration  time.Duration     `json:"duration"`
}

BatchInvokeResponse contains results of batch invocation.

type Config

type Config struct {
	// Memory limits
	DefaultMemoryLimitMB int `yaml:"default_memory_limit_mb"`
	MaxMemoryLimitMB     int `yaml:"max_memory_limit_mb"`

	// Execution limits
	DefaultTimeoutSeconds int `yaml:"default_timeout_seconds"`
	MaxTimeoutSeconds     int `yaml:"max_timeout_seconds"`

	// Retry configuration
	DefaultRetryCount        int `yaml:"default_retry_count"`
	MaxRetryCount            int `yaml:"max_retry_count"`
	DefaultRetryDelaySeconds int `yaml:"default_retry_delay_seconds"`

	// Rate limiting (global)
	GlobalRateLimitPerMinute int `yaml:"global_rate_limit_per_minute"`

	// Background job configuration
	JobWorkers        int           `yaml:"job_workers"`
	JobPollInterval   time.Duration `yaml:"job_poll_interval"`
	JobMaxQueueSize   int           `yaml:"job_max_queue_size"`
	JobMaxPayloadSize int           `yaml:"job_max_payload_size"` // bytes

	// Scheduler configuration
	CronPollInterval  time.Duration `yaml:"cron_poll_interval"`
	TimerPollInterval time.Duration `yaml:"timer_poll_interval"`
	DBPollInterval    time.Duration `yaml:"db_poll_interval"`

	// WASM compilation cache
	ModuleCacheSize int  `yaml:"module_cache_size"` // Number of compiled modules to cache
	EnablePrewarm   bool `yaml:"enable_prewarm"`    // Pre-compile frequently used functions

	// Secrets encryption
	SecretsEncryptionKey string `yaml:"secrets_encryption_key"` // AES-256 key (32 bytes, hex-encoded)

	// Logging
	LogInvocations bool `yaml:"log_invocations"` // Log all invocations to database
	LogRetention   int  `yaml:"log_retention"`   // Days to retain logs
}

Config holds configuration for the serverless engine.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a configuration with sensible defaults.

func (*Config) ApplyDefaults

func (c *Config) ApplyDefaults()

ApplyDefaults fills in zero values with defaults.

func (*Config) Validate

func (c *Config) Validate() []error

Validate checks the configuration for errors.

func (*Config) WithMemoryLimit

func (c *Config) WithMemoryLimit(defaultMB, maxMB int) *Config

WithMemoryLimit returns a copy with the memory limit set.

func (*Config) WithRateLimit

func (c *Config) WithRateLimit(perMinute int) *Config

WithRateLimit returns a copy with the rate limit set.

func (*Config) WithTimeout

func (c *Config) WithTimeout(defaultSec, maxSec int) *Config

WithTimeout returns a copy with the timeout set.

type ConfigError

type ConfigError struct {
	Field   string
	Message string
}

ConfigError represents a configuration validation error.

func (*ConfigError) Error

func (e *ConfigError) Error() string

type CronTrigger

type CronTrigger struct {
	ID             string     `json:"id"`
	FunctionID     string     `json:"function_id"`
	CronExpression string     `json:"cron_expression"`
	NextRunAt      *time.Time `json:"next_run_at,omitempty"`
	LastRunAt      *time.Time `json:"last_run_at,omitempty"`
	Enabled        bool       `json:"enabled"`
}

CronTrigger represents a cron-based trigger.

type DBChangeEvent

type DBChangeEvent struct {
	Table     string                 `json:"table"`
	Operation DBOperation            `json:"operation"`
	Row       map[string]interface{} `json:"row"`
	OldRow    map[string]interface{} `json:"old_row,omitempty"`
}

DBChangeEvent is passed to functions triggered by database changes.

type DBOperation

type DBOperation string

DBOperation represents the type of database operation that triggered a function.

const (
	DBOperationInsert DBOperation = "INSERT"
	DBOperationUpdate DBOperation = "UPDATE"
	DBOperationDelete DBOperation = "DELETE"
)

type DBTrigger

type DBTrigger struct {
	ID         string      `json:"id"`
	FunctionID string      `json:"function_id"`
	TableName  string      `json:"table_name"`
	Operation  DBOperation `json:"operation"`
	Condition  string      `json:"condition,omitempty"`
	Enabled    bool        `json:"enabled"`
}

DBTrigger represents a database trigger.

type DBTriggerConfig

type DBTriggerConfig struct {
	Table     string      `json:"table"`
	Operation DBOperation `json:"operation"`
	Condition string      `json:"condition,omitempty"`
}

DBTriggerConfig defines a database trigger configuration.

type DLQMessage

type DLQMessage struct {
	FunctionID   string      `json:"function_id"`
	FunctionName string      `json:"function_name"`
	Namespace    string      `json:"namespace"`
	RequestID    string      `json:"request_id"`
	Input        []byte      `json:"input"`
	Error        string      `json:"error"`
	FailedAt     time.Time   `json:"failed_at"`
	TriggerType  TriggerType `json:"trigger_type"`
	CallerWallet string      `json:"caller_wallet,omitempty"`
}

DLQMessage represents a message sent to the dead letter queue.

type DeployError

type DeployError struct {
	FunctionName string
	Cause        error
}

DeployError represents an error during function deployment.

func (*DeployError) Error

func (e *DeployError) Error() string

func (*DeployError) Unwrap

func (e *DeployError) Unwrap() error

type DeployRequest

type DeployRequest struct {
	Definition *FunctionDefinition `json:"definition"`
	Source     io.Reader           `json:"-"`       // Go source code or WASM bytes
	IsWASM     bool                `json:"is_wasm"` // True if Source contains WASM bytes
}

DeployRequest represents a request to deploy a function.

type DeployResult

type DeployResult struct {
	Function *Function `json:"function"`
	WASMCID  string    `json:"wasm_cid"`
	Triggers []string  `json:"triggers,omitempty"`
}

DeployResult represents the result of a deployment.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the core WASM execution engine using wazero. It manages compiled module caching and function execution.

func NewEngine

func NewEngine(cfg *Config, registry FunctionRegistry, hostServices HostServices, logger *zap.Logger, opts ...EngineOption) (*Engine, error)

NewEngine creates a new WASM execution engine.

func (*Engine) Close

func (e *Engine) Close(ctx context.Context) error

Close shuts down the engine and releases resources.

func (*Engine) Execute

func (e *Engine) Execute(ctx context.Context, fn *Function, input []byte, invCtx *InvocationContext) ([]byte, error)

Execute runs a function with the given input and returns the output.

func (*Engine) GetCacheStats

func (e *Engine) GetCacheStats() (size int, capacity int)

GetCacheStats returns cache statistics.

func (*Engine) Invalidate

func (e *Engine) Invalidate(wasmCID string)

Invalidate removes a compiled module from the cache.

func (*Engine) Precompile

func (e *Engine) Precompile(ctx context.Context, wasmCID string, wasmBytes []byte) error

Precompile compiles a WASM module and caches it for faster execution.

type EngineOption

type EngineOption func(*Engine)

EngineOption configures the Engine.

func WithInvocationLogger

func WithInvocationLogger(logger InvocationLogger) EngineOption

WithInvocationLogger sets the invocation logger.

func WithRateLimiter

func WithRateLimiter(limiter RateLimiter) EngineOption

WithRateLimiter sets the rate limiter.

type ExecutionError

type ExecutionError struct {
	FunctionName string
	RequestID    string
	Cause        error
}

ExecutionError represents an error during function execution.

func (*ExecutionError) Error

func (e *ExecutionError) Error() string

func (*ExecutionError) Unwrap

func (e *ExecutionError) Unwrap() error

type Function

type Function struct {
	ID                string         `json:"id"`
	Name              string         `json:"name"`
	Namespace         string         `json:"namespace"`
	Version           int            `json:"version"`
	WASMCID           string         `json:"wasm_cid"`
	SourceCID         string         `json:"source_cid,omitempty"`
	MemoryLimitMB     int            `json:"memory_limit_mb"`
	TimeoutSeconds    int            `json:"timeout_seconds"`
	IsPublic          bool           `json:"is_public"`
	RetryCount        int            `json:"retry_count"`
	RetryDelaySeconds int            `json:"retry_delay_seconds"`
	DLQTopic          string         `json:"dlq_topic,omitempty"`
	Status            FunctionStatus `json:"status"`
	CreatedAt         time.Time      `json:"created_at"`
	UpdatedAt         time.Time      `json:"updated_at"`
	CreatedBy         string         `json:"created_by"`
}

Function represents a deployed serverless function.

type FunctionDefinition

type FunctionDefinition struct {
	Name              string            `json:"name"`
	Namespace         string            `json:"namespace"`
	Version           int               `json:"version,omitempty"`
	MemoryLimitMB     int               `json:"memory_limit_mb,omitempty"`
	TimeoutSeconds    int               `json:"timeout_seconds,omitempty"`
	IsPublic          bool              `json:"is_public,omitempty"`
	RetryCount        int               `json:"retry_count,omitempty"`
	RetryDelaySeconds int               `json:"retry_delay_seconds,omitempty"`
	DLQTopic          string            `json:"dlq_topic,omitempty"`
	EnvVars           map[string]string `json:"env_vars,omitempty"`
	CronExpressions   []string          `json:"cron_expressions,omitempty"`
	DBTriggers        []DBTriggerConfig `json:"db_triggers,omitempty"`
	PubSubTopics      []string          `json:"pubsub_topics,omitempty"`
}

FunctionDefinition contains the configuration for deploying a function.

type FunctionExecutor

type FunctionExecutor interface {
	// Execute runs a function with the given input and returns the output.
	Execute(ctx context.Context, fn *Function, input []byte, invCtx *InvocationContext) ([]byte, error)

	// Precompile compiles a WASM module and caches it for faster execution.
	Precompile(ctx context.Context, wasmCID string, wasmBytes []byte) error

	// Invalidate removes a compiled module from the cache.
	Invalidate(wasmCID string)
}

FunctionExecutor handles the actual execution of WASM functions.

type FunctionRegistry

type FunctionRegistry interface {
	// Register deploys a new function or updates an existing one.
	// Returns the old function definition if it was updated, or nil if it was a new registration.
	Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error)

	// Get retrieves a function by name and optional version.
	// If version is 0, returns the latest version.
	Get(ctx context.Context, namespace, name string, version int) (*Function, error)

	// List returns all functions for a namespace.
	List(ctx context.Context, namespace string) ([]*Function, error)

	// Delete removes a function. If version is 0, removes all versions.
	Delete(ctx context.Context, namespace, name string, version int) error

	// GetWASMBytes retrieves the compiled WASM bytecode for a function.
	GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error)

	// GetLogs retrieves logs for a function.
	GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error)
}

FunctionRegistry manages function metadata and bytecode storage. Responsible for CRUD operations on function definitions.

type FunctionStatus

type FunctionStatus string

FunctionStatus represents the current state of a deployed function.

const (
	FunctionStatusActive   FunctionStatus = "active"
	FunctionStatusInactive FunctionStatus = "inactive"
	FunctionStatusError    FunctionStatus = "error"
)

type GorillaWSConn

type GorillaWSConn struct {
	*websocket.Conn
}

GorillaWSConn wraps a gorilla/websocket.Conn to implement WebSocketConn.

func (*GorillaWSConn) Close

func (c *GorillaWSConn) Close() error

Close closes the WebSocket connection.

func (*GorillaWSConn) ReadMessage

func (c *GorillaWSConn) ReadMessage() (messageType int, p []byte, err error)

ReadMessage reads a message from the WebSocket connection.

func (*GorillaWSConn) WriteMessage

func (c *GorillaWSConn) WriteMessage(messageType int, data []byte) error

WriteMessage writes a message to the WebSocket connection.

type HostFunctionError

type HostFunctionError struct {
	Function string
	Cause    error
}

HostFunctionError represents an error in a host function call.

func (*HostFunctionError) Error

func (e *HostFunctionError) Error() string

func (*HostFunctionError) Unwrap

func (e *HostFunctionError) Unwrap() error

type HostServices

type HostServices interface {
	// Database operations
	DBQuery(ctx context.Context, query string, args []interface{}) ([]byte, error)
	DBExecute(ctx context.Context, query string, args []interface{}) (int64, error)

	// Cache operations
	CacheGet(ctx context.Context, key string) ([]byte, error)
	CacheSet(ctx context.Context, key string, value []byte, ttlSeconds int64) error
	CacheDelete(ctx context.Context, key string) error
	CacheIncr(ctx context.Context, key string) (int64, error)
	CacheIncrBy(ctx context.Context, key string, delta int64) (int64, error)

	// Storage operations
	StoragePut(ctx context.Context, data []byte) (string, error)
	StorageGet(ctx context.Context, cid string) ([]byte, error)

	// PubSub operations
	PubSubPublish(ctx context.Context, topic string, data []byte) error

	// WebSocket operations (only valid in WS context)
	WSSend(ctx context.Context, clientID string, data []byte) error
	WSBroadcast(ctx context.Context, topic string, data []byte) error

	// HTTP operations
	HTTPFetch(ctx context.Context, method, url string, headers map[string]string, body []byte) ([]byte, error)

	// Context operations
	GetEnv(ctx context.Context, key string) (string, error)
	GetSecret(ctx context.Context, name string) (string, error)
	GetRequestID(ctx context.Context) string
	GetCallerWallet(ctx context.Context) string

	// Job operations
	EnqueueBackground(ctx context.Context, functionName string, payload []byte) (string, error)
	ScheduleOnce(ctx context.Context, functionName string, runAt time.Time, payload []byte) (string, error)

	// Logging
	LogInfo(ctx context.Context, message string)
	LogError(ctx context.Context, message string)
}

HostServices provides access to Orama services from within WASM functions. This interface is implemented by the host and exposed to WASM modules.

type InvocationContext

type InvocationContext struct {
	RequestID    string            `json:"request_id"`
	FunctionID   string            `json:"function_id"`
	FunctionName string            `json:"function_name"`
	Namespace    string            `json:"namespace"`
	CallerWallet string            `json:"caller_wallet,omitempty"`
	TriggerType  TriggerType       `json:"trigger_type"`
	WSClientID   string            `json:"ws_client_id,omitempty"`
	EnvVars      map[string]string `json:"env_vars,omitempty"`
}

InvocationContext provides context for a function invocation.

func EnsureInvocationContext

func EnsureInvocationContext(ctx *InvocationContext, fn *Function) *InvocationContext

EnsureInvocationContext creates a default context if none is provided.

type InvocationLogger

type InvocationLogger interface {
	Log(ctx context.Context, inv *InvocationRecord) error
}

InvocationLogger logs function invocations (optional).

type InvocationRecord

type InvocationRecord struct {
	ID           string           `json:"id"`
	FunctionID   string           `json:"function_id"`
	RequestID    string           `json:"request_id"`
	TriggerType  TriggerType      `json:"trigger_type"`
	CallerWallet string           `json:"caller_wallet,omitempty"`
	InputSize    int              `json:"input_size"`
	OutputSize   int              `json:"output_size"`
	StartedAt    time.Time        `json:"started_at"`
	CompletedAt  time.Time        `json:"completed_at"`
	DurationMS   int64            `json:"duration_ms"`
	Status       InvocationStatus `json:"status"`
	ErrorMessage string           `json:"error_message,omitempty"`
	MemoryUsedMB float64          `json:"memory_used_mb"`
	Logs         []LogEntry       `json:"logs,omitempty"`
}

InvocationRecord represents a logged invocation.

type InvocationResult

type InvocationResult struct {
	RequestID  string           `json:"request_id"`
	Output     []byte           `json:"output,omitempty"`
	Status     InvocationStatus `json:"status"`
	Error      string           `json:"error,omitempty"`
	DurationMS int64            `json:"duration_ms"`
	Logs       []LogEntry       `json:"logs,omitempty"`
}

InvocationResult represents the result of a function invocation.

type InvocationStatus

type InvocationStatus string

InvocationStatus represents the result of a function invocation.

const (
	InvocationStatusSuccess InvocationStatus = "success"
	InvocationStatusError   InvocationStatus = "error"
	InvocationStatusTimeout InvocationStatus = "timeout"
)

type InvokeRequest

type InvokeRequest struct {
	Namespace    string      `json:"namespace"`
	FunctionName string      `json:"function_name"`
	Version      int         `json:"version,omitempty"` // 0 = latest
	Input        []byte      `json:"input"`
	TriggerType  TriggerType `json:"trigger_type"`
	CallerWallet string      `json:"caller_wallet,omitempty"`
	WSClientID   string      `json:"ws_client_id,omitempty"`
}

InvokeRequest contains the parameters for invoking a function.

type InvokeResponse

type InvokeResponse struct {
	RequestID  string           `json:"request_id"`
	Output     []byte           `json:"output,omitempty"`
	Status     InvocationStatus `json:"status"`
	Error      string           `json:"error,omitempty"`
	DurationMS int64            `json:"duration_ms"`
	Retries    int              `json:"retries,omitempty"`
}

InvokeResponse contains the result of a function invocation.

type Invoker

type Invoker struct {
	// contains filtered or unexported fields
}

Invoker handles function invocation with retry logic and DLQ support. It wraps the Engine to provide higher-level invocation semantics.

func NewInvoker

func NewInvoker(engine *Engine, registry FunctionRegistry, hostServices HostServices, logger *zap.Logger) *Invoker

NewInvoker creates a new function invoker.

func (*Invoker) BatchInvoke

func (i *Invoker) BatchInvoke(ctx context.Context, req *BatchInvokeRequest) (*BatchInvokeResponse, error)

BatchInvoke executes multiple functions in parallel.

func (*Invoker) CanInvoke

func (i *Invoker) CanInvoke(ctx context.Context, namespace, functionName string, callerWallet string) (bool, error)

CanInvoke checks if a caller is authorized to invoke a function.

func (*Invoker) GetFunctionInfo

func (i *Invoker) GetFunctionInfo(ctx context.Context, namespace, functionName string, version int) (*Function, error)

GetFunctionInfo returns basic info about a function for invocation.

func (*Invoker) InvalidateCache

func (i *Invoker) InvalidateCache(wasmCID string)

InvalidateCache removes a compiled module from the engine's cache.

func (*Invoker) Invoke

func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeResponse, error)

Invoke executes a function with automatic retry logic.

func (*Invoker) InvokeByID

func (i *Invoker) InvokeByID(ctx context.Context, functionID string, input []byte, invCtx *InvocationContext) (*InvokeResponse, error)

InvokeByID invokes a function by its ID.

func (*Invoker) ValidateInput

func (i *Invoker) ValidateInput(input []byte, maxSize int) error

ValidateInput performs basic input validation.

type Job

type Job struct {
	ID          string     `json:"id"`
	FunctionID  string     `json:"function_id"`
	Payload     []byte     `json:"payload,omitempty"`
	Status      JobStatus  `json:"status"`
	Progress    int        `json:"progress"`
	Result      []byte     `json:"result,omitempty"`
	Error       string     `json:"error,omitempty"`
	StartedAt   *time.Time `json:"started_at,omitempty"`
	CompletedAt *time.Time `json:"completed_at,omitempty"`
	CreatedAt   time.Time  `json:"created_at"`
}

Job represents a background job.

type JobManager

type JobManager interface {
	// Enqueue adds a job to the queue for background execution.
	Enqueue(ctx context.Context, functionID string, payload []byte) (string, error)

	// GetStatus retrieves the current status of a job.
	GetStatus(ctx context.Context, jobID string) (*Job, error)

	// List returns jobs for a function.
	List(ctx context.Context, functionID string, limit int) ([]*Job, error)

	// Cancel attempts to cancel a pending or running job.
	Cancel(ctx context.Context, jobID string) error
}

JobManager manages background job execution.

type JobStatus

type JobStatus string

JobStatus represents the current state of a background job.

const (
	JobStatusPending   JobStatus = "pending"
	JobStatusRunning   JobStatus = "running"
	JobStatusCompleted JobStatus = "completed"
	JobStatusFailed    JobStatus = "failed"
)

type LogEntry

type LogEntry struct {
	Level     string    `json:"level"`
	Message   string    `json:"message"`
	Timestamp time.Time `json:"timestamp"`
}

LogEntry represents a log message from a function.

type PubSubTrigger

type PubSubTrigger struct {
	ID         string `json:"id"`
	FunctionID string `json:"function_id"`
	Topic      string `json:"topic"`
	Enabled    bool   `json:"enabled"`
}

PubSubTrigger represents a pubsub trigger.

type RateLimiter

type RateLimiter interface {
	Allow(ctx context.Context, key string) (bool, error)
}

RateLimiter checks if a request should be rate limited.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages function metadata in RQLite and bytecode in IPFS. It implements the FunctionRegistry interface.

func NewRegistry

func NewRegistry(db rqlite.Client, ipfsClient ipfs.IPFSClient, cfg RegistryConfig, logger *zap.Logger) *Registry

NewRegistry creates a new function registry.

func (*Registry) Delete

func (r *Registry) Delete(ctx context.Context, namespace, name string, version int) error

Delete removes a function. If version is 0, removes all versions.

func (*Registry) Get

func (r *Registry) Get(ctx context.Context, namespace, name string, version int) (*Function, error)

Get retrieves a function by name and optional version. If version is 0, returns the latest version.

func (*Registry) GetByID

func (r *Registry) GetByID(ctx context.Context, id string) (*Function, error)

GetByID retrieves a function by its ID.

func (*Registry) GetEnvVars

func (r *Registry) GetEnvVars(ctx context.Context, functionID string) (map[string]string, error)

GetEnvVars retrieves environment variables for a function.

func (*Registry) GetLogs

func (r *Registry) GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error)

GetLogs retrieves logs for a function.

func (*Registry) GetWASMBytes

func (r *Registry) GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error)

GetWASMBytes retrieves the compiled WASM bytecode for a function.

func (*Registry) List

func (r *Registry) List(ctx context.Context, namespace string) ([]*Function, error)

List returns all functions for a namespace.

func (*Registry) ListVersions

func (r *Registry) ListVersions(ctx context.Context, namespace, name string) ([]*Function, error)

ListVersions returns all versions of a function.

func (*Registry) Log

func (r *Registry) Log(ctx context.Context, inv *InvocationRecord) error

Log records a function invocation and its logs to the database.

func (*Registry) Register

func (r *Registry) Register(ctx context.Context, fn *FunctionDefinition, wasmBytes []byte) (*Function, error)

Register deploys a new function or updates an existing one.

type RegistryConfig

type RegistryConfig struct {
	IPFSAPIURL string // IPFS API URL for content retrieval
}

RegistryConfig holds configuration for the Registry.

type RetryableError

type RetryableError struct {
	Cause      error
	RetryAfter int // Suggested retry delay in seconds
	MaxRetries int // Maximum number of retries remaining
	CurrentTry int // Current attempt number
}

RetryableError wraps an error that should be retried.

func (*RetryableError) Error

func (e *RetryableError) Error() string

func (*RetryableError) Unwrap

func (e *RetryableError) Unwrap() error

type SecretsManager

type SecretsManager interface {
	// Set stores an encrypted secret.
	Set(ctx context.Context, namespace, name, value string) error

	// Get retrieves a decrypted secret.
	Get(ctx context.Context, namespace, name string) (string, error)

	// List returns all secret names for a namespace (not values).
	List(ctx context.Context, namespace string) ([]string, error)

	// Delete removes a secret.
	Delete(ctx context.Context, namespace, name string) error
}

SecretsManager handles secure storage and retrieval of secrets.

type Timer

type Timer struct {
	ID         string    `json:"id"`
	FunctionID string    `json:"function_id"`
	RunAt      time.Time `json:"run_at"`
	Payload    []byte    `json:"payload,omitempty"`
	Status     JobStatus `json:"status"`
	CreatedAt  time.Time `json:"created_at"`
}

Timer represents a one-time scheduled execution.

type TriggerError

type TriggerError struct {
	TriggerType string
	TriggerID   string
	FunctionID  string
	Cause       error
}

TriggerError represents an error in trigger execution.

func (*TriggerError) Error

func (e *TriggerError) Error() string

func (*TriggerError) Unwrap

func (e *TriggerError) Unwrap() error

type TriggerManager

type TriggerManager interface {
	// AddCronTrigger adds a cron-based trigger to a function.
	AddCronTrigger(ctx context.Context, functionID, cronExpr string) error

	// AddDBTrigger adds a database trigger to a function.
	AddDBTrigger(ctx context.Context, functionID, tableName string, operation DBOperation, condition string) error

	// AddPubSubTrigger adds a pubsub trigger to a function.
	AddPubSubTrigger(ctx context.Context, functionID, topic string) error

	// ScheduleOnce schedules a one-time execution.
	ScheduleOnce(ctx context.Context, functionID string, runAt time.Time, payload []byte) (string, error)

	// RemoveTrigger removes a trigger by ID.
	RemoveTrigger(ctx context.Context, triggerID string) error
}

TriggerManager manages function triggers (cron, database, pubsub, timer).

type TriggerType

type TriggerType string

TriggerType identifies the type of event that triggered a function invocation.

const (
	TriggerTypeHTTP      TriggerType = "http"
	TriggerTypeWebSocket TriggerType = "websocket"
	TriggerTypeCron      TriggerType = "cron"
	TriggerTypeDatabase  TriggerType = "database"
	TriggerTypePubSub    TriggerType = "pubsub"
	TriggerTypeTimer     TriggerType = "timer"
	TriggerTypeJob       TriggerType = "job"
)

type ValidationError

type ValidationError struct {
	Field   string
	Message string
}

ValidationError represents an input validation error.

func (*ValidationError) Error

func (e *ValidationError) Error() string

type WSManager

type WSManager struct {
	// contains filtered or unexported fields
}

WSManager manages WebSocket connections for serverless functions. It handles connection registration, message routing, and topic subscriptions.

func NewWSManager

func NewWSManager(logger *zap.Logger) *WSManager

NewWSManager creates a new WebSocket manager.

func (*WSManager) Broadcast

func (m *WSManager) Broadcast(topic string, data []byte) error

Broadcast sends data to all clients subscribed to a topic.

func (*WSManager) Close

func (m *WSManager) Close()

Close closes all connections and cleans up resources.

func (*WSManager) GetClientTopics

func (m *WSManager) GetClientTopics(clientID string) []string

GetClientTopics returns all topics a client is subscribed to.

func (*WSManager) GetConnectionCount

func (m *WSManager) GetConnectionCount() int

GetConnectionCount returns the number of active connections.

func (*WSManager) GetStats

func (m *WSManager) GetStats() *WSStats

GetStats returns current statistics.

func (*WSManager) GetTopicSubscriberCount

func (m *WSManager) GetTopicSubscriberCount(topic string) int

GetTopicSubscriberCount returns the number of subscribers for a topic.

func (*WSManager) IsConnected

func (m *WSManager) IsConnected(clientID string) bool

IsConnected checks if a client is connected.

func (*WSManager) Register

func (m *WSManager) Register(clientID string, conn WebSocketConn)

Register registers a new WebSocket connection.

func (*WSManager) Send

func (m *WSManager) Send(clientID string, data []byte) error

Send sends data to a specific client.

func (*WSManager) Subscribe

func (m *WSManager) Subscribe(clientID, topic string)

Subscribe adds a client to a topic.

func (*WSManager) Unregister

func (m *WSManager) Unregister(clientID string)

Unregister removes a WebSocket connection and its subscriptions.

func (*WSManager) Unsubscribe

func (m *WSManager) Unsubscribe(clientID, topic string)

Unsubscribe removes a client from a topic.

type WSStats

type WSStats struct {
	ConnectionCount   int            `json:"connection_count"`
	TopicCount        int            `json:"topic_count"`
	SubscriptionCount int            `json:"subscription_count"`
	TopicStats        map[string]int `json:"topic_stats"` // topic -> subscriber count
}

Stats returns statistics about the WebSocket manager.

type WebSocketConn

type WebSocketConn interface {
	WriteMessage(messageType int, data []byte) error
	ReadMessage() (messageType int, p []byte, err error)
	Close() error
}

WebSocketConn abstracts a WebSocket connection for testability.

type WebSocketManager

type WebSocketManager interface {
	// Register registers a new WebSocket connection.
	Register(clientID string, conn WebSocketConn)

	// Unregister removes a WebSocket connection.
	Unregister(clientID string)

	// Send sends data to a specific client.
	Send(clientID string, data []byte) error

	// Broadcast sends data to all clients subscribed to a topic.
	Broadcast(topic string, data []byte) error

	// Subscribe adds a client to a topic.
	Subscribe(clientID, topic string)

	// Unsubscribe removes a client from a topic.
	Unsubscribe(clientID, topic string)
}

WebSocketManager manages WebSocket connections for function streaming.

Directories

Path Synopsis
Package registry manages function metadata in RQLite and bytecode in IPFS.
Package registry manages function metadata in RQLite and bytecode in IPFS.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL