Documentation
¶
Overview ¶
Package plugins provides the plugin system for StreamSpace API.
The api_registry component enables plugins to register custom HTTP API endpoints that are dynamically mounted into the main API router. This allows plugins to extend the API surface without modifying core code.
Architecture:
┌─────────────────────────────────────────────────────────────┐
│ Main API Router (Gin) │
│ /api/sessions, /api/users, /api/templates, etc. │
└──────────────────────────┬──────────────────────────────────┘
│ AttachToRouter()
↓
┌─────────────────────────────────────────────────────────────┐
│ APIRegistry │
│ - Stores plugin endpoint registrations │
│ - Enforces namespace isolation (/api/plugins/{name}/...) │
│ - Thread-safe registration/unregistration │
└──────────────────────────┬──────────────────────────────────┘
│ Manages
↓
┌─────────────────────────────────────────────────────────────┐
│ PluginEndpoint Records │
│ plugin-slack: POST /api/plugins/slack/send │
│ plugin-billing: GET /api/plugins/billing/invoices │
│ plugin-sentry: POST /api/plugins/sentry/report │
└─────────────────────────────────────────────────────────────┘
Endpoint Lifecycle:
- Plugin calls api.RegisterEndpoint() during OnLoad()
- APIRegistry stores endpoint with namespace prefix
- AttachToRouter() mounts all endpoints to main router
- Requests to /api/plugins/{name}/... route to plugin handlers
- Plugin calls api.Unregister() or runtime unloads plugin
- Endpoints are removed from registry (router cleanup on restart)
Namespace Isolation:
All plugin endpoints are automatically prefixed with /api/plugins/{pluginName}/ to prevent conflicts between plugins and with core API routes.
// Plugin code
api.RegisterEndpoint(EndpointOptions{
Method: "POST",
Path: "/send", // Plugin provides relative path
Handler: sendHandler,
})
// Results in: POST /api/plugins/slack/send
Thread Safety:
The registry uses sync.RWMutex for thread-safe concurrent access:
- Register/Unregister: Exclusive lock (write)
- GetEndpoints/AttachToRouter: Shared lock (read)
- Safe for plugins to register during parallel OnLoad() calls
Middleware Support:
Endpoints can specify middleware chains (authentication, rate limiting, etc.):
api.RegisterEndpoint(EndpointOptions{
Method: "POST",
Path: "/admin/settings",
Handler: settingsHandler,
Middleware: []gin.HandlerFunc{authMiddleware, adminOnlyMiddleware},
})
Permission Model:
Endpoints can declare required permissions for documentation/UI purposes. Actual enforcement happens in middleware, not the registry:
api.RegisterEndpoint(EndpointOptions{
Permissions: []string{"plugin.slack.send", "sessions.read"},
})
Cleanup on Unload:
When a plugin is unloaded:
- UnregisterAll(pluginName) removes all endpoints for that plugin
- Prevents orphaned routes from unloaded plugins
- Router rebuild required to apply changes (done on restart)
Performance:
- Registration: O(1) map insertion
- Lookup: O(1) map access
- AttachToRouter: O(n) iteration over all endpoints
- Memory: ~200 bytes per endpoint registration
Future Enhancements:
- Dynamic route reloading without restart
- Endpoint versioning (/api/plugins/slack/v1/send)
- Rate limiting per plugin
- Request/response logging and metrics
- OpenAPI/Swagger spec generation from registered endpoints
Package plugins provides the plugin system for StreamSpace API.
The base_plugin component provides default no-op implementations of the PluginHandler interface, following the "convention over configuration" pattern.
Design Pattern - Embedding for Selective Override:
Instead of requiring plugins to implement all 13 lifecycle hook methods, plugins can embed BasePlugin and only override the hooks they need:
type SlackPlugin struct {
plugins.BasePlugin // Embeds all default implementations
}
// Only override hooks you need
func (p *SlackPlugin) OnLoad(ctx *PluginContext) error {
// Custom initialization
return nil
}
func (p *SlackPlugin) OnSessionCreated(ctx *PluginContext, session interface{}) error {
// Send Slack notification
return nil
}
// All other hooks (OnUserLogin, OnSessionDeleted, etc.) use default no-op
This pattern:
- Reduces boilerplate code in plugins
- Makes plugins easier to write and maintain
- Provides forward compatibility when new hooks are added
- Follows Go's composition-over-inheritance model
Hook Categories:
Plugin Lifecycle: - OnLoad: Plugin initialization - OnUnload: Plugin cleanup - OnEnable: Plugin enabled - OnDisable: Plugin disabled
Session Hooks: - OnSessionCreated, OnSessionStarted, OnSessionStopped - OnSessionHibernated, OnSessionWoken, OnSessionDeleted
User Hooks: - OnUserCreated, OnUserUpdated, OnUserDeleted - OnUserLogin, OnUserLogout
Built-in Plugin Registry:
This file also provides a global registry for built-in plugins. Built-in plugins are compiled into the binary and automatically discovered at startup:
// In plugin code (e.g., slack_plugin.go)
func init() {
plugins.RegisterBuiltinPlugin("slack", &SlackPlugin{})
}
// Runtime automatically loads all registered built-ins
Built-in vs Dynamic:
- Built-in: Compiled into binary, always available, faster startup
- Dynamic: Loaded from .so files, can be added without recompile
Package plugins - database.go ¶
This file implements database access for plugins, providing two tiers of data storage: full SQL access and simple key-value storage.
Plugins can use these interfaces to persist data, query the main database, and maintain state across restarts without managing database connections.
Why Plugins Need Database Access ¶
**Use Cases**:
- Analytics: Store metrics, aggregated statistics, custom reports
- Monitoring: Track historical data, threshold violations, alerts
- Integrations: Cache external API responses, sync mappings
- Session Extensions: Store custom session metadata, tags, annotations
- User Preferences: Save plugin-specific user settings
**Without Database** (alternatives):
- In-memory: Lost on restart, not shared across API replicas
- File storage: Difficult to query, no transactions, concurrency issues
- External DB: Extra infrastructure, connection management overhead
**With Database** (this implementation):
- Persistent across restarts
- Shared across API replicas
- ACID transactions
- SQL query capabilities
- Simple key-value API for basic needs
Architecture: Two Storage Tiers ¶
┌─────────────────────────────────────────────────────────┐
│ Plugin │
└──────────┬──────────────────────────┬───────────────────┘
│ │
▼ ▼
┌──────────────────────┐ ┌──────────────────────┐
│ PluginDatabase │ │ PluginStorage │
│ (Full SQL access) │ │ (Key-value store) │
├──────────────────────┤ ├──────────────────────┤
│ - Exec() │ │ - Get(key) │
│ - Query() │ │ - Set(key, value) │
│ - Transaction() │ │ - Delete(key) │
│ - CreateTable() │ │ - Keys(prefix) │
└──────────┬───────────┘ └──────────┬───────────┘
│ │
└────────────┬─────────────┘
▼
┌──────────────────────────┐
│ PostgreSQL Database │
│ - plugin_*_* tables │
│ - plugin_storage table │
└──────────────────────────┘
**Tier 1: PluginDatabase** (SQL access):
- Use when: Complex queries, joins, aggregations needed
- Examples: Analytics queries, report generation, data mining
- Namespace: Tables prefixed with `plugin_{pluginName}_`
- Power: Full SQL capabilities
**Tier 2: PluginStorage** (key-value):
- Use when: Simple get/set operations sufficient
- Examples: Cache, preferences, flags, counters
- Namespace: Rows filtered by `plugin_name` column
- Simplicity: No SQL required
Namespace Isolation ¶
**Why namespace plugin data?**
- Prevents naming conflicts (Plugin A "users" vs. Plugin B "users")
- Enables cleanup (drop all `plugin_X_*` tables on uninstall)
- Security: Plugins can't access other plugins' data
- Monitoring: Track storage per plugin
**PluginDatabase Namespacing** (table prefix):
Plugin: streamspace-analytics
CreateTable("metrics", "id SERIAL, value INT")
→ Creates table: plugin_streamspace_analytics_metrics
**PluginStorage Namespacing** (row filter):
Plugin: streamspace-analytics
Set("last_sync", "2025-01-15")
→ INSERT INTO plugin_storage (plugin_name, key, value)
VALUES ('streamspace-analytics', 'last_sync', '"2025-01-15"')
Transaction Support ¶
PluginDatabase provides transaction support for atomic operations:
db.Transaction(func(tx *sql.Tx) error {
// Multiple operations in transaction
tx.Exec("UPDATE plugin_analytics_metrics SET count = count + 1")
tx.Exec("INSERT INTO plugin_analytics_log ...")
return nil // Commit
// return err // Rollback
})
**Why transactions?**
- Atomicity: All-or-nothing (prevents partial updates)
- Consistency: Enforce constraints across operations
- Isolation: Concurrent plugins don't see intermediate state
PluginStorage Format ¶
**Schema**:
CREATE TABLE plugin_storage (
plugin_name TEXT NOT NULL,
key TEXT NOT NULL,
value JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (plugin_name, key)
)
**Why JSONB value type?**
- Stores any data type (string, number, object, array)
- Efficient querying (JSONB operators: ->, ->>, @>, etc.)
- No schema evolution (flexible structure)
- Example: {"count": 42, "lastSync": "2025-01-15", "enabled": true}
**Primary Key** (plugin_name, key):
- Ensures unique keys within plugin namespace
- Enables efficient Get/Set/Delete (index lookup)
- Prevents duplicate keys
Performance Characteristics ¶
**PluginDatabase**:
- Exec: O(query complexity) - same as raw SQL
- Query: O(result size) - depends on SELECT
- Transaction: +1ms overhead (BEGIN/COMMIT)
- CreateTable: One-time operation (typically in OnLoad)
**PluginStorage**:
- Get: O(1) - indexed lookup on (plugin_name, key)
- Set: O(1) - UPSERT with indexed columns
- Delete: O(1) - indexed DELETE
- Keys: O(n) - full scan of plugin's rows (use sparingly)
- Typical latency: 1-2ms per operation
Known Limitations ¶
**No query builder**: Plugins write raw SQL (SQL injection risk if not careful) - Mitigation: Always use parameterized queries ($1, $2, ...) - Future: Provide query builder library
**No automatic migrations**: Plugin must handle schema changes - Example: Add column, migrate data, drop old column - Future: Migration framework for plugins
**No distributed transactions**: Can't atomically update storage + external API - Workaround: Use compensation logic (undo on failure) - Future: Two-phase commit support
**PluginStorage not indexed by value**: Can't query "all keys where value = X" - Workaround: Use PluginDatabase for complex queries - PluginStorage designed for simple get/set only
**No quota enforcement**: Plugin can consume unlimited storage - Future: Per-plugin storage quotas - Workaround: Monitor disk usage, set limits externally
Security Considerations ¶
**SQL Injection**:
- Plugin code can execute arbitrary SQL
- Must use parameterized queries: db.Exec("SELECT * FROM t WHERE id = $1", id)
- Never interpolate user input: db.Exec("SELECT * FROM t WHERE id = " + id) ❌
**Access Control**:
- Plugins can access entire database (not sandboxed)
- Trust model: Plugins are trusted code (same as runtime)
- Future: Database-level permissions (CREATE USER per plugin)
**Data Validation**:
- No automatic validation of JSONB values
- Plugin responsible for schema validation
- Future: JSON Schema validation
See also:
- api/internal/plugins/runtime.go: Plugin lifecycle management
- api/internal/db/database.go: Main database connection
Package plugins - discovery.go ¶
This file implements plugin discovery for both built-in and dynamic plugins.
Plugin Discovery System ¶
StreamSpace supports two types of plugins:
- **Built-in plugins**: Compiled into the binary using Go's init() pattern
- **Dynamic plugins**: Loaded at runtime from .so files using Go's plugin package
This dual-plugin architecture enables:
- Core plugins shipped with the application (built-in)
- Third-party plugins installed by users (dynamic)
- Hot-reload of dynamic plugins without restarting
- Plugin sandboxing (future: dynamic plugins in containers)
Built-in Plugins ¶
Built-in plugins are registered using the global registry (registry.go) and imported directly into the API binary. They are:
- **Faster**: No file I/O or symbol resolution overhead
- **More reliable**: Guaranteed to be available (no missing .so files)
- **Type-safe**: Compile-time checking of interface implementation
- **Smaller**: No duplicate code between plugin and API
Examples: streamspace-analytics, streamspace-audit, streamspace-billing
Registration:
// In plugin package
func init() {
plugins.Register("analytics", NewAnalyticsPlugin)
}
// In API main.go
import _ "github.com/streamspace/plugins/analytics"
Dynamic Plugins ¶
Dynamic plugins are compiled as Go shared objects (.so files) and loaded at runtime using Go's plugin package. They must:
- Be built with the same Go version as the API server
- Export a "NewPlugin" function with signature: func() PluginHandler
- Be placed in a plugin directory (/plugins, ./plugins, etc.)
Building a dynamic plugin:
go build -buildmode=plugin -o my-plugin.so my-plugin.go
Plugin structure:
package main
import "github.com/streamspace-dev/streamspace/api/internal/plugins"
type MyPlugin struct{}
func (p *MyPlugin) OnLoad(ctx *plugins.PluginContext) error {
// Plugin initialization
return nil
}
// ... other PluginHandler methods
// Required export
func NewPlugin() plugins.PluginHandler {
return &MyPlugin{}
}
Discovery Process ¶
When the runtime starts, plugin discovery happens in this order:
- **Built-in plugins**: Already registered in global registry
- **Dynamic plugins**: Filesystem scan for .so files
- **Merge lists**: Combined list of available plugins
- **Load requested**: Only load plugins that are enabled in database
Flow diagram:
┌─────────────────────────────────────────────────────────┐
│ Plugin Discovery Start │
└──────────────────────┬──────────────────────────────────┘
│
┌───────────────┴───────────────┐
▼ ▼
┌─────────────────┐ ┌─────────────────────┐
│ Built-in │ │ Dynamic Plugin │
│ Plugins │ │ Scan │
│ (registry) │ │ (.so files) │
└────────┬────────┘ └─────────┬───────────┘
│ │
└─────────────┬───────────────┘
▼
┌──────────────────────────────┐
│ Merge Plugin Lists │
│ (built-in + dynamic) │
└──────────────┬───────────────┘
│
▼
┌──────────────────────────────┐
│ Filter by Enabled Status │
│ (query database) │
└──────────────┬───────────────┘
│
▼
┌──────────────────────────────┐
│ Load Selected Plugins │
│ into Runtime │
└──────────────────────────────┘
Plugin Directories ¶
Dynamic plugins are searched in multiple directories (in order):
- /plugins - Container/production deployment
- ./plugins - Local development
- /usr/local/share/streamspace/plugins - System-wide install
Directory structure:
/plugins/
├── analytics.so # Direct placement
├── streamspace-billing.so # With prefix
└── custom-plugin/ # Subdirectory
└── custom-plugin.so
Plugin Loading Strategy ¶
The discovery system uses lazy loading:
- Discovery finds all available plugins (cheap scan)
- Loading only happens for enabled plugins (expensive operation)
- Dynamic plugins are cached after first load (avoid re-open)
Why lazy loading?
- Faster startup (don't load disabled plugins)
- Lower memory usage (only active plugins in memory)
- Supports large plugin directories (100+ plugins)
Caching Behavior ¶
Dynamic plugins are cached after loading:
- First LoadPlugin: Opens .so file, resolves symbols
- Subsequent calls: Reuse cached plugin.Plugin object
- Cache persists for lifetime of discovery instance
This avoids:
- Repeated file I/O
- Symbol resolution overhead
- Memory duplication
Error Handling ¶
Discovery is resilient to errors:
- Missing directories: Silently skipped
- Unreadable files: Logged and skipped
- Invalid plugins: Logged but don't abort discovery
- Symbol resolution errors: Returned to caller
This ensures that one broken plugin doesn't prevent others from loading.
Go Plugin Package Limitations ¶
Dynamic plugin loading uses Go's plugin package, which has limitations:
- **Linux only**: Go plugins only work on Linux (not Windows/Mac)
- **Version matching**: Plugin must be built with exact same Go version
- **No unload**: Once loaded, plugins can't be unloaded (memory leak)
- **Symbol export**: Must export exactly "NewPlugin" with correct signature
- **Dependency hell**: Plugin and API must use compatible package versions
Future alternatives being considered:
- WebAssembly plugins (cross-platform, sandboxed)
- gRPC-based plugins (out-of-process, language-agnostic)
- Lua/JavaScript embedding (lightweight scripting)
Performance Characteristics ¶
Discovery performance:
- Built-in plugin lookup: O(1) hash map access (~1μs)
- Dynamic plugin scan: O(n) filesystem walk (~10ms for 100 plugins)
- Plugin load (dynamic): ~50ms per plugin (file I/O + symbol resolution)
Memory usage:
- Built-in plugin: ~0 bytes (already in binary)
- Dynamic plugin cache: ~10 KB per plugin (plugin.Plugin struct)
Security Considerations ¶
Dynamic plugins run with full API privileges:
- Same memory space as API server
- No sandboxing or isolation
- Can access all Go packages
- Malicious plugins can compromise entire system
Security recommendations:
- Only load trusted plugins (verify signatures)
- Use built-in plugins for critical functionality
- Future: Container-based plugin sandboxing
- Future: Capability-based security model
Package plugins - event_bus.go
This file implements the event bus for plugin event distribution.
The EventBus provides a publish-subscribe (pub/sub) pattern for delivering platform events to plugins. It enables loose coupling between the platform and plugins, allowing plugins to react to events without being directly called.
Architecture ¶
The event bus follows a classic pub/sub pattern:
┌─────────────────────────────────────────────────────────┐
│ Platform Code │
│ (API handlers, controllers, background workers) │
└──────────────────────┬──────────────────────────────────┘
│ EmitEvent("session.created", data)
▼
┌─────────────────────────────────────────────────────────┐
│ Event Bus │
│ - Maintains subscriber registry (event → handlers) │
│ - Routes events to all matching subscribers │
│ - Executes handlers in parallel goroutines │
│ - Recovers from handler panics (isolation) │
└──────────┬──────────┬──────────┬──────────┬────────────┘
▼ ▼ ▼ ▼
Plugin A Plugin B Plugin C Plugin D
(Analytics) (Billing) (Audit) (Slack)
Event Delivery Model ¶
**Asynchronous by default**:
- Emit() returns immediately, handlers run in background
- No blocking on slow plugins (e.g., network calls)
- Suitable for most use cases (fire-and-forget)
**Synchronous option**:
- EmitSync() waits for all handlers to complete
- Returns errors from all handlers
- Use when event ordering matters or errors must be handled
Subscription Management ¶
Subscribers are tracked using a compound key: "eventType:pluginName"
- Allows multiple handlers per event (different plugins)
- Enables efficient cleanup when plugin unloads (UnsubscribeAll)
- Prevents key collisions between plugins
Example subscriber registry:
subscribers = map[string][]EventHandler{
"session.created:analytics": [handler1, handler2],
"session.created:billing": [handler3],
"user.login:audit": [handler4],
}
Concurrency Model ¶
The event bus is designed for high-concurrency environments:
- **RWMutex**: Protects subscriber registry
- **Concurrent reads**: Multiple Emit() calls can read subscribers simultaneously
- **Goroutine per handler**: Each handler runs in isolation
- **Panic recovery**: Handler panics don't crash the event bus
Performance characteristics:
- Emit latency: <1ms (just spawns goroutines)
- EmitSync latency: Depends on slowest handler
- Memory overhead: ~2 KB per goroutine
Error Handling ¶
The event bus is resilient to handler failures:
- **Handler errors**: Logged but don't affect other handlers
- **Handler panics**: Recovered with stack trace logged
- **No cascading failures**: One plugin can't break others
Example: If 5 plugins subscribe to "session.created" and 2 of them panic, the other 3 still process the event successfully.
Event Namespacing ¶
Platform events vs. plugin events:
- **Platform events**: Emitted by StreamSpace code (session.*, user.*)
- **Plugin events**: Emitted by plugins, prefixed with "plugin.{name}.*"
Example plugin event: "plugin.analytics.report_generated"
Performance Optimization ¶
The event bus is optimized for high-throughput event processing:
- **Lazy handler collection**: Handlers collected under read lock
- **Lock-free execution**: Handlers run after lock is released
- **No buffering**: Events processed immediately (no queue)
Benchmark data (1000 events/sec, 10 subscribers per event):
- CPU usage: ~5% (mostly handler execution, not event bus overhead)
- Memory: ~20 MB for 10,000 in-flight goroutines
- Latency p50: <1ms, p99: <5ms
Known Limitations ¶
- **No event persistence**: Events lost if no subscribers (not a queue)
- **No replay**: Can't re-deliver events after they're emitted
- **No filtering**: All subscribers receive all events of that type
- **No ordering across types**: session.created may process before user.created
Future enhancements:
- Event filtering (e.g., only sessions for user X)
- Event persistence for audit log
- Replay capability for debugging
- Priority-based delivery
Package plugins provides the plugin system for StreamSpace API.
The logger component provides structured JSON logging for plugins with automatic plugin name tagging. This enables centralized log aggregation and filtering.
Design Rationale - Why Structured Logging:
Traditional logging:
log.Printf("Plugin %s: User %s logged in", pluginName, userID)
Output: "Plugin slack: User user123 logged in"
Problem: Hard to parse, filter, and aggregate
Structured logging:
logger.Info("User logged in", map[string]interface{}{
"user_id": "user123",
})
Output: {"plugin":"slack","level":"INFO","message":"User logged in","data":{"user_id":"user123"},"timestamp":"2025-01-15T10:30:00Z"}
Benefits: Machine-parsable, queryable, aggregatable
Log Aggregation Benefits:
Filter by plugin: jq 'select(.plugin == "slack")' logs.json
Filter by level: jq 'select(.level == "ERROR")' logs.json
Extract structured data: jq '.data.user_id' logs.json
Time-series analysis: jq 'select(.timestamp > "2025-01-15T10:00:00Z")' logs.json
Log Levels:
- DEBUG: Detailed diagnostic information
- INFO: General informational messages
- WARN: Warning messages (potential issues)
- ERROR: Error messages (handled errors)
- FATAL: Fatal errors (plugin should stop, but doesn't exit process)
Field Helpers:
The logger supports pre-configured fields via WithField/WithFields:
userLogger := logger.WithField("user_id", "user123")
userLogger.Info("Session started")
userLogger.Info("Session stopped")
// Both logs include "user_id": "user123"
Integration with Log Aggregation Systems:
- Elasticsearch: Ingest JSON logs directly
- Splunk: Parse JSON with automatic field extraction
- CloudWatch: JSON format enables CloudWatch Insights queries
- Datadog: Structured logs enable faceted search
Performance:
- JSON marshaling: ~500ns per log entry
- No reflection overhead (manual struct creation)
- Async write to stdout (buffered by Go runtime)
Package plugins - marketplace.go ¶
This file implements the plugin marketplace for discovery, installation, and updates.
The marketplace provides a centralized location for users to discover and install community and official plugins from external repositories (GitHub, private registries).
Why a Plugin Marketplace is Important ¶
**Discovery**: Users need a way to find plugins without manual searching
- Catalog of 100+ available plugins
- Category-based browsing (Analytics, Security, Integrations)
- Search by tags, keywords, features
**Ease of Installation**: One-click install instead of manual deployment
- Automatic download from repository
- Dependency resolution (future)
- Configuration wizard (future)
**Updates**: Centralized version management
- Update notifications when new versions available
- Automatic updates (opt-in)
- Changelog and release notes
**Security**: Vetted plugins from trusted sources
- Official plugins signed by StreamSpace
- Community plugins with ratings/reviews
- Security scanning (future)
Architecture: Repository-Based Distribution ¶
┌─────────────────────────────────────────────────────────┐
│ GitHub Repository │
│ (streamspace-plugins) │
│ - catalog.json: List of all available plugins │
│ - Each plugin: manifest.json, code, README │
└──────────────────────┬──────────────────────────────────┘
│ HTTPS (raw.githubusercontent.com)
▼
┌─────────────────────────────────────────────────────────┐
│ Plugin Marketplace (This File) │
│ 1. Fetch catalog.json (cached 15 min) │
│ 2. Parse available plugins │
│ 3. Download .tar.gz or individual files │
│ 4. Extract to /plugins/{name}/ │
│ 5. Register in database (installed_plugins table) │
└──────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Plugin Runtime │
│ - LoadPlugin() to initialize │
│ - OnLoad() hook called │
│ - Plugin becomes active │
└─────────────────────────────────────────────────────────┘
Catalog Structure ¶
The catalog.json file in the repository lists all available plugins:
[
{
"name": "streamspace-analytics",
"version": "1.2.3",
"displayName": "Analytics Dashboard",
"description": "Real-time session analytics and reporting",
"author": "StreamSpace Team",
"category": "Analytics",
"tags": ["analytics", "dashboard", "reporting"],
"iconUrl": "https://...",
"downloadUrl": "https://github.com/.../releases/download/...",
"manifest": { /* plugin capabilities */ }
}
]
Installation Flow ¶
- **User clicks "Install"** in UI → POST /api/plugins/install
- **Marketplace.SyncCatalog()**: Fetch latest catalog (if cache expired)
- **Marketplace.GetPlugin()**: Lookup plugin in catalog
- **Marketplace.downloadPlugin()**: Download .tar.gz from GitHub releases
- **Marketplace.extractTarGz()**: Extract to /plugins/{name}/
- **Marketplace.registerPluginInDatabase()**: Insert into installed_plugins
- **Runtime.LoadPlugin()**: Load plugin into runtime (if enabled)
- **User sees "Installed" badge** in UI
Caching Strategy ¶
The catalog is cached to reduce GitHub API calls:
- Cache TTL: 15 minutes (configurable)
- Invalidated on: Manual refresh, API rate limit errors
- Stored in: Memory map (availablePlugins)
- Persistent copy: catalog_plugins database table
This prevents hitting GitHub's rate limit (60 requests/hour unauthenticated).
Download Methods ¶
**Method 1: GitHub Releases (.tar.gz)**:
- Preferred for official plugins
- Example: https://github.com/foo/bar/releases/download/v1.0.0/plugin.tar.gz
- Contains: manifest.json, code files, README.md, LICENSE
- Integrity: SHA256 checksum (future)
**Method 2: Raw GitHub Content** (fallback):
- For development/testing
- Downloads individual files (manifest.json, plugin.go, README.md)
- Example: https://raw.githubusercontent.com/foo/bar/main/manifest.json
- No versioning (always latest)
Security Considerations ¶
**Current Implementation** (minimal security):
- Downloads over HTTPS (prevents MITM)
- No signature verification
- No malware scanning
- Trusts repository content
**Future Enhancements**:
- GPG signature verification
- SHA256 checksum validation
- Virus/malware scanning (ClamAV)
- Sandboxed execution
- Permission system (plugin can only access X)
Known Limitations ¶
- **No dependency resolution**: Plugins can't depend on other plugins
- **No rollback**: Can't easily uninstall/revert to previous version
- **No sandboxing**: Plugins run in same process (can access everything)
- **No private registries**: Only supports GitHub public repos (OAuth future)
- **No version constraints**: Can't specify "plugin X requires version Y"
See also:
- api/internal/plugins/runtime.go: Plugin loading and lifecycle
- api/internal/handlers/plugins.go: API endpoints for marketplace
- ui/src/pages/PluginCatalog.tsx: Marketplace UI
Package plugins - registry.go ¶
This file implements the global plugin registry for automatic plugin discovery.
The global registry provides a centralized location for plugins to register themselves at initialization time, enabling automatic plugin discovery without explicit configuration or hardcoded plugin lists.
Auto-Registration Pattern ¶
Plugins register themselves using Go's init() function pattern:
// In plugin file: plugins/my-plugin/main.go
package main
import "github.com/streamspace-dev/streamspace/api/internal/plugins"
func init() {
plugins.Register("my-plugin", func() plugins.PluginHandler {
return &MyPlugin{}
})
}
This registration happens automatically when the plugin package is imported, without requiring explicit registration calls in application code.
Benefits of Auto-Registration ¶
- **No hardcoded plugin lists**: Add new plugin = just import it
- **Compile-time discovery**: Plugins discovered at build time
- **Type safety**: Factory functions enforce PluginHandler interface
- **Clean initialization**: No manual "register all plugins" code
How It Works ¶
The registration flow:
┌─────────────────────────────────────────────────────────┐
│ 1. Go Program Startup │
│ - All imported packages' init() functions run │
└──────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 2. Plugin init() Functions Execute │
│ - Each plugin calls plugins.Register() │
│ - Factory functions stored in globalRegistry │
└──────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 3. Runtime Startup │
│ - Runtime queries globalRegistry.GetAll() │
│ - Calls factory functions to create plugin instances│
│ - Plugins loaded into runtime │
└─────────────────────────────────────────────────────────┘
Factory Function Pattern ¶
Plugins are registered using factory functions, not instances:
type PluginFactory func() PluginHandler
Why factory functions?
- Allows runtime to create fresh instances (stateless)
- Supports multiple instances if needed
- Enables testing with mock implementations
- Defer initialization until runtime starts
Example factory:
func MyPluginFactory() plugins.PluginHandler {
return &MyPlugin{
config: make(map[string]interface{}),
state: "initialized",
}
}
Global vs. Local Registries ¶
**Global Registry** (this file):
- Package-level singleton
- Populated at program startup (init functions)
- Used for built-in plugins
- Thread-safe for concurrent access
**Discovery Registry** (discovery.go):
- Instance-level registry
- Combines global registry + catalog plugins
- Handles external plugins from database
- Used by runtime for plugin loading
Thread Safety ¶
The global registry is thread-safe:
- RWMutex protects the plugins map
- Multiple goroutines can call Register() concurrently
- Readers (Get, GetAll) don't block each other
- Safe to access during and after initialization
Duplicate Registration ¶
If a plugin is registered twice:
- Warning is logged to console
- Second registration overwrites the first
- This allows hot-reload scenarios (reload = re-register)
Known Limitations ¶
- **No unregister**: Once registered, plugins can't be removed
- **No versioning**: Can't register multiple versions of same plugin
- **Build-time only**: Can't dynamically register plugins at runtime
- **No dependencies**: Can't express plugin dependencies
Future enhancements:
- Support for plugin versioning (multiple versions co-existing)
- Dependency graph resolution
- Runtime dynamic registration (hot plugin upload)
- Unregister for cleanup/testing
Package plugins implements the StreamSpace plugin system runtime.
The plugin runtime is the core execution environment that manages the complete lifecycle of plugins, from loading to unloading, and provides the foundation for platform extensibility.
Architecture Overview ¶
The plugin system follows a modular architecture with clear separation of concerns:
┌─────────────────────────────────────────────────────────────┐
│ Plugin Runtime │
│ - Lifecycle Management (Load/Unload/Enable/Disable) │
│ - Event Distribution (Pub/Sub to 16 platform events) │
│ - Resource Isolation (Per-plugin namespacing) │
│ - Concurrency Control (Thread-safe plugin execution) │
└──────────────┬──────────────────────────────────────────────┘
│
┌───────┴────────┬──────────────┬─────────────┐
▼ ▼ ▼ ▼
EventBus APIRegistry UIRegistry Scheduler
(Pub/Sub) (REST APIs) (UI Hooks) (Cron Jobs)
Plugin Lifecycle ¶
Plugins go through a well-defined lifecycle managed by the runtime:
- **Discovery**: Plugin manifest loaded from catalog_plugins table
- **Installation**: Plugin entry created in installed_plugins table
- **Loading**: Plugin code loaded into memory, context initialized
- **OnLoad Hook**: Plugin performs one-time initialization
- **Enabling**: Plugin marked as enabled, starts receiving events
- **OnEnable Hook**: Plugin activates background workers, registers APIs
- **Runtime**: Plugin handles events, serves API requests, runs jobs
- **Disabling**: Plugin stops receiving new events (OnDisable hook)
- **OnUnload Hook**: Plugin cleans up resources
10. **Unloading**: Plugin removed from memory, all resources released
Concurrency Model ¶
The runtime is designed for high-concurrency environments with multiple plugins processing events simultaneously:
- **Read-Write Mutex**: Protects the plugins map for concurrent access
- **Goroutine per Event**: Each event handler runs in a separate goroutine
- **Panic Recovery**: Plugin panics are isolated and logged, not affecting other plugins or the platform
- **No Blocking**: Event emission is fully asynchronous (fire-and-forget)
Example: When a session is created, the runtime emits a "session.created" event to 10 loaded plugins in parallel. If one plugin panics or takes 30s to process, other plugins are unaffected.
Resource Isolation ¶
Each plugin runs in its own isolated context with namespaced resources:
- **Database Tables**: Plugin tables prefixed with "plugin_{name}_"
- **API Routes**: Plugin routes prefixed with "/api/plugins/{name}/"
- **UI Components**: Plugin UI components namespaced in React
- **Event Handlers**: Plugin event subscriptions tracked separately
- **Scheduled Jobs**: Plugin cron jobs tagged with plugin name
- **Logs**: Plugin logs prefixed with "[Plugin: {name}]"
This isolation ensures:
- Plugins cannot interfere with each other
- Unloading a plugin cleanly removes all its resources
- Plugin failures don't cascade to other plugins
- Security boundaries between plugin code
Event System ¶
The runtime provides 16 platform events that plugins can subscribe to:
**Session Events** (6 events):
- session.created: New session requested (before pod created)
- session.started: Session pod running and ready
- session.stopped: Session gracefully stopped by user
- session.hibernated: Session scaled to zero (auto-hibernation)
- session.woken: Hibernated session resumed (scaled back to 1)
- session.deleted: Session permanently deleted
**User Events** (5 events):
- user.created: New user account created
- user.updated: User profile or settings changed
- user.deleted: User account deleted
- user.login: User authenticated successfully
- user.logout: User session ended
Event handlers are called asynchronously and receive the full object (Session or User model) as the data parameter.
Performance Characteristics ¶
The runtime is optimized for low-latency event processing:
- **Event Emission**: O(1) - no blocking, events queued immediately
- **Plugin Lookup**: O(1) - hash map lookup with RWMutex
- **Context Creation**: O(1) - pre-allocated context objects
- **Memory Overhead**: ~1-2 MB per loaded plugin (varies by plugin)
Benchmark data (100 plugins loaded, 1000 events/sec):
- Event emission latency: <1ms p50, <5ms p99
- Plugin load time: 10-50ms per plugin
- Memory usage: 150 MB for 100 plugins
Error Handling Strategy ¶
The runtime follows a "fail gracefully" approach:
- **Plugin Load Errors**: Logged and skipped, other plugins continue loading
- **Event Handler Errors**: Logged but don't affect other handlers
- **Plugin Panics**: Recovered with stack trace logged
- **Unload Errors**: Logged but unload continues (best-effort cleanup)
This ensures platform stability even when plugins misbehave.
Security Considerations ¶
The runtime provides several security boundaries:
- **Database Isolation**: Plugins can only access their own tables via PluginDatabase API (no direct database access)
- **API Authentication**: Plugin API routes inherit platform auth middleware
- **Resource Limits**: Future: CPU/memory limits per plugin (cgroups)
- **Sandbox Mode**: Future: Run untrusted plugins in containers
Current limitations:
- Plugins run in the same process (shared memory space)
- No CPU/memory limits enforced yet
- Plugin code must be trusted (no sandboxing)
Usage Example ¶
// Initialize runtime with database connection
runtime := NewRuntime(database)
// Start runtime and load enabled plugins
if err := runtime.Start(ctx); err != nil {
log.Fatal(err)
}
// Emit events as platform actions occur
runtime.EmitEvent("session.created", sessionData)
runtime.EmitEvent("user.login", userData)
// Gracefully shutdown runtime
defer runtime.Stop(ctx)
Related Documentation ¶
- PLUGIN_DEVELOPMENT.md: Guide for creating custom plugins
- docs/PLUGIN_API.md: Complete API reference for plugin developers
- api/internal/plugins/discovery.go: Plugin discovery and installation
- api/internal/plugins/event_bus.go: Event distribution implementation
Known Limitations ¶
- **No Hot Reload**: Plugins must be unloaded and reloaded to update code
- **No Dependency Management**: Plugins cannot depend on other plugins
- **No Version Constraints**: Installing multiple versions not supported
- **No Resource Limits**: Plugins can consume unlimited CPU/memory
- **In-Process Only**: Plugins run in API process (no out-of-process plugins)
Future enhancements planned for Phase 6:
- Hot reload with zero downtime
- Plugin dependency graph resolution
- Resource quotas per plugin
- Out-of-process plugin execution via gRPC
- WebAssembly plugin support for sandboxing
Package plugins provides the plugin system for StreamSpace API.
The runtime_v2 component is the central orchestrator that manages the entire plugin lifecycle, from discovery to loading, execution, and cleanup.
Design Rationale - Why RuntimeV2:
RuntimeV2 is an evolution of the original Runtime that adds:
- Automatic discovery of available plugins (filesystem + built-in)
- Database-driven plugin loading (loads only enabled plugins)
- Auto-start capability (plugins load on API startup)
- Integrated event bus for inter-plugin communication
- Centralized registries (API, UI, Events, Scheduler)
Plugin Lifecycle Flow:
┌─────────────────────────────────────────────────────────────┐
│ 1. DISCOVERY │
│ - Scan plugin directories for .so files │
│ - Enumerate built-in plugins │
│ - Build catalog of available plugins │
└────────────────────────┬────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 2. DATABASE QUERY │
│ - SELECT * FROM installed_plugins WHERE enabled = true │
│ - Load plugin configuration from database │
│ - Load plugin manifest (metadata, permissions, etc.) │
└────────────────────────┬────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 3. PLUGIN LOADING │
│ - Load plugin handler via discovery system │
│ - Create PluginContext with all helper components │
│ - Initialize plugin instance │
│ - Call OnLoad() lifecycle hook │
└────────────────────────┬────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 4. RUNTIME EXECUTION │
│ - Handle lifecycle events (sessions, users, etc.) │
│ - Execute scheduled jobs via cron scheduler │
│ - Process API requests via registered endpoints │
│ - Render UI components via registered components │
└────────────────────────┬────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 5. SHUTDOWN │
│ - Call OnUnload() lifecycle hook for each plugin │
│ - Remove scheduled jobs │
│ - Unregister API endpoints │
│ - Unregister UI components │
│ - Cleanup event subscriptions │
└─────────────────────────────────────────────────────────────┘
Event-Driven Architecture:
RuntimeV2 acts as an event hub, broadcasting system events to all loaded plugins. This enables plugins to react to platform events without tight coupling:
// When a session is created, runtime broadcasts to all plugins:
runtime.EmitEvent("session.created", sessionData)
// Each loaded plugin receives the event via its OnSessionCreated hook:
func (p *MyPlugin) OnSessionCreated(ctx *PluginContext, session interface{}) error {
// React to session creation
return nil
}
Event Types:
- session.created, session.started, session.stopped
- session.hibernated, session.woken, session.deleted
- user.created, user.updated, user.deleted
- user.login, user.logout
Automatic Discovery vs Manual Loading:
RuntimeV2 supports two plugin loading modes:
Auto-start (default): Automatically loads all enabled plugins from database - Best for: Production deployments - Use case: Plugins are managed via UI/API, enabled state in database - Example: Admin enables "slack-notifications" via UI → loads on restart
Manual loading: Plugins must be loaded via API calls - Best for: Development, testing, debugging - Use case: Fine-grained control over plugin loading - Example: Load specific plugin version for testing
Database Schema Integration:
RuntimeV2 relies on two main database tables:
installed_plugins: - id, name, version, enabled, config, catalog_plugin_id - Tracks which plugins are installed and their configuration - enabled=true → plugin loads on startup (auto-start mode) catalog_plugins: - id, name, version, manifest, source_url, ... - Plugin catalog metadata (description, icon, permissions, etc.) - Linked to installed_plugins via catalog_plugin_id
Plugin Context Components:
Each loaded plugin receives a PluginContext with access to:
- Database: Namespaced table access (plugin_name_*)
- Events: Pub/sub event system (subscribe to platform events)
- API: HTTP endpoint registration (/api/plugins/{name}/*)
- UI: Component registration (widgets, pages, menu items)
- Storage: Key-value storage (plugin configuration)
- Logger: Structured JSON logging with plugin name tagging
- Scheduler: Cron-based job scheduling
Thread Safety:
RuntimeV2 uses sync.RWMutex for thread-safe plugin registry access:
- Read lock: GetPlugin, ListPlugins (concurrent reads allowed)
- Write lock: LoadPlugin, UnloadPlugin (exclusive access)
- Event emission: Read lock + goroutines (non-blocking)
Performance Characteristics:
- Discovery: O(n) filesystem scan + O(m) built-in enumeration
- Database query: Single SELECT with indexed enabled column
- Plugin loading: Sequential, ~100-500ms per plugin (OnLoad hook latency)
- Event emission: O(n) plugins, each in separate goroutine (non-blocking)
- Shutdown: Sequential unload, ~50-200ms per plugin (OnUnload hook latency)
Example Usage:
// Create runtime with plugin directories
runtime := NewRuntimeV2(database, "/opt/plugins", "/usr/local/plugins")
// Optional: Disable auto-start for development
runtime.SetAutoStart(false)
// Optional: Register built-in plugins
runtime.RegisterBuiltinPlugin("analytics", &AnalyticsPlugin{})
// Start runtime (auto-loads enabled plugins from database)
if err := runtime.Start(ctx); err != nil {
log.Fatal(err)
}
// Emit events during platform operation
runtime.EmitEvent("session.created", sessionData)
// Graceful shutdown
defer runtime.Stop(ctx)
Package plugins - scheduler.go ¶
This file implements cron-based job scheduling for plugins, enabling plugins to run periodic tasks without blocking the main event loop.
The scheduler provides a simple API for plugins to schedule recurring jobs using standard cron expressions or convenient interval shortcuts.
Why Plugins Need Scheduling ¶
**Use Cases for Plugin Scheduling**:
- Analytics: Generate hourly reports, aggregate statistics
- Monitoring: Check system health every 5 minutes, send alerts
- Cleanup: Delete old data daily, purge expired sessions
- Sync: Pull data from external APIs every 15 minutes
- Notifications: Send daily summary emails
**Without Scheduling** (manual implementation):
- Plugin must create goroutine + time.Ticker
- Hard to manage multiple jobs (one goroutine per job)
- No built-in error recovery (panic kills goroutine)
- Difficult to cleanup on plugin unload
- No easy way to list/remove jobs
**With Scheduler** (this implementation):
- Simple API: scheduler.Schedule("daily-report", "@daily", func)
- Cron library handles timing (accurate, efficient)
- Automatic error recovery (panics logged, job continues)
- RemoveAll() on plugin unload (cleanup guaranteed)
- ListJobs() for debugging
Architecture: Per-Plugin Scheduler ¶
┌─────────────────────────────────────────────────────────┐
│ Global Cron Instance (shared across all plugins) │
│ - Single background goroutine │
│ - Manages all scheduled jobs │
│ - Runs jobs at specified times │
└──────────────────────┬──────────────────────────────────┘
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Plugin A │ │ Plugin B │ │ Plugin C │
│ Scheduler │ │ Scheduler │ │ Scheduler │
├──────────────┤ ├──────────────┤ ├──────────────┤
│ Jobs: │ │ Jobs: │ │ Jobs: │
│ - cleanup │ │ - sync │ │ - monitor │
│ - report │ │ - backup │ │ - alert │
└──────────────┘ └──────────────┘ └──────────────┘
**Why one scheduler per plugin?**
- Namespace isolation: Each plugin manages own jobs
- Easy cleanup: RemoveAll() removes only plugin's jobs
- Prevents naming conflicts: Plugin A "sync" vs. Plugin B "sync"
- Simplifies plugin code (don't need to prefix job names)
Cron Expression Format ¶
Standard 5-field cron syntax (minute hour day month weekday):
┌───────────── minute (0-59) │ ┌─────────── hour (0-23) │ │ ┌───────── day of month (1-31) │ │ │ ┌─────── month (1-12) │ │ │ │ ┌───── day of week (0-6, Sunday=0) │ │ │ │ │ * * * * *
**Examples**:
- "*/5 * * * *" → Every 5 minutes
- "0 * * * *" → Every hour (at minute 0)
- "0 0 * * *" → Daily at midnight
- "0 0 * * 0" → Weekly on Sunday at midnight
- "0 9,17 * * 1-5" → Weekdays at 9 AM and 5 PM
**Special strings**:
- "@hourly" → 0 * * * * (every hour)
- "@daily" → 0 0 * * * (every day at midnight)
- "@weekly" → 0 0 * * 0 (every Sunday at midnight)
- "@monthly" → 0 0 1 * * (first day of month at midnight)
Error Handling and Recovery ¶
**Job Panic Recovery**:
- Every job wrapped with defer/recover
- Panics logged but don't crash scheduler
- Job continues to run on next schedule
- Example: Job panics at 10:00, still runs at 10:05
**Why auto-recovery?**
- Plugin bugs shouldn't break scheduling
- Allows plugin debugging in production
- Scheduler remains reliable
- Alternative: Let panic kill goroutine (breaks all scheduled jobs)
Thread Safety ¶
The underlying cron library is thread-safe:
- Multiple plugins can call Schedule() concurrently
- Safe to add/remove jobs while cron is running
- RWMutex protects internal job registry
Performance Characteristics ¶
- Cron overhead: ~1ms CPU per tick (minimal)
- Memory: ~100 bytes per scheduled job
- Accuracy: ±1 second (good enough for most use cases)
- Max jobs: Unlimited (tested with 10,000+ jobs)
Known Limitations ¶
**No distributed scheduling**: Jobs run on single API instance - Problem: Multiple API replicas all run same jobs (duplicate work) - Future: Add distributed locking (Redis, PostgreSQL advisory locks)
**No job history**: Can't see when job last ran or if it failed - Future: Store job run history in database
**No job dependencies**: Can't chain jobs (run B after A completes) - Workaround: Use event bus to trigger dependent jobs
**Timezone issues**: All times in server timezone - Future: Support per-job timezone configuration
See also:
- api/internal/plugins/runtime.go: Plugin lifecycle management
- github.com/robfig/cron: Underlying cron library
Package plugins provides the plugin system for StreamSpace API.
The ui_registry component enables plugins to register custom UI components that are dynamically integrated into the React frontend. This allows plugins to extend the user interface without modifying core UI code.
Architecture:
┌─────────────────────────────────────────────────────────────┐
│ React Frontend (Browser) │
│ Fetches UI metadata from /api/plugins/ui/components │
└──────────────────────────┬──────────────────────────────────┘
│ HTTP API
↓
┌─────────────────────────────────────────────────────────────┐
│ UIRegistry │
│ - Widgets: Dashboard cards (session stats, alerts) │
│ - Pages: Full pages (/plugins/slack/messages) │
│ - AdminPages: Admin panel pages (/admin/plugins/slack) │
│ - MenuItems: Navigation menu entries │
│ - AdminWidgets: Admin dashboard widgets │
└──────────────────────────┬──────────────────────────────────┘
│ Registered by
↓
┌─────────────────────────────────────────────────────────────┐
│ Plugin OnLoad() │
│ ui.RegisterWidget({title: "Slack Stats", ...}) │
│ ui.RegisterPage({path: "/messages", ...}) │
│ ui.RegisterMenuItem({label: "Slack", ...}) │
└─────────────────────────────────────────────────────────────┘
UI Component Types:
Widgets: Dashboard cards on user home page - Position: top, sidebar, bottom - Width: full, half, third - Example: "Session Activity", "Quota Usage"
Pages: Full user-facing pages - Custom routes under /plugins/{name}/ - Example: /plugins/slack/messages
AdminPages: Admin panel pages - Custom routes under /admin/plugins/{name}/ - Example: /admin/plugins/slack/settings
MenuItems: Navigation menu entries - Appear in main navigation menu - Link to plugin pages or external URLs
AdminWidgets: Admin dashboard widgets - Similar to widgets but for admin dashboard - Example: "Plugin Health", "License Status"
Component Lifecycle:
- Plugin calls ui.RegisterWidget() during OnLoad()
- UIRegistry stores component metadata
- Frontend calls /api/plugins/ui/components
- Registry returns all component metadata as JSON
- React renders components dynamically
- Plugin unload removes components via UnregisterAll()
Dynamic UI Loading:
The frontend fetches component metadata and renders them dynamically:
// Frontend code
fetch('/api/plugins/ui/components')
.then(res => res.json())
.then(data => {
renderWidgets(data.widgets)
registerPages(data.pages)
updateMenu(data.menuItems)
})
Component Rendering:
Plugins can provide: - Component name (React component string) - Inline HTML/React JSX - URL to external React component bundle The frontend uses dynamic import() to load plugin components.
Thread Safety:
The registry uses sync.RWMutex for thread-safe concurrent access:
- Register methods: Exclusive lock (write)
- Get methods: Shared lock (read)
- Safe for plugins to register during parallel OnLoad() calls
Permissions:
UI components can declare required permissions. The frontend queries user permissions and only renders components the user can access:
ui.RegisterWidget(WidgetOptions{
Title: "Admin Stats",
Permissions: []string{"admin.read"}, // Only visible to admins
})
Component Cleanup:
When a plugin is unloaded:
- UnregisterAll(pluginName) removes all UI components
- Frontend polls for updates and removes components
- Prevents orphaned UI elements from unloaded plugins
Performance:
- Registration: O(1) map insertion
- Lookup: O(1) map access
- GetAll operations: O(n) iteration
- Memory: ~300 bytes per component registration
Future Enhancements:
- Hot reloading without frontend refresh
- Component versioning
- Server-side rendering (SSR) for plugin UIs
- Plugin UI theming and customization
- WebSocket-based real-time component updates
Index ¶
- func ListBuiltinPlugins() []string
- func Register(name string, factory PluginFactory)
- func RegisterBuiltinPlugin(name string, plugin PluginHandler)
- type APIRegistry
- func (r *APIRegistry) AttachToRouter(router *gin.RouterGroup)
- func (r *APIRegistry) GetEndpoints() []*PluginEndpoint
- func (r *APIRegistry) GetPluginEndpoints(pluginName string) []*PluginEndpoint
- func (r *APIRegistry) Register(pluginName string, endpoint *PluginEndpoint) error
- func (r *APIRegistry) Unregister(pluginName string, method string, path string)
- func (r *APIRegistry) UnregisterAll(pluginName string)
- type AdminPageOptions
- type BasePlugin
- func (p *BasePlugin) OnDisable(ctx *PluginContext) error
- func (p *BasePlugin) OnEnable(ctx *PluginContext) error
- func (p *BasePlugin) OnLoad(ctx *PluginContext) error
- func (p *BasePlugin) OnSessionCreated(ctx *PluginContext, session interface{}) error
- func (p *BasePlugin) OnSessionDeleted(ctx *PluginContext, session interface{}) error
- func (p *BasePlugin) OnSessionHibernated(ctx *PluginContext, session interface{}) error
- func (p *BasePlugin) OnSessionStarted(ctx *PluginContext, session interface{}) error
- func (p *BasePlugin) OnSessionStopped(ctx *PluginContext, session interface{}) error
- func (p *BasePlugin) OnSessionWoken(ctx *PluginContext, session interface{}) error
- func (p *BasePlugin) OnUnload(ctx *PluginContext) error
- func (p *BasePlugin) OnUserCreated(ctx *PluginContext, user interface{}) error
- func (p *BasePlugin) OnUserDeleted(ctx *PluginContext, user interface{}) error
- func (p *BasePlugin) OnUserLogin(ctx *PluginContext, user interface{}) error
- func (p *BasePlugin) OnUserLogout(ctx *PluginContext, user interface{}) error
- func (p *BasePlugin) OnUserUpdated(ctx *PluginContext, user interface{}) error
- type EndpointOptions
- type EventBus
- func (bus *EventBus) Emit(eventType string, data interface{})
- func (bus *EventBus) EmitSync(eventType string, data interface{}) []error
- func (bus *EventBus) Subscribe(eventType string, pluginName string, handler EventHandler)
- func (bus *EventBus) Unsubscribe(eventType string, pluginName string)
- func (bus *EventBus) UnsubscribeAll(pluginName string)
- type EventHandler
- type GlobalPluginRegistry
- type LoadedPlugin
- type LogEntry
- type MarketplacePlugin
- type MenuItemOptions
- type PageOptions
- type PluginAPI
- func (pa *PluginAPI) DELETE(path string, handler gin.HandlerFunc, permissions ...string) error
- func (pa *PluginAPI) GET(path string, handler gin.HandlerFunc, permissions ...string) error
- func (pa *PluginAPI) PATCH(path string, handler gin.HandlerFunc, permissions ...string) error
- func (pa *PluginAPI) POST(path string, handler gin.HandlerFunc, permissions ...string) error
- func (pa *PluginAPI) PUT(path string, handler gin.HandlerFunc, permissions ...string) error
- func (pa *PluginAPI) RegisterEndpoint(opts EndpointOptions) error
- func (pa *PluginAPI) Unregister(method string, path string)
- type PluginContext
- type PluginDatabase
- func (pd *PluginDatabase) CreateTable(tableName string, schema string) error
- func (pd *PluginDatabase) Exec(query string, args ...interface{}) (sql.Result, error)
- func (pd *PluginDatabase) Migrate(migrationSQL string) error
- func (pd *PluginDatabase) Query(query string, args ...interface{}) (*sql.Rows, error)
- func (pd *PluginDatabase) QueryRow(query string, args ...interface{}) *sql.Row
- func (pd *PluginDatabase) Transaction(fn func(*sql.Tx) error) error
- type PluginDiscovery
- func (pd *PluginDiscovery) DiscoverAll() ([]string, error)
- func (pd *PluginDiscovery) IsBuiltin(name string) bool
- func (pd *PluginDiscovery) ListBuiltin() []string
- func (pd *PluginDiscovery) ListDynamic() []string
- func (pd *PluginDiscovery) LoadPlugin(name string) (PluginHandler, error)
- func (pd *PluginDiscovery) RegisterBuiltin(name string, factory PluginFactory)
- type PluginEndpoint
- type PluginEvents
- type PluginFactory
- type PluginHandler
- type PluginInstance
- type PluginLogger
- func (pl *PluginLogger) Debug(message string, data ...map[string]interface{})
- func (pl *PluginLogger) Error(message string, data ...map[string]interface{})
- func (pl *PluginLogger) Fatal(message string, data ...map[string]interface{})
- func (pl *PluginLogger) Info(message string, data ...map[string]interface{})
- func (pl *PluginLogger) Warn(message string, data ...map[string]interface{})
- func (pl *PluginLogger) WithField(key string, value interface{}) *PluginLoggerWithFields
- func (pl *PluginLogger) WithFields(fields map[string]interface{}) *PluginLoggerWithFields
- type PluginLoggerWithFields
- func (plwf *PluginLoggerWithFields) Debug(message string, data ...map[string]interface{})
- func (plwf *PluginLoggerWithFields) Error(message string, data ...map[string]interface{})
- func (plwf *PluginLoggerWithFields) Fatal(message string, data ...map[string]interface{})
- func (plwf *PluginLoggerWithFields) Info(message string, data ...map[string]interface{})
- func (plwf *PluginLoggerWithFields) Warn(message string, data ...map[string]interface{})
- type PluginMarketplace
- func (m *PluginMarketplace) GetPlugin(ctx context.Context, name string) (*MarketplacePlugin, error)
- func (m *PluginMarketplace) InstallPlugin(ctx context.Context, name string, config map[string]interface{}) error
- func (m *PluginMarketplace) ListAvailable(ctx context.Context) ([]*MarketplacePlugin, error)
- func (m *PluginMarketplace) SyncCatalog(ctx context.Context) error
- func (m *PluginMarketplace) UninstallPlugin(ctx context.Context, name string) error
- type PluginScheduler
- func (ps *PluginScheduler) IsScheduled(jobName string) bool
- func (ps *PluginScheduler) ListJobs() []string
- func (ps *PluginScheduler) Remove(jobName string)
- func (ps *PluginScheduler) RemoveAll()
- func (ps *PluginScheduler) Schedule(jobName string, cronExpr string, job func()) error
- func (ps *PluginScheduler) ScheduleInterval(jobName string, interval string, job func()) error
- type PluginStorage
- type PluginUI
- func (pu *PluginUI) RegisterAdminPage(opts AdminPageOptions) error
- func (pu *PluginUI) RegisterAdminWidget(opts WidgetOptions) error
- func (pu *PluginUI) RegisterMenuItem(opts MenuItemOptions) error
- func (pu *PluginUI) RegisterPage(opts PageOptions) error
- func (pu *PluginUI) RegisterWidget(opts WidgetOptions) error
- type Runtime
- func (r *Runtime) EmitEvent(eventType string, data interface{})
- func (r *Runtime) GetAPIRegistry() *APIRegistry
- func (r *Runtime) GetEventBus() *EventBus
- func (r *Runtime) GetPlugin(name string) (*LoadedPlugin, error)
- func (r *Runtime) GetUIRegistry() *UIRegistry
- func (r *Runtime) ListPlugins() []*LoadedPlugin
- func (r *Runtime) LoadPlugin(ctx context.Context, name, version string, config map[string]interface{}, ...) error
- func (r *Runtime) Start(ctx context.Context) error
- func (r *Runtime) Stop(ctx context.Context) error
- func (r *Runtime) UnloadPlugin(ctx context.Context, name string) error
- type RuntimeV2
- func (r *RuntimeV2) EmitEvent(eventType string, data interface{})
- func (r *RuntimeV2) GetAPIRegistry() *APIRegistry
- func (r *RuntimeV2) GetEventBus() *EventBus
- func (r *RuntimeV2) GetPlugin(name string) (*LoadedPlugin, error)
- func (r *RuntimeV2) GetUIRegistry() *UIRegistry
- func (r *RuntimeV2) ListAvailablePlugins() []string
- func (r *RuntimeV2) ListPlugins() []*LoadedPlugin
- func (r *RuntimeV2) LoadPluginByName(ctx context.Context, name string) error
- func (r *RuntimeV2) LoadPluginWithConfig(ctx context.Context, name, version string, config map[string]interface{}, ...) error
- func (r *RuntimeV2) RegisterBuiltinPlugin(name string, factory PluginFactory)
- func (r *RuntimeV2) ReloadPlugin(ctx context.Context, name string) error
- func (r *RuntimeV2) SetAutoStart(enabled bool)
- func (r *RuntimeV2) Start(ctx context.Context) error
- func (r *RuntimeV2) Stop(ctx context.Context) error
- func (r *RuntimeV2) UnloadPlugin(ctx context.Context, name string) error
- type UIAdminPage
- type UIMenuItem
- type UIPage
- type UIRegistry
- func (r *UIRegistry) GetAdminPages() []*UIAdminPage
- func (r *UIRegistry) GetAdminWidgets() []*UIWidget
- func (r *UIRegistry) GetMenuItems() []*UIMenuItem
- func (r *UIRegistry) GetPages() []*UIPage
- func (r *UIRegistry) GetWidgets() []*UIWidget
- func (r *UIRegistry) RegisterAdminPage(pluginName string, page *UIAdminPage) error
- func (r *UIRegistry) RegisterAdminWidget(pluginName string, widget *UIWidget) error
- func (r *UIRegistry) RegisterMenuItem(pluginName string, item *UIMenuItem) error
- func (r *UIRegistry) RegisterPage(pluginName string, page *UIPage) error
- func (r *UIRegistry) RegisterWidget(pluginName string, widget *UIWidget) error
- func (r *UIRegistry) UnregisterAll(pluginName string)
- type UIWidget
- type WidgetOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ListBuiltinPlugins ¶
func ListBuiltinPlugins() []string
ListBuiltinPlugins returns names of all registered built-in plugins.
Used by discovery system to enumerate available built-ins.
func Register ¶
func Register(name string, factory PluginFactory)
Register registers a plugin globally (called from plugin init())
func RegisterBuiltinPlugin ¶
func RegisterBuiltinPlugin(name string, plugin PluginHandler)
RegisterBuiltinPlugin registers a plugin as built-in.
This should be called from init() functions in plugin packages:
func init() {
plugins.RegisterBuiltinPlugin("slack", &SlackPlugin{})
}
Thread Safety: Not thread-safe. Should only be called during init.
Types ¶
type APIRegistry ¶
type APIRegistry struct {
// contains filtered or unexported fields
}
APIRegistry manages plugin API endpoint registrations.
The registry provides centralized management of all plugin-contributed API endpoints, ensuring namespace isolation and thread-safe registration.
Key responsibilities:
- Store endpoint registrations with plugin attribution
- Enforce /api/plugins/{name}/ namespace prefix
- Prevent endpoint conflicts between plugins
- Provide thread-safe concurrent access
- Support bulk cleanup on plugin unload
Registry Structure:
endpoints: map[string]*PluginEndpoint
Key format: "{pluginName}:{method}:{path}"
Example: "slack:POST:/api/plugins/slack/send"
Value: Full endpoint metadata
Concurrency Model:
Register/Unregister: Write lock (exclusive) GetEndpoints/Attach: Read lock (shared) Multiple plugins can query concurrently Registration is serialized to prevent conflicts
func NewAPIRegistry ¶
func NewAPIRegistry() *APIRegistry
NewAPIRegistry creates a new API registry.
Returns an initialized registry ready to accept plugin endpoint registrations.
Usage:
registry := NewAPIRegistry() runtime.apiRegistry = registry
func (*APIRegistry) AttachToRouter ¶
func (r *APIRegistry) AttachToRouter(router *gin.RouterGroup)
AttachToRouter attaches all registered endpoints to a Gin router.
This method mounts all plugin endpoints to the main API router. It should be called once during API server initialization, after all plugins have registered their endpoints.
Parameters:
- router: Gin router group to mount endpoints on
Behavior:
For each registered endpoint: 1. Build middleware chain (endpoint.Middleware + endpoint.Handler) 2. Register with router: router.Handle(method, path, handlers...) 3. Log the attachment
Thread Safety:
Acquires shared read lock. Safe to call while plugins are querying. Should not be called concurrently with Register() during startup.
Middleware Chain:
The handler chain is built as: [middleware1, middleware2, ..., handler] Middleware executes in array order before the handler.
Example:
router := gin.Default()
apiGroup := router.Group("/api")
registry.AttachToRouter(apiGroup)
// All plugin endpoints now available under /api/plugins/...
Note:
This does not support dynamic route reloading. Endpoint changes require application restart to take effect.
func (*APIRegistry) GetEndpoints ¶
func (r *APIRegistry) GetEndpoints() []*PluginEndpoint
GetEndpoints returns all registered endpoints across all plugins.
Returns a snapshot of all endpoints currently registered. The returned slice is safe to iterate without holding locks.
Returns:
- []*PluginEndpoint: Slice of all registered endpoints
Thread Safety:
Acquires shared read lock. Multiple callers can execute concurrently. Returned slice is a copy, safe to modify.
Use Cases:
- Generate API documentation
- List all plugin endpoints in admin UI
- Export endpoint catalog for testing
Example:
endpoints := registry.GetEndpoints()
for _, ep := range endpoints {
fmt.Printf("%s %s - %s\n", ep.Method, ep.Path, ep.Description)
}
func (*APIRegistry) GetPluginEndpoints ¶
func (r *APIRegistry) GetPluginEndpoints(pluginName string) []*PluginEndpoint
GetPluginEndpoints returns endpoints for a specific plugin.
Filters the endpoint registry to return only endpoints owned by the specified plugin. Useful for plugin-specific introspection.
Parameters:
- pluginName: Name of the plugin to query
Returns:
- []*PluginEndpoint: Endpoints registered by that plugin
Thread Safety:
Acquires shared read lock. Safe for concurrent calls.
Performance:
O(n) iteration over all endpoints with filtering. For large registries, consider adding an index by plugin.
Example:
slackEndpoints := registry.GetPluginEndpoints("slack")
fmt.Printf("Slack plugin has %d endpoints\n", len(slackEndpoints))
func (*APIRegistry) Register ¶
func (r *APIRegistry) Register(pluginName string, endpoint *PluginEndpoint) error
Register registers a plugin API endpoint in the registry.
This method stores the endpoint metadata and associates it with the plugin. The endpoint will be mounted to the router when AttachToRouter() is called.
Parameters:
- pluginName: Name of the plugin registering the endpoint
- endpoint: Endpoint metadata (method, path, handler, etc.)
Returns:
- error: Conflict error if endpoint already registered, nil on success
Thread Safety:
This method acquires an exclusive write lock. It's safe to call concurrently from multiple plugins during startup.
Conflict Detection:
Endpoints are uniquely identified by (pluginName, method, path). Attempting to register a duplicate returns an error.
Example:
err := registry.Register("slack", &PluginEndpoint{
Method: "POST",
Path: "/api/plugins/slack/send",
Handler: sendHandler,
})
func (*APIRegistry) Unregister ¶
func (r *APIRegistry) Unregister(pluginName string, method string, path string)
Unregister removes a specific plugin API endpoint from the registry.
This method removes a single endpoint by its method and path. The endpoint will no longer be available after the next router rebuild (typically on restart).
Parameters:
- pluginName: Name of the plugin that owns the endpoint
- method: HTTP method (GET, POST, etc.)
- path: Full URL path including namespace prefix
Thread Safety:
Acquires exclusive write lock. Safe for concurrent calls.
Note:
This does not immediately remove the route from the Gin router. Router rebuilding happens on application restart.
Example:
registry.Unregister("slack", "POST", "/api/plugins/slack/send")
func (*APIRegistry) UnregisterAll ¶
func (r *APIRegistry) UnregisterAll(pluginName string)
UnregisterAll removes all endpoints for a plugin.
This method is called during plugin unload to clean up all endpoints registered by that plugin. Prevents orphaned routes after unload.
Parameters:
- pluginName: Name of the plugin to clean up
Thread Safety:
Acquires exclusive write lock. Safe for concurrent calls.
Implementation:
Uses two-pass approach to avoid modifying map during iteration: 1. Collect keys to delete 2. Delete collected keys
Example:
// During plugin unload
registry.UnregisterAll("slack")
// All endpoints like /api/plugins/slack/* are removed
type AdminPageOptions ¶
type AdminPageOptions struct {
ID string
Title string
Path string
Component string
Icon string
MenuLabel string
Permissions []string
Order int
}
AdminPageOptions contains options for registering an admin page.
Fields:
- Order: Position in admin menu (lower = earlier)
type BasePlugin ¶
type BasePlugin struct {
// Name is the plugin identifier.
// Set during registration, not by plugin code.
Name string
}
BasePlugin provides default no-op implementations for the PluginHandler interface.
Plugins can embed this struct to inherit default implementations and only override the lifecycle hooks they actually need.
Benefits:
- Reduces boilerplate: Don't implement unused hooks
- Forward compatibility: New hooks added to interface don't break existing plugins
- Convention over configuration: Most plugins only need 2-3 hooks
Usage:
type MyPlugin struct {
plugins.BasePlugin
}
// Override only what you need
func (p *MyPlugin) OnLoad(ctx *PluginContext) error {
// Initialize plugin
return nil
}
All hook methods return nil (success) by default.
func (*BasePlugin) OnDisable ¶
func (p *BasePlugin) OnDisable(ctx *PluginContext) error
OnDisable is called when the plugin is disabled. Default: no-op. Override to pause plugin services.
func (*BasePlugin) OnEnable ¶
func (p *BasePlugin) OnEnable(ctx *PluginContext) error
OnEnable is called when the plugin is enabled. Default: no-op. Override to start plugin services.
func (*BasePlugin) OnLoad ¶
func (p *BasePlugin) OnLoad(ctx *PluginContext) error
OnLoad is called when the plugin is first loaded. Default: no-op. Override to initialize plugin resources.
func (*BasePlugin) OnSessionCreated ¶
func (p *BasePlugin) OnSessionCreated(ctx *PluginContext, session interface{}) error
OnSessionCreated is called when a new session is created. Default: no-op. Override to track session creation or send notifications.
func (*BasePlugin) OnSessionDeleted ¶
func (p *BasePlugin) OnSessionDeleted(ctx *PluginContext, session interface{}) error
OnSessionDeleted is called when a session is permanently deleted. Default: no-op. Override to clean up or log deletion.
func (*BasePlugin) OnSessionHibernated ¶
func (p *BasePlugin) OnSessionHibernated(ctx *PluginContext, session interface{}) error
OnSessionHibernated is called when a session is hibernated (scale to zero). Default: no-op. Override to react to hibernation.
func (*BasePlugin) OnSessionStarted ¶
func (p *BasePlugin) OnSessionStarted(ctx *PluginContext, session interface{}) error
OnSessionStarted is called when a session starts (transitions to running). Default: no-op. Override to react to session startup.
func (*BasePlugin) OnSessionStopped ¶
func (p *BasePlugin) OnSessionStopped(ctx *PluginContext, session interface{}) error
OnSessionStopped is called when a session stops. Default: no-op. Override to clean up session-specific resources.
func (*BasePlugin) OnSessionWoken ¶
func (p *BasePlugin) OnSessionWoken(ctx *PluginContext, session interface{}) error
OnSessionWoken is called when a hibernated session wakes up. Default: no-op. Override to react to session wake.
func (*BasePlugin) OnUnload ¶
func (p *BasePlugin) OnUnload(ctx *PluginContext) error
OnUnload is called when the plugin is being unloaded. Default: no-op. Override to clean up plugin resources.
func (*BasePlugin) OnUserCreated ¶
func (p *BasePlugin) OnUserCreated(ctx *PluginContext, user interface{}) error
OnUserCreated is called when a new user account is created. Default: no-op. Override to provision user-specific resources.
func (*BasePlugin) OnUserDeleted ¶
func (p *BasePlugin) OnUserDeleted(ctx *PluginContext, user interface{}) error
OnUserDeleted is called when a user account is deleted. Default: no-op. Override to clean up user data.
func (*BasePlugin) OnUserLogin ¶
func (p *BasePlugin) OnUserLogin(ctx *PluginContext, user interface{}) error
OnUserLogin is called when a user logs in. Default: no-op. Override to track login events.
func (*BasePlugin) OnUserLogout ¶
func (p *BasePlugin) OnUserLogout(ctx *PluginContext, user interface{}) error
OnUserLogout is called when a user logs out. Default: no-op. Override to clean up session data.
func (*BasePlugin) OnUserUpdated ¶
func (p *BasePlugin) OnUserUpdated(ctx *PluginContext, user interface{}) error
OnUserUpdated is called when a user profile is updated. Default: no-op. Override to sync user data.
type EndpointOptions ¶
type EndpointOptions struct {
Method string
Path string
Handler gin.HandlerFunc
Middleware []gin.HandlerFunc
Permissions []string
Description string
}
EndpointOptions contains options for registering an endpoint.
This struct provides a flexible API for endpoint registration with optional middleware, permissions, and documentation.
Fields:
- Method: HTTP method (GET, POST, PUT, PATCH, DELETE)
- Path: Relative path (will be prefixed with /api/plugins/{name})
- Handler: Gin handler function
- Middleware: Optional middleware chain
- Permissions: Permission strings for documentation
- Description: Human-readable endpoint description
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus manages event distribution to plugins using a pub/sub pattern.
The EventBus is the central message broker for plugin events. It maintains a registry of event subscribers and routes events to all matching handlers.
Key features:
- Thread-safe subscription management
- Asynchronous event delivery (non-blocking)
- Synchronous delivery option (EmitSync)
- Automatic panic recovery (handler failures isolated)
- Per-plugin cleanup (UnsubscribeAll)
Typical usage:
bus := NewEventBus()
// Plugin subscribes to events
bus.Subscribe("session.created", "my-plugin", func(data interface{}) error {
session := data.(*models.Session)
log.Printf("Session created: %s", session.ID)
return nil
})
// Platform emits events
bus.Emit("session.created", sessionData)
Concurrency: All methods are thread-safe and safe for concurrent use.
func NewEventBus ¶
func NewEventBus() *EventBus
NewEventBus creates a new event bus for plugin event distribution.
Returns an initialized EventBus with an empty subscriber registry. The event bus is ready to use immediately - no additional setup required.
Thread safety: The returned event bus is safe for concurrent use.
func (*EventBus) Emit ¶
Emit publishes an event to all subscribers asynchronously.
This is the primary method for delivering events to plugins. It immediately spawns goroutines for all matching event handlers and returns without waiting for them to complete (fire-and-forget pattern).
Event matching:
- Finds all subscriber keys that start with the eventType
- Example: "session.created" matches "session.created:analytics", "session.created:billing"
- Each matching handler is invoked in a separate goroutine
Execution model:
- **Asynchronous**: Returns immediately, doesn't wait for handlers
- **Parallel**: All handlers run concurrently in separate goroutines
- **Non-blocking**: Slow handlers don't delay event emission
- **Isolated**: Handler errors/panics don't affect other handlers
Error handling:
- Handler errors are logged to console (not returned to caller)
- Handler panics are recovered and logged with stack trace
- No errors bubble up to caller (fire-and-forget semantics)
Performance:
- Emit latency: <1ms (just spawns goroutines)
- No waiting for handler completion
- Memory overhead: ~2 KB per goroutine (handler stack)
Use cases:
- Notifying plugins about platform events (session.*, user.*)
- Broadcasting state changes to interested parties
- Triggering asynchronous side effects (analytics, notifications)
When NOT to use:
- When you need to know if handlers succeeded (use EmitSync instead)
- When event ordering matters (use EmitSync for synchronous delivery)
- When handler return values are needed (use direct function calls)
Example usage:
// After creating a session
bus.Emit("session.created", &models.Session{
ID: "sess-123",
UserID: "user-456",
})
// The function returns immediately while handlers run in background
log.Println("Event emitted, continuing...")
Thread safety:
- Safe to call concurrently from multiple goroutines
- Uses read lock to collect handlers (concurrent reads allowed)
- Lock released before executing handlers (no blocking)
See also:
- EmitSync(): Synchronous version that waits for all handlers
- Subscribe(): Register event handlers
func (*EventBus) EmitSync ¶
EmitSync publishes an event and waits for all handlers to complete synchronously.
Unlike Emit(), this method blocks until all event handlers have finished executing and returns any errors that occurred. Use this when you need to:
- Ensure handlers complete before continuing
- Collect errors from handlers for error handling
- Maintain event ordering guarantees
Execution model:
- **Synchronous**: Blocks until all handlers complete
- **Parallel**: Handlers still run in separate goroutines
- **Wait for completion**: Uses sync.WaitGroup to wait for all
- **Error collection**: Returns slice of all errors from handlers
Error handling:
- All handler errors are collected and returned
- Panics are recovered and converted to errors
- Caller can inspect errors to determine if any handler failed
- Empty slice returned if all handlers succeeded
Performance implications:
- Latency equals slowest handler (blocking behavior)
- If one handler takes 5s, EmitSync blocks for 5s
- Use with caution in request paths (can cause timeouts)
- Better suited for background jobs or admin operations
Use cases:
- Validation hooks where all validators must pass
- Ordered state transitions (e.g., session cleanup)
- Admin operations where errors must be reported
- Testing event handlers (wait for completion)
Example usage:
// Emit event and check for errors
errors := bus.EmitSync("session.deleted", session)
if len(errors) > 0 {
log.Printf("Warning: %d plugins failed to process deletion", len(errors))
for i, err := range errors {
log.Printf(" Handler %d error: %v", i, err)
}
}
Comparison with Emit():
// Async (fire-and-forget)
bus.Emit("event", data) // Returns immediately
doOtherWork() // Handlers run in background
// Sync (wait for completion)
errors := bus.EmitSync("event", data) // Blocks until done
if len(errors) > 0 { // Can check results
handleErrors(errors)
}
Thread safety:
- Safe to call concurrently from multiple goroutines
- Uses read lock to collect handlers
- Error slice protected by mutex during collection
See also:
- Emit(): Asynchronous version (recommended for most use cases)
- Subscribe(): Register event handlers
func (*EventBus) Subscribe ¶
func (bus *EventBus) Subscribe(eventType string, pluginName string, handler EventHandler)
Subscribe registers an event handler for a specific event type.
Plugins use this method to subscribe to platform events (session.*, user.*) or custom plugin events (plugin.{name}.*). Multiple handlers can be registered for the same event type by different plugins.
Parameters:
- eventType: The event to subscribe to (e.g., "session.created")
- pluginName: The plugin registering the handler (for tracking/cleanup)
- handler: The function to call when the event is emitted
Subscription key:
- Internally uses compound key "eventType:pluginName"
- Allows multiple plugins to subscribe to same event
- Enables efficient cleanup via UnsubscribeAll(pluginName)
Multiple subscriptions:
- A plugin can register multiple handlers for the same event
- Handlers are appended to the list and all will be called
- Order of handler execution is not guaranteed
Thread safety:
- Safe to call concurrently from multiple goroutines
- Uses write lock to protect subscriber registry
Example usage:
// In plugin's OnLoad hook
ctx.Events.Subscribe("session.created", func(data interface{}) error {
session := data.(*models.Session)
log.Printf("Session %s created for user %s", session.ID, session.UserID)
return nil
})
func (*EventBus) Unsubscribe ¶
Unsubscribe removes a handler
func (*EventBus) UnsubscribeAll ¶
UnsubscribeAll removes all handlers for a plugin
type EventHandler ¶
type EventHandler func(data interface{}) error
EventHandler is a function that handles an event.
Event handlers are registered by plugins to receive platform events. Handlers receive the event data as an interface{} and must type assert to the appropriate model type (e.g., *models.Session, *models.User).
Error handling:
- Returning an error logs the error but doesn't stop event delivery
- Panicking is caught and logged by the event bus
- Errors don't affect other handlers or the platform
Concurrency:
- Handlers may be called concurrently for different events
- Handler must be thread-safe if it accesses shared state
- Use mutexes or channels to synchronize state changes
Performance:
- Handlers should complete quickly (< 100ms target)
- For long-running work, spawn a background goroutine
- Avoid blocking operations without timeouts
type GlobalPluginRegistry ¶
type GlobalPluginRegistry struct {
// contains filtered or unexported fields
}
GlobalPluginRegistry manages global plugin registration and discovery.
This registry maintains a map of plugin names to factory functions, enabling automatic plugin discovery at runtime startup. Plugins register themselves using Go's init() pattern for zero-configuration discovery.
Thread safety:
- All methods are thread-safe using RWMutex
- Safe for concurrent registration and access
- Multiple readers don't block each other
Typical usage:
// Plugin registration (in plugin's init)
func init() {
plugins.Register("my-plugin", NewMyPlugin)
}
// Runtime discovery
registry := plugins.GetGlobalRegistry()
allPlugins := registry.GetAll()
for name, factory := range allPlugins {
handler := factory()
// Load handler into runtime
}
func GetGlobalRegistry ¶
func GetGlobalRegistry() *GlobalPluginRegistry
GetGlobalRegistry returns the global plugin registry
func (*GlobalPluginRegistry) ApplyToDiscovery ¶
func (r *GlobalPluginRegistry) ApplyToDiscovery(discovery *PluginDiscovery)
ApplyToDiscovery applies all globally registered plugins to a discovery instance
func (*GlobalPluginRegistry) Get ¶
func (r *GlobalPluginRegistry) Get(name string) (PluginFactory, bool)
Get retrieves a specific plugin factory
func (*GlobalPluginRegistry) GetAll ¶
func (r *GlobalPluginRegistry) GetAll() map[string]PluginFactory
GetAll returns all registered plugins
func (*GlobalPluginRegistry) List ¶
func (r *GlobalPluginRegistry) List() []string
List returns all registered plugin names
type LoadedPlugin ¶
type LoadedPlugin struct {
// ID is the database primary key from the installed_plugins table.
// Used to track plugin state and configuration in the database.
ID int
// Name is the unique identifier for the plugin (e.g., "streamspace-analytics").
// Must match the plugin's directory name and be URL-safe (lowercase, hyphens).
Name string
// Version is the semantic version string (e.g., "1.2.3").
// Used for compatibility checking and upgrade detection.
Version string
// Enabled controls whether the plugin receives events and processes requests.
// When false, the plugin remains loaded but dormant (no event handlers called).
Enabled bool
// Config contains user-provided configuration values for the plugin.
// Stored as JSON in the database, deserialized into map for runtime access.
// Examples: API keys, feature flags, threshold values.
Config map[string]interface{}
// Manifest describes the plugin's capabilities, requirements, and metadata.
// Loaded from the catalog_plugins table during installation.
// Includes: display name, description, category, author, permissions.
Manifest models.PluginManifest
// Handler is the plugin's implementation of the PluginHandler interface.
// Contains lifecycle hooks (OnLoad, OnUnload) and event handlers.
Handler PluginHandler
// Instance holds the plugin's runtime context and isolated resources.
// Provides access to: storage, logger, scheduler, events API.
Instance *PluginInstance
// LoadedAt is the timestamp when the plugin was loaded into the runtime.
// Used for uptime monitoring and debugging load order issues.
LoadedAt time.Time
// IsBuiltin indicates whether the plugin is bundled with StreamSpace.
// Builtin plugins cannot be uninstalled and may have elevated permissions.
IsBuiltin bool
}
LoadedPlugin represents a plugin that has been loaded into the runtime.
A LoadedPlugin contains all the metadata, configuration, and runtime state for an active plugin. The plugin remains in memory and actively processes events until it is explicitly unloaded.
State transitions:
- Created when LoadPlugin() is called
- Enabled flag controls event processing
- Destroyed when UnloadPlugin() is called
Resource tracking:
- LoadedAt timestamp for uptime monitoring
- Instance holds plugin-specific runtime state
- Config stores user-provided configuration values
- Manifest contains plugin metadata and capabilities
Memory lifecycle:
- LoadedPlugin struct: ~1 KB (excluding Handler)
- Config map: Varies by plugin (typically 1-10 KB)
- Handler: Varies by plugin implementation
- Instance: ~100 KB (includes logger buffers, storage cache)
type LogEntry ¶
type LogEntry struct {
Plugin string `json:"plugin"`
Level string `json:"level"`
Message string `json:"message"`
Data map[string]interface{} `json:"data,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
LogEntry represents a structured log entry in JSON format.
All log entries follow this consistent structure for machine parsing:
- plugin: Source plugin name
- level: Log level (DEBUG, INFO, WARN, ERROR, FATAL)
- message: Human-readable message
- data: Optional structured fields (omitted if empty)
- timestamp: ISO 8601 timestamp
type MarketplacePlugin ¶
type MarketplacePlugin struct {
Name string `json:"name"`
Version string `json:"version"`
DisplayName string `json:"displayName"`
Description string `json:"description"`
Author string `json:"author"`
Category string `json:"category"`
Tags []string `json:"tags"`
IconURL string `json:"iconUrl"`
Manifest models.PluginManifest `json:"manifest"`
DownloadURL string `json:"downloadUrl"`
Installed bool `json:"installed"`
Enabled bool `json:"enabled"`
}
MarketplacePlugin represents a plugin available in the marketplace.
This struct combines plugin metadata from the catalog with installation status from the local database, providing a complete view of each plugin.
**Metadata fields** (from catalog.json):
- Name, Version, DisplayName, Description: Basic plugin info
- Author, Category, Tags: Discoverability and attribution
- IconURL: Visual representation in UI
- Manifest: Detailed capabilities and permissions
- DownloadURL: Where to fetch the plugin package
**Status fields** (from database):
- Installed: Whether plugin is installed locally
- Enabled: Whether plugin is currently active
This combination allows the UI to show "Install", "Installed", or "Update Available" buttons dynamically without extra database queries.
type MenuItemOptions ¶
type MenuItemOptions struct {
ID string
Label string
Path string
Icon string
Component string
Order int
Permissions []string
}
MenuItemOptions contains options for registering a menu item.
Fields:
- Label: Menu text
- Path: URL to navigate to
- Order: Position in menu (lower = earlier, use 1000+ for plugins)
type PageOptions ¶
type PageOptions struct {
ID string
Title string
Path string
Component string
Icon string
MenuLabel string
Permissions []string
}
PageOptions contains options for registering a page.
Fields:
- ID, Title, Path, Component, Icon: Page metadata
- MenuLabel: If set, page appears in navigation menu
- Permissions: Required permissions to access
type PluginAPI ¶
type PluginAPI struct {
// contains filtered or unexported fields
}
PluginAPI provides API registration interface for plugins.
This is the plugin-facing API that abstracts the underlying APIRegistry. Each plugin receives a PluginAPI instance pre-configured with its name, ensuring automatic namespace isolation.
Design Pattern:
Instead of giving plugins direct access to the global registry, we provide a scoped interface that automatically applies the plugin's namespace prefix. This prevents plugins from interfering with each other's routes.
Example Usage in Plugin:
func (p *SlackPlugin) OnLoad(ctx *PluginContext) error {
// ctx.API is pre-configured for this plugin
return ctx.API.POST("/send", p.handleSend, "plugin.slack.send")
}
// Results in: POST /api/plugins/slack/send
func NewPluginAPI ¶
func NewPluginAPI(registry *APIRegistry, pluginName string) *PluginAPI
NewPluginAPI creates a new plugin API instance.
Creates a scoped API interface for a specific plugin, with automatic namespace isolation. This is called by the plugin runtime during initialization, not by plugins directly.
Parameters:
- registry: The global API registry
- pluginName: Name of the plugin (used for namespacing)
Returns:
- *PluginAPI: Scoped API instance for the plugin
Example:
// In plugin runtime pluginCtx.API = NewPluginAPI(runtime.apiRegistry, "slack")
func (*PluginAPI) DELETE ¶
DELETE registers a DELETE endpoint.
Convenience method for registering DELETE endpoints for resource deletion.
Parameters:
- path: Relative path (e.g., "/webhooks/:id")
- handler: Gin handler function
- permissions: Optional permission strings (variadic)
Returns:
- error: Registration error if endpoint conflicts, nil on success
Example:
err := api.DELETE("/webhooks/:id", deleteWebhookHandler, "plugin.slack.webhooks.delete")
// Results in: DELETE /api/plugins/slack/webhooks/:id
func (*PluginAPI) GET ¶
GET registers a GET endpoint.
Convenience method for registering GET endpoints with minimal configuration. Automatically applies plugin namespace prefix.
Parameters:
- path: Relative path (e.g., "/messages")
- handler: Gin handler function
- permissions: Optional permission strings (variadic)
Returns:
- error: Registration error if endpoint conflicts, nil on success
Example:
err := api.GET("/messages", listMessagesHandler, "plugin.slack.read")
// Results in: GET /api/plugins/slack/messages
func (*PluginAPI) PATCH ¶
PATCH registers a PATCH endpoint.
Convenience method for registering PATCH endpoints for partial updates.
Parameters:
- path: Relative path (e.g., "/settings")
- handler: Gin handler function
- permissions: Optional permission strings (variadic)
Returns:
- error: Registration error if endpoint conflicts, nil on success
Example:
err := api.PATCH("/settings", patchSettingsHandler, "plugin.slack.settings.write")
// Results in: PATCH /api/plugins/slack/settings
func (*PluginAPI) POST ¶
POST registers a POST endpoint.
Convenience method for registering POST endpoints with minimal configuration.
Parameters:
- path: Relative path (e.g., "/send")
- handler: Gin handler function
- permissions: Optional permission strings (variadic)
Returns:
- error: Registration error if endpoint conflicts, nil on success
Example:
err := api.POST("/send", sendMessageHandler, "plugin.slack.send")
// Results in: POST /api/plugins/slack/send
func (*PluginAPI) PUT ¶
PUT registers a PUT endpoint.
Convenience method for registering PUT endpoints for resource updates.
Parameters:
- path: Relative path (e.g., "/config")
- handler: Gin handler function
- permissions: Optional permission strings (variadic)
Returns:
- error: Registration error if endpoint conflicts, nil on success
Example:
err := api.PUT("/config", updateConfigHandler, "plugin.slack.config.write")
// Results in: PUT /api/plugins/slack/config
func (*PluginAPI) RegisterEndpoint ¶
func (pa *PluginAPI) RegisterEndpoint(opts EndpointOptions) error
RegisterEndpoint registers an API endpoint with full options.
This is the low-level registration method that supports all endpoint configuration options. Most plugins should use the convenience methods (GET, POST, etc.) instead.
Parameters:
- opts: Complete endpoint configuration
Returns:
- error: Registration error if endpoint conflicts, nil on success
Automatic Namespace Prefix:
The path is automatically prefixed with /api/plugins/{pluginName}/.
Plugin provides: "/send"
Results in: "/api/plugins/slack/send"
Example:
err := api.RegisterEndpoint(EndpointOptions{
Method: "POST",
Path: "/send",
Handler: sendHandler,
Middleware: []gin.HandlerFunc{authMiddleware},
Permissions: []string{"plugin.slack.send"},
Description: "Send a Slack message",
})
func (*PluginAPI) Unregister ¶
Unregister removes an endpoint.
Removes a previously registered endpoint by method and path. The path should be the relative path used during registration, not the full path.
Parameters:
- method: HTTP method (GET, POST, etc.)
- path: Relative path (e.g., "/send", not "/api/plugins/slack/send")
Example:
// Register
api.POST("/send", handler)
// Later, unregister
api.Unregister("POST", "/send")
type PluginContext ¶
type PluginContext struct {
PluginName string
Config map[string]interface{}
Manifest models.PluginManifest
// Platform APIs
Database *PluginDatabase
Events *PluginEvents
API *PluginAPI
UI *PluginUI
Storage *PluginStorage
Logger *PluginLogger
Scheduler *PluginScheduler
// contains filtered or unexported fields
}
PluginContext provides plugins with access to platform APIs and resources.
The PluginContext is the primary interface between plugin code and the StreamSpace platform. It provides controlled access to platform functionality while maintaining security boundaries and resource isolation.
Available APIs ¶
**Database**: Plugin-scoped database access
- Create tables prefixed with "plugin_{name}_"
- Execute queries within plugin's schema namespace
- Automatic connection pooling and transaction management
**Events**: Subscribe to platform events and emit custom events
- Subscribe to session.*, user.* events
- Emit custom events namespaced as "plugin.{name}.*"
- Events delivered asynchronously (non-blocking)
**API**: Register REST API endpoints
- Routes prefixed with "/api/plugins/{name}/"
- Automatic auth middleware (JWT validation)
- Request/response helpers
**UI**: Register React components and UI hooks
- Inject components into dashboard, admin panel
- Add navigation menu items
- Extend forms with custom fields
**Storage**: Simple key-value store for plugin data
- Namespaced to plugin (keys cannot conflict)
- JSON serialization of values
- Backed by database (persistent across restarts)
**Logger**: Structured logging with plugin prefix
- Automatic log level filtering (debug, info, warn, error)
- Contextual fields for correlation
- Centralized log aggregation
**Scheduler**: Cron-based scheduled jobs
- Standard cron syntax (e.g., "0 * * * *" for hourly)
- Jobs run in background goroutines
- Automatic cleanup on plugin unload
Security Boundaries ¶
The context enforces several security constraints:
- Database: Cannot access tables outside plugin namespace
- API: Routes inherit platform authentication
- Storage: Keys isolated to plugin (no cross-plugin access)
- Events: Cannot intercept or modify other plugin's events
Concurrency ¶
The context is safe for concurrent access:
- Multiple event handlers can use the same context
- Database connection pool handles concurrent queries
- Event subscriptions are thread-safe
- Storage operations are atomic (per-key basis)
Example Usage ¶
func (p *MyPlugin) OnLoad(ctx *PluginContext) error {
// Access configuration
apiKey := ctx.Config["api_key"].(string)
// Register API endpoint
ctx.API.GET("/status", func(c *gin.Context) {
c.JSON(200, gin.H{"status": "ok"})
})
// Subscribe to events
ctx.Events.On("session.created", func(data interface{}) error {
session := data.(*models.Session)
ctx.Logger.Info("New session", "id", session.ID)
return nil
})
// Schedule periodic task
ctx.Scheduler.Schedule("0 * * * *", func() {
ctx.Logger.Info("Hourly task executed")
})
// Store plugin state
ctx.Storage.Set("last_run", time.Now())
return nil
}
type PluginDatabase ¶
type PluginDatabase struct {
// contains filtered or unexported fields
}
PluginDatabase provides full SQL database access for plugins.
This struct wraps the platform's database connection, providing plugins with the ability to execute SQL statements, run queries, and manage transactions.
**Fields**:
- db: Platform database connection (shared across all plugins)
- pluginName: Plugin identifier (used for table namespacing)
**Capabilities**:
- Execute SQL: INSERT, UPDATE, DELETE, DDL
- Query data: SELECT with result iteration
- Transactions: Atomic multi-statement operations
- Schema management: CREATE TABLE with namespace prefix
**Lifecycle**:
- Created: When plugin is loaded (passed to OnLoad)
- Used: Throughout plugin lifetime
- No cleanup: Database connection managed by platform
func NewPluginDatabase ¶
func NewPluginDatabase(database *db.Database, pluginName string) *PluginDatabase
NewPluginDatabase creates a new plugin database instance.
This constructor is called by the runtime when loading a plugin, providing a database interface scoped to that plugin's namespace.
**Why pass database instead of connection string?**
- Connection pooling: All plugins share single connection pool
- Lifecycle management: Platform handles connection lifecycle
- Configuration: No need for plugins to know DB credentials
- Monitoring: Platform can track queries from all plugins
**Plugin Name Usage**:
- Table prefixing: CreateTable("metrics") → plugin_{pluginName}_metrics
- Logging: Database errors tagged with plugin name
- Monitoring: Query metrics grouped by plugin
**Example Usage** (in runtime):
for _, plugin := range plugins {
db := NewPluginDatabase(platformDB, plugin.Name)
plugin.OnLoad(..., db, ...) // Plugin receives database
}
Parameters:
- database: Platform database connection
- pluginName: Plugin identifier
Returns initialized database wrapper.
func (*PluginDatabase) CreateTable ¶
func (pd *PluginDatabase) CreateTable(tableName string, schema string) error
CreateTable creates a table for the plugin with automatic namespacing.
This is a convenience method that automatically prefixes the table name with `plugin_{pluginName}_` to prevent naming conflicts.
**Namespace Prefix**:
- Plugin: streamspace-analytics
- CreateTable("metrics", "...")
- Creates: plugin_streamspace_analytics_metrics
**Why Automatic Prefixing?**
- Prevents collisions: Multiple plugins can have "metrics" table
- Cleanup: Easy to find all tables for a plugin (LIKE 'plugin_X_%')
- Security: Clear ownership of tables
**Example Usage**:
// Create metrics table
err := db.CreateTable("metrics", `
id SERIAL PRIMARY KEY,
session_id TEXT NOT NULL,
value INT NOT NULL,
timestamp TIMESTAMP DEFAULT NOW()
`)
// Creates: plugin_streamspace_analytics_metrics
// Create index separately
db.Exec(`
CREATE INDEX IF NOT EXISTS idx_metrics_session
ON plugin_streamspace_analytics_metrics (session_id)
`)
**Schema Parameter**:
- Column definitions only (no CREATE TABLE or table name)
- Example: "id SERIAL PRIMARY KEY, name TEXT"
- Constraints can be included: "id INT UNIQUE, FOREIGN KEY (...)"
**IF NOT EXISTS**:
- Automatically added to CREATE TABLE statement
- Safe to call multiple times (idempotent)
- No error if table already exists
**When to Use vs. Migrate**:
- CreateTable: Simple single-table creation
- Migrate: Complex migrations, indexes, multiple tables
**Limitations**:
- Can only create one table per call
- Can't create indexes (use Exec or Migrate)
- No automatic cleanup on plugin uninstall
**Cleanup on Uninstall** (manual):
// In plugin OnUnload or uninstall handler
db.Exec("DROP TABLE IF EXISTS plugin_streamspace_analytics_metrics CASCADE")
**Full Control Alternative** (manual prefixing):
// Use Migrate for full control
db.Migrate(`
CREATE TABLE IF NOT EXISTS plugin_streamspace_analytics_metrics (...)
CREATE INDEX ...
`)
Parameters:
- tableName: Base table name (will be prefixed automatically)
- schema: Column definitions (without CREATE TABLE or table name)
Returns error if table creation fails, nil on success.
func (*PluginDatabase) Exec ¶
func (pd *PluginDatabase) Exec(query string, args ...interface{}) (sql.Result, error)
Exec executes a SQL statement (INSERT, UPDATE, DELETE, DDL).
This method is used for SQL statements that don't return rows, such as data modification or schema changes.
**Use Cases**:
- INSERT: Add new rows to plugin tables
- UPDATE: Modify existing data
- DELETE: Remove rows
- DDL: CREATE INDEX, ALTER TABLE, etc.
**Example Usage**:
// Insert metric
result, err := db.Exec(`
INSERT INTO plugin_analytics_metrics (session_id, value, timestamp)
VALUES ($1, $2, NOW())
`, sessionID, value)
// Update counter
db.Exec(`
UPDATE plugin_analytics_counters
SET count = count + 1
WHERE name = $1
`, counterName)
// Create index
db.Exec(`
CREATE INDEX IF NOT EXISTS idx_metrics_session
ON plugin_analytics_metrics (session_id)
`)
**Return Value** (sql.Result):
- LastInsertId(): ID of inserted row (if table has SERIAL column)
- RowsAffected(): Number of rows modified
**SQL Injection Prevention**:
- ✅ Use parameterized queries: Exec("SELECT * FROM t WHERE id = $1", id)
- ❌ Never concatenate: Exec("SELECT * FROM t WHERE id = " + id)
- PostgreSQL uses $1, $2, ... for parameters (not ?)
**Error Handling**:
- Syntax errors: Returns parse error
- Constraint violations: Returns constraint error (unique, foreign key)
- Connection errors: Returns network/timeout error
**Performance**:
- Prepared internally (first call parses, subsequent calls use cached plan)
- Typical latency: 1-5ms depending on query complexity
Parameters:
- query: SQL statement with $1, $2, ... placeholders
- args: Values to substitute for placeholders
Returns sql.Result with affected rows count, or error.
func (*PluginDatabase) Migrate ¶
func (pd *PluginDatabase) Migrate(migrationSQL string) error
Migrate executes a migration SQL script for plugin table setup.
This method is typically called in plugin's OnLoad to ensure required database schema exists before the plugin starts operating.
**Use Cases**:
- Initial setup: Create tables, indexes, functions
- Schema upgrades: Add columns, modify constraints
- Data migrations: Transform existing data
**Example Usage** (in plugin OnLoad):
func (p *MyPlugin) OnLoad(db *PluginDatabase, ...) error {
migrationSQL := `
CREATE TABLE IF NOT EXISTS plugin_analytics_metrics (
id SERIAL PRIMARY KEY,
session_id TEXT NOT NULL,
value INT NOT NULL,
timestamp TIMESTAMP DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_metrics_session
ON plugin_analytics_metrics (session_id);
CREATE INDEX IF NOT EXISTS idx_metrics_timestamp
ON plugin_analytics_metrics (timestamp);
`
return db.Migrate(migrationSQL)
}
**Why "IF NOT EXISTS"?**
- Idempotent: Safe to run multiple times (plugin reload)
- No-op if schema already exists
- Prevents errors on restart
**Manual Table Names**:
- Unlike CreateTable(), this doesn't auto-prefix
- Plugin must manually use `plugin_{pluginName}_` prefix
- Provides full control for complex migrations
**Multi-Statement Support**:
- Can contain multiple statements separated by semicolons
- All executed in sequence
- First error stops execution (no transaction)
**Error Handling**:
- SQL syntax error: Returns parse error
- Constraint violation: Returns constraint error
- Migration fails: Plugin OnLoad fails, plugin not loaded
**No Transaction**:
- Statements executed individually (not in transaction)
- Partial success possible (some statements succeed, later ones fail)
- DDL statements auto-commit in PostgreSQL anyway
**Migration Strategy** (version tracking):
// Not provided by this API - plugin must implement
CREATE TABLE IF NOT EXISTS plugin_analytics_migrations (
version INT PRIMARY KEY,
applied_at TIMESTAMP DEFAULT NOW()
);
// Check if migration already applied
var exists bool
db.QueryRow("SELECT EXISTS(SELECT 1 FROM plugin_analytics_migrations WHERE version = $1)", 2).Scan(&exists)
if !exists {
// Run migration 2
db.Migrate("ALTER TABLE plugin_analytics_metrics ADD COLUMN user_id TEXT")
db.Exec("INSERT INTO plugin_analytics_migrations (version) VALUES ($1)", 2)
}
Parameters:
- migrationSQL: SQL script to execute (can contain multiple statements)
Returns error if migration fails, nil on success.
func (*PluginDatabase) Query ¶
func (pd *PluginDatabase) Query(query string, args ...interface{}) (*sql.Rows, error)
Query executes a SQL query that returns rows.
This method is used for SELECT statements, returning an iterator over result rows that must be closed after use.
**Use Cases**:
- SELECT: Retrieve data from plugin tables
- Aggregations: COUNT, SUM, AVG, GROUP BY
- Joins: Combine data from multiple tables
- Analytics: Complex queries for reports
**Example Usage**:
// Query metrics
rows, err := db.Query(`
SELECT session_id, value, timestamp
FROM plugin_analytics_metrics
WHERE timestamp > $1
ORDER BY timestamp DESC
LIMIT 100
`, time.Now().Add(-24 * time.Hour))
if err != nil {
return err
}
defer rows.Close() // ⚠️ Important: Always close rows
// Iterate results
for rows.Next() {
var sessionID string
var value int
var timestamp time.Time
if err := rows.Scan(&sessionID, &value, ×tamp); err != nil {
return err
}
// Process row
}
if err := rows.Err(); err != nil {
return err
}
**Why defer rows.Close()?**
- Releases database connection back to pool
- Prevents connection leaks (exhausting pool)
- Failure to close = connection remains locked until GC
- Critical: Always close, even on error
**Result Iteration Pattern**:
- Check query error
- defer rows.Close()
- Loop with rows.Next()
- Scan columns into variables
- Check rows.Err() after loop
**Error Handling**:
- Query error: Returns immediately, rows is nil
- Scan error: Row skipped, continue or return
- rows.Err(): Catches iteration errors after loop
**Performance**:
- Lazy evaluation: Rows fetched as needed (not all at once)
- Memory: O(1) per row (not O(n) for entire result set)
- Use LIMIT to prevent unbounded queries
Parameters:
- query: SELECT statement with $1, $2, ... placeholders
- args: Values to substitute for placeholders
Returns sql.Rows iterator (must be closed) or error.
func (*PluginDatabase) QueryRow ¶
func (pd *PluginDatabase) QueryRow(query string, args ...interface{}) *sql.Row
QueryRow executes a SQL query that returns at most one row.
This is a convenience method for queries expected to return a single row, such as lookups by primary key or aggregations.
**Use Cases**:
- Get by ID: SELECT * FROM table WHERE id = $1
- Count: SELECT COUNT(*) FROM table
- Exists check: SELECT EXISTS(SELECT 1 FROM table WHERE ...)
- Aggregations: SELECT MAX(value) FROM table
**Why QueryRow instead of Query?**
- Simpler: No need to call Next() or Close()
- No resource leak: Automatically cleaned up after Scan()
- Clear intent: Signals expectation of single row
**Example Usage**:
// Get counter value
var count int
err := db.QueryRow(`
SELECT count
FROM plugin_analytics_counters
WHERE name = $1
`, "sessions").Scan(&count)
if err == sql.ErrNoRows {
// Handle not found
count = 0
} else if err != nil {
return err
}
// Check if record exists
var exists bool
db.QueryRow(`
SELECT EXISTS(
SELECT 1 FROM plugin_analytics_metrics
WHERE session_id = $1
)
`, sessionID).Scan(&exists)
**Error Handling**:
- No rows: Scan() returns sql.ErrNoRows (not an error from QueryRow)
- Query error: Scan() returns the error
- Scan type mismatch: Scan() returns conversion error
**Why no error return?**
- Error deferred to Scan() call
- Allows chaining: db.QueryRow(...).Scan(...)
- Consistent with database/sql standard library
**Multiple Rows**:
- If query returns multiple rows: Only first row scanned
- Remaining rows discarded (connection not released until Scan)
- Use Query() if you need all rows
Parameters:
- query: SELECT statement expected to return 0-1 rows
- args: Values to substitute for placeholders
Returns sql.Row (must call Scan to get values and error).
func (*PluginDatabase) Transaction ¶
func (pd *PluginDatabase) Transaction(fn func(*sql.Tx) error) error
Transaction executes a function within a database transaction.
This method provides ACID guarantees for multiple SQL operations, ensuring they either all succeed (commit) or all fail (rollback).
**Why Use Transactions?**
**Atomicity** (all-or-nothing):
- Either all operations succeed, or none do
- Example: Transfer balance (decrement A, increment B) - both or neither
**Consistency** (constraints enforced):
- Database constraints checked at commit time
- Foreign keys, unique constraints, check constraints
**Isolation** (concurrent safety):
- Other transactions don't see intermediate state
- Prevents read-after-write inconsistencies
**Durability** (crash recovery):
- Committed changes survive system crashes
- Write-ahead logging ensures recovery
**Example Usage**:
// Transfer counter value atomically
err := db.Transaction(func(tx *sql.Tx) error {
// Decrement source counter
_, err := tx.Exec(`
UPDATE plugin_analytics_counters
SET count = count - $1
WHERE name = $2
`, amount, "source")
if err != nil {
return err // Rollback
}
// Increment destination counter
_, err = tx.Exec(`
UPDATE plugin_analytics_counters
SET count = count + $1
WHERE name = $2
`, amount, "destination")
if err != nil {
return err // Rollback
}
return nil // Commit
})
**Rollback Conditions**:
- Function returns error → ROLLBACK
- Function panics → ROLLBACK (panic re-raised after rollback)
- Function returns nil → COMMIT
**Panic Recovery**:
- defer/recover catches panics
- Ensures rollback even on panic
- Panic re-raised after rollback (doesn't hide panic)
**Error Handling**:
- tx.Begin() fails: Return error immediately
- Function returns error: Rollback, return function error
- tx.Commit() fails: Return commit error
- Rollback fails: Log but return function error (rollback failure rare)
**Why not manual BEGIN/COMMIT?**
- Automatic rollback on error (can't forget)
- Panic-safe (manual ROLLBACK might be skipped)
- Cleaner code (no if err != nil { tx.Rollback(); return err })
**Nested Transactions**:
- Not supported (PostgreSQL limitation)
- Calling Transaction() inside function creates new transaction (independent)
- Use savepoints if nesting needed (not exposed in this API)
**Performance**:
- BEGIN overhead: ~0.5ms
- COMMIT overhead: ~1ms (WAL flush)
- Use for multiple statements, overkill for single statement
Parameters:
- fn: Function containing SQL operations to execute in transaction
Returns error from function, commit, or rollback (whichever fails first).
type PluginDiscovery ¶
type PluginDiscovery struct {
// contains filtered or unexported fields
}
PluginDiscovery handles automatic plugin discovery and loading.
The discovery system manages two types of plugins:
- Built-in plugins: Compiled into the binary, registered via global registry
- Dynamic plugins: Loaded at runtime from .so files
Discovery provides:
- Automatic plugin scanning (filesystem + registry)
- Lazy loading (only load enabled plugins)
- Plugin caching (avoid re-loading .so files)
- Unified interface for both plugin types
Thread safety:
- Discovery is not thread-safe
- Create one instance per runtime
- Don't share across goroutines
Typical usage:
// Create discovery with custom plugin directories
discovery := NewPluginDiscovery("/plugins", "./local-plugins")
// Register built-in plugins from global registry
globalRegistry.ApplyToDiscovery(discovery)
// Discover all available plugins
plugins, _ := discovery.DiscoverAll()
// Load specific plugin
handler, _ := discovery.LoadPlugin("analytics")
func NewPluginDiscovery ¶
func NewPluginDiscovery(pluginDirs ...string) *PluginDiscovery
NewPluginDiscovery creates a new plugin discovery instance
func (*PluginDiscovery) DiscoverAll ¶
func (pd *PluginDiscovery) DiscoverAll() ([]string, error)
DiscoverAll discovers all available plugins (built-in and dynamic)
func (*PluginDiscovery) IsBuiltin ¶
func (pd *PluginDiscovery) IsBuiltin(name string) bool
IsBuiltin checks if a plugin is built-in
func (*PluginDiscovery) ListBuiltin ¶
func (pd *PluginDiscovery) ListBuiltin() []string
ListBuiltin returns all built-in plugin names
func (*PluginDiscovery) ListDynamic ¶
func (pd *PluginDiscovery) ListDynamic() []string
ListDynamic returns all discovered dynamic plugin names
func (*PluginDiscovery) LoadPlugin ¶
func (pd *PluginDiscovery) LoadPlugin(name string) (PluginHandler, error)
LoadPlugin loads a plugin by name (built-in or dynamic)
func (*PluginDiscovery) RegisterBuiltin ¶
func (pd *PluginDiscovery) RegisterBuiltin(name string, factory PluginFactory)
RegisterBuiltin registers a built-in plugin factory
type PluginEndpoint ¶
type PluginEndpoint struct {
// PluginName identifies which plugin registered this endpoint.
// Used for cleanup when plugin is unloaded.
PluginName string
// Method is the HTTP method (GET, POST, PUT, PATCH, DELETE, etc.)
Method string
// Path is the full URL path including namespace prefix.
// Format: /api/plugins/{pluginName}/{relative-path}
// Example: /api/plugins/slack/send
Path string
// Handler is the Gin handler function that processes requests.
// Receives gin.Context with request data, writes response.
Handler gin.HandlerFunc
// Middleware is an optional chain of middleware functions.
// Executed before the handler in array order.
// Common uses: authentication, rate limiting, logging.
Middleware []gin.HandlerFunc
// Permissions lists required permissions for this endpoint.
// Used for documentation and UI permission checks.
// Actual enforcement must happen in middleware.
Permissions []string
// Description provides human-readable documentation.
// Used in API documentation and admin UI.
Description string
}
PluginEndpoint represents a registered plugin API endpoint.
Each endpoint contains all metadata needed to mount it to the Gin router:
- HTTP method and path
- Handler function
- Middleware chain
- Permission requirements
- Documentation description
Endpoints are namespaced under /api/plugins/{pluginName}/ to ensure isolation.
Example:
&PluginEndpoint{
PluginName: "slack",
Method: "POST",
Path: "/api/plugins/slack/send", // Full path with namespace
Handler: sendMessageHandler,
Middleware: []gin.HandlerFunc{authMiddleware},
Permissions: []string{"plugin.slack.send"},
Description: "Send a Slack message to a channel",
}
type PluginEvents ¶
type PluginEvents struct {
// contains filtered or unexported fields
}
PluginEvents provides event API for plugins
func NewPluginEvents ¶
func NewPluginEvents(bus *EventBus, pluginName string) *PluginEvents
NewPluginEvents creates a new plugin events instance
func (*PluginEvents) Emit ¶
func (pe *PluginEvents) Emit(eventType string, data interface{})
Emit emits an event (plugins can emit custom events)
func (*PluginEvents) Off ¶
func (pe *PluginEvents) Off(eventType string)
Off removes an event handler
func (*PluginEvents) On ¶
func (pe *PluginEvents) On(eventType string, handler func(data interface{}) error)
On registers an event handler
type PluginFactory ¶
type PluginFactory func() PluginHandler
PluginFactory is a function that creates a new plugin instance
type PluginHandler ¶
type PluginHandler interface {
// Lifecycle hooks
OnLoad(ctx *PluginContext) error
OnUnload(ctx *PluginContext) error
OnEnable(ctx *PluginContext) error
OnDisable(ctx *PluginContext) error
// Event handlers (optional)
OnSessionCreated(ctx *PluginContext, session interface{}) error
OnSessionStarted(ctx *PluginContext, session interface{}) error
OnSessionStopped(ctx *PluginContext, session interface{}) error
OnSessionHibernated(ctx *PluginContext, session interface{}) error
OnSessionWoken(ctx *PluginContext, session interface{}) error
OnSessionDeleted(ctx *PluginContext, session interface{}) error
OnUserCreated(ctx *PluginContext, user interface{}) error
OnUserUpdated(ctx *PluginContext, user interface{}) error
OnUserDeleted(ctx *PluginContext, user interface{}) error
OnUserLogin(ctx *PluginContext, user interface{}) error
OnUserLogout(ctx *PluginContext, user interface{}) error
}
PluginHandler is the interface that all plugins must implement.
This interface defines the contract between the plugin runtime and plugin code. Plugins implement these hooks to respond to lifecycle events and platform events.
Lifecycle Hooks ¶
**OnLoad(ctx)**: Called once when plugin is loaded into memory
- Initialize data structures, validate configuration
- Register API routes, UI components, scheduled jobs
- Connect to external services (databases, APIs)
- Return error to abort load and prevent plugin from starting
**OnUnload(ctx)**: Called when plugin is being removed from runtime
- Close database connections, network sockets
- Cancel background goroutines
- Flush buffered data, save state
- Errors are logged but unload continues (best-effort cleanup)
**OnEnable(ctx)**: Called when plugin is enabled (future use)
- Resume event processing
- Start background workers
**OnDisable(ctx)**: Called when plugin is disabled (future use)
- Pause event processing
- Stop background workers
Event Hooks ¶
Event hooks are optional - plugins can implement only the events they need. Return nil from unwanted hooks (default no-op implementation).
**Session Events**: Track session lifecycle for analytics, monitoring, cleanup
- OnSessionCreated: Before Kubernetes pod is created
- OnSessionStarted: Pod is running, user can connect
- OnSessionStopped: User stopped session gracefully
- OnSessionHibernated: Auto-scaled to zero (cost optimization)
- OnSessionWoken: Resumed from hibernation
- OnSessionDeleted: Permanently removed, cleanup resources
**User Events**: Track user activity for analytics, notifications, compliance
- OnUserCreated: New user registration
- OnUserUpdated: Profile changed, settings modified
- OnUserDeleted: Account deletion, GDPR compliance
- OnUserLogin: Authentication successful
- OnUserLogout: Session ended
Error Handling ¶
Event hook errors are logged but don't affect other plugins or platform:
- If OnSessionCreated returns error, other plugins still process event
- If plugin panics in event handler, panic is recovered and logged
- Only OnLoad errors prevent plugin from loading
Concurrency ¶
Event handlers may be called concurrently:
- Multiple events processed in parallel goroutines
- Plugin must handle concurrent access to shared state
- Use mutexes or channels to synchronize state changes
Performance ¶
Event handlers should be fast (< 100ms):
- Offload heavy work to background goroutines
- Use ctx.Scheduler.Schedule() for periodic tasks
- Avoid blocking operations (use timeouts)
Example Implementation ¶
type MyPlugin struct{}
func (p *MyPlugin) OnLoad(ctx *PluginContext) error {
// Initialize plugin
ctx.Logger.Info("MyPlugin loaded")
return nil
}
func (p *MyPlugin) OnSessionCreated(ctx *PluginContext, session interface{}) error {
// Handle session creation
s := session.(*models.Session)
ctx.Logger.Info("Session created", "id", s.ID)
return nil
}
// Return nil for unused hooks
func (p *MyPlugin) OnUserDeleted(ctx *PluginContext, user interface{}) error {
return nil
}
func GetBuiltinPlugin ¶
func GetBuiltinPlugin(name string) PluginHandler
GetBuiltinPlugin retrieves a built-in plugin by name.
Returns nil if plugin not found.
type PluginInstance ¶
type PluginInstance struct {
// Context provides access to platform APIs (database, events, etc.)
// Shared across all plugin hook invocations.
Context *PluginContext
// Storage is the plugin's isolated key-value store.
// Data persisted to database in "plugin_{name}_storage" table.
Storage *PluginStorage
// Logger is the plugin's namespaced logger.
// All log messages prefixed with "[Plugin: {name}]".
Logger *PluginLogger
// Scheduler manages the plugin's cron jobs.
// Jobs automatically removed when plugin is unloaded.
Scheduler *PluginScheduler
}
PluginInstance holds the runtime state and isolated resources for a plugin.
Each loaded plugin gets its own Instance with namespaced resources that cannot interfere with other plugins. The Instance is created during LoadPlugin and destroyed during UnloadPlugin.
Resource isolation:
- Storage: Plugin-specific key-value store (isolated namespace)
- Logger: Prefixed logger with plugin name
- Scheduler: Cron jobs tagged with plugin name (auto-cleanup on unload)
Memory allocation:
- Context: ~1 KB (pointers to shared resources)
- Storage: ~50 KB (includes in-memory cache)
- Logger: ~10 KB (circular buffer for recent logs)
- Scheduler: ~5 KB (cron job metadata)
Lifecycle:
- Created in LoadPlugin before OnLoad hook
- Passed to all plugin hooks via Context parameter
- Cleaned up in UnloadPlugin (jobs removed, storage flushed)
type PluginLogger ¶
type PluginLogger struct {
// contains filtered or unexported fields
}
PluginLogger provides structured JSON logging for plugins.
Each log entry is formatted as JSON with automatic plugin name tagging. This enables centralized log aggregation, filtering, and analysis.
Example log entry:
{
"plugin": "slack",
"level": "INFO",
"message": "Notification sent",
"data": {"user_id": "user123", "channel": "#general"},
"timestamp": "2025-01-15T10:30:00Z"
}
func NewPluginLogger ¶
func NewPluginLogger(pluginName string) *PluginLogger
NewPluginLogger creates a new plugin logger with automatic plugin tagging.
Called by plugin runtime during initialization. Plugins receive this via ctx.Logger, they don't create it directly.
func (*PluginLogger) Debug ¶
func (pl *PluginLogger) Debug(message string, data ...map[string]interface{})
Debug logs a debug-level message.
Use for detailed diagnostic information during development. Typically disabled in production.
func (*PluginLogger) Error ¶
func (pl *PluginLogger) Error(message string, data ...map[string]interface{})
Error logs an error message.
Use for error conditions that are handled gracefully.
func (*PluginLogger) Fatal ¶
func (pl *PluginLogger) Fatal(message string, data ...map[string]interface{})
Fatal logs a fatal error message.
NOTE: Unlike log.Fatal(), this does NOT exit the process. It only logs at FATAL level to indicate critical plugin errors.
func (*PluginLogger) Info ¶
func (pl *PluginLogger) Info(message string, data ...map[string]interface{})
Info logs an informational message.
Use for general operational messages (startup, shutdown, state changes).
func (*PluginLogger) Warn ¶
func (pl *PluginLogger) Warn(message string, data ...map[string]interface{})
Warn logs a warning message.
Use for potentially problematic situations that don't prevent operation.
func (*PluginLogger) WithField ¶
func (pl *PluginLogger) WithField(key string, value interface{}) *PluginLoggerWithFields
WithField returns a logger with a pre-configured field.
All subsequent log calls will include this field automatically.
Example:
userLogger := logger.WithField("user_id", "user123")
userLogger.Info("Login successful") // Includes user_id
userLogger.Info("Session created") // Includes user_id
func (*PluginLogger) WithFields ¶
func (pl *PluginLogger) WithFields(fields map[string]interface{}) *PluginLoggerWithFields
WithFields returns a logger with multiple pre-configured fields.
All subsequent log calls will include these fields automatically.
type PluginLoggerWithFields ¶
type PluginLoggerWithFields struct {
// contains filtered or unexported fields
}
PluginLoggerWithFields is a logger with pre-configured fields.
Created via WithField() or WithFields(). All log calls automatically include the pre-configured fields plus any additional fields provided.
func (*PluginLoggerWithFields) Debug ¶
func (plwf *PluginLoggerWithFields) Debug(message string, data ...map[string]interface{})
Debug logs a debug message with pre-configured fields merged in.
func (*PluginLoggerWithFields) Error ¶
func (plwf *PluginLoggerWithFields) Error(message string, data ...map[string]interface{})
Error logs an error message with pre-configured fields merged in.
func (*PluginLoggerWithFields) Fatal ¶
func (plwf *PluginLoggerWithFields) Fatal(message string, data ...map[string]interface{})
Fatal logs a fatal message with pre-configured fields merged in.
func (*PluginLoggerWithFields) Info ¶
func (plwf *PluginLoggerWithFields) Info(message string, data ...map[string]interface{})
Info logs an info message with pre-configured fields merged in.
func (*PluginLoggerWithFields) Warn ¶
func (plwf *PluginLoggerWithFields) Warn(message string, data ...map[string]interface{})
Warn logs a warning message with pre-configured fields merged in.
type PluginMarketplace ¶
type PluginMarketplace struct {
// contains filtered or unexported fields
}
PluginMarketplace manages plugin discovery, download, and installation.
The marketplace acts as a bridge between external plugin repositories (GitHub) and the StreamSpace platform, handling catalog synchronization, plugin downloads, and installation into the runtime.
**Key Responsibilities**:
- Fetch and cache plugin catalog from remote repository
- Download plugin packages (.tar.gz or individual files)
- Extract plugins to local filesystem (/plugins/ directory)
- Register installed plugins in database
- Track installation status (installed, enabled)
**State Management**:
- In-memory cache: availablePlugins map (15 min TTL)
- Database persistence: catalog_plugins table (searchable)
- Filesystem storage: /plugins/{name}/ directories
Thread safety: Not thread-safe (should be accessed sequentially or with external mutex)
func NewPluginMarketplace ¶
func NewPluginMarketplace(database *db.Database, repositoryURL, pluginDir string) *PluginMarketplace
NewPluginMarketplace creates a new plugin marketplace instance.
This constructor initializes the marketplace with default values for optional parameters, allowing callers to omit repository URL or plugin directory.
**Default Values**:
- repositoryURL: "https://raw.githubusercontent.com/JoshuaAFerguson/streamspace-plugins/main"
- pluginDir: "/plugins"
- cacheTTL: 15 minutes (hardcoded, could be configurable)
**Why default to GitHub raw content?**
- No authentication required (public repos)
- Direct file access (no API rate limits for raw content)
- Simple URL structure: {repo}/main/catalog.json
- Fallback: Could support GitHub API in future for private repos
**Plugin Directory Structure**:
/plugins/ ├── streamspace-analytics/ │ ├── manifest.json │ ├── plugin.go │ └── README.md ├── streamspace-slack/ │ ├── manifest.json │ ├── plugin.go │ └── README.md └── (other plugins)
Parameters:
- database: Database connection for storing installed plugin metadata
- repositoryURL: Base URL of plugin repository (empty = default to streamspace-plugins)
- pluginDir: Local directory for plugin files (empty = default to /plugins)
Returns initialized marketplace ready to sync catalog.
func (*PluginMarketplace) GetPlugin ¶
func (m *PluginMarketplace) GetPlugin(ctx context.Context, name string) (*MarketplacePlugin, error)
GetPlugin retrieves a specific plugin from the marketplace by name.
This method is used before installation to fetch plugin metadata, including download URL, version, manifest, and installation status.
**Lookup Process**:
- Ensure catalog is synced (SyncCatalog)
- Check availablePlugins map for plugin name
- Return plugin if found, error if not
**Why sync before lookup?**
- Plugin might be newly added to catalog
- Ensures we're checking against latest catalog
- Cache prevents unnecessary HTTP requests (15 min TTL)
**Example Usage**:
plugin, err := marketplace.GetPlugin(ctx, "streamspace-analytics")
if err != nil {
return fmt.Errorf("plugin not found: %w", err)
}
fmt.Printf("Installing %s version %s\n", plugin.DisplayName, plugin.Version)
// Download from plugin.DownloadURL
**Error Cases**:
- Plugin not in catalog: Returns "plugin X not found in marketplace"
- Catalog sync fails: Returns sync error
- Plugin name case-sensitive: Must match exactly
Parameters:
- name: Plugin identifier (e.g., "streamspace-analytics")
Returns plugin metadata or error if not found.
func (*PluginMarketplace) InstallPlugin ¶
func (m *PluginMarketplace) InstallPlugin(ctx context.Context, name string, config map[string]interface{}) error
InstallPlugin downloads and installs a plugin from the marketplace.
This is the main installation workflow that combines catalog lookup, file download, extraction, and database registration into a single atomic-ish operation.
**Installation Workflow**:
┌─────────────────────────────────────────────────────────┐
│ 1. GetPlugin(name) │
│ - Fetch plugin metadata from catalog │
│ - Validate plugin exists │
└──────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 2. downloadPlugin(plugin) │
│ - Create /plugins/{name}/ directory │
│ - Download .tar.gz from plugin.DownloadURL │
│ - Extract to /plugins/{name}/ │
│ - Fallback: Download individual files if no .tar.gz│
└──────────────────────┬──────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 3. registerPluginInDatabase(plugin, config) │
│ - INSERT INTO installed_plugins │
│ - Set enabled=true, config=provided config │
│ - ON CONFLICT: Update version and config │
└─────────────────────────────────────────────────────────┘
**Why not atomic?**
- Files written to disk before DB insert (no transaction across filesystem + DB)
- If DB insert fails: Plugin files remain, but not marked as installed
- If download fails: Partial files may exist (cleaned up on retry)
- Future: Add cleanup on error (rollback filesystem changes)
**Configuration Parameter**:
- config: Plugin-specific settings (API keys, webhooks, thresholds)
- Stored as JSONB in database
- Passed to plugin's OnLoad() after installation
- Example: {"slackWebhook": "https://hooks.slack.com/...", "threshold": 100}
**Post-Installation**:
- Plugin is installed but not loaded (requires restart or manual LoadPlugin call)
- Admin must enable plugin in UI or API (set enabled=true)
- Runtime will auto-load enabled plugins on next startup
**Error Handling**:
- Download fails: Return error, no DB entry created
- DB insert fails: Plugin files exist but not marked installed (orphaned)
- Extraction fails: Partial files remain (should cleanup)
Parameters:
- name: Plugin identifier (e.g., "streamspace-analytics")
- config: Plugin configuration map (can be empty)
Returns nil on success, error on failure (with context).
func (*PluginMarketplace) ListAvailable ¶
func (m *PluginMarketplace) ListAvailable(ctx context.Context) ([]*MarketplacePlugin, error)
ListAvailable returns all available plugins in the marketplace.
This method ensures the catalog is synced (fetches if cache expired), then returns all plugins with their installation status (installed/enabled flags).
**Why call SyncCatalog() first?**
- Ensures fresh data (if cache expired)
- No-op if cache still valid (fast return)
- Simplifies caller logic (don't need to manually sync)
**Return Value Structure**:
[
{
"name": "streamspace-analytics",
"version": "1.2.3",
"installed": true, ← From database query
"enabled": true, ← From database query
/* other metadata from catalog */
},
{
"name": "streamspace-slack",
"version": "2.0.0",
"installed": false, ← Not installed locally
"enabled": false,
/* other metadata */
}
]
**Use Cases**:
- Plugin catalog UI: Display all available plugins with install buttons
- Admin panel: See which plugins can be installed
- API endpoint: GET /api/plugins/marketplace
Returns slice of all marketplace plugins, or error if sync fails.
func (*PluginMarketplace) SyncCatalog ¶
func (m *PluginMarketplace) SyncCatalog(ctx context.Context) error
SyncCatalog syncs the plugin catalog from the remote repository.
This method fetches the latest catalog.json from the configured repository (GitHub raw content by default), parses available plugins, and updates both the in-memory cache and database catalog table.
**Caching Strategy** (to avoid GitHub rate limits):
┌─────────────────────────────────────────────────────────┐
│ First Call (cold cache) │
│ 1. Fetch catalog.json from GitHub │
│ 2. Parse JSON to MarketplacePlugin structs │
│ 3. Store in availablePlugins map (memory) │
│ 4. Mark installed plugins (DB query) │
│ 5. Update catalog_plugins table (DB insert/update) │
│ 6. Set lastSync = now │
└─────────────────────────────────────────────────────────┘
Time passes (< 15 minutes)
┌─────────────────────────────────────────────────────────┐
│ Subsequent Call (warm cache) │
│ 1. Check time.Since(lastSync) < cacheTTL │
│ 2. Return immediately (no HTTP request) │
│ - Benefit: 0ms latency, no network calls │
└─────────────────────────────────────────────────────────┘
Time passes (> 15 minutes)
┌─────────────────────────────────────────────────────────┐
│ Next Call (cache expired) │
│ - Repeat full sync process │
└─────────────────────────────────────────────────────────┘
**Why 15-minute cache TTL?**
- GitHub API rate limit: 60 requests/hour (unauthenticated)
- 15 min TTL = max 4 requests/hour (safe margin)
- Plugin updates are infrequent (days/weeks, not minutes)
- Balances freshness vs. reliability
**Catalog Format** (catalog.json):
[
{
"name": "streamspace-analytics",
"version": "1.2.3",
"displayName": "Analytics Dashboard",
"description": "Real-time session analytics",
"author": "StreamSpace Team",
"category": "Analytics",
"tags": ["analytics", "dashboard"],
"iconUrl": "https://.../icon.png",
"downloadUrl": "https://.../releases/download/v1.2.3/plugin.tar.gz",
"manifest": { /* full plugin manifest */ }
}
]
**Error Handling**:
- HTTP errors: Return error (caller handles retry/fallback)
- JSON parse errors: Return error (invalid catalog)
- Database errors: Log warning, continue (catalog still works in memory)
**Thread Safety**: Not thread-safe (caller should synchronize if needed)
Returns error if fetch or parse fails, nil on success.
func (*PluginMarketplace) UninstallPlugin ¶
func (m *PluginMarketplace) UninstallPlugin(ctx context.Context, name string) error
UninstallPlugin removes a plugin from the system.
This method performs cleanup of both database records and filesystem files, effectively reversing the installation process.
**Uninstallation Steps**:
- DELETE FROM installed_plugins WHERE name = $1
- Remove /plugins/{name}/ directory and all contents
- Log success
**Why delete DB first?**
- Database is source of truth for "installed" status
- If DB delete fails: Files remain but plugin still marked installed (safe)
- If file delete fails: Plugin uninstalled in DB but files orphaned (logged)
- Files can be manually cleaned up, DB state is critical
**Orphaned Files Warning**:
- If os.RemoveAll fails (permissions, locks), files remain
- Only logs warning (does not return error)
- Admin should manually remove /plugins/{name}/ if needed
- Future: Track orphaned files in database for cleanup
**Plugin Lifecycle State After Uninstall**:
- Runtime: Plugin remains loaded in memory until restart
- Database: installed_plugins row deleted
- Filesystem: /plugins/{name}/ directory removed
- Catalog: Plugin still visible in marketplace (can reinstall)
**Unload vs. Uninstall**:
- Unload: Stops plugin in runtime, files/DB remain (reversible)
- Uninstall: Removes plugin entirely (requires reinstall to restore)
**Security Consideration**:
- Should verify plugin not in use before uninstalling
- Future: Check for dependent plugins or active features
- Current: No dependency checking (admin responsibility)
Parameters:
- name: Plugin identifier to uninstall
Returns error if database deletion fails, nil otherwise (file errors logged).
type PluginScheduler ¶
type PluginScheduler struct {
// contains filtered or unexported fields
}
PluginScheduler provides cron-based scheduling for plugins.
Each plugin receives its own scheduler instance, which wraps a shared global cron instance but maintains separate job namespace and lifecycle management.
**Fields**:
- cron: Shared global cron instance (one per platform)
- pluginName: Plugin identifier (for logging and namespacing)
- jobIDs: Map of job name to cron entry ID (for removal)
**Why map job names to entry IDs?**
- Cron library identifies jobs by EntryID (sequential integer)
- Plugins use human-readable names ("daily-cleanup", "sync-users")
- Map allows Remove("daily-cleanup") without remembering EntryID
- Prevents duplicate job names within same plugin
**Lifecycle**:
- Created: When plugin is loaded (NewPluginScheduler)
- Used: Plugin calls Schedule(), Remove(), etc.
- Cleanup: RemoveAll() called on plugin unload
**Thread Safety**: Not thread-safe internally (map access), but underlying cron.Cron is thread-safe, so concurrent Schedule() calls are safe.
func NewPluginScheduler ¶
func NewPluginScheduler(cronInstance *cron.Cron, pluginName string) *PluginScheduler
NewPluginScheduler creates a new plugin scheduler instance.
This constructor is called by the runtime when loading a plugin, providing the plugin with its own scheduler that wraps the shared global cron instance.
**Why pass cron instance instead of creating new one?**
- Single background goroutine for all plugins (efficient)
- Shared ticker reduces CPU wakeups (battery-friendly)
- Centralized lifecycle management (one cron.Start/Stop)
- Alternative: Per-plugin cron = N goroutines + N tickers (wasteful)
**Parameter Validation**:
- cronInstance: Must not be nil (panics if nil, caller error)
- pluginName: Used for logging, empty string allowed but not recommended
**Initialization**:
- Empty jobIDs map (no jobs scheduled yet)
- Plugin must call Schedule() to add jobs
**Example Usage** (in runtime):
globalCron := cron.New()
globalCron.Start()
for _, plugin := range plugins {
scheduler := NewPluginScheduler(globalCron, plugin.Name)
plugin.OnLoad(scheduler, ...) // Plugin receives scheduler
}
Parameters:
- cronInstance: Shared global cron instance
- pluginName: Plugin identifier for logging
Returns initialized scheduler ready to schedule jobs.
func (*PluginScheduler) IsScheduled ¶
func (ps *PluginScheduler) IsScheduled(jobName string) bool
IsScheduled checks if a job is currently scheduled.
This method provides a simple way to check job existence without having to search through ListJobs() results.
**Use Cases**:
- Conditional scheduling: Only schedule if not already scheduled
- Validation: Verify job registered successfully
- Testing: Assert job exists after Setup()
- Config reload: Check if job needs rescheduling
**Example** (conditional scheduling):
func (p *MyPlugin) EnsureSyncScheduled() {
if !p.scheduler.IsScheduled("sync") {
p.scheduler.Schedule("sync", "@hourly", p.syncData)
}
}
**Example** (testing):
func TestPluginSchedulesJobs(t *testing.T) {
plugin := NewPlugin()
plugin.OnLoad(scheduler, ...)
assert.True(t, scheduler.IsScheduled("sync"))
assert.True(t, scheduler.IsScheduled("cleanup"))
}
**Why not just try to schedule?**
- Schedule() overwrites existing job (not always desired)
- IsScheduled allows check-then-act logic
- Clearer intent (checking vs. modifying)
**Performance**:
- Time: O(1) map lookup
- Memory: No allocation
- Typical: <100ns
Parameters:
- jobName: Name of job to check
Returns true if job is scheduled, false otherwise.
func (*PluginScheduler) ListJobs ¶
func (ps *PluginScheduler) ListJobs() []string
ListJobs returns all scheduled job names for this plugin.
This method provides visibility into which jobs are currently scheduled, useful for debugging, monitoring, and admin dashboards.
**Return Value**:
- Slice of job names (e.g., ["sync", "cleanup", "report"])
- Empty slice if no jobs scheduled
- Order: Undefined (map iteration order)
**Use Cases**:
- Debugging: Log all scheduled jobs on plugin load
- Admin UI: Display plugin's scheduled jobs
- Testing: Verify jobs registered correctly
- Monitoring: Track number of scheduled jobs
**Example** (debugging):
func (p *MyPlugin) OnLoad(scheduler *PluginScheduler, ...) error {
scheduler.Schedule("sync", "@hourly", p.sync)
scheduler.Schedule("cleanup", "@daily", p.cleanup)
log.Printf("Scheduled jobs: %v", scheduler.ListJobs())
// Output: Scheduled jobs: [sync cleanup]
}
**Example** (admin API):
GET /api/plugins/streamspace-analytics/jobs
Response: {
"plugin": "streamspace-analytics",
"jobs": ["generate-report", "sync-metrics", "cleanup-old-data"],
"count": 3
}
**Why not return more details?**
- Cron library doesn't expose schedule or next run time easily
- Would require additional tracking (complexity)
- Job names sufficient for most debugging
- Future: Could add GetJobDetails(name) for schedule, next run, etc.
**Performance**:
- Time: O(n) where n = number of jobs
- Memory: Allocates new slice (copy of keys)
- Typical: <1µs for 10 jobs
Returns slice of job names (order undefined).
func (*PluginScheduler) Remove ¶
func (ps *PluginScheduler) Remove(jobName string)
Remove removes a scheduled job by name.
This method stops a job from running further, removing it from the cron scheduler. If the job doesn't exist, this is a no-op (safe to call).
**Removal Process**:
- Look up job name in jobIDs map
- If exists: Call cron.Remove(entryID)
- Delete from jobIDs map
- Log removal
**Why no error return?**
- Removing non-existent job is safe (idempotent)
- Plugin doesn't need to track which jobs exist
- Simplifies cleanup code
- Alternative: Return error if not found (adds error handling burden)
**Use Cases**:
- Plugin reconfiguration: Remove old job, schedule new one
- Conditional scheduling: Remove job if feature disabled
- Cleanup: Remove all jobs on plugin unload (see RemoveAll)
**Example** (plugin reconfiguration):
func (p *MyPlugin) UpdateConfig(config Config) {
// Remove old sync job
p.scheduler.Remove("sync")
// Reschedule with new interval
if config.SyncEnabled {
p.scheduler.Schedule("sync", config.SyncInterval, p.syncData)
}
}
**Thread Safety**:
- cron.Remove() is thread-safe
- Map access not protected (assumes sequential calls from plugin)
- Safe to call while job is running (job completes, won't reschedule)
Parameters:
- jobName: Name of job to remove
No return value (idempotent, always succeeds).
func (*PluginScheduler) RemoveAll ¶
func (ps *PluginScheduler) RemoveAll()
RemoveAll removes all scheduled jobs for this plugin.
This method is called during plugin unload to ensure clean shutdown, preventing orphaned jobs from running after plugin is stopped.
**Cleanup Process**:
- Iterate through all job IDs in jobIDs map
- Call cron.Remove(entryID) for each
- Clear jobIDs map (reset to empty)
- Log each removal
**Why clear the map?**
- Prevents memory leaks (stale entry IDs)
- Allows plugin to be reloaded cleanly
- Makes scheduler reusable (though typically not reused)
**When Called**:
- Plugin unload: runtime.UnloadPlugin() calls plugin.OnUnload()
- Plugin disable: Admin disables plugin in UI
- Platform shutdown: Cleanup all plugins
**Example** (in plugin OnUnload):
func (p *MyPlugin) OnUnload() error {
// Stop all scheduled jobs
p.scheduler.RemoveAll()
// Clean up other resources
p.db.Close()
return nil
}
**What if RemoveAll not called?**
- Jobs continue running (access unloaded plugin state)
- Panics likely (plugin resources released)
- Memory leak (plugin can't be garbage collected)
- Critical: Always call RemoveAll in OnUnload
**Thread Safety**:
- Safe to call while jobs are running
- Running jobs complete, won't reschedule
- cron.Remove() thread-safe
**Performance**:
- Time: O(n) where n = number of plugin's jobs
- Typical: <1ms for 10 jobs
- Runs during plugin unload (not performance critical)
No parameters or return value.
func (*PluginScheduler) Schedule ¶
func (ps *PluginScheduler) Schedule(jobName string, cronExpr string, job func()) error
Schedule schedules a job using cron syntax.
This is the main API for plugins to register recurring tasks. The job function is called at times matching the cron expression, wrapped with error recovery.
**Cron Expression Examples**:
- "*/5 * * * *" → Every 5 minutes
- "0 * * * *" → Every hour (at :00)
- "0 0 * * *" → Daily at midnight
- "0 9 * * 1-5" → Weekdays at 9 AM
- "@hourly" → Every hour (shortcut)
- "@daily" → Every day at midnight (shortcut)
**Job Wrapping** (automatic):
- Panic recovery: Panics logged, job continues on next schedule
- Logging: Logs when job starts (helps debugging)
- Plugin context: Logs include plugin name
**Duplicate Job Names** (overwrite behavior):
- If job "sync" already exists: Remove old, add new
- New schedule replaces old schedule
- Allows dynamic rescheduling without manual Remove()
- Example: Change from hourly to daily
**Why allow overwrites?**
- Simplifies plugin code (no need to check if exists)
- Enables dynamic reconfiguration
- Alternative: Return error on duplicate (forces manual Remove)
**Job Function Signature**:
- Must be `func()` (no parameters, no return value)
- Runs in separate goroutine (don't block)
- Can access plugin state via closures
**Example Usage** (in plugin):
func (p *MyPlugin) OnLoad(scheduler *PluginScheduler, ...) error {
// Schedule daily cleanup at 2 AM
scheduler.Schedule("cleanup", "0 2 * * *", func() {
p.cleanupOldData()
})
// Schedule sync every 15 minutes
scheduler.Schedule("sync", "*/15 * * * *", func() {
p.syncWithExternalAPI()
})
return nil
}
**Error Cases**:
- Invalid cron expression: Returns parse error from cron library
- Example: "invalid" → "failed to parse cron expression"
- Job added successfully: Returns nil
**Performance**:
- Schedule() call: O(log n) where n = total scheduled jobs
- Memory per job: ~200 bytes (closure + metadata)
- Scheduling overhead: <1ms
Parameters:
- jobName: Human-readable job identifier (unique within plugin)
- cronExpr: Cron expression or special string (@hourly, @daily, etc.)
- job: Function to execute on schedule
Returns nil on success, error if cron expression is invalid.
func (*PluginScheduler) ScheduleInterval ¶
func (ps *PluginScheduler) ScheduleInterval(jobName string, interval string, job func()) error
ScheduleInterval schedules a job to run at a fixed interval.
This is a convenience method that converts human-readable intervals ("5m", "1h", "daily") to cron expressions, then calls Schedule().
**Why provide this method?**
- Cron syntax confusing for simple intervals
- "*/5 * * * *" vs. "5m" (latter more readable)
- Reduces documentation burden (don't need to teach cron)
- Common case: Most plugins want simple intervals, not complex schedules
**Supported Intervals**:
- Minutes: "1m", "5m", "10m", "15m", "30m"
- Hours: "1h", "2h", "4h", "6h", "12h"
- Days: "1 day", "24h", "daily"
- Weeks: "weekly"
- Months: "monthly"
**Conversion Examples**:
"5m" → "*/5 * * * *" (every 5 minutes) "1h" → "@hourly" (every hour) "daily" → "@daily" (midnight daily) "weekly" → "@weekly" (Sunday midnight) "monthly" → "@monthly" (1st of month)
**Why limited set of intervals?**
- Prevents ambiguity ("1.5h" unclear)
- Covers 95% of use cases
- For complex schedules, use Schedule() with cron expression
- Future: Could parse arbitrary durations (time.ParseDuration)
**Example Usage**:
// Simple intervals
scheduler.ScheduleInterval("sync", "5m", p.syncData)
scheduler.ScheduleInterval("report", "daily", p.generateReport)
scheduler.ScheduleInterval("cleanup", "weekly", p.cleanupOldData)
// Complex schedule (use Schedule instead)
scheduler.Schedule("backup", "0 2 * * 1-5", p.backup) // Weekdays at 2 AM
**Error Handling**:
- Unsupported interval: Returns error "unsupported interval: {interval}"
- Invalid cron expression (shouldn't happen): Returns cron parse error
- Success: Returns nil
**Why not support seconds?**
- Cron standard doesn't include seconds (5-field format)
- Sub-minute scheduling usually wrong solution (use event bus instead)
- Prevents abuse (scheduling job every second)
- Alternative: Use goroutine + time.Ticker for sub-minute tasks
**Thread Safety**: Same as Schedule() (wraps cron.AddFunc)
Parameters:
- jobName: Human-readable job identifier
- interval: Interval string (see supported list above)
- job: Function to execute on schedule
Returns nil on success, error if interval unsupported or cron expression invalid.
type PluginStorage ¶
type PluginStorage struct {
// contains filtered or unexported fields
}
PluginStorage provides key-value storage for plugins.
This struct offers a simpler alternative to PluginDatabase for plugins that only need basic get/set operations without writing SQL.
**Fields**:
- db: Platform database connection (shared)
- pluginName: Plugin identifier (used for row namespacing)
**API Design** (like Redis/localStorage):
- Get(key) → value
- Set(key, value) → store/update
- Delete(key) → remove
- Keys(prefix) → list keys
- Clear() → delete all plugin's data
**Storage Format**:
- Table: plugin_storage (shared across all plugins)
- Namespace: plugin_name column filters data
- Value type: JSONB (flexible, queryable)
**When to Use**:
- Cache: Store API responses, computed values
- Config: Save plugin settings, preferences
- Flags: Boolean state (enabled, initialized)
- Counters: Track metrics, counts
- Last sync time: Timestamps, version numbers
**When NOT to Use** (use PluginDatabase instead):
- Complex queries: JOIN, GROUP BY, aggregations
- Relationships: Foreign keys, references
- Large datasets: Thousands of rows
- Structured schema: Fixed columns, constraints
**Lifecycle**:
- Created: When plugin is loaded (passed to OnLoad)
- Auto-init: First call creates plugin_storage table if needed
- Used: Throughout plugin lifetime
Thread safety: Same as PluginDatabase (connection pool thread-safe).
func NewPluginStorage ¶
func NewPluginStorage(database *db.Database, pluginName string) *PluginStorage
NewPluginStorage creates a new plugin storage instance.
This constructor is called by the runtime when loading a plugin, providing a simple key-value store scoped to that plugin's namespace.
**Why separate from PluginDatabase?**
- Different use cases: SQL vs. key-value
- Simpler API: No SQL required for basic storage
- Clear intent: Get/Set signals simple storage
- Shared table: All plugins use plugin_storage (namespace by plugin_name)
**Auto-Initialization**:
- First method call creates plugin_storage table if needed
- Each method calls initStorage() (idempotent)
- No manual setup required
**Example Usage** (in plugin):
func (p *MyPlugin) OnLoad(..., storage *PluginStorage) error {
// Get last sync time
lastSync, err := storage.Get("last_sync")
if err != nil && err != sql.ErrNoRows {
return err
}
// Do sync...
// Update last sync time
return storage.Set("last_sync", time.Now().Format(time.RFC3339))
}
Parameters:
- database: Platform database connection
- pluginName: Plugin identifier for namespacing
Returns initialized storage wrapper.
func (*PluginStorage) Clear ¶
func (ps *PluginStorage) Clear() error
Clear removes all storage for the plugin.
This method deletes all rows in plugin_storage belonging to this plugin, effectively resetting the plugin's storage to empty state.
**Example Usage**:
// Reset plugin on uninstall
func (p *MyPlugin) OnUnload() error {
return p.storage.Clear()
}
// Reset to defaults
storage.Clear()
storage.Set("config", defaultConfig)
// Clear cache on demand
if userRequestedClearCache {
storage.Clear() // Deletes all plugin data (be careful!)
}
**Deletion Scope**:
- Deletes: All rows WHERE plugin_name = {pluginName}
- Keeps: Other plugins' data (isolated by plugin_name)
- No undo: Permanent deletion (can't recover)
**⚠️ WARNING**:
- Deletes ALL plugin data (config, cache, state, everything)
- No confirmation prompt
- Use with caution (consider deleting specific keys instead)
**Use Cases**:
- Plugin uninstall: Clean up all data
- Factory reset: Restore plugin to initial state
- Testing: Clear data between test runs
- Migration: Clear old format, re-populate new format
**When NOT to use**:
- Clearing cache only: Use Keys("cache_") + Delete() instead
- Resetting single value: Use Set() with new value
- Testing: Consider transaction rollback instead
**Performance**:
- Time: O(n) where n = number of plugin's storage keys
- Typical: <5ms for 100 keys
- DELETE with WHERE clause (indexed on plugin_name)
**Post-Clear State**:
- storage.Keys("") returns empty slice
- storage.Get(any_key) returns nil, nil
- Fresh start (like plugin first load)
**Partial Clear Alternative**:
// Clear only cache keys
cacheKeys, _ := storage.Keys("cache_")
for _, key := range cacheKeys {
storage.Delete(key)
}
**Error Handling**:
- Database error: Returns error (unlikely)
- No data to delete: No error (affects 0 rows, success)
Returns error if database operation fails, nil on success.
func (*PluginStorage) Delete ¶
func (ps *PluginStorage) Delete(key string) error
Delete removes a value from plugin storage.
This method deletes a key from the plugin_storage table, freeing up space and ensuring subsequent Get() returns nil.
**Example Usage**:
// Delete API key
if err := storage.Delete("api_key"); err != nil {
return err
}
// Delete cache after expiration
storage.Delete("cache_" + cacheKey)
**Idempotent**:
- Deleting non-existent key: No error (affects 0 rows)
- Safe to call multiple times
- No need to check if key exists before deleting
**Post-Delete State**:
- storage.Get(key) returns nil, nil
- Key no longer in Keys() results
- Disk space freed (vacuum reclaims space eventually)
**Why no error on missing key?**
- Deletion is idempotent (end state same)
- Caller doesn't care if key existed or not
- Simplifies error handling (no need to handle "not found")
**Use Cases**:
- Clear cache: Delete expired entries
- Reset state: Remove flags, counters
- Cleanup: Remove temporary data
- Logout: Delete session tokens
**Performance**:
- Time: O(1) - indexed DELETE
- Typical latency: 1-2ms
- Disk space: Freed on next VACUUM (not immediate)
**Bulk Delete** (alternative):
// Delete all cache keys
keys, err := storage.Keys("cache_")
for _, key := range keys {
storage.Delete(key)
}
// Or use Clear() to delete all plugin's data
Parameters:
- key: Storage key to delete
Returns error if database operation fails, nil on success (even if key didn't exist).
func (*PluginStorage) Get ¶
func (ps *PluginStorage) Get(key string) (interface{}, error)
Get retrieves a value from plugin storage by key.
This method fetches a JSONB value from the plugin_storage table, returning the value as interface{} (needs type assertion).
**Example Usage**:
// Get string value
value, err := storage.Get("api_key")
if err == sql.ErrNoRows {
// Key doesn't exist
apiKey = ""
} else if err != nil {
return err
}
apiKey := value.(string) // Type assertion
// Get object value
value, err := storage.Get("config")
if err != nil {
return err
}
configMap := value.(map[string]interface{})
**Return Values**:
- Key exists: Returns value (interface{}), nil error
- Key not found: Returns nil value, nil error
- Database error: Returns nil value, error
**Why nil instead of sql.ErrNoRows?**
- Line 131: if err == sql.ErrNoRows { return nil, nil }
- Makes "key not found" a normal case, not an error
- Simpler caller code (just check if value == nil)
**JSONB Value Types**:
- String: value.(string)
- Number: value.(float64) -- JSON numbers are float64
- Boolean: value.(bool)
- Object: value.(map[string]interface{})
- Array: value.([]interface{})
- Null: value == nil
**Type Assertion Safety**:
value, err := storage.Get("count")
if count, ok := value.(float64); ok {
// Safe: value is float64
} else {
// Value is not float64 (wrong type stored)
}
**Performance**:
- Time: O(1) - indexed lookup on (plugin_name, key)
- Typical latency: 1-2ms
- No full table scan
Parameters:
- key: Storage key to retrieve
Returns value (interface{}) or nil if not found, and error if query fails.
func (*PluginStorage) Keys ¶
func (ps *PluginStorage) Keys(prefix string) ([]string, error)
Keys returns all keys for the plugin, optionally filtered by prefix.
This method lists all storage keys belonging to the plugin, useful for iterating over stored data or implementing search/cleanup operations.
**Example Usage**:
// List all keys
keys, err := storage.Keys("")
if err != nil {
return err
}
// Returns: ["api_key", "config", "last_sync", "retry_count"]
// List keys with prefix
cacheKeys, err := storage.Keys("cache_")
// Returns: ["cache_users", "cache_sessions", "cache_metrics"]
// Iterate and process
for _, key := range cacheKeys {
value, _ := storage.Get(key)
// Process value
}
**Prefix Filtering**:
- Empty string: Returns all plugin's keys
- "cache_": Returns keys starting with "cache_"
- SQL LIKE pattern: prefix + "%" (e.g., "cache_%")
- Case-sensitive match
**Why prefix parameter?**
- Common pattern: Namespace keys ("cache_*", "config_*", "temp_*")
- Efficient: Database filters (uses index)
- Avoids fetching all keys then filtering in app
**Use Cases**:
- List all config keys: Keys("config_")
- Delete all cache: Keys("cache_") then Delete each
- Debug: List all storage to see what's stored
- Backup: Export all plugin data
**Return Value**:
- Slice of key names (e.g., ["key1", "key2"])
- Empty slice if no keys match
- Sorted by key (ORDER BY key in SQL)
**Performance Warning**:
- Time: O(n) where n = number of plugin's storage keys
- Full scan of plugin's rows (can't use index for prefix search efficiently)
- Typical: <10ms for 100 keys
- Slow if plugin has thousands of keys (rare)
**Alternative for Many Keys**:
- If storing thousands of keys, use PluginDatabase instead
- Create indexed table: CREATE TABLE ... (key TEXT, PRIMARY KEY (key))
- Query with index: SELECT key FROM table WHERE key LIKE 'prefix%'
**No Pagination**:
- Returns all matching keys (no LIMIT/OFFSET)
- Memory: O(n) for n keys
- Future: Add pagination if needed (offset, limit parameters)
Parameters:
- prefix: Key prefix to filter by (empty string = all keys)
Returns slice of key names matching prefix, or error if query fails.
func (*PluginStorage) Set ¶
func (ps *PluginStorage) Set(key string, value interface{}) error
Set stores a value in plugin storage, creating or updating the key.
This method uses UPSERT (INSERT ... ON CONFLICT ... DO UPDATE) to atomically create or update a storage key without checking existence first.
**Example Usage**:
// Store string
storage.Set("api_key", "sk_live_abc123")
// Store number
storage.Set("retry_count", 3)
// Store object
storage.Set("config", map[string]interface{}{
"webhook": "https://example.com/hook",
"threshold": 100,
"enabled": true,
})
// Store array
storage.Set("allowed_users", []string{"user1", "user2", "user3"})
**UPSERT Behavior**:
First call: Set("count", 1)
→ INSERT INTO plugin_storage (plugin_name, key, value)
VALUES ('my-plugin', 'count', '1')
Second call: Set("count", 2)
→ ON CONFLICT (plugin_name, key)
DO UPDATE SET value = '2', updated_at = NOW()
**Why UPSERT instead of separate INSERT/UPDATE?**
- Atomic: No race condition (check-then-act)
- Simpler: One call instead of "try INSERT, if fail try UPDATE"
- Efficient: Single round-trip to database
- No error on duplicate: Idempotent
**Timestamps**:
- created_at: Set on first insert, preserved on update
- updated_at: Set to NOW() on every insert/update
- Useful for tracking when value last changed
**Value Serialization**:
- Any JSON-serializable value accepted
- Stored as JSONB in PostgreSQL
- json.Marshal() used internally
- Error if value can't be serialized (channels, functions, etc.)
**Error Cases**:
- json.Marshal fails: Non-serializable value
- INSERT fails: Database error (unlikely)
- UPDATE fails: Database error (unlikely)
**Performance**:
- Time: O(1) - indexed UPSERT
- Typical latency: 2-3ms
- JSONB indexing: Supports querying nested fields (future)
Parameters:
- key: Storage key (unique within plugin namespace)
- value: Any JSON-serializable value
Returns error if serialization or database operation fails, nil on success.
type PluginUI ¶
type PluginUI struct {
// contains filtered or unexported fields
}
PluginUI provides UI registration interface for plugins.
This is the plugin-facing API that abstracts the underlying UIRegistry. Each plugin receives a PluginUI instance pre-configured with its name, ensuring automatic registration attribution.
Example Usage in Plugin:
func (p *SlackPlugin) OnLoad(ctx *PluginContext) error {
// Register a widget
ctx.UI.RegisterWidget(WidgetOptions{
ID: "stats", Title: "Slack Stats", Position: "top", Width: "half",
})
// Register a page
ctx.UI.RegisterPage(PageOptions{
ID: "messages", Title: "Messages", Path: "/messages",
})
return nil
}
func NewPluginUI ¶
func NewPluginUI(registry *UIRegistry, pluginName string) *PluginUI
NewPluginUI creates a new plugin UI instance.
Creates a scoped UI interface for a specific plugin. Called by the plugin runtime during initialization, not by plugins directly.
func (*PluginUI) RegisterAdminPage ¶
func (pu *PluginUI) RegisterAdminPage(opts AdminPageOptions) error
RegisterAdminPage registers an admin page at /admin/plugins/{name}/{path}.
func (*PluginUI) RegisterAdminWidget ¶
func (pu *PluginUI) RegisterAdminWidget(opts WidgetOptions) error
RegisterAdminWidget registers an admin dashboard widget.
Similar to RegisterWidget but for admin dashboard.
func (*PluginUI) RegisterMenuItem ¶
func (pu *PluginUI) RegisterMenuItem(opts MenuItemOptions) error
RegisterMenuItem registers a navigation menu item.
func (*PluginUI) RegisterPage ¶
func (pu *PluginUI) RegisterPage(opts PageOptions) error
RegisterPage registers a user-facing page at /plugins/{name}/{path}.
func (*PluginUI) RegisterWidget ¶
func (pu *PluginUI) RegisterWidget(opts WidgetOptions) error
RegisterWidget registers a user dashboard widget.
Registers a widget for display on the user home dashboard.
Returns: error if widget ID conflicts, nil on success
type Runtime ¶
type Runtime struct {
// contains filtered or unexported fields
}
Runtime manages the lifecycle and execution of plugins.
The Runtime is the central coordinator for all plugin operations. It maintains the registry of loaded plugins, routes events to appropriate handlers, and provides the infrastructure for plugin APIs, UI components, and scheduled jobs.
Key responsibilities:
- Load plugins from database on startup
- Initialize plugin contexts with platform APIs
- Route platform events to plugin handlers
- Manage plugin lifecycle (load/unload/enable/disable)
- Clean up plugin resources on shutdown
Concurrency safety:
- All public methods are thread-safe using pluginsMux
- Events are processed in parallel goroutines (non-blocking)
- Plugin map uses RWMutex for efficient concurrent reads
Resource management:
- Each plugin has isolated context and storage
- API routes, UI components, and cron jobs are namespaced
- Unloading a plugin cleans up all associated resources
Example usage in API server initialization:
runtime := NewRuntime(database)
if err := runtime.Start(ctx); err != nil {
return fmt.Errorf("failed to start plugin runtime: %w", err)
}
defer runtime.Stop(ctx)
// Store runtime in server context for route handlers
server.PluginRuntime = runtime
func NewRuntime ¶
NewRuntime creates a new plugin runtime
func (*Runtime) EmitEvent ¶
EmitEvent emits a platform event to all loaded and enabled plugins.
This is the primary mechanism for notifying plugins about platform events. Events are delivered asynchronously to all plugins that are enabled and implement the corresponding event hook.
Event delivery model:
- **Fire-and-forget**: EmitEvent returns immediately without waiting
- **Parallel processing**: Each plugin handler runs in its own goroutine
- **Isolation**: Plugin errors/panics don't affect other plugins
- **No blocking**: Event emission never blocks the caller
Supported event types:
**Session events** (6 types):
- "session.created": data is *models.Session (before pod created)
- "session.started": data is *models.Session (pod running)
- "session.stopped": data is *models.Session (user stopped)
- "session.hibernated": data is *models.Session (scaled to zero)
- "session.woken": data is *models.Session (resumed from hibernation)
- "session.deleted": data is *models.Session (permanently deleted)
**User events** (5 types):
- "user.created": data is *models.User (new registration)
- "user.updated": data is *models.User (profile changed)
- "user.deleted": data is *models.User (account deleted)
- "user.login": data is *models.User (authenticated)
- "user.logout": data is *models.User (session ended)
Error handling:
- Plugin handler errors are logged but don't affect event delivery
- Plugin panics are recovered with stack trace logged
- One plugin's failure doesn't prevent others from processing event
Performance characteristics:
- Event emission latency: <1ms (just enqueues to goroutines)
- Plugin handler execution: runs in parallel, not serialized
- Memory overhead: ~1 KB per event (goroutine stack)
Example usage in API handlers:
// After creating a session
session, err := createSession(ctx, req)
if err != nil {
return err
}
runtime.EmitEvent("session.created", session)
// After user login
user, err := authenticateUser(ctx, credentials)
if err != nil {
return err
}
runtime.EmitEvent("user.login", user)
Concurrency:
- Thread-safe (uses RLock for reading plugin registry)
- Safe to call from multiple goroutines simultaneously
- Plugin handlers may run concurrently (plugins must handle this)
Order guarantees:
- Events are delivered in the order they are emitted (per plugin)
- No ordering guarantee across different plugins
- No ordering guarantee for different event types
See also:
- EventBus.Emit(): Underlying pub/sub implementation
- PluginHandler: Interface defining event hooks
- EmitSync(): Synchronous version (waits for all handlers)
func (*Runtime) GetAPIRegistry ¶
func (r *Runtime) GetAPIRegistry() *APIRegistry
GetAPIRegistry returns the API registry for direct access
func (*Runtime) GetEventBus ¶
GetEventBus returns the event bus for direct access
func (*Runtime) GetPlugin ¶
func (r *Runtime) GetPlugin(name string) (*LoadedPlugin, error)
GetPlugin retrieves a loaded plugin
func (*Runtime) GetUIRegistry ¶
func (r *Runtime) GetUIRegistry() *UIRegistry
GetUIRegistry returns the UI registry for direct access
func (*Runtime) ListPlugins ¶
func (r *Runtime) ListPlugins() []*LoadedPlugin
ListPlugins returns all loaded plugins
func (*Runtime) LoadPlugin ¶
func (r *Runtime) LoadPlugin(ctx context.Context, name, version string, config map[string]interface{}, manifest models.PluginManifest) error
LoadPlugin loads and initializes a single plugin into the runtime.
This method is used for:
- Loading plugins during runtime startup (called by Start)
- Dynamically loading plugins after installation (hot-load)
- Reloading plugins after configuration changes
Loading process:
- Check if plugin is already loaded (prevent duplicates)
- Create plugin context with isolated resources
- Initialize plugin components (database, events, API, UI, storage, logger, scheduler)
- Load plugin handler code (built-in or dynamic)
- Call plugin's OnLoad hook
- Register plugin in runtime's active plugins map
Resource isolation:
- Each plugin gets its own PluginContext with namespaced resources
- Database tables prefixed with "plugin_{name}_"
- API routes prefixed with "/api/plugins/{name}/"
- Event subscriptions tracked separately for cleanup
Parameters:
- name: Unique plugin identifier (e.g., "streamspace-analytics")
- version: Semantic version string (e.g., "1.2.3")
- config: User-provided configuration (API keys, settings)
- manifest: Plugin metadata and capabilities
Error handling:
- Returns error if plugin is already loaded (check with GetPlugin first)
- Returns error if plugin handler cannot be loaded
- Returns error if OnLoad hook fails (plugin initialization failed)
- On error, plugin is NOT added to registry (atomic operation)
Concurrency:
- Thread-safe (uses pluginsMux for exclusive access)
- Safe to call from multiple goroutines
- Plugin handlers are called synchronously (not in goroutine)
Example usage:
// Load plugin dynamically after installation
config := map[string]interface{}{
"api_key": "sk-1234567890",
"enabled_features": []string{"analytics", "reporting"},
}
err := runtime.LoadPlugin(ctx, "streamspace-analytics", "1.0.0", config, manifest)
if err != nil {
return fmt.Errorf("failed to load plugin: %w", err)
}
Performance:
- Load time: 10-50ms per plugin (varies by plugin complexity)
- Memory allocation: ~100 KB per plugin (context + resources)
State transitions:
- Before: Plugin not in runtime.plugins map
- After: Plugin registered and receiving events
See also:
- UnloadPlugin(): Removes plugin from runtime
- Start(): Loads all enabled plugins from database
func (*Runtime) Start ¶
Start initializes the plugin runtime and loads all enabled plugins from the database.
This method performs the following operations in sequence:
- Start the cron scheduler for plugin scheduled jobs
- Query the database for all enabled plugins
- Load each plugin's manifest from the catalog
- Initialize plugin contexts with platform APIs
- Call OnLoad hook for each plugin
- Register plugin as active in the runtime
Error handling:
- Individual plugin load failures are logged but don't abort startup
- This ensures that one broken plugin doesn't prevent others from loading
- Database query errors are fatal (runtime cannot start)
Performance:
- Plugins are loaded sequentially, not in parallel
- Each plugin load takes 10-50ms (varies by plugin complexity)
- Typical startup time: 100-500ms for 10 plugins
State transitions:
- Before: Runtime is uninitialized (no plugins loaded)
- After: Runtime is running, enabled plugins are active
Concurrency:
- Start should only be called once (not thread-safe for multiple callers)
- After Start completes, the runtime is fully thread-safe
Example usage in API server initialization:
runtime := NewRuntime(database)
if err := runtime.Start(ctx); err != nil {
log.Fatalf("Failed to start plugin runtime: %v", err)
}
log.Printf("Plugin runtime started, %d plugins loaded", len(runtime.ListPlugins()))
Common errors:
- Database connection failures: Check database connectivity
- Plugin manifest not found: Plugin may be uninstalled from catalog
- Plugin OnLoad failures: Check plugin logs for specific errors
See also:
- Stop(): Gracefully shuts down the runtime
- LoadPlugin(): Loads a single plugin dynamically
type RuntimeV2 ¶
type RuntimeV2 struct {
// contains filtered or unexported fields
}
RuntimeV2 manages the lifecycle and execution of plugins with automatic discovery.
This is the central orchestrator for the entire plugin system, responsible for:
- Discovering available plugins (filesystem + built-in)
- Loading enabled plugins from database on startup
- Managing plugin lifecycle (load, enable, disable, unload)
- Broadcasting events to all loaded plugins
- Providing centralized registries (API, UI, Events, Scheduler)
Thread Safety: All methods are thread-safe via sync.RWMutex protection.
func NewRuntimeV2 ¶
NewRuntimeV2 creates a new plugin runtime with automatic discovery.
Parameters:
- database: Database connection for loading installed plugins
- pluginDirs: Optional directories to scan for dynamic plugins (.so files)
Returns a new RuntimeV2 instance with:
- Auto-start enabled by default (loads plugins from database on Start())
- Empty plugin registry (no plugins loaded yet)
- Initialized event bus, scheduler, and registries
Example:
// Create runtime with custom plugin directories runtime := NewRuntimeV2(db, "/opt/plugins", "/usr/local/plugins") // Create runtime without plugin directories (built-in plugins only) runtime := NewRuntimeV2(db)
Thread Safety: Constructor is not thread-safe. Do not call concurrently.
func (*RuntimeV2) EmitEvent ¶
EmitEvent emits an event to all listening plugins.
This is the core event distribution mechanism that broadcasts platform events to all loaded and enabled plugins. Each plugin receives the event via its corresponding lifecycle hook method.
Parameters:
- eventType: Event type identifier (e.g., "session.created", "user.login")
- data: Event payload (typically a session or user struct)
Event Types and Hooks:
Session Events: - "session.created" → OnSessionCreated(ctx, session) - "session.started" → OnSessionStarted(ctx, session) - "session.stopped" → OnSessionStopped(ctx, session) - "session.hibernated" → OnSessionHibernated(ctx, session) - "session.woken" → OnSessionWoken(ctx, session) - "session.deleted" → OnSessionDeleted(ctx, session) User Events: - "user.created" → OnUserCreated(ctx, user) - "user.updated" → OnUserUpdated(ctx, user) - "user.deleted" → OnUserDeleted(ctx, user) - "user.login" → OnUserLogin(ctx, user) - "user.logout" → OnUserLogout(ctx, user)
Behavior:
- Only enabled plugins receive events (plugin.Enabled == true)
- Each plugin runs in a separate goroutine (non-blocking)
- Plugin panics are recovered and logged (don't crash runtime)
- Plugin hook errors are logged but don't stop other plugins
- Events are also emitted to event bus for custom subscriptions
Example:
// In API handler after session creation
session := &models.Session{
ID: 1,
UserID: "user123",
Name: "firefox-session",
}
// Emit event to all plugins
runtime.EmitEvent("session.created", session)
// Each plugin's OnSessionCreated hook is called:
// - Slack plugin sends notification
// - Analytics plugin tracks usage
// - Audit plugin logs creation
Performance:
- O(n) where n = number of loaded, enabled plugins
- Non-blocking: Plugins run in parallel goroutines
- No timeout: Long-running plugin hooks don't block other plugins
Thread Safety: Thread-safe via read lock (allows concurrent event emission).
func (*RuntimeV2) GetAPIRegistry ¶
func (r *RuntimeV2) GetAPIRegistry() *APIRegistry
GetAPIRegistry returns the API registry for direct access.
This allows external code to:
- Enumerate registered endpoints (registry.GetAll())
- Mount endpoints to Gin router (for each endpoint)
Primary Use Case: HTTP server initialization.
Example:
registry := runtime.GetAPIRegistry()
endpoints := registry.GetAll()
for _, endpoint := range endpoints {
log.Printf("Plugin %s registered: %s %s", endpoint.PluginName, endpoint.Method, endpoint.Path)
}
Thread Safety: APIRegistry has internal locking.
func (*RuntimeV2) GetEventBus ¶
GetEventBus returns the event bus for direct access.
This allows external code to:
- Subscribe to events (bus.Subscribe(eventType, handler))
- Emit custom events (bus.Emit(eventType, data))
Use Cases:
- Plugin code subscribing to custom events
- Testing event emission
Example:
bus := runtime.GetEventBus()
// Subscribe to custom event
bus.Subscribe("custom.event", func(data interface{}) {
log.Printf("Received custom event: %v", data)
})
// Emit custom event
bus.Emit("custom.event", map[string]string{"key": "value"})
Thread Safety: EventBus has internal locking.
func (*RuntimeV2) GetPlugin ¶
func (r *RuntimeV2) GetPlugin(name string) (*LoadedPlugin, error)
GetPlugin retrieves a loaded plugin by name.
Returns the LoadedPlugin struct containing:
- Name, Version, Enabled status
- Plugin configuration and manifest
- Plugin handler and instance
- LoadedAt timestamp, IsBuiltin flag
Parameters:
- name: Plugin identifier
Returns:
- Loaded plugin on success
- Error if plugin is not loaded
Example:
plugin, err := runtime.GetPlugin("slack-notifications")
if err != nil {
log.Printf("Plugin not loaded: %v", err)
return
}
log.Printf("Plugin: %s v%s (loaded at %v)", plugin.Name, plugin.Version, plugin.LoadedAt)
log.Printf("Builtin: %v, Enabled: %v", plugin.IsBuiltin, plugin.Enabled)
Thread Safety: Thread-safe via read lock.
func (*RuntimeV2) GetUIRegistry ¶
func (r *RuntimeV2) GetUIRegistry() *UIRegistry
GetUIRegistry returns the UI registry for direct access.
This allows external code to:
- Enumerate registered UI components (registry.GetWidgets(), etc.)
- Serialize component definitions for frontend
Primary Use Case: UI component manifest endpoint.
Example:
registry := runtime.GetUIRegistry()
widgets := registry.GetWidgets()
// Send to frontend
for _, widget := range widgets {
log.Printf("Widget: %s (component: %s)", widget.Title, widget.Component)
}
Thread Safety: UIRegistry has internal locking.
func (*RuntimeV2) ListAvailablePlugins ¶
ListAvailablePlugins returns names of all discoverable plugins.
This includes both loaded and unloaded plugins:
- Built-in plugins (registered via RegisterBuiltinPlugin)
- Dynamic plugins (discovered from plugin directories)
Returns:
- Slice of plugin names (empty if discovery fails)
Example:
available := runtime.ListAvailablePlugins()
loaded := runtime.ListPlugins()
log.Printf("Available: %d, Loaded: %d", len(available), len(loaded))
// Show unloaded plugins
loadedMap := make(map[string]bool)
for _, p := range loaded {
loadedMap[p.Name] = true
}
for _, name := range available {
if !loadedMap[name] {
log.Printf("Available but not loaded: %s", name)
}
}
Use Cases:
- Plugin catalog page (show all installable plugins)
- Discovery endpoint (GET /api/plugins/available)
Thread Safety: Thread-safe (discovery has internal locking).
func (*RuntimeV2) ListPlugins ¶
func (r *RuntimeV2) ListPlugins() []*LoadedPlugin
ListPlugins returns all currently loaded plugins.
Returns a slice of LoadedPlugin structs, one for each loaded plugin. The order is non-deterministic (map iteration order).
Returns:
- Slice of all loaded plugins (empty slice if none loaded)
Example:
plugins := runtime.ListPlugins()
log.Printf("Loaded %d plugins:", len(plugins))
for _, p := range plugins {
status := "disabled"
if p.Enabled {
status = "enabled"
}
log.Printf(" - %s v%s (%s)", p.Name, p.Version, status)
}
Use Cases:
- Admin UI plugin list page
- Status endpoints (GET /api/plugins/loaded)
- Metrics collection (number of loaded plugins)
Thread Safety: Thread-safe via read lock.
func (*RuntimeV2) LoadPluginByName ¶
LoadPluginByName loads a single plugin from the database by its name.
This method is useful for enabling a plugin at runtime after it was previously disabled. It queries the database for the plugin's configuration and manifest, then loads it into the runtime.
Parameters:
- ctx: Context for cancellation
- name: Plugin name to load
Returns:
- nil on success
- error if plugin not found in database or loading fails
Thread Safety: Thread-safe via internal LoadPluginWithConfig locking.
func (*RuntimeV2) LoadPluginWithConfig ¶
func (r *RuntimeV2) LoadPluginWithConfig(ctx context.Context, name, version string, config map[string]interface{}, manifest models.PluginManifest) error
LoadPluginWithConfig loads and initializes a plugin with specific configuration.
This is the core plugin loading method that:
- Checks if plugin is already loaded (prevents duplicates)
- Loads plugin handler via discovery system
- Creates PluginContext with all helper components
- Calls plugin's OnLoad lifecycle hook
- Registers plugin in runtime registry
Parameters:
- ctx: Context for cancellation
- name: Plugin identifier (must be discoverable)
- version: Plugin version string (for tracking/display)
- config: Plugin-specific configuration map
- manifest: Plugin manifest with metadata and permissions
Returns:
- nil on success
- error if plugin already loaded, not found, or OnLoad fails
Plugin Context Components:
Each plugin receives a PluginContext with access to:
- Database: Namespaced table access (plugin_name_*)
- Events: Pub/sub event system
- API: HTTP endpoint registration (/api/plugins/{name}/*)
- UI: Component registration (widgets, pages, menus)
- Storage: Key-value storage
- Logger: Structured JSON logging
- Scheduler: Cron job scheduling
Example:
config := map[string]interface{}{
"api_key": "secret-key",
"webhook_url": "https://hooks.slack.com/...",
}
err := runtime.LoadPluginWithConfig(ctx, "slack-notifications", "1.0.0", config, manifest)
if err != nil {
log.Fatalf("Failed to load plugin: %v", err)
}
// Plugin is now active and can receive events
runtime.EmitEvent("session.created", sessionData)
Thread Safety: Thread-safe via write lock (blocks other loads/unloads).
func (*RuntimeV2) RegisterBuiltinPlugin ¶
func (r *RuntimeV2) RegisterBuiltinPlugin(name string, factory PluginFactory)
RegisterBuiltinPlugin registers a built-in plugin for automatic discovery.
Built-in plugins are compiled into the API binary and don't require external .so files. This is typically called during package init():
func init() {
plugins.RegisterBuiltinPlugin("analytics", NewAnalyticsPlugin)
}
Parameters:
- name: Plugin identifier (must be unique)
- factory: Function that creates new plugin instances
The plugin becomes available for loading but is not automatically loaded. To load the plugin, either:
- Enable it in database (for auto-start mode)
- Call LoadPluginWithConfig manually
Example:
// Define plugin factory
func NewAnalyticsPlugin() PluginHandler {
return &AnalyticsPlugin{}
}
// Register as built-in
runtime.RegisterBuiltinPlugin("analytics", NewAnalyticsPlugin)
// Plugin is now discoverable
available := runtime.ListAvailablePlugins() // Contains "analytics"
Thread Safety: Not thread-safe. Call before Start().
func (*RuntimeV2) ReloadPlugin ¶
ReloadPlugin unloads and reloads a plugin with updated configuration.
This is useful for applying configuration changes without restarting the API.
Parameters:
- ctx: Context for cancellation
- name: Plugin name to reload
Returns:
- nil on success
- error if unload or load fails
Thread Safety: Thread-safe via internal locking.
func (*RuntimeV2) SetAutoStart ¶
SetAutoStart enables/disables automatic plugin loading on Start().
When auto-start is enabled (default):
- Start() queries database for enabled plugins
- Loads each enabled plugin automatically
- Best for production deployments
When auto-start is disabled:
- Start() only initializes the runtime (no plugin loading)
- Plugins must be loaded manually via LoadPlugin API
- Best for development, testing, or dynamic loading scenarios
Parameters:
- enabled: true to enable auto-start, false to disable
Example:
// Disable auto-start for testing runtime := NewRuntimeV2(db) runtime.SetAutoStart(false) runtime.Start(ctx) // No plugins loaded // Manually load specific plugin runtime.LoadPluginWithConfig(ctx, "test-plugin", "1.0.0", config, manifest)
Thread Safety: Not thread-safe. Call before Start().
func (*RuntimeV2) Start ¶
Start initializes the plugin runtime and auto-loads enabled plugins.
Startup sequence:
- Start the cron scheduler (for plugin scheduled jobs)
- Discover all available plugins (filesystem + built-in)
- If auto-start is enabled: Load all enabled plugins from database
Parameters:
- ctx: Context for cancellation and timeout control
Returns:
- nil on success
- error if plugin discovery or loading fails critically
Behavior:
- Plugin discovery errors are logged as warnings but don't fail startup
- Individual plugin loading errors are logged but don't fail startup
- Only critical errors (database connection, etc.) return error
Example:
runtime := NewRuntimeV2(db, "/opt/plugins")
// Start with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := runtime.Start(ctx); err != nil {
log.Fatalf("Failed to start plugin runtime: %v", err)
}
log.Printf("Plugin runtime started, %d plugins loaded", len(runtime.ListPlugins()))
Thread Safety: Safe to call concurrently, but typically called once at startup.
func (*RuntimeV2) Stop ¶
Stop gracefully shuts down the plugin runtime.
Shutdown sequence:
- Unload all loaded plugins (calls OnUnload hooks)
- Remove all scheduled jobs from cron scheduler
- Unregister all API endpoints
- Unregister all UI components
- Remove all event subscriptions
- Stop the cron scheduler (waits for running jobs)
Parameters:
- ctx: Context for cancellation (currently not used, reserved for future)
Returns:
- Always returns nil (errors are logged, not returned)
Behavior:
- Individual plugin OnUnload errors are logged but don't stop shutdown
- All plugins are unloaded even if some fail
- Scheduler waits for all running jobs to complete
Example:
runtime := NewRuntimeV2(db) runtime.Start(ctx) // On shutdown (e.g., SIGTERM handler) defer runtime.Stop(context.Background()) // Or with timeout ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() runtime.Stop(ctx)
Thread Safety: Thread-safe via write lock.
func (*RuntimeV2) UnloadPlugin ¶
UnloadPlugin unloads a specific plugin.
This removes the plugin from the runtime and cleans up all its resources:
- Calls plugin's OnUnload lifecycle hook
- Removes all scheduled cron jobs
- Unregisters all HTTP API endpoints
- Unregisters all UI components
- Removes all event subscriptions
Parameters:
- ctx: Context for cancellation
- name: Plugin name to unload
Returns:
- nil on success
- error if plugin is not loaded
Example:
// Unload a plugin manually
if err := runtime.UnloadPlugin(ctx, "slack-notifications"); err != nil {
log.Printf("Failed to unload plugin: %v", err)
}
// Plugin is now unloaded and won't receive events
runtime.EmitEvent("session.created", data) // slack-notifications won't see this
Thread Safety: Thread-safe via write lock.
type UIAdminPage ¶
type UIAdminPage struct {
// PluginName identifies which plugin registered this page.
PluginName string
// ID is a unique identifier for this page within the plugin.
ID string
// Title is the page title.
Title string
// Path is the route path relative to /admin/plugins/{pluginName}/.
Path string
// Component is the React component name or bundle URL.
Component string
// Icon is the icon shown in admin menu.
Icon string
// MenuLabel is the text shown in admin navigation menu.
MenuLabel string
// Permissions lists required permissions (typically admin permissions).
Permissions []string
// Order determines position in admin menu (lower = earlier).
// Typical range: 0-1000
Order int
}
UIAdminPage represents an admin panel page.
Admin pages are similar to regular pages but appear in the admin panel and typically require admin permissions.
URL Format:
/admin/plugins/{pluginName}/{path}
Example: /admin/plugins/slack/settings
Menu Ordering:
Admin pages appear in the admin menu sorted by Order field. Lower numbers appear first.
Example:
&UIAdminPage{
ID: "settings",
Title: "Slack Settings",
Path: "/settings",
Component: "SlackAdminSettings",
Icon: "cog",
MenuLabel: "Slack",
Permissions: []string{"admin.plugins.manage"},
Order: 100,
}
type UIMenuItem ¶
type UIMenuItem struct {
// PluginName identifies which plugin registered this item.
PluginName string
// ID is a unique identifier for this item within the plugin.
ID string
// Label is the text displayed in the menu.
Label string
// Path is the URL to navigate to when clicked.
// Can be:
// - Internal: "/plugins/slack/messages"
// - External: "https://slack.com"
Path string
// Icon is the icon shown next to the label.
Icon string
// Component is an optional custom React component for the menu item.
// If empty, standard menu item rendering is used.
Component string
// Order determines position in menu (lower = earlier).
// Recommended: 1000+ for plugin items.
Order int
// Permissions lists required permissions to see this menu item.
Permissions []string
}
UIMenuItem represents a navigation menu item.
Menu items appear in the main navigation menu and can link to:
- Plugin pages
- External URLs
- Custom components
Menu Ordering:
Items are sorted by Order field. Lower numbers appear first. Standard menu items use Order 100-900. Plugin items typically use Order 1000+.
Example:
&UIMenuItem{
ID: "slack-menu",
Label: "Slack",
Path: "/plugins/slack/messages",
Icon: "slack",
Order: 1000,
Permissions: []string{"plugin.slack.read"},
}
type UIPage ¶
type UIPage struct {
// PluginName identifies which plugin registered this page.
// Set automatically by the registry.
PluginName string
// ID is a unique identifier for this page within the plugin.
ID string
// Title is the page title shown in browser tab and header.
Title string
// Path is the route path relative to /plugins/{pluginName}/.
// Example: "/messages" becomes "/plugins/slack/messages"
Path string
// Component is the React component name or bundle URL.
Component string
// Icon is the icon shown in menus and browser tab.
Icon string
// MenuLabel is the text shown in navigation menus.
// If empty, page is not added to menus (direct URL only).
MenuLabel string
// Permissions lists required permissions to access this page.
// Frontend enforces access control before rendering.
Permissions []string
}
UIPage represents a user-facing page.
Pages are full-page components rendered at custom routes. They provide complete plugin-specific interfaces within the main application.
URL Format:
/plugins/{pluginName}/{path}
Example: /plugins/slack/messages
Navigation:
Pages can appear in navigation menus if MenuLabel is set. Otherwise, they're accessible only by direct URL.
Example:
&UIPage{
ID: "messages",
Title: "Slack Messages",
Path: "/messages", // Results in /plugins/slack/messages
Component: "SlackMessagesPage",
Icon: "comment",
MenuLabel: "Messages", // Appears in menu
Permissions: []string{"plugin.slack.read"},
}
type UIRegistry ¶
type UIRegistry struct {
// contains filtered or unexported fields
}
UIRegistry manages plugin UI component registrations.
The registry provides centralized management of all plugin-contributed UI components, enabling dynamic frontend integration without core code changes.
Key responsibilities:
- Store UI component registrations with plugin attribution
- Support multiple component types (widgets, pages, menus)
- Prevent component ID conflicts between plugins
- Provide thread-safe concurrent access
- Support bulk cleanup on plugin unload
Registry Structure:
widgets: map[string]*UIWidget // User dashboard widgets
pages: map[string]*UIPage // User-facing pages
adminPages: map[string]*UIAdminPage // Admin panel pages
menuItems: map[string]*UIMenuItem // Navigation menu items
adminWidgets: map[string]*UIWidget // Admin dashboard widgets
Map key format: "{pluginName}:{componentID}"
Example: "slack:widget-stats"
Concurrency Model:
Register methods: Write lock (exclusive) Get methods: Read lock (shared) UnregisterAll: Write lock (exclusive) Multiple plugins can query concurrently Registration is serialized to prevent conflicts
func NewUIRegistry ¶
func NewUIRegistry() *UIRegistry
NewUIRegistry creates a new UI registry.
Returns an initialized registry ready to accept plugin UI component registrations.
func (*UIRegistry) GetAdminPages ¶
func (r *UIRegistry) GetAdminPages() []*UIAdminPage
GetAdminPages returns all registered admin pages.
Returns a snapshot of all admin pages. Admin UI uses this to register routes and populate admin navigation menu.
Thread Safety: Acquires shared read lock.
func (*UIRegistry) GetAdminWidgets ¶
func (r *UIRegistry) GetAdminWidgets() []*UIWidget
GetAdminWidgets returns all registered admin dashboard widgets.
Returns a snapshot of all widgets for the admin dashboard. Admin UI fetches this to render admin-specific widgets.
Thread Safety: Acquires shared read lock.
func (*UIRegistry) GetMenuItems ¶
func (r *UIRegistry) GetMenuItems() []*UIMenuItem
GetMenuItems returns all registered navigation menu items.
Returns a snapshot of all menu items. Frontend uses this to populate the main navigation menu, sorted by Order field.
Thread Safety: Acquires shared read lock.
func (*UIRegistry) GetPages ¶
func (r *UIRegistry) GetPages() []*UIPage
GetPages returns all registered user-facing pages.
Returns a snapshot of all pages. Frontend uses this to register routes and populate navigation menus.
Thread Safety: Acquires shared read lock.
func (*UIRegistry) GetWidgets ¶
func (r *UIRegistry) GetWidgets() []*UIWidget
GetWidgets returns all registered user dashboard widgets.
Returns a snapshot of all widgets for the user home dashboard. Frontend fetches this to render widgets dynamically.
Thread Safety: Acquires shared read lock.
Returns: Slice of all registered widgets (copy, safe to modify)
func (*UIRegistry) RegisterAdminPage ¶
func (r *UIRegistry) RegisterAdminPage(pluginName string, page *UIAdminPage) error
RegisterAdminPage registers an admin panel page.
Registers an admin page accessible at /admin/plugins/{pluginName}/{path}. Admin pages appear in admin navigation menu sorted by Order field.
Thread Safety: Acquires exclusive write lock.
func (*UIRegistry) RegisterAdminWidget ¶
func (r *UIRegistry) RegisterAdminWidget(pluginName string, widget *UIWidget) error
RegisterAdminWidget registers an admin dashboard widget.
Similar to RegisterWidget but for admin dashboard. Admin widgets typically display platform-wide metrics, plugin health, or administrative quick actions.
Thread Safety: Acquires exclusive write lock.
func (*UIRegistry) RegisterMenuItem ¶
func (r *UIRegistry) RegisterMenuItem(pluginName string, item *UIMenuItem) error
RegisterMenuItem registers a navigation menu item.
Menu items appear in the main navigation menu. They can link to plugin pages, external URLs, or use custom components. Items are sorted by Order field.
Thread Safety: Acquires exclusive write lock.
func (*UIRegistry) RegisterPage ¶
func (r *UIRegistry) RegisterPage(pluginName string, page *UIPage) error
RegisterPage registers a user-facing page.
Registers a full-page component accessible at /plugins/{pluginName}/{path}. Pages can optionally appear in navigation menus if MenuLabel is set.
Thread Safety: Acquires exclusive write lock.
func (*UIRegistry) RegisterWidget ¶
func (r *UIRegistry) RegisterWidget(pluginName string, widget *UIWidget) error
RegisterWidget registers a user dashboard widget.
Stores widget metadata for display on the user's home dashboard. Frontend fetches registered widgets via API and renders them dynamically.
Parameters:
- pluginName: Name of the plugin registering the widget
- widget: Widget configuration (title, component, position, width)
Returns:
- error: Conflict error if widget ID already registered, nil on success
Thread Safety: Acquires exclusive write lock.
Example:
err := registry.RegisterWidget("slack", &UIWidget{
ID: "stats", Title: "Slack Stats", Position: "top", Width: "half",
})
func (*UIRegistry) UnregisterAll ¶
func (r *UIRegistry) UnregisterAll(pluginName string)
UnregisterAll removes all UI components for a plugin.
Called during plugin unload to clean up all widgets, pages, admin pages, menu items, and admin widgets registered by the plugin.
Thread Safety: Acquires exclusive write lock.
type UIWidget ¶
type UIWidget struct {
// PluginName identifies which plugin registered this widget.
// Set automatically by the registry.
PluginName string
// ID is a unique identifier for this widget within the plugin.
// Format: kebab-case (e.g., "session-stats")
ID string
// Title is displayed as the widget header.
// Example: "Session Statistics"
Title string
// Component is the React component name or bundle URL.
// Can be:
// - Component name: "SessionStatsWidget"
// - Bundle URL: "/plugins/slack/widget.js"
Component string
// Position determines vertical placement on the dashboard.
// Values: "top", "sidebar", "bottom"
Position string
// Width determines horizontal size.
// Values: "full" (100%), "half" (50%), "third" (33%)
Width string
// Icon is the icon name from the icon library.
// Example: "chart-line", "bell", "users"
Icon string
// Permissions lists required permissions to view this widget.
// Frontend checks user permissions before rendering.
// Empty = visible to all users.
Permissions []string
}
UIWidget represents a dashboard widget.
Widgets are cards displayed on the user's home dashboard. They can show real-time data, quick actions, or status information.
Layout:
- Position: Where on the dashboard (top, sidebar, bottom)
- Width: How much horizontal space (full=100%, half=50%, third=33%)
Example widgets:
- "Session Activity": Recent session usage
- "Quota Status": Resource usage bars
- "Quick Actions": Buttons to create sessions
Example:
&UIWidget{
ID: "session-stats",
Title: "Session Statistics",
Component: "SessionStatsWidget", // React component name
Position: "top",
Width: "half",
Icon: "chart-line",
Permissions: []string{"sessions.read"},
}
type WidgetOptions ¶
type WidgetOptions struct {
ID string
Title string
Component string
Position string
Width string
Icon string
Permissions []string
}
WidgetOptions contains options for registering a widget.
Fields:
- ID: Unique widget identifier within plugin
- Title: Widget header text
- Component: React component name or bundle URL
- Position: Dashboard placement ("top", "sidebar", "bottom")
- Width: Horizontal size ("full", "half", "third")
- Icon: Icon name
- Permissions: Required permissions to view