Documentation
¶
Index ¶
- Variables
- type Bulkhead
- func (b *Bulkhead) Acquire(ctx context.Context, tenantID string) (func(), error)
- func (b *Bulkhead) AcquireWait(ctx context.Context, tenantID string) (func(), error)
- func (b *Bulkhead) RemoveLimit(tenantID string)
- func (b *Bulkhead) SetLimit(tenantID string, cfg TenantLimitConfig)
- func (b *Bulkhead) Stats() map[string]BulkheadStats
- type BulkheadConfig
- type BulkheadStats
- type ConsistentHash
- func (h *ConsistentHash) AddNode(node string)
- func (h *ConsistentHash) GetNode(key string) (string, error)
- func (h *ConsistentHash) GetNodes(key string, count int) ([]string, error)
- func (h *ConsistentHash) Members() []string
- func (h *ConsistentHash) RemoveNode(node string)
- func (h *ConsistentHash) Size() int
- type DistributedLock
- type InMemoryLock
- type PGAdvisoryLock
- type RedisLock
- type Shard
- type ShardInfo
- type ShardManager
- func (m *ShardManager) AddShard(id string) error
- func (m *ShardManager) RemoveShard(id string) error
- func (m *ShardManager) RouteTask(partitionKey string, task Task) error
- func (m *ShardManager) ShardCount() int
- func (m *ShardManager) ShardStats() map[string]ShardInfo
- func (m *ShardManager) Start(ctx context.Context) error
- func (m *ShardManager) Stop() error
- type ShardManagerConfig
- type ShardState
- type Task
- type TaskResult
- type TenantLimitConfig
- type WorkerPool
- type WorkerPoolConfig
- type WorkerPoolStats
Constants ¶
This section is empty.
Variables ¶
var ErrTenantLimitExceeded = fmt.Errorf("tenant concurrency limit exceeded")
ErrTenantLimitExceeded is returned when a tenant has reached its concurrency limit.
Functions ¶
This section is empty.
Types ¶
type Bulkhead ¶
type Bulkhead struct {
// contains filtered or unexported fields
}
Bulkhead provides per-tenant resource isolation to prevent noisy neighbors. Each tenant has configurable concurrency limits enforced via semaphores.
func NewBulkhead ¶
func NewBulkhead(cfg BulkheadConfig) *Bulkhead
NewBulkhead creates a new bulkhead with the given configuration.
func (*Bulkhead) Acquire ¶
Acquire attempts to acquire a slot for the given tenant. Returns ErrTenantLimitExceeded if the tenant has reached its concurrency limit. On success, returns a release function that must be called when the operation completes.
func (*Bulkhead) AcquireWait ¶
AcquireWait attempts to acquire a slot for the given tenant, blocking until a slot is available or the context is cancelled.
func (*Bulkhead) RemoveLimit ¶
RemoveLimit removes the specific limit for a tenant, reverting to the default.
func (*Bulkhead) SetLimit ¶
func (b *Bulkhead) SetLimit(tenantID string, cfg TenantLimitConfig)
SetLimit configures limits for a specific tenant. If the tenant already has a limit, it replaces the semaphore while preserving rejection stats.
func (*Bulkhead) Stats ¶
func (b *Bulkhead) Stats() map[string]BulkheadStats
Stats returns current usage statistics per tenant. Includes all tenants with specific limits plus the default bucket.
type BulkheadConfig ¶
type BulkheadConfig struct {
// DefaultMaxConcurrent is the default concurrency limit for tenants without specific limits.
DefaultMaxConcurrent int
// DefaultRateLimit is the default rate limit (requests per second).
DefaultRateLimit float64
}
BulkheadConfig configures the bulkhead defaults.
func DefaultBulkheadConfig ¶
func DefaultBulkheadConfig() BulkheadConfig
DefaultBulkheadConfig returns sensible defaults.
type BulkheadStats ¶
type BulkheadStats struct {
TenantID string `json:"tenant_id"`
Active int `json:"active"`
MaxConcurrent int `json:"max_concurrent"`
Rejected int64 `json:"rejected"`
}
BulkheadStats holds current usage statistics for a tenant.
type ConsistentHash ¶
type ConsistentHash struct {
// contains filtered or unexported fields
}
ConsistentHash implements a consistent hashing ring for partitioning work by tenant or conversation ID across a set of nodes.
func NewConsistentHash ¶
func NewConsistentHash(replicas int) *ConsistentHash
NewConsistentHash creates a new consistent hash ring. replicas controls the number of virtual nodes per physical node (higher = more even distribution).
func (*ConsistentHash) AddNode ¶
func (h *ConsistentHash) AddNode(node string)
AddNode adds a node to the hash ring.
func (*ConsistentHash) GetNode ¶
func (h *ConsistentHash) GetNode(key string) (string, error)
GetNode returns the node responsible for the given key.
func (*ConsistentHash) GetNodes ¶
func (h *ConsistentHash) GetNodes(key string, count int) ([]string, error)
GetNodes returns the N distinct nodes responsible for the given key, in ring order. Useful for replication.
func (*ConsistentHash) Members ¶
func (h *ConsistentHash) Members() []string
Members returns the set of nodes in the ring.
func (*ConsistentHash) RemoveNode ¶
func (h *ConsistentHash) RemoveNode(node string)
RemoveNode removes a node from the hash ring.
func (*ConsistentHash) Size ¶
func (h *ConsistentHash) Size() int
Size returns the number of nodes in the ring.
type DistributedLock ¶
type DistributedLock interface {
// Acquire obtains a lock for the given key. Returns a release function.
// Blocks until the lock is acquired or context is cancelled.
Acquire(ctx context.Context, key string, ttl time.Duration) (release func(), err error)
// TryAcquire attempts to acquire a lock without blocking.
// Returns false if the lock is already held.
TryAcquire(ctx context.Context, key string, ttl time.Duration) (release func(), acquired bool, err error)
}
DistributedLock provides distributed locking for state machine transitions and other coordination needs across multiple Workflow instances.
type InMemoryLock ¶
type InMemoryLock struct {
// contains filtered or unexported fields
}
InMemoryLock implements DistributedLock for testing and single-server deployments. Uses sync.Mutex per key with a map.
func NewInMemoryLock ¶
func NewInMemoryLock() *InMemoryLock
NewInMemoryLock creates a new in-memory distributed lock.
func (*InMemoryLock) Acquire ¶
Acquire obtains a lock for the given key, blocking until acquired or context cancelled.
func (*InMemoryLock) TryAcquire ¶
func (l *InMemoryLock) TryAcquire(ctx context.Context, key string, ttl time.Duration) (func(), bool, error)
TryAcquire attempts to acquire a lock without blocking. Returns false if the lock is already held.
type PGAdvisoryLock ¶
type PGAdvisoryLock struct {
// contains filtered or unexported fields
}
PGAdvisoryLock implements DistributedLock using PostgreSQL advisory locks (pg_advisory_lock / pg_advisory_unlock). The key string is hashed to int64 for use as the lock ID.
func NewPGAdvisoryLock ¶
func NewPGAdvisoryLock(db *sql.DB) *PGAdvisoryLock
NewPGAdvisoryLock creates a new PostgreSQL advisory lock implementation.
func (*PGAdvisoryLock) Acquire ¶
func (l *PGAdvisoryLock) Acquire(ctx context.Context, key string, ttl time.Duration) (func(), error)
Acquire obtains a PostgreSQL advisory lock for the given key. Blocks until the lock is acquired or context is cancelled. Note: ttl is not natively supported by pg_advisory_lock; the lock is held until explicitly released or the session ends.
func (*PGAdvisoryLock) TryAcquire ¶
func (l *PGAdvisoryLock) TryAcquire(ctx context.Context, key string, ttl time.Duration) (func(), bool, error)
TryAcquire attempts to acquire a PostgreSQL advisory lock without blocking. Returns false if the lock is already held.
type RedisLock ¶
type RedisLock struct {
// contains filtered or unexported fields
}
RedisLock implements DistributedLock using Redis SET NX with TTL. Requires a Redis client connection. This is a stub implementation; the full implementation will be provided when the Redis client is integrated as a direct dependency.
func NewRedisLock ¶
NewRedisLock creates a new Redis distributed lock stub. Full implementation requires a Redis client (e.g., go-redis).
type Shard ¶
type Shard struct {
ID string
State ShardState
Pool *WorkerPool
ActiveTasks int64
CreatedAt time.Time
}
Shard represents a processing partition.
type ShardInfo ¶
type ShardInfo struct {
ID string
State string
ActiveWorkers int
PendingTasks int
CompletedOK int64
CompletedErr int64
CreatedAt time.Time
}
ShardInfo holds read-only information about a shard.
type ShardManager ¶
type ShardManager struct {
// contains filtered or unexported fields
}
ShardManager manages a set of shards and routes tasks to them using consistent hashing on the tenant/conversation key.
func NewShardManager ¶
func NewShardManager(cfg ShardManagerConfig) *ShardManager
NewShardManager creates a new shard manager.
func (*ShardManager) AddShard ¶
func (m *ShardManager) AddShard(id string) error
AddShard dynamically adds a new shard.
func (*ShardManager) RemoveShard ¶
func (m *ShardManager) RemoveShard(id string) error
RemoveShard drains and removes a shard.
func (*ShardManager) RouteTask ¶
func (m *ShardManager) RouteTask(partitionKey string, task Task) error
RouteTask routes a task to the appropriate shard based on the partition key.
func (*ShardManager) ShardCount ¶
func (m *ShardManager) ShardCount() int
ShardCount returns the number of active shards.
func (*ShardManager) ShardStats ¶
func (m *ShardManager) ShardStats() map[string]ShardInfo
ShardStats returns statistics for all shards.
func (*ShardManager) Start ¶
func (m *ShardManager) Start(ctx context.Context) error
Start initializes all shards and their worker pools.
func (*ShardManager) Stop ¶
func (m *ShardManager) Stop() error
Stop gracefully shuts down all shards.
type ShardManagerConfig ¶
type ShardManagerConfig struct {
// ShardCount is the initial number of shards.
ShardCount int
// Replicas is the number of virtual nodes per shard in the hash ring.
Replicas int
// PoolConfig is the worker pool config for each shard.
PoolConfig WorkerPoolConfig
}
ShardManagerConfig configures the shard manager.
func DefaultShardManagerConfig ¶
func DefaultShardManagerConfig() ShardManagerConfig
DefaultShardManagerConfig returns sensible defaults.
type ShardState ¶
type ShardState int
ShardState represents the lifecycle state of a shard.
const ( ShardActive ShardState = iota ShardDraining ShardInactive )
func (ShardState) String ¶
func (s ShardState) String() string
type TaskResult ¶
TaskResult holds the outcome of a task execution.
type TenantLimitConfig ¶
type TenantLimitConfig struct {
// MaxConcurrent is the maximum number of concurrent operations allowed.
MaxConcurrent int
// RateLimit is the maximum requests per second (reserved for future use).
RateLimit float64
}
TenantLimitConfig defines the configuration for a tenant's concurrency constraints. Use this when calling SetLimit to configure tenant-specific limits.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages a pool of goroutines for executing workflow tasks.
func NewWorkerPool ¶
func NewWorkerPool(cfg WorkerPoolConfig) *WorkerPool
NewWorkerPool creates a new worker pool.
func (*WorkerPool) Results ¶
func (p *WorkerPool) Results() <-chan TaskResult
Results returns the results channel for reading task outcomes.
func (*WorkerPool) Start ¶
func (p *WorkerPool) Start(ctx context.Context) error
Start launches the minimum number of workers.
func (*WorkerPool) Stats ¶
func (p *WorkerPool) Stats() WorkerPoolStats
Stats returns current pool statistics.
func (*WorkerPool) Stop ¶
func (p *WorkerPool) Stop() error
Stop gracefully shuts down the worker pool.
func (*WorkerPool) Submit ¶
func (p *WorkerPool) Submit(task Task) error
Submit adds a task to the queue. It spawns an extra worker if the queue is getting full and the pool hasn't reached MaxWorkers.
type WorkerPoolConfig ¶
type WorkerPoolConfig struct {
// MinWorkers is the minimum number of goroutines kept alive.
MinWorkers int
// MaxWorkers is the maximum number of goroutines allowed.
MaxWorkers int
// QueueSize is the capacity of the task queue.
QueueSize int
// IdleTimeout is how long an idle worker waits before exiting (above MinWorkers).
IdleTimeout time.Duration
}
WorkerPoolConfig configures the worker pool.
func DefaultWorkerPoolConfig ¶
func DefaultWorkerPoolConfig() WorkerPoolConfig
DefaultWorkerPoolConfig returns sensible defaults.