scale

package
v0.0.0-...-dac86b4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrTenantLimitExceeded = fmt.Errorf("tenant concurrency limit exceeded")

ErrTenantLimitExceeded is returned when a tenant has reached its concurrency limit.

Functions

This section is empty.

Types

type Bulkhead

type Bulkhead struct {
	// contains filtered or unexported fields
}

Bulkhead provides per-tenant resource isolation to prevent noisy neighbors. Each tenant has configurable concurrency limits enforced via semaphores.

func NewBulkhead

func NewBulkhead(cfg BulkheadConfig) *Bulkhead

NewBulkhead creates a new bulkhead with the given configuration.

func (*Bulkhead) Acquire

func (b *Bulkhead) Acquire(ctx context.Context, tenantID string) (func(), error)

Acquire attempts to acquire a slot for the given tenant. Returns ErrTenantLimitExceeded if the tenant has reached its concurrency limit. On success, returns a release function that must be called when the operation completes.

func (*Bulkhead) AcquireWait

func (b *Bulkhead) AcquireWait(ctx context.Context, tenantID string) (func(), error)

AcquireWait attempts to acquire a slot for the given tenant, blocking until a slot is available or the context is cancelled.

func (*Bulkhead) RemoveLimit

func (b *Bulkhead) RemoveLimit(tenantID string)

RemoveLimit removes the specific limit for a tenant, reverting to the default.

func (*Bulkhead) SetLimit

func (b *Bulkhead) SetLimit(tenantID string, cfg TenantLimitConfig)

SetLimit configures limits for a specific tenant. If the tenant already has a limit, it replaces the semaphore while preserving rejection stats.

func (*Bulkhead) Stats

func (b *Bulkhead) Stats() map[string]BulkheadStats

Stats returns current usage statistics per tenant. Includes all tenants with specific limits plus the default bucket.

type BulkheadConfig

type BulkheadConfig struct {
	// DefaultMaxConcurrent is the default concurrency limit for tenants without specific limits.
	DefaultMaxConcurrent int
	// DefaultRateLimit is the default rate limit (requests per second).
	DefaultRateLimit float64
}

BulkheadConfig configures the bulkhead defaults.

func DefaultBulkheadConfig

func DefaultBulkheadConfig() BulkheadConfig

DefaultBulkheadConfig returns sensible defaults.

type BulkheadStats

type BulkheadStats struct {
	TenantID      string `json:"tenant_id"`
	Active        int    `json:"active"`
	MaxConcurrent int    `json:"max_concurrent"`
	Rejected      int64  `json:"rejected"`
}

BulkheadStats holds current usage statistics for a tenant.

type ConsistentHash

type ConsistentHash struct {
	// contains filtered or unexported fields
}

ConsistentHash implements a consistent hashing ring for partitioning work by tenant or conversation ID across a set of nodes.

func NewConsistentHash

func NewConsistentHash(replicas int) *ConsistentHash

NewConsistentHash creates a new consistent hash ring. replicas controls the number of virtual nodes per physical node (higher = more even distribution).

func (*ConsistentHash) AddNode

func (h *ConsistentHash) AddNode(node string)

AddNode adds a node to the hash ring.

func (*ConsistentHash) GetNode

func (h *ConsistentHash) GetNode(key string) (string, error)

GetNode returns the node responsible for the given key.

func (*ConsistentHash) GetNodes

func (h *ConsistentHash) GetNodes(key string, count int) ([]string, error)

GetNodes returns the N distinct nodes responsible for the given key, in ring order. Useful for replication.

func (*ConsistentHash) Members

func (h *ConsistentHash) Members() []string

Members returns the set of nodes in the ring.

func (*ConsistentHash) RemoveNode

func (h *ConsistentHash) RemoveNode(node string)

RemoveNode removes a node from the hash ring.

func (*ConsistentHash) Size

func (h *ConsistentHash) Size() int

Size returns the number of nodes in the ring.

type DistributedLock

type DistributedLock interface {
	// Acquire obtains a lock for the given key. Returns a release function.
	// Blocks until the lock is acquired or context is cancelled.
	Acquire(ctx context.Context, key string, ttl time.Duration) (release func(), err error)
	// TryAcquire attempts to acquire a lock without blocking.
	// Returns false if the lock is already held.
	TryAcquire(ctx context.Context, key string, ttl time.Duration) (release func(), acquired bool, err error)
}

DistributedLock provides distributed locking for state machine transitions and other coordination needs across multiple Workflow instances.

type InMemoryLock

type InMemoryLock struct {
	// contains filtered or unexported fields
}

InMemoryLock implements DistributedLock for testing and single-server deployments. Uses sync.Mutex per key with a map.

func NewInMemoryLock

func NewInMemoryLock() *InMemoryLock

NewInMemoryLock creates a new in-memory distributed lock.

func (*InMemoryLock) Acquire

func (l *InMemoryLock) Acquire(ctx context.Context, key string, ttl time.Duration) (func(), error)

Acquire obtains a lock for the given key, blocking until acquired or context cancelled.

func (*InMemoryLock) TryAcquire

func (l *InMemoryLock) TryAcquire(ctx context.Context, key string, ttl time.Duration) (func(), bool, error)

TryAcquire attempts to acquire a lock without blocking. Returns false if the lock is already held.

type PGAdvisoryLock

type PGAdvisoryLock struct {
	// contains filtered or unexported fields
}

PGAdvisoryLock implements DistributedLock using PostgreSQL advisory locks (pg_advisory_lock / pg_advisory_unlock). The key string is hashed to int64 for use as the lock ID.

func NewPGAdvisoryLock

func NewPGAdvisoryLock(db *sql.DB) *PGAdvisoryLock

NewPGAdvisoryLock creates a new PostgreSQL advisory lock implementation.

func (*PGAdvisoryLock) Acquire

func (l *PGAdvisoryLock) Acquire(ctx context.Context, key string, ttl time.Duration) (func(), error)

Acquire obtains a PostgreSQL advisory lock for the given key. Blocks until the lock is acquired or context is cancelled. Note: ttl is not natively supported by pg_advisory_lock; the lock is held until explicitly released or the session ends.

func (*PGAdvisoryLock) TryAcquire

func (l *PGAdvisoryLock) TryAcquire(ctx context.Context, key string, ttl time.Duration) (func(), bool, error)

TryAcquire attempts to acquire a PostgreSQL advisory lock without blocking. Returns false if the lock is already held.

type RedisLock

type RedisLock struct {
	// contains filtered or unexported fields
}

RedisLock implements DistributedLock using Redis SET NX with TTL. Requires a Redis client connection. This is a stub implementation; the full implementation will be provided when the Redis client is integrated as a direct dependency.

func NewRedisLock

func NewRedisLock(addr string) *RedisLock

NewRedisLock creates a new Redis distributed lock stub. Full implementation requires a Redis client (e.g., go-redis).

func (*RedisLock) Acquire

func (l *RedisLock) Acquire(_ context.Context, key string, _ time.Duration) (func(), error)

Acquire obtains a lock using Redis SET NX with TTL. This is a stub that returns an error indicating Redis is not yet configured.

func (*RedisLock) TryAcquire

func (l *RedisLock) TryAcquire(_ context.Context, key string, _ time.Duration) (func(), bool, error)

TryAcquire attempts to acquire a Redis lock without blocking. This is a stub that returns an error indicating Redis is not yet configured.

type Shard

type Shard struct {
	ID          string
	State       ShardState
	Pool        *WorkerPool
	ActiveTasks int64
	CreatedAt   time.Time
}

Shard represents a processing partition.

type ShardInfo

type ShardInfo struct {
	ID            string
	State         string
	ActiveWorkers int
	PendingTasks  int
	CompletedOK   int64
	CompletedErr  int64
	CreatedAt     time.Time
}

ShardInfo holds read-only information about a shard.

type ShardManager

type ShardManager struct {
	// contains filtered or unexported fields
}

ShardManager manages a set of shards and routes tasks to them using consistent hashing on the tenant/conversation key.

func NewShardManager

func NewShardManager(cfg ShardManagerConfig) *ShardManager

NewShardManager creates a new shard manager.

func (*ShardManager) AddShard

func (m *ShardManager) AddShard(id string) error

AddShard dynamically adds a new shard.

func (*ShardManager) RemoveShard

func (m *ShardManager) RemoveShard(id string) error

RemoveShard drains and removes a shard.

func (*ShardManager) RouteTask

func (m *ShardManager) RouteTask(partitionKey string, task Task) error

RouteTask routes a task to the appropriate shard based on the partition key.

func (*ShardManager) ShardCount

func (m *ShardManager) ShardCount() int

ShardCount returns the number of active shards.

func (*ShardManager) ShardStats

func (m *ShardManager) ShardStats() map[string]ShardInfo

ShardStats returns statistics for all shards.

func (*ShardManager) Start

func (m *ShardManager) Start(ctx context.Context) error

Start initializes all shards and their worker pools.

func (*ShardManager) Stop

func (m *ShardManager) Stop() error

Stop gracefully shuts down all shards.

type ShardManagerConfig

type ShardManagerConfig struct {
	// ShardCount is the initial number of shards.
	ShardCount int
	// Replicas is the number of virtual nodes per shard in the hash ring.
	Replicas int
	// PoolConfig is the worker pool config for each shard.
	PoolConfig WorkerPoolConfig
}

ShardManagerConfig configures the shard manager.

func DefaultShardManagerConfig

func DefaultShardManagerConfig() ShardManagerConfig

DefaultShardManagerConfig returns sensible defaults.

type ShardState

type ShardState int

ShardState represents the lifecycle state of a shard.

const (
	ShardActive ShardState = iota
	ShardDraining
	ShardInactive
)

func (ShardState) String

func (s ShardState) String() string

type Task

type Task struct {
	ID       string
	TenantID string
	Execute  func(ctx context.Context) error
}

Task represents a unit of work to be executed by the worker pool.

type TaskResult

type TaskResult struct {
	TaskID   string
	TenantID string
	Err      error
	Duration time.Duration
}

TaskResult holds the outcome of a task execution.

type TenantLimitConfig

type TenantLimitConfig struct {
	// MaxConcurrent is the maximum number of concurrent operations allowed.
	MaxConcurrent int
	// RateLimit is the maximum requests per second (reserved for future use).
	RateLimit float64
}

TenantLimitConfig defines the configuration for a tenant's concurrency constraints. Use this when calling SetLimit to configure tenant-specific limits.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of goroutines for executing workflow tasks.

func NewWorkerPool

func NewWorkerPool(cfg WorkerPoolConfig) *WorkerPool

NewWorkerPool creates a new worker pool.

func (*WorkerPool) Results

func (p *WorkerPool) Results() <-chan TaskResult

Results returns the results channel for reading task outcomes.

func (*WorkerPool) Start

func (p *WorkerPool) Start(ctx context.Context) error

Start launches the minimum number of workers.

func (*WorkerPool) Stats

func (p *WorkerPool) Stats() WorkerPoolStats

Stats returns current pool statistics.

func (*WorkerPool) Stop

func (p *WorkerPool) Stop() error

Stop gracefully shuts down the worker pool.

func (*WorkerPool) Submit

func (p *WorkerPool) Submit(task Task) error

Submit adds a task to the queue. It spawns an extra worker if the queue is getting full and the pool hasn't reached MaxWorkers.

type WorkerPoolConfig

type WorkerPoolConfig struct {
	// MinWorkers is the minimum number of goroutines kept alive.
	MinWorkers int
	// MaxWorkers is the maximum number of goroutines allowed.
	MaxWorkers int
	// QueueSize is the capacity of the task queue.
	QueueSize int
	// IdleTimeout is how long an idle worker waits before exiting (above MinWorkers).
	IdleTimeout time.Duration
}

WorkerPoolConfig configures the worker pool.

func DefaultWorkerPoolConfig

func DefaultWorkerPoolConfig() WorkerPoolConfig

DefaultWorkerPoolConfig returns sensible defaults.

type WorkerPoolStats

type WorkerPoolStats struct {
	ActiveWorkers  int
	PendingTasks   int
	TotalSubmitted int64
	CompletedOK    int64
	CompletedErr   int64
}

WorkerPoolStats holds pool statistics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL