tenancy

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2025 License: Apache-2.0 Imports: 3 Imported by: 0

README

StreamBus Multi-Tenancy

StreamBus provides a comprehensive multi-tenancy system that enables resource isolation, quota enforcement, and usage tracking for multiple tenants sharing the same broker infrastructure.

Table of Contents

Architecture Overview

The multi-tenancy system consists of three main components:

┌─────────────┐
│   Manager   │  ← Orchestration layer (thread-safe)
└─────┬───────┘
      │
      ├─────────────┬─────────────┐
      │             │             │
┌─────▼──────┐ ┌───▼──────┐ ┌───▼──────┐
│TenantStore │ │ Tracker1 │ │ Tracker2 │  ← Per-tenant tracking
└────────────┘ └──────────┘ └──────────┘
Components
  1. TenantStore: Manages tenant metadata, configuration, and lifecycle
  2. QuotaTracker: Tracks real-time resource usage and enforces quotas per tenant
  3. Manager: Coordinates tenant management and quota enforcement across all tenants

Key Concepts

Tenants

A tenant represents an isolated namespace with its own:

  • Unique identifier and metadata
  • Resource quotas
  • Usage statistics
  • Lifecycle state (Active, Suspended, Deleted)
Quotas

Quotas define resource limits for each tenant:

Quota Type Description Unit
MaxBytesPerSecond Maximum data throughput bytes/sec
MaxMessagesPerSecond Maximum message rate messages/sec
MaxStorageBytes Maximum storage usage bytes
MaxTopics Maximum number of topics count
MaxPartitions Maximum total partitions across all topics count
MaxConnections Maximum concurrent connections count
MaxProducers Maximum concurrent producers count
MaxConsumers Maximum concurrent consumers count
MaxConsumerGroups Maximum consumer groups count
MaxRequestsPerSecond Maximum request rate requests/sec
MaxRetentionHours Maximum message retention hours

Note: Set any quota to -1 for unlimited resources.

Rate Limiting

StreamBus uses a sliding window algorithm for rate limiting:

Time Window (1 second)
┌────────────────────────────────┐
│ [100ms] [100ms] [100ms] ... │  ← 10 buckets
│   50      75      60     ... │  ← Bytes tracked
└────────────────────────────────┘
  • Window size: 1 second (configurable)
  • Bucket size: 100ms (10 buckets per window)
  • Rate calculation: Sum of valid buckets / window duration

This provides accurate per-second rate tracking with minimal memory overhead.

Quick Start

Creating a Tenant
package main

import (
    "github.com/shawntherrien/streambus/pkg/tenancy"
)

func main() {
    // Create manager
    manager := tenancy.NewManager()

    // Define quotas
    quotas := &tenancy.Quotas{
        MaxBytesPerSecond:    10 * 1024 * 1024,  // 10 MB/s
        MaxMessagesPerSecond: 10000,              // 10k msg/s
        MaxStorageBytes:      100 * 1024 * 1024 * 1024, // 100 GB
        MaxTopics:            100,
        MaxPartitions:        1000,
        MaxConnections:       500,
        MaxProducers:         100,
        MaxConsumers:         200,
        MaxConsumerGroups:    50,
        MaxRequestsPerSecond: 50000,
        MaxRetentionHours:    168, // 7 days
    }

    // Create tenant
    tenant, err := manager.CreateTenant("acme-corp", "ACME Corporation", quotas)
    if err != nil {
        panic(err)
    }

    println("Created tenant:", tenant.ID)
}
Enforcing Quotas
// Enforce produce quota
err := manager.EnforceProduceQuota("acme-corp", 1024, 1) // 1KB, 1 message
if err != nil {
    // Handle quota exceeded error
    if quotaErr, ok := err.(*tenancy.QuotaError); ok {
        log.Printf("Quota exceeded: %s (current: %d, limit: %d)",
            quotaErr.QuotaType, quotaErr.Current, quotaErr.Limit)
    }
    return err
}

// Enforce connection quota
err = manager.EnforceConnectionQuota("acme-corp")
if err != nil {
    return err
}
defer manager.ReleaseConnection("acme-corp")

// Enforce topic creation quota
err = manager.EnforceTopicQuota("acme-corp", 3) // 3 partitions
if err != nil {
    return err
}
Monitoring Usage
// Get current usage
usage, err := manager.GetUsage("acme-corp")
if err != nil {
    return err
}

log.Printf("Tenant Usage:")
log.Printf("  Connections: %d", usage.Connections)
log.Printf("  Topics: %d", usage.Topics)
log.Printf("  Partitions: %d", usage.Partitions)
log.Printf("  Storage: %d bytes", usage.StorageBytes)
log.Printf("  Throughput: %d bytes/sec", usage.BytesPerSecond)

// Get utilization percentages
utilization, err := manager.GetUtilization("acme-corp")
if err != nil {
    return err
}

log.Printf("Utilization:")
log.Printf("  Connections: %.1f%%", utilization["connections"])
log.Printf("  Storage: %.1f%%", utilization["storage"])
log.Printf("  Topics: %.1f%%", utilization["topics"])

API Reference

Manager
Tenant Management
// CreateTenant creates a new tenant with specified quotas
func (m *Manager) CreateTenant(id TenantID, name string, quotas *Quotas) (*Tenant, error)

// GetTenant retrieves a tenant by ID
func (m *Manager) GetTenant(id TenantID) (*Tenant, error)

// UpdateTenant updates tenant configuration
func (m *Manager) UpdateTenant(id TenantID, name string, quotas *Quotas) error

// DeleteTenant deletes a tenant
func (m *Manager) DeleteTenant(id TenantID) error

// ListTenants returns all tenants
func (m *Manager) ListTenants() []*Tenant

// SuspendTenant suspends a tenant (blocks all operations)
func (m *Manager) SuspendTenant(id TenantID) error

// ActivateTenant activates a suspended tenant
func (m *Manager) ActivateTenant(id TenantID) error
Quota Enforcement
// EnforceProduceQuota checks and records produce operation
func (m *Manager) EnforceProduceQuota(id TenantID, bytes, messages int64) error

// EnforceConsumeQuota checks and records consume operation
func (m *Manager) EnforceConsumeQuota(id TenantID, bytes, messages int64) error

// EnforceConnectionQuota checks and increments connection count
func (m *Manager) EnforceConnectionQuota(id TenantID) error

// ReleaseConnection decrements connection count
func (m *Manager) ReleaseConnection(id TenantID)

// EnforceTopicQuota checks and increments topic/partition count
func (m *Manager) EnforceTopicQuota(id TenantID, partitions int) error

// ReleaseTopic decrements topic/partition count
func (m *Manager) ReleaseTopic(id TenantID, partitions int)

// EnforceStorageQuota checks if storage quota allows operation
func (m *Manager) EnforceStorageQuota(id TenantID, bytes int64) error

// UpdateStorageUsage updates current storage usage
func (m *Manager) UpdateStorageUsage(id TenantID, bytes int64) error
Monitoring
// GetUsage returns current usage for a tenant
func (m *Manager) GetUsage(id TenantID) (*Usage, error)

// GetAllUsage returns usage for all tenants
func (m *Manager) GetAllUsage() map[TenantID]*Usage

// GetUtilization returns utilization percentages
func (m *Manager) GetUtilization(id TenantID) (map[string]float64, error)

// GetTenantStats returns comprehensive statistics
func (m *Manager) GetTenantStats(id TenantID) (*TenantStats, error)
QuotaTracker

The QuotaTracker is used internally by the Manager but can be accessed directly:

// GetTracker gets the quota tracker for a tenant
func (m *Manager) GetTracker(id TenantID) (*QuotaTracker, error)

Available methods on QuotaTracker:

// CheckThroughput checks if throughput is within quota
func (qt *QuotaTracker) CheckThroughput(bytes, messages int64) error

// RecordThroughput records usage
func (qt *QuotaTracker) RecordThroughput(bytes, messages int64)

// AddConnection increments connection count
func (qt *QuotaTracker) AddConnection() error

// RemoveConnection decrements connection count
func (qt *QuotaTracker) RemoveConnection()

// AddProducer increments producer count
func (qt *QuotaTracker) AddProducer() error

// RemoveProducer decrements producer count
func (qt *QuotaTracker) RemoveProducer()

// AddTopic increments topic/partition count
func (qt *QuotaTracker) AddTopic(partitions int) error

// RemoveTopic decrements topic/partition count
func (qt *QuotaTracker) RemoveTopic(partitions int)

// CheckStorage checks storage quota
func (qt *QuotaTracker) CheckStorage(additionalBytes int64) error

// UpdateStorage updates storage usage
func (qt *QuotaTracker) UpdateStorage(bytes int64)

// GetUsage returns current usage
func (qt *QuotaTracker) GetUsage() *Usage

// UtilizationPercent calculates quota utilization
func (qt *QuotaTracker) UtilizationPercent(quotaType string) float64

Integration Guide

Broker Integration

Integrate tenancy into your broker's request handlers:

type Broker struct {
    tenancyManager *tenancy.Manager
    // ... other fields
}

func (b *Broker) HandleProduceRequest(req *ProduceRequest) error {
    // Extract tenant ID from request context
    tenantID := req.TenantID

    // Enforce quota
    err := b.tenancyManager.EnforceProduceQuota(
        tenantID,
        int64(len(req.Message.Value)),
        1,
    )
    if err != nil {
        return &QuotaExceededError{Tenant: tenantID, Err: err}
    }

    // Process request normally
    return b.processProduceRequest(req)
}

func (b *Broker) HandleConnection(conn net.Conn) error {
    tenantID := extractTenantFromAuth(conn)

    // Enforce connection quota
    err := b.tenancyManager.EnforceConnectionQuota(tenantID)
    if err != nil {
        conn.Close()
        return err
    }
    defer b.tenancyManager.ReleaseConnection(tenantID)

    // Handle connection
    return b.handleConnection(conn)
}
Storage Integration

Update storage usage periodically:

func (b *Broker) UpdateTenantStorage() {
    for _, tenant := range b.tenancyManager.ListTenants() {
        // Calculate storage usage
        usage := b.storage.GetTenantUsage(tenant.ID)

        // Update tracker
        b.tenancyManager.UpdateStorageUsage(tenant.ID, usage)
    }
}

// Run periodically
go func() {
    ticker := time.NewTicker(1 * time.Minute)
    for range ticker.C {
        b.UpdateTenantStorage()
    }
}()
Admin API Integration

Expose tenant management via API:

// POST /admin/tenants
func (api *AdminAPI) CreateTenant(w http.ResponseWriter, r *http.Request) {
    var req struct {
        ID     string          `json:"id"`
        Name   string          `json:"name"`
        Quotas *tenancy.Quotas `json:"quotas"`
    }

    json.NewDecoder(r.Body).Decode(&req)

    tenant, err := api.manager.CreateTenant(req.ID, req.Name, req.Quotas)
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    json.NewEncoder(w).Encode(tenant)
}

// GET /admin/tenants/:id/stats
func (api *AdminAPI) GetTenantStats(w http.ResponseWriter, r *http.Request) {
    tenantID := mux.Vars(r)["id"]

    stats, err := api.manager.GetTenantStats(tenantID)
    if err != nil {
        http.Error(w, err.Error(), http.StatusNotFound)
        return
    }

    json.NewEncoder(w).Encode(stats)
}

Best Practices

1. Quota Configuration

Start Conservative, Adjust Based on Usage

// Start with moderate quotas
initialQuotas := &tenancy.Quotas{
    MaxBytesPerSecond:    5 * 1024 * 1024,  // 5 MB/s
    MaxMessagesPerSecond: 5000,
    MaxStorageBytes:      50 * 1024 * 1024 * 1024, // 50 GB
    MaxTopics:            50,
    MaxPartitions:        500,
    MaxConnections:       100,
}

// Monitor and adjust
utilization, _ := manager.GetUtilization(tenantID)
if utilization["storage"] > 80.0 {
    // Consider increasing storage quota
}
2. Monitoring and Alerting

Set Up Proactive Monitoring

func MonitorTenants(manager *tenancy.Manager) {
    for _, tenant := range manager.ListTenants() {
        stats, _ := manager.GetTenantStats(tenant.ID)

        // Alert if utilization is high
        for resource, utilization := range stats.Utilization {
            if utilization > 80.0 {
                log.Printf("WARNING: Tenant %s at %.1f%% %s utilization",
                    tenant.ID, utilization, resource)
            }
            if utilization > 95.0 {
                log.Printf("CRITICAL: Tenant %s at %.1f%% %s utilization",
                    tenant.ID, utilization, resource)
                // Send alert
            }
        }
    }
}
3. Graceful Degradation

Handle Quota Errors Gracefully

func HandleProduceWithRetry(manager *tenancy.Manager, tenantID string, msg []byte) error {
    for i := 0; i < 3; i++ {
        err := manager.EnforceProduceQuota(tenantID, int64(len(msg)), 1)
        if err == nil {
            return produceMessage(msg)
        }

        if quotaErr, ok := err.(*tenancy.QuotaError); ok {
            if quotaErr.QuotaType == "bytes_per_second" {
                // Wait for rate limit window to refresh
                time.Sleep(100 * time.Millisecond)
                continue
            }
        }

        return err
    }
    return errors.New("quota enforcement failed after retries")
}
4. Tenant Lifecycle Management

Suspend Before Deleting

func SafeDeleteTenant(manager *tenancy.Manager, tenantID string) error {
    // First suspend to prevent new operations
    if err := manager.SuspendTenant(tenantID); err != nil {
        return err
    }

    // Wait for in-flight operations to complete
    time.Sleep(5 * time.Second)

    // Verify no active connections
    usage, _ := manager.GetUsage(tenantID)
    if usage.Connections > 0 {
        return errors.New("tenant still has active connections")
    }

    // Now safe to delete
    return manager.DeleteTenant(tenantID)
}
5. Default Quotas

Use Tiered Quota Profiles

func FreeTierQuotas() *tenancy.Quotas {
    return &tenancy.Quotas{
        MaxBytesPerSecond:    1 * 1024 * 1024,  // 1 MB/s
        MaxMessagesPerSecond: 1000,
        MaxStorageBytes:      10 * 1024 * 1024 * 1024, // 10 GB
        MaxTopics:            10,
        MaxPartitions:        50,
        MaxConnections:       10,
        MaxProducers:         5,
        MaxConsumers:         10,
    }
}

func ProTierQuotas() *tenancy.Quotas {
    return &tenancy.Quotas{
        MaxBytesPerSecond:    50 * 1024 * 1024,  // 50 MB/s
        MaxMessagesPerSecond: 50000,
        MaxStorageBytes:      500 * 1024 * 1024 * 1024, // 500 GB
        MaxTopics:            500,
        MaxPartitions:        5000,
        MaxConnections:       1000,
        MaxProducers:         200,
        MaxConsumers:         500,
    }
}

func EnterpriseTierQuotas() *tenancy.Quotas {
    return tenancy.UnlimitedQuotas() // No limits
}

Examples

Complete Integration Example
package main

import (
    "log"
    "time"

    "github.com/shawntherrien/streambus/pkg/tenancy"
)

func main() {
    // Initialize manager
    manager := tenancy.NewManager()

    // Create tenants
    setupTenants(manager)

    // Start monitoring
    go monitorTenants(manager)

    // Simulate operations
    simulateOperations(manager)
}

func setupTenants(manager *tenancy.Manager) {
    tenants := []struct {
        id     string
        name   string
        quotas *tenancy.Quotas
    }{
        {"startup-a", "Startup A", tenancy.DefaultQuotas()},
        {"enterprise-b", "Enterprise B", enterpriseQuotas()},
        {"free-tier", "Free User", freeTierQuotas()},
    }

    for _, t := range tenants {
        _, err := manager.CreateTenant(t.id, t.name, t.quotas)
        if err != nil {
            log.Fatalf("Failed to create tenant %s: %v", t.id, err)
        }
        log.Printf("Created tenant: %s", t.id)
    }
}

func monitorTenants(manager *tenancy.Manager) {
    ticker := time.NewTicker(10 * time.Second)
    for range ticker.C {
        usage := manager.GetAllUsage()

        for tenantID, stats := range usage {
            log.Printf("Tenant %s: %d connections, %d topics, %d bytes/sec",
                tenantID, stats.Connections, stats.Topics, stats.BytesPerSecond)
        }
    }
}

func simulateOperations(manager *tenancy.Manager) {
    // Simulate produce
    for i := 0; i < 100; i++ {
        err := manager.EnforceProduceQuota("startup-a", 1024, 1)
        if err != nil {
            log.Printf("Produce failed: %v", err)
        } else {
            log.Printf("Produced message %d", i)
        }
        time.Sleep(100 * time.Millisecond)
    }
}

func freeTierQuotas() *tenancy.Quotas {
    return &tenancy.Quotas{
        MaxBytesPerSecond:    1 * 1024 * 1024,
        MaxMessagesPerSecond: 1000,
        MaxStorageBytes:      10 * 1024 * 1024 * 1024,
        MaxTopics:            10,
        MaxPartitions:        50,
        MaxConnections:       10,
    }
}

func enterpriseQuotas() *tenancy.Quotas {
    return tenancy.UnlimitedQuotas()
}
Testing Example
func TestTenantQuotaEnforcement(t *testing.T) {
    manager := tenancy.NewManager()

    quotas := &tenancy.Quotas{
        MaxBytesPerSecond:    1000,
        MaxMessagesPerSecond: 10,
    }

    _, err := manager.CreateTenant("test-tenant", "Test", quotas)
    if err != nil {
        t.Fatal(err)
    }

    // Should succeed under quota
    err = manager.EnforceProduceQuota("test-tenant", 500, 5)
    if err != nil {
        t.Errorf("Expected success under quota, got: %v", err)
    }

    // Should fail when exceeding quota
    err = manager.EnforceProduceQuota("test-tenant", 600, 10)
    if err == nil {
        t.Error("Expected quota error")
    }

    // Verify it's a QuotaError
    if _, ok := err.(*tenancy.QuotaError); !ok {
        t.Errorf("Expected QuotaError, got: %T", err)
    }
}

Performance Considerations

Memory Usage

Each QuotaTracker uses approximately:

  • 200 bytes for counter variables
  • 800 bytes for rate windows (3 windows × 10 buckets × 24 bytes)
  • Total: ~1 KB per tenant

For 10,000 tenants: ~10 MB memory overhead.

CPU Usage
  • Rate window updates: O(1) per operation
  • Quota checks: O(1) per operation
  • All operations use read locks where possible for maximum concurrency
Thread Safety

All operations are thread-safe:

  • Manager uses RWMutex for tracker map
  • QuotaTracker uses RWMutex for counters
  • RateWindow uses Mutex for bucket updates

Troubleshooting

Common Issues

1. Quota Errors Despite Low Usage

Check rate window timing:

usage, _ := manager.GetUsage(tenantID)
log.Printf("Current rate: %d bytes/sec", usage.BytesPerSecond)

Rate limits use a sliding window - spikes in traffic can cause temporary quota errors even if average usage is low.

2. Storage Quota Not Working

Storage must be updated manually:

// Update storage usage periodically
manager.UpdateStorageUsage(tenantID, currentStorageBytes)

3. Connection Count Drift

Ensure connections are properly released:

manager.EnforceConnectionQuota(tenantID)
defer manager.ReleaseConnection(tenantID) // Always use defer

License

Part of the StreamBus project - See main LICENSE file.

Contributing

See the main CONTRIBUTING.md in the repository root.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages all tenants and their quota tracking

func NewManager

func NewManager() *Manager

NewManager creates a new tenant manager

func (*Manager) ActivateTenant

func (m *Manager) ActivateTenant(id TenantID) error

ActivateTenant activates a tenant

func (*Manager) CheckTenantActive

func (m *Manager) CheckTenantActive(id TenantID) error

CheckTenantActive checks if a tenant exists and is active

func (*Manager) CreateTenant

func (m *Manager) CreateTenant(id TenantID, name string, quotas *Quotas) (*Tenant, error)

CreateTenant creates a new tenant with specified quotas

func (*Manager) DeleteTenant

func (m *Manager) DeleteTenant(id TenantID) error

DeleteTenant deletes a tenant

func (*Manager) EnforceConnectionQuota

func (m *Manager) EnforceConnectionQuota(id TenantID) error

EnforceConnectionQuota checks if a new connection can be created

func (*Manager) EnforceConsumeQuota

func (m *Manager) EnforceConsumeQuota(id TenantID, bytes, messages int64) error

EnforceConsumeQuota checks and records consume operation

func (*Manager) EnforceProduceQuota

func (m *Manager) EnforceProduceQuota(id TenantID, bytes, messages int64) error

EnforceProduceQuota checks and records produce operation

func (*Manager) EnforceStorageQuota

func (m *Manager) EnforceStorageQuota(id TenantID, bytes int64) error

EnforceStorageQuota checks if storage quota allows the operation

func (*Manager) EnforceTopicQuota

func (m *Manager) EnforceTopicQuota(id TenantID, partitions int) error

EnforceTopicQuota checks if a new topic can be created

func (*Manager) GetAllUsage

func (m *Manager) GetAllUsage() map[TenantID]*Usage

GetAllUsage returns usage for all tenants

func (*Manager) GetTenant

func (m *Manager) GetTenant(id TenantID) (*Tenant, error)

GetTenant retrieves a tenant

func (*Manager) GetTenantStats

func (m *Manager) GetTenantStats(id TenantID) (*TenantStats, error)

GetTenantStats returns comprehensive stats for a tenant

func (*Manager) GetTracker

func (m *Manager) GetTracker(id TenantID) (*QuotaTracker, error)

GetTracker gets the quota tracker for a tenant

func (*Manager) GetUsage

func (m *Manager) GetUsage(id TenantID) (*Usage, error)

GetUsage returns current usage for a tenant

func (*Manager) GetUtilization

func (m *Manager) GetUtilization(id TenantID) (map[string]float64, error)

GetUtilization returns utilization percentages for a tenant

func (*Manager) ListTenants

func (m *Manager) ListTenants() []*Tenant

ListTenants returns all tenants

func (*Manager) ReleaseConnection

func (m *Manager) ReleaseConnection(id TenantID)

ReleaseConnection releases a connection quota

func (*Manager) ReleaseTopic

func (m *Manager) ReleaseTopic(id TenantID, partitions int)

ReleaseTopic releases a topic quota

func (*Manager) SuspendTenant

func (m *Manager) SuspendTenant(id TenantID) error

SuspendTenant suspends a tenant

func (*Manager) UpdateStorageUsage

func (m *Manager) UpdateStorageUsage(id TenantID, bytes int64) error

UpdateStorageUsage updates the current storage usage for a tenant

func (*Manager) UpdateTenant

func (m *Manager) UpdateTenant(id TenantID, name string, quotas *Quotas) error

UpdateTenant updates a tenant's configuration

type QuotaError

type QuotaError struct {
	TenantID  TenantID
	QuotaType string
	Current   int64
	Limit     int64
	Message   string
}

QuotaError represents a quota exceeded error

func (*QuotaError) Error

func (e *QuotaError) Error() string

type QuotaTracker

type QuotaTracker struct {
	// contains filtered or unexported fields
}

QuotaTracker tracks resource usage for a tenant and enforces quotas

func NewQuotaTracker

func NewQuotaTracker(tenantID TenantID, quotas *Quotas) *QuotaTracker

NewQuotaTracker creates a new quota tracker for a tenant

func (*QuotaTracker) AddConnection

func (qt *QuotaTracker) AddConnection() error

AddConnection increments the connection count

func (*QuotaTracker) AddProducer

func (qt *QuotaTracker) AddProducer() error

AddProducer increments the producer count

func (*QuotaTracker) AddTopic

func (qt *QuotaTracker) AddTopic(partitions int) error

AddTopic increments the topic count

func (*QuotaTracker) CheckConnection

func (qt *QuotaTracker) CheckConnection() error

CheckConnection checks if a new connection can be added

func (*QuotaTracker) CheckProducer

func (qt *QuotaTracker) CheckProducer() error

CheckProducer checks if a new producer can be added

func (*QuotaTracker) CheckStorage

func (qt *QuotaTracker) CheckStorage(additionalBytes int64) error

CheckStorage checks if storage quota allows the specified bytes

func (*QuotaTracker) CheckThroughput

func (qt *QuotaTracker) CheckThroughput(bytes, messages int64) error

CheckThroughput checks if the tenant can send/receive the specified bytes and messages

func (*QuotaTracker) CheckTopic

func (qt *QuotaTracker) CheckTopic() error

CheckTopic checks if a new topic can be created

func (*QuotaTracker) GetUsage

func (qt *QuotaTracker) GetUsage() *Usage

GetUsage returns current usage statistics

func (*QuotaTracker) RecordThroughput

func (qt *QuotaTracker) RecordThroughput(bytes, messages int64)

RecordThroughput records bytes and messages used

func (*QuotaTracker) RemoveConnection

func (qt *QuotaTracker) RemoveConnection()

RemoveConnection decrements the connection count

func (*QuotaTracker) RemoveProducer

func (qt *QuotaTracker) RemoveProducer()

RemoveProducer decrements the producer count

func (*QuotaTracker) RemoveTopic

func (qt *QuotaTracker) RemoveTopic(partitions int)

RemoveTopic decrements the topic count

func (*QuotaTracker) UpdateStorage

func (qt *QuotaTracker) UpdateStorage(bytes int64)

UpdateStorage updates the current storage usage

func (*QuotaTracker) UtilizationPercent

func (qt *QuotaTracker) UtilizationPercent(quotaType string) float64

UtilizationPercent calculates quota utilization percentage

type Quotas

type Quotas struct {
	// Throughput quotas (per second)
	MaxBytesPerSecond    int64 // Maximum bytes/sec for produce + consume
	MaxMessagesPerSecond int64 // Maximum messages/sec for produce + consume

	// Storage quotas
	MaxStorageBytes int64 // Maximum total storage in bytes
	MaxTopics       int   // Maximum number of topics
	MaxPartitions   int   // Maximum total partitions across all topics

	// Connection quotas
	MaxConnections    int // Maximum concurrent connections
	MaxProducers      int // Maximum concurrent producers
	MaxConsumers      int // Maximum concurrent consumers
	MaxConsumerGroups int // Maximum consumer groups

	// Request quotas
	MaxRequestsPerSecond int64 // Maximum requests/sec

	// Advanced quotas
	MaxRetentionHours int64 // Maximum message retention in hours
}

Quotas define resource limits for a tenant

func DefaultQuotas

func DefaultQuotas() *Quotas

DefaultQuotas returns sensible default quotas for a new tenant

func UnlimitedQuotas

func UnlimitedQuotas() *Quotas

UnlimitedQuotas returns quotas with no limits (for testing or admin tenants)

type RateWindow

type RateWindow struct {
	// contains filtered or unexported fields
}

RateWindow tracks usage over a sliding time window for rate limiting

func NewRateWindow

func NewRateWindow(windowSize, bucketSize time.Duration) *RateWindow

NewRateWindow creates a new rate tracking window

func (*RateWindow) Add

func (w *RateWindow) Add(value int64)

Add adds a value to the current bucket

func (*RateWindow) Rate

func (w *RateWindow) Rate() float64

Rate returns the current rate (total in window / window duration in seconds)

type Tenant

type Tenant struct {
	ID          TenantID
	Name        string
	Description string
	CreatedAt   time.Time
	UpdatedAt   time.Time

	// Quotas define resource limits for this tenant
	Quotas *Quotas

	// Metadata for custom tenant properties
	Metadata map[string]string

	// Status indicates if tenant is active, suspended, etc.
	Status TenantStatus
}

Tenant represents a multi-tenant namespace in StreamBus

func (*Tenant) IsActive

func (t *Tenant) IsActive() bool

IsActive checks if a tenant is active

func (*Tenant) Validate

func (t *Tenant) Validate() error

Validate validates the tenant configuration

type TenantID

type TenantID string

TenantID is a unique identifier for a tenant

type TenantStats

type TenantStats struct {
	Tenant      *Tenant
	Usage       *Usage
	Utilization map[string]float64
}

TenantStats returns summary statistics for a tenant

type TenantStatus

type TenantStatus string

TenantStatus represents the operational status of a tenant

const (
	TenantStatusActive    TenantStatus = "ACTIVE"
	TenantStatusSuspended TenantStatus = "SUSPENDED"
	TenantStatusDeleted   TenantStatus = "DELETED"
)

type TenantStore

type TenantStore struct {
	// contains filtered or unexported fields
}

TenantStore manages tenants in the system

func NewTenantStore

func NewTenantStore() *TenantStore

NewTenantStore creates a new tenant store

func (*TenantStore) ActivateTenant

func (s *TenantStore) ActivateTenant(id TenantID) error

ActivateTenant activates a suspended tenant

func (*TenantStore) CreateTenant

func (s *TenantStore) CreateTenant(id TenantID, name string, quotas *Quotas) (*Tenant, error)

CreateTenant creates a new tenant

func (*TenantStore) DeleteTenant

func (s *TenantStore) DeleteTenant(id TenantID) error

DeleteTenant deletes a tenant

func (*TenantStore) GetTenant

func (s *TenantStore) GetTenant(id TenantID) (*Tenant, error)

GetTenant retrieves a tenant by ID

func (*TenantStore) ListTenants

func (s *TenantStore) ListTenants() []*Tenant

ListTenants returns all tenants

func (*TenantStore) SuspendTenant

func (s *TenantStore) SuspendTenant(id TenantID) error

SuspendTenant suspends a tenant

func (*TenantStore) UpdateTenant

func (s *TenantStore) UpdateTenant(id TenantID, name string, quotas *Quotas) error

UpdateTenant updates a tenant's configuration

type Usage

type Usage struct {
	TenantID          TenantID
	Connections       int
	Producers         int
	Consumers         int
	Topics            int
	Partitions        int
	ConsumerGroups    int
	StorageBytes      int64
	BytesPerSecond    int64
	MessagesPerSecond int64
	RequestsPerSecond int64
}

Usage represents current resource usage for a tenant

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL