services

package
v0.0.0-...-6a3e998 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2025 License: MIT Imports: 44 Imported by: 0

README

Services Package

Overview

The services package implements the business logic layer for the DevOps MCP platform. It provides production-ready service implementations with built-in resilience, security, observability, and distributed systems support. All services inherit from a common base that provides enterprise-grade features out of the box.

Architecture

services/
├── base_service.go              # Foundation for all services
├── interfaces.go                # Service interfaces
├── task_service.go             # Task management
├── workflow_service.go         # Workflow orchestration
├── workspace_service.go        # Workspace management
├── document_service.go         # Document handling
├── agent_service_impl.go       # Agent management
├── assignment_engine.go        # Task assignment logic
├── document_lock_service.go    # Distributed locking
├── notification_service_impl.go # Event notifications
└── service_helpers.go          # Utility implementations

Base Service Foundation

All services inherit from BaseService, providing:

Core Features
type BaseService struct {
    // Dependencies
    logger      Logger
    tracer      Tracer
    metrics     MetricsClient
    eventBus    EventBus
    
    // Security
    authorizer  Authorizer
    encryptor   EncryptionService
    sanitizer   Sanitizer
    
    // Resilience
    rateLimiter RateLimiter
    quotaMgr    QuotaManager
    circuitBreaker CircuitBreaker
    
    // State
    stateStore  StateStore
    cache       Cache
}
Built-in Capabilities
  1. Distributed Transactions
// Two-phase commit support
tx, err := service.BeginDistributedTransaction(ctx, &TxOptions{
    Timeout: 30 * time.Second,
    Isolation: ReadCommitted,
})
defer tx.Rollback()

// Execute across services
err = tx.Execute(func() error {
    // Transactional operations
    return nil
})

err = tx.Commit()
  1. Rate Limiting & Quotas
// Check rate limits
if err := service.CheckRateLimit(ctx, userID, "api_calls", 100); err != nil {
    return ErrRateLimitExceeded
}

// Check quotas
if err := service.CheckQuota(ctx, workspaceID, "storage", 1024*1024); err != nil {
    return ErrQuotaExceeded
}
  1. Event Publishing
// Publish domain events
err := service.PublishEvent(ctx, &TaskAssignedEvent{
    TaskID:  task.ID,
    AgentID: agent.ID,
    Time:    time.Now(),
})

Service Implementations

Task Service

Manages task lifecycle with state machine validation:

// Initialize service
taskService := NewTaskServiceImpl(
    repo,
    assignmentEngine,
    cache,
    eventBus,
    logger,
    tracer,
)

// Create task
task, err := taskService.CreateTask(ctx, &CreateTaskRequest{
    Type:     "code_review",
    Priority: PriorityHigh,
    Parameters: map[string]interface{}{
        "repository": "devops-mcp",
        "pr_number": 123,
    },
})

// Assign to agent
agent, err := taskService.AssignTask(ctx, task.ID)

// Update progress
err = taskService.UpdateTaskProgress(ctx, task.ID, &ProgressUpdate{
    PercentComplete: 50,
    Status: "Analyzing code",
})

// Complete task
err = taskService.CompleteTask(ctx, task.ID, &TaskResult{
    Success: true,
    Output: analysisResults,
})

Key Features:

  • State machine enforcement
  • Automatic assignment via assignment engine
  • Progress tracking
  • Retry management
  • Background rebalancing
  • Performance metrics
Workflow Service

Orchestrates multi-step workflows with dependencies:

// Create workflow
workflow, err := workflowService.CreateWorkflow(ctx, &WorkflowDefinition{
    Name: "deployment-pipeline",
    Type: WorkflowTypeDAG,
    Steps: []WorkflowStep{
        {
            ID:   "build",
            Type: "build_code",
            Dependencies: []string{},
        },
        {
            ID:   "test",
            Type: "run_tests",
            Dependencies: []string{"build"},
        },
        {
            ID:   "deploy",
            Type: "deploy_app",
            Dependencies: []string{"test"},
        },
    },
})

// Execute workflow
execution, err := workflowService.ExecuteWorkflow(ctx, workflow.ID, &ExecutionParams{
    Variables: map[string]interface{}{
        "environment": "staging",
        "version": "1.2.3",
    },
})

// Monitor execution
status, err := workflowService.GetExecutionStatus(ctx, execution.ID)

// Handle step completion
err = workflowService.CompleteStep(ctx, execution.ID, "build", &StepResult{
    Success: true,
    Outputs: map[string]interface{}{
        "artifact": "app-1.2.3.tar.gz",
    },
})

Workflow Types:

  • Sequential: Steps run in order
  • Parallel: Steps run simultaneously
  • DAG: Directed acyclic graph
  • Saga: Distributed transactions
  • State Machine: State-based execution
  • Event-Driven: Triggered by events
Workspace Service

Manages collaborative workspaces:

// Create workspace
workspace, err := workspaceService.CreateWorkspace(ctx, &CreateWorkspaceRequest{
    Name: "frontend-team",
    Description: "Frontend development workspace",
    Settings: &WorkspaceSettings{
        IsPublic: false,
        Features: []string{"code_review", "ci_cd"},
        Quotas: map[string]int64{
            "storage_gb": 100,
            "members": 50,
        },
    },
})

// Add members
err = workspaceService.AddMember(ctx, workspace.ID, &Member{
    UserID: userID,
    Role:   RoleAdmin,
})

// Update resource usage
err = workspaceService.UpdateResourceUsage(ctx, workspace.ID, &ResourceUpdate{
    StorageBytes: 1024 * 1024 * 50, // 50MB
    CPUSeconds: 3600,
})

// Check limits
withinLimits, err := workspaceService.CheckResourceLimits(ctx, workspace.ID)

Features:

  • Member management with roles
  • Resource quotas and tracking
  • Distributed state synchronization
  • Activity monitoring
  • Access control
Document Service

Handles collaborative document editing:

// Create document
doc, err := documentService.CreateDocument(ctx, &CreateDocumentRequest{
    WorkspaceID: workspace.ID,
    Name: "architecture.md",
    Type: DocumentTypeMarkdown,
    Content: "# System Architecture\n\n...",
})

// Lock for editing
lock, err := documentService.LockDocument(ctx, doc.ID, userID, &LockOptions{
    Duration: 5 * time.Minute,
    AutoRefresh: true,
})
defer documentService.UnlockDocument(ctx, lock.ID)

// Update with conflict detection
updated, err := documentService.UpdateDocument(ctx, &UpdateDocumentRequest{
    ID: doc.ID,
    Content: updatedContent,
    Version: doc.Version, // Optimistic locking
})

// Handle conflicts
if err == ErrDocumentConflict {
    conflicts, err := documentService.GetConflicts(ctx, doc.ID)
    resolved, err := documentService.ResolveConflicts(ctx, doc.ID, resolution)
}

Features:

  • Version control
  • Distributed locking
  • Conflict detection and resolution
  • Section-level operations
  • Change tracking
Agent Service

Manages AI agent lifecycle:

// Register agent
agent, err := agentService.RegisterAgent(ctx, &RegisterAgentRequest{
    Name: "code-analyzer-1",
    Type: "analyzer",
    Capabilities: []Capability{
        CapabilityCodeAnalysis,
        CapabilitySecurityScan,
    },
    Endpoint: "ws://agent1:8080",
})

// Update status
err = agentService.UpdateAgentStatus(ctx, agent.ID, AgentStatusActive)

// Get workload
workload, err := agentService.GetAgentWorkload(ctx, agent.ID)

// Find available agents
agents, err := agentService.GetAvailableAgents(ctx, &AgentFilter{
    Capabilities: []Capability{CapabilityCodeAnalysis},
    MaxWorkload: 10,
})

Assignment Engine

Intelligent task routing with multiple strategies:

// Initialize with strategy
engine := NewAssignmentEngine(
    agentRepo,
    StrategyCapabilityMatch, // or RoundRobin, LeastLoaded, etc.
    logger,
)

// Assign task
agent, err := engine.AssignTask(ctx, &Task{
    Type: "security_scan",
    Requirements: []string{"security_scan", "code_analysis"},
    Priority: PriorityHigh,
})

// Custom rules
engine.AddRule(&AssignmentRule{
    Name: "high-priority-to-premium",
    Condition: func(task *Task) bool {
        return task.Priority == PriorityHigh
    },
    Filter: func(agents []*Agent) []*Agent {
        // Filter to premium agents
        return filterPremium(agents)
    },
})

Built-in Strategies:

  • Round Robin: Even distribution
  • Least Loaded: Based on workload
  • Capability Match: Based on requirements
  • Random: Random selection
  • Rule Based: Custom rules
  • Cost Optimized: Minimize cost
  • Performance: Fastest agents

Distributed Lock Service

Redis-based locking for distributed systems:

// Initialize service
lockService := NewDocumentLockService(
    redisClient,
    logger,
    tracer,
)

// Acquire lock
lock, err := lockService.AcquireLock(ctx, &LockRequest{
    ResourceID: "doc-123",
    OwnerID: userID,
    Duration: 5 * time.Minute,
    Metadata: map[string]string{
        "operation": "edit",
    },
})

// Auto-refresh
ctx, cancel := context.WithCancel(ctx)
go lockService.AutoRefresh(ctx, lock.ID, 30*time.Second)
defer cancel()

// Check lock
isLocked, owner := lockService.IsLocked(ctx, "doc-123")

// Force unlock (admin)
err = lockService.ForceUnlock(ctx, "doc-123")

Features:

  • Distributed locking with TTL
  • Auto-refresh mechanism
  • Deadlock detection
  • Lock queuing
  • Metrics and monitoring

Notification Service

Event-driven notifications:

// Task notifications
err = notificationService.NotifyTaskAssigned(ctx, agentID, task)

// Broadcast to agents
err = notificationService.BroadcastToAgents(ctx, agentIDs, &Notification{
    Type: "system_update",
    Message: "Maintenance in 5 minutes",
    Priority: PriorityHigh,
})

// Workspace notifications
err = notificationService.NotifyWorkspaceEvent(ctx, workspaceID, &WorkspaceEvent{
    Type: "member_joined",
    Actor: userID,
    Timestamp: time.Now(),
})

Error Handling

Comprehensive error types with context:

// Service errors
var (
    ErrRateLimitExceeded = &ServiceError{
        Code: "RATE_LIMIT_EXCEEDED",
        Type: ErrorTypeRateLimit,
    }
    
    ErrQuotaExceeded = &ServiceError{
        Code: "QUOTA_EXCEEDED",
        Type: ErrorTypeQuota,
    }
    
    ErrUnauthorized = &ServiceError{
        Code: "UNAUTHORIZED",
        Type: ErrorTypeAuthorization,
    }
)

// Domain errors
var (
    ErrTaskNotFound = &TaskError{
        Code: "TASK_NOT_FOUND",
        Type: ErrorTypeNotFound,
    }
    
    ErrWorkflowInvalid = &WorkflowError{
        Code: "WORKFLOW_INVALID",
        Type: ErrorTypeValidation,
    }
)

Resilience Patterns

Circuit Breaker
// Wrap external calls
result, err := service.WithCircuitBreaker("external-api", func() (interface{}, error) {
    return externalAPI.Call()
})
Retry with Backoff
// Retry failed operations
err := service.RetryWithBackoff(ctx, func() error {
    return unstableOperation()
}, RetryOptions{
    MaxAttempts: 3,
    InitialDelay: 100 * time.Millisecond,
    MaxDelay: 5 * time.Second,
})
Timeout Control
// Set operation timeout
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

result, err := service.PerformOperation(ctx)

Observability

Metrics
// Automatic metrics collection
- service_operations_total
- service_operation_duration_seconds
- service_errors_total
- service_rate_limit_hits_total
- service_quota_usage
- service_circuit_breaker_state
Tracing
// Automatic span creation
ctx, span := service.tracer.Start(ctx, "ServiceName.OperationName")
defer span.End()

// Add attributes
span.SetAttributes(
    attribute.String("task.id", taskID),
    attribute.Int("retry.count", retryCount),
)
Logging
// Structured logging
service.logger.Info("Task assigned",
    "task_id", task.ID,
    "agent_id", agent.ID,
    "duration", time.Since(start),
)

Testing

Unit Tests
// Mock dependencies
func TestTaskService(t *testing.T) {
    mockRepo := mocks.NewMockRepository()
    mockCache := mocks.NewMockCache()
    
    service := NewTaskServiceImpl(
        mockRepo,
        mockEngine,
        mockCache,
        mockEventBus,
        logger,
        tracer,
    )
    
    // Test operations
    task, err := service.CreateTask(ctx, request)
    assert.NoError(t, err)
}
Integration Tests
// Test with real dependencies
func TestTaskServiceIntegration(t *testing.T) {
    if testing.Short() {
        t.Skip("Skipping integration test")
    }
    
    // Setup test database
    db := setupTestDB(t)
    defer cleanupDB(db)
    
    // Test full workflow
}

Best Practices

  1. Always use context: Pass context for cancellation and tracing
  2. Handle errors explicitly: Check all error returns
  3. Use transactions: Wrap multi-step operations
  4. Set timeouts: Prevent hanging operations
  5. Monitor metrics: Track service health
  6. Test resilience: Test failure scenarios
  7. Document APIs: Keep interfaces well-documented

Performance Considerations

  • Caching: Use multi-level caching for read-heavy operations
  • Batch Operations: Use bulk methods when available
  • Connection Pooling: Reuse database connections
  • Async Processing: Use background workers for long operations
  • Circuit Breakers: Prevent cascade failures

Future Enhancements

  • GraphQL API support
  • Event sourcing for audit trail
  • CQRS pattern implementation
  • Saga orchestration improvements
  • Advanced scheduling algorithms
  • Machine learning for assignment

References

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRateLimitExceeded       = errors.New("rate limit exceeded")
	ErrQuotaExceeded           = errors.New("quota exceeded")
	ErrUnauthorized            = errors.New("unauthorized")
	ErrInsufficientPermissions = errors.New("insufficient permissions")
	ErrConcurrentModification  = errors.New("concurrent modification")
	ErrNoEligibleAgents        = errors.New("no eligible agents")
	ErrNoCapableAgent          = errors.New("no capable agent")
	ErrDelegationDenied        = errors.New("delegation denied")
	ErrNotFound                = errors.New("resource not found")
	ErrWorkflowNotActive       = errors.New("workflow is not active")
	ErrDocumentLocked          = errors.New("document is locked")
	ErrMergeConflict           = errors.New("merge conflict detected")

	// Additional errors
	ErrInvalidID        = errors.New("invalid ID format")
	ErrResourceNotFound = errors.New("resource not found")
)

Service errors

View Source
var AssignmentStrategyLeastLoad = &LeastLoadedStrategy{}

AssignmentStrategyLeastLoad represents the least load assignment strategy

Functions

func CreateDefaultCircuitBreakerSettings

func CreateDefaultCircuitBreakerSettings() *gobreaker.Settings

CreateDefaultCircuitBreakerSettings creates default circuit breaker settings

func ExponentialBackoff

func ExponentialBackoff(base time.Duration, factor float64) func(attempt int) time.Duration

ExponentialBackoff creates an exponential backoff function

Types

type AESEncryptionService

type AESEncryptionService struct {
	// contains filtered or unexported fields
}

AESEncryptionService provides AES-GCM encryption for production use

func (*AESEncryptionService) Decrypt

func (s *AESEncryptionService) Decrypt(ctx context.Context, ciphertext []byte) ([]byte, error)

Decrypt decrypts data using AES-GCM

func (*AESEncryptionService) DecryptString

func (s *AESEncryptionService) DecryptString(ctx context.Context, ciphertext string) (string, error)

DecryptString decrypts a base64 encoded string

func (*AESEncryptionService) Encrypt

func (s *AESEncryptionService) Encrypt(ctx context.Context, plaintext []byte) ([]byte, error)

Encrypt encrypts data using AES-GCM

func (*AESEncryptionService) EncryptString

func (s *AESEncryptionService) EncryptString(ctx context.Context, plaintext string) (string, error)

EncryptString encrypts a string and returns base64 encoded result

type AgentNotInWorkspaceError

type AgentNotInWorkspaceError struct {
	AgentID     string
	WorkspaceID uuid.UUID
}

AgentNotInWorkspaceError represents agent not in workspace

func (AgentNotInWorkspaceError) Error

func (e AgentNotInWorkspaceError) Error() string

type AgentService

type AgentService interface {
	GetAgent(ctx context.Context, agentID string) (*models.Agent, error)
	GetAvailableAgents(ctx context.Context) ([]*models.Agent, error)
	GetAgentCapabilities(ctx context.Context, agentID string) ([]string, error)
	UpdateAgentStatus(ctx context.Context, agentID string, status string) error
	GetAgentWorkload(ctx context.Context, agentID string) (*models.AgentWorkload, error)
}

AgentService provides agent management functionality

func NewAgentService

func NewAgentService(config ServiceConfig, repo agent.Repository) AgentService

NewAgentService creates a new agent service

type AggregateRoot

type AggregateRoot interface {
	GetID() uuid.UUID
	GetType() string
	GetVersion() int
}

AggregateRoot represents an aggregate root for event sourcing

type AssignmentEngine

type AssignmentEngine struct {
	// contains filtered or unexported fields
}

AssignmentEngine handles intelligent task assignment

func NewAssignmentEngine

func NewAssignmentEngine(ruleEngine rules.Engine, agentService AgentService, logger observability.Logger, metrics observability.MetricsClient) *AssignmentEngine

NewAssignmentEngine creates a new assignment engine

func (*AssignmentEngine) FindBestAgent

func (e *AssignmentEngine) FindBestAgent(ctx context.Context, task *models.Task) (*models.Agent, error)

FindBestAgent finds the best agent for a task using rules and strategies

func (*AssignmentEngine) RegisterStrategy

func (e *AssignmentEngine) RegisterStrategy(name string, strategy AssignmentStrategy)

RegisterStrategy registers an assignment strategy

type AssignmentStrategy

type AssignmentStrategy interface {
	Assign(ctx context.Context, task *models.Task, agents []*models.Agent) (*models.Agent, error)
	GetName() string
}

AssignmentStrategy defines how tasks are assigned to agents

type BaseService

type BaseService struct {
	// contains filtered or unexported fields
}

BaseService provides common functionality for all services

func NewBaseService

func NewBaseService(config ServiceConfig) BaseService

NewBaseService creates a new base service

func (*BaseService) CheckQuota

func (s *BaseService) CheckQuota(ctx context.Context, resource string, amount int64) error

CheckQuota verifies resource quota

func (*BaseService) CheckRateLimit

func (s *BaseService) CheckRateLimit(ctx context.Context, resource string) error

CheckRateLimit enforces rate limiting per tenant/agent

func (*BaseService) PublishEvent

func (s *BaseService) PublishEvent(ctx context.Context, eventType string, aggregate AggregateRoot, data interface{}) error

PublishEvent publishes an event with versioning and metadata

func (*BaseService) SetEventPublisher

func (s *BaseService) SetEventPublisher(publisher events.EventPublisher)

SetEventPublisher sets the event publisher

func (*BaseService) SetEventStore

func (s *BaseService) SetEventStore(store events.EventStore)

SetEventStore sets the event store

func (*BaseService) SetHealthChecker

func (s *BaseService) SetHealthChecker(checker HealthChecker)

SetHealthChecker sets the health checker

func (*BaseService) StartDistributedTransaction

func (s *BaseService) StartDistributedTransaction(ctx context.Context, opts ...TxOption) (context.Context, func(), func())

StartDistributedTransaction starts a new distributed transaction with options

func (*BaseService) WithTransaction

func (s *BaseService) WithTransaction(ctx context.Context, fn func(ctx context.Context, tx Transaction) error) error

WithTransaction executes a function within a transaction with saga support

type CapabilityMatchStrategy

type CapabilityMatchStrategy struct {
	// contains filtered or unexported fields
}

CapabilityMatchStrategy assigns tasks based on agent capabilities

func (*CapabilityMatchStrategy) Assign

func (s *CapabilityMatchStrategy) Assign(ctx context.Context, task *models.Task, agents []*models.Agent) (*models.Agent, error)

func (*CapabilityMatchStrategy) GetName

func (s *CapabilityMatchStrategy) GetName() string

type CollaboratorAddedEvent

type CollaboratorAddedEvent struct {
	WorkspaceID uuid.UUID
	AgentID     string
	Role        string
	Timestamp   time.Time
}

type Compensator

type Compensator struct {
	// contains filtered or unexported fields
}

Compensator manages saga compensations

func NewCompensator

func NewCompensator(logger observability.Logger) *Compensator

NewCompensator creates a new compensator

func (*Compensator) AddCompensation

func (c *Compensator) AddCompensation(fn func() error)

AddCompensation adds a compensation function

func (*Compensator) Compensate

func (c *Compensator) Compensate(ctx context.Context) error

Compensate runs all compensations

type ConcurrentModificationError

type ConcurrentModificationError struct {
	Resource string
	ID       uuid.UUID
	Version  int
}

ConcurrentModificationError represents a concurrent modification conflict

func (ConcurrentModificationError) Error

type ConflictResolutionService

type ConflictResolutionService interface {
	// Document conflict resolution
	ResolveDocumentConflict(ctx context.Context, documentID uuid.UUID, operations []collaboration.DocumentOperation) (*models.Document, error)
	GetDocumentCRDT(ctx context.Context, documentID uuid.UUID) (*collaboration.DocumentCRDT, error)
	ApplyDocumentOperation(ctx context.Context, documentID uuid.UUID, operation *collaboration.CRDTOperation) error
	SyncDocument(ctx context.Context, documentID uuid.UUID, remoteCRDT *collaboration.DocumentCRDT) error

	// Workspace state conflict resolution
	ResolveWorkspaceStateConflict(ctx context.Context, workspaceID uuid.UUID, operations []models.StateOperation) (*models.WorkspaceState, error)
	GetWorkspaceStateCRDT(ctx context.Context, workspaceID uuid.UUID) (*collaboration.StateCRDT, error)
	ApplyStateOperation(ctx context.Context, workspaceID uuid.UUID, operation *collaboration.StateOperation) error
	SyncWorkspaceState(ctx context.Context, workspaceID uuid.UUID, remoteState *collaboration.StateCRDT) error

	// Task conflict resolution
	ResolveTaskConflict(ctx context.Context, taskID uuid.UUID, conflicts []models.TaskConflict) (*models.Task, error)

	// General conflict detection
	DetectConflicts(ctx context.Context, entityType string, entityID uuid.UUID) ([]*models.ConflictInfo, error)
	GetConflictHistory(ctx context.Context, entityType string, entityID uuid.UUID) ([]*models.ConflictResolution, error)

	// Vector clock management
	GetVectorClock(ctx context.Context, nodeID string) (crdt.VectorClock, error)
	UpdateVectorClock(ctx context.Context, nodeID string, clock crdt.VectorClock) error
}

ConflictResolutionService manages conflict resolution for collaborative entities

func NewConflictResolutionService

func NewConflictResolutionService(
	config ServiceConfig,
	documentRepo interfaces.DocumentRepository,
	workspaceRepo interfaces.WorkspaceRepository,
	taskRepo interfaces.TaskRepository,
	nodeID string,
) ConflictResolutionService

NewConflictResolutionService creates a new conflict resolution service

type CostOptimizedStrategy

type CostOptimizedStrategy struct {
	// contains filtered or unexported fields
}

CostOptimizedStrategy assigns tasks based on cost optimization

func (*CostOptimizedStrategy) Assign

func (s *CostOptimizedStrategy) Assign(ctx context.Context, task *models.Task, agents []*models.Agent) (*models.Agent, error)

func (*CostOptimizedStrategy) GetName

func (s *CostOptimizedStrategy) GetName() string

type DeadlockInfo

type DeadlockInfo struct {
	ID            string    `json:"id"`
	DetectedAt    time.Time `json:"detected_at"`
	InvolvedLocks []string  `json:"involved_locks"`
	Agents        []string  `json:"agents"`
	CycleInfo     string    `json:"cycle_info"`
}

DeadlockInfo contains information about detected deadlocks

type DefaultSanitizer

type DefaultSanitizer struct{}

DefaultSanitizer implements production-grade input sanitization

func (*DefaultSanitizer) SanitizeHTML

func (s *DefaultSanitizer) SanitizeHTML(input string) string

func (*DefaultSanitizer) SanitizeJSON

func (s *DefaultSanitizer) SanitizeJSON(input interface{}) (interface{}, error)

func (*DefaultSanitizer) SanitizeString

func (s *DefaultSanitizer) SanitizeString(input string) string

type DelegationError

type DelegationError struct {
	Reason string
}

DelegationError provides delegation denial details

func (DelegationError) Error

func (e DelegationError) Error() string

type DocumentLock

type DocumentLock struct {
	ID           string                 `json:"id"`
	DocumentID   uuid.UUID              `json:"document_id"`
	AgentID      string                 `json:"agent_id"`
	Type         LockType               `json:"type"`
	AcquiredAt   time.Time              `json:"acquired_at"`
	ExpiresAt    time.Time              `json:"expires_at"`
	RefreshCount int                    `json:"refresh_count"`
	Metadata     map[string]interface{} `json:"metadata,omitempty"`
}

DocumentLock represents a lock on a document

type DocumentLockService

type DocumentLockService interface {
	// Document-level locking
	LockDocument(ctx context.Context, documentID uuid.UUID, agentID string, duration time.Duration) (*DocumentLock, error)
	UnlockDocument(ctx context.Context, documentID uuid.UUID, agentID string) error
	ExtendLock(ctx context.Context, documentID uuid.UUID, agentID string, extension time.Duration) error
	IsDocumentLocked(ctx context.Context, documentID uuid.UUID) (bool, *DocumentLock, error)

	// Section-level locking
	LockSection(ctx context.Context, documentID uuid.UUID, sectionID string, agentID string, duration time.Duration) (*SectionLock, error)
	UnlockSection(ctx context.Context, documentID uuid.UUID, sectionID string, agentID string) error
	GetSectionLocks(ctx context.Context, documentID uuid.UUID) ([]*SectionLock, error)

	// Lock management
	GetActiveLocks(ctx context.Context, agentID string) ([]*DocumentLock, error)
	ReleaseAllLocks(ctx context.Context, agentID string) error

	// Deadlock detection
	DetectDeadlocks(ctx context.Context) ([]*DeadlockInfo, error)
	ResolveDeadlock(ctx context.Context, deadlockID string) error
}

DocumentLockService provides distributed locking for documents

func NewDocumentLockService

func NewDocumentLockService(
	config ServiceConfig,
	redisClient *redis.Client,
) DocumentLockService

NewDocumentLockService creates a new document lock service

type DocumentLockedError

type DocumentLockedError struct {
	DocumentID uuid.UUID
	LockedBy   string
}

DocumentLockedError represents a locked document

func (DocumentLockedError) Error

func (e DocumentLockedError) Error() string

type DocumentService

type DocumentService interface {
	// Document lifecycle
	Create(ctx context.Context, doc *models.SharedDocument) error
	Get(ctx context.Context, id uuid.UUID) (*models.SharedDocument, error)
	Update(ctx context.Context, doc *models.SharedDocument) error
	Delete(ctx context.Context, id uuid.UUID) error

	// Collaborative editing
	ApplyOperation(ctx context.Context, docID uuid.UUID, operation *collaboration.DocumentOperation) error
	GetOperations(ctx context.Context, docID uuid.UUID, since time.Time) ([]*collaboration.DocumentOperation, error)
	GetOperationsBySequence(ctx context.Context, docID uuid.UUID, fromSeq, toSeq int64) ([]*collaboration.DocumentOperation, error)

	// Locking
	AcquireLock(ctx context.Context, docID uuid.UUID, agentID string, duration time.Duration) error
	ReleaseLock(ctx context.Context, docID uuid.UUID, agentID string) error
	ExtendLock(ctx context.Context, docID uuid.UUID, agentID string, duration time.Duration) error
	GetLockInfo(ctx context.Context, docID uuid.UUID) (*models.DocumentLock, error)

	// Conflict resolution
	DetectConflicts(ctx context.Context, docID uuid.UUID) ([]*models.ConflictInfo, error)
	ResolveConflict(ctx context.Context, conflictID uuid.UUID, resolution interface{}) error
	GetConflictHistory(ctx context.Context, docID uuid.UUID) ([]*models.ConflictResolution, error)

	// Version management
	CreateSnapshot(ctx context.Context, docID uuid.UUID) (*models.DocumentSnapshot, error)
	GetSnapshot(ctx context.Context, docID uuid.UUID, version int64) (*models.DocumentSnapshot, error)
	ListSnapshots(ctx context.Context, docID uuid.UUID) ([]*models.DocumentSnapshot, error)
	RestoreSnapshot(ctx context.Context, docID uuid.UUID, version int64) error

	// Search and query
	SearchDocuments(ctx context.Context, query string, filters interfaces.DocumentFilters) ([]*models.SharedDocument, error)
	GetDocumentsByWorkspace(ctx context.Context, workspaceID uuid.UUID) ([]*models.SharedDocument, error)
	GetDocumentsByCreator(ctx context.Context, createdBy string) ([]*models.SharedDocument, error)

	// Analytics
	GetDocumentStats(ctx context.Context, docID uuid.UUID) (*interfaces.DocumentStats, error)
	GetCollaborationMetrics(ctx context.Context, docID uuid.UUID, period time.Duration) (*models.CollaborationMetrics, error)

	// Real-time updates
	SubscribeToChanges(ctx context.Context, docID uuid.UUID, handler func(operation *collaboration.DocumentOperation)) (unsubscribe func())
	BroadcastChange(ctx context.Context, docID uuid.UUID, operation *collaboration.DocumentOperation) error
}

DocumentService manages collaborative documents with conflict resolution

func NewDocumentService

func NewDocumentService(
	config ServiceConfig,
	documentRepo interfaces.DocumentRepository,
	cache cache.Cache,
) DocumentService

NewDocumentService creates a production-grade document service

func NewDocumentServiceWithLocks

func NewDocumentServiceWithLocks(
	config ServiceConfig,
	documentRepo interfaces.DocumentRepository,
	cache cache.Cache,
	lockService DocumentLockService,
) DocumentService

NewDocumentServiceWithLocks creates a document service with distributed locking

type EncryptionService

type EncryptionService interface {
	Encrypt(ctx context.Context, data []byte) ([]byte, error)
	Decrypt(ctx context.Context, data []byte) ([]byte, error)
	EncryptString(ctx context.Context, data string) (string, error)
	DecryptString(ctx context.Context, data string) (string, error)
}

EncryptionService provides encryption functionality

func NewAESEncryptionService

func NewAESEncryptionService(key []byte) (EncryptionService, error)

NewAESEncryptionService creates a production encryption service with AES-256-GCM

func NewNoOpEncryptionService

func NewNoOpEncryptionService() EncryptionService

NewNoOpEncryptionService creates a new no-op encryption service

type EnhancedTaskService

type EnhancedTaskService struct {
	// contains filtered or unexported fields
}

EnhancedTaskService implements TaskService with production features

func (*EnhancedTaskService) AcceptTask

func (s *EnhancedTaskService) AcceptTask(ctx context.Context, taskID uuid.UUID, agentID string) error

AcceptTask accepts a delegated task with validation

func (EnhancedTaskService) ArchiveCompletedTasks

func (s EnhancedTaskService) ArchiveCompletedTasks(ctx context.Context, before time.Time) (int64, error)

func (EnhancedTaskService) AssignTask

func (s EnhancedTaskService) AssignTask(ctx context.Context, taskID uuid.UUID, agentID string) error

func (EnhancedTaskService) AutoAssignTask

func (s EnhancedTaskService) AutoAssignTask(ctx context.Context, taskID uuid.UUID, strategy AssignmentStrategy) error

func (EnhancedTaskService) CancelTaskTree

func (s EnhancedTaskService) CancelTaskTree(ctx context.Context, rootTaskID uuid.UUID, reason string) error

func (*EnhancedTaskService) CompleteTask

func (s *EnhancedTaskService) CompleteTask(ctx context.Context, taskID uuid.UUID, agentID string, result interface{}) error

CompleteTask completes a task with result storage

func (EnhancedTaskService) CompleteWorkflowTask

func (s EnhancedTaskService) CompleteWorkflowTask(ctx context.Context, taskID uuid.UUID, output interface{}) error

func (*EnhancedTaskService) Create

func (s *EnhancedTaskService) Create(ctx context.Context, task *models.Task, idempotencyKey string) error

Create creates a task with idempotency support

func (EnhancedTaskService) CreateBatch

func (s EnhancedTaskService) CreateBatch(ctx context.Context, tasks []*models.Task) error

CreateBatch creates multiple tasks in a single operation

func (EnhancedTaskService) CreateDistributedTask

func (s EnhancedTaskService) CreateDistributedTask(ctx context.Context, dt *models.DistributedTask) error

CreateDistributedTask creates a task with subtasks using saga pattern

func (EnhancedTaskService) CreateWorkflowTask

func (s EnhancedTaskService) CreateWorkflowTask(ctx context.Context, workflowID, stepID uuid.UUID, params map[string]interface{}) (*models.Task, error)

func (*EnhancedTaskService) DelegateTask

func (s *EnhancedTaskService) DelegateTask(ctx context.Context, delegation *models.TaskDelegation) error

DelegateTask delegates a task with history tracking

func (EnhancedTaskService) Delete

func (s EnhancedTaskService) Delete(ctx context.Context, id uuid.UUID) error

func (EnhancedTaskService) FailTask

func (s EnhancedTaskService) FailTask(ctx context.Context, taskID uuid.UUID, agentID string, errorMsg string) error

func (EnhancedTaskService) GenerateTaskReport

func (s EnhancedTaskService) GenerateTaskReport(ctx context.Context, filters interfaces.TaskFilters, format string) ([]byte, error)

func (EnhancedTaskService) Get

func (s EnhancedTaskService) Get(ctx context.Context, id uuid.UUID) (*models.Task, error)

Get retrieves a task by ID

func (EnhancedTaskService) GetAgentPerformance

func (s EnhancedTaskService) GetAgentPerformance(ctx context.Context, agentID string, period time.Duration) (*models.AgentPerformance, error)

func (EnhancedTaskService) GetAgentTasks

func (s EnhancedTaskService) GetAgentTasks(ctx context.Context, agentID string, filters interfaces.TaskFilters) ([]*models.Task, error)

func (EnhancedTaskService) GetAvailableTasks

func (s EnhancedTaskService) GetAvailableTasks(ctx context.Context, agentID string, capabilities []string) ([]*models.Task, error)

func (EnhancedTaskService) GetBatch

func (s EnhancedTaskService) GetBatch(ctx context.Context, ids []uuid.UUID) ([]*models.Task, error)

func (EnhancedTaskService) GetTaskStats

func (s EnhancedTaskService) GetTaskStats(ctx context.Context, filters interfaces.TaskFilters) (*models.TaskStats, error)

func (EnhancedTaskService) GetTaskTimeline

func (s EnhancedTaskService) GetTaskTimeline(ctx context.Context, taskID uuid.UUID) ([]*models.TaskEvent, error)

func (EnhancedTaskService) GetTaskTree

func (s EnhancedTaskService) GetTaskTree(ctx context.Context, rootTaskID uuid.UUID) (*models.TaskTree, error)

func (EnhancedTaskService) RebalanceTasks

func (s EnhancedTaskService) RebalanceTasks(ctx context.Context) error

func (EnhancedTaskService) RejectTask

func (s EnhancedTaskService) RejectTask(ctx context.Context, taskID uuid.UUID, agentID string, reason string) error

func (EnhancedTaskService) RetryTask

func (s EnhancedTaskService) RetryTask(ctx context.Context, taskID uuid.UUID) error

func (EnhancedTaskService) SearchTasks

func (s EnhancedTaskService) SearchTasks(ctx context.Context, query string, filters interfaces.TaskFilters) ([]*models.Task, error)

func (EnhancedTaskService) StartTask

func (s EnhancedTaskService) StartTask(ctx context.Context, taskID uuid.UUID, agentID string) error

func (EnhancedTaskService) SubmitSubtaskResult

func (s EnhancedTaskService) SubmitSubtaskResult(ctx context.Context, parentTaskID, subtaskID uuid.UUID, result interface{}) error

func (EnhancedTaskService) Update

func (s EnhancedTaskService) Update(ctx context.Context, task *models.Task) error

func (EnhancedTaskService) UpdateProgress

func (s EnhancedTaskService) UpdateProgress(ctx context.Context, taskID uuid.UUID, progress int, message string) error

func (*EnhancedTaskService) UpdateStatus

func (s *EnhancedTaskService) UpdateStatus(ctx context.Context, taskID uuid.UUID, newStatus models.TaskStatus, reason string) error

UpdateStatus updates task status with state machine validation

type EventBus

type EventBus interface {
	Subscribe(topic string, handler func(event interface{})) (unsubscribe func())
	Publish(topic string, event interface{})
	Close()
}

EventBus provides event broadcasting

type ExecutionMonitor

type ExecutionMonitor struct {
	// contains filtered or unexported fields
}

ExecutionMonitor monitors running workflow executions

func NewExecutionMonitor

func NewExecutionMonitor(service *workflowService) *ExecutionMonitor

func (*ExecutionMonitor) Start

func (m *ExecutionMonitor) Start()

func (*ExecutionMonitor) Stop

func (m *ExecutionMonitor) Stop()

type HealthChecker

type HealthChecker interface {
	Check(ctx context.Context) error
	RegisterCheck(name string, check func(context.Context) error)
}

HealthChecker provides health check functionality

type InMemoryQuotaManager

type InMemoryQuotaManager struct {
	// contains filtered or unexported fields
}

InMemoryQuotaManager implements a production-grade in-memory quota manager

func (*InMemoryQuotaManager) GetQuota

func (qm *InMemoryQuotaManager) GetQuota(ctx context.Context, tenantID uuid.UUID, resource string) (int64, error)

func (*InMemoryQuotaManager) GetQuotaStatus

func (qm *InMemoryQuotaManager) GetQuotaStatus(ctx context.Context, tenantID uuid.UUID) (*QuotaStatus, error)

func (*InMemoryQuotaManager) GetUsage

func (qm *InMemoryQuotaManager) GetUsage(ctx context.Context, tenantID uuid.UUID, resource string) (int64, error)

func (*InMemoryQuotaManager) IncrementUsage

func (qm *InMemoryQuotaManager) IncrementUsage(ctx context.Context, tenantID uuid.UUID, resource string, amount int64) error

func (*InMemoryQuotaManager) SetQuota

func (qm *InMemoryQuotaManager) SetQuota(ctx context.Context, tenantID uuid.UUID, resource string, limit int64) error

type InMemoryRateLimiter

type InMemoryRateLimiter struct {
	// contains filtered or unexported fields
}

InMemoryRateLimiter implements a production-grade in-memory rate limiter

func (*InMemoryRateLimiter) Check

func (rl *InMemoryRateLimiter) Check(ctx context.Context, key string) error

func (*InMemoryRateLimiter) CheckWithLimit

func (rl *InMemoryRateLimiter) CheckWithLimit(ctx context.Context, key string, limit int, window time.Duration) error

func (*InMemoryRateLimiter) GetRemaining

func (rl *InMemoryRateLimiter) GetRemaining(ctx context.Context, key string) (int, error)

func (*InMemoryRateLimiter) Reset

func (rl *InMemoryRateLimiter) Reset(ctx context.Context, key string) error

type LeastLoadedStrategy

type LeastLoadedStrategy struct {
	// contains filtered or unexported fields
}

LeastLoadedStrategy assigns tasks to the least loaded agent

func (*LeastLoadedStrategy) Assign

func (s *LeastLoadedStrategy) Assign(ctx context.Context, task *models.Task, agents []*models.Agent) (*models.Agent, error)

func (*LeastLoadedStrategy) GetName

func (s *LeastLoadedStrategy) GetName() string

type LockConflictError

type LockConflictError struct {
	DocumentID    uuid.UUID
	CurrentHolder string
	ExpiresAt     time.Time
}

func (*LockConflictError) Error

func (e *LockConflictError) Error() string

type LockNotFoundError

type LockNotFoundError struct {
	DocumentID uuid.UUID
}

func (*LockNotFoundError) Error

func (e *LockNotFoundError) Error() string

type LockRefreshLimitError

type LockRefreshLimitError struct {
	DocumentID   uuid.UUID
	RefreshCount int
	MaxRefresh   int
}

func (*LockRefreshLimitError) Error

func (e *LockRefreshLimitError) Error() string

type LockType

type LockType string

LockType represents the type of lock

const (
	LockTypeExclusive LockType = "exclusive"
	LockTypeShared    LockType = "shared"
	LockTypeSection   LockType = "section"
)

type MergeConflictError

type MergeConflictError struct {
	DocumentID uuid.UUID
	Conflicts  []string
}

MergeConflictError represents a document merge conflict

func (MergeConflictError) Error

func (e MergeConflictError) Error() string

type NoEligibleAgentsError

type NoEligibleAgentsError struct {
	TaskType     string
	Requirements []string
}

NoEligibleAgentsError represents no eligible agents found

func (NoEligibleAgentsError) Error

func (e NoEligibleAgentsError) Error() string

type NoOpEncryptionService

type NoOpEncryptionService struct{}

NoOpEncryptionService implements a no-op encryption service for development In production, this would use AES-GCM or similar

func (*NoOpEncryptionService) Decrypt

func (s *NoOpEncryptionService) Decrypt(ctx context.Context, data []byte) ([]byte, error)

func (*NoOpEncryptionService) DecryptString

func (s *NoOpEncryptionService) DecryptString(ctx context.Context, data string) (string, error)

func (*NoOpEncryptionService) Encrypt

func (s *NoOpEncryptionService) Encrypt(ctx context.Context, data []byte) ([]byte, error)

func (*NoOpEncryptionService) EncryptString

func (s *NoOpEncryptionService) EncryptString(ctx context.Context, data string) (string, error)

type NoOpSpan

type NoOpSpan struct{}

NoOpSpan implements a no-op span for tracing

func (NoOpSpan) AddEvent

func (s NoOpSpan) AddEvent(name string, attributes map[string]interface{})

func (NoOpSpan) End

func (s NoOpSpan) End()

func (NoOpSpan) RecordError

func (s NoOpSpan) RecordError(err error)

func (NoOpSpan) SetAttribute

func (s NoOpSpan) SetAttribute(key string, value interface{})

func (NoOpSpan) SetStatus

func (s NoOpSpan) SetStatus(code int, message string)

type NotificationService

type NotificationService interface {
	NotifyTaskAssigned(ctx context.Context, agentID string, task interface{}) error
	NotifyTaskCompleted(ctx context.Context, agentID string, task interface{}) error
	NotifyTaskFailed(ctx context.Context, taskID uuid.UUID, agentID string, reason string) error
	NotifyWorkflowStarted(ctx context.Context, workflow interface{}) error
	NotifyWorkflowCompleted(ctx context.Context, workflow interface{}) error
	NotifyWorkflowFailed(ctx context.Context, workflowID uuid.UUID, reason string) error
	NotifyWorkflowUpdated(ctx context.Context, workflow interface{}) error
	NotifyStepStarted(ctx context.Context, executionID uuid.UUID, stepID string) error
	NotifyStepCompleted(ctx context.Context, executionID uuid.UUID, stepID string, output interface{}) error
	NotifyResourceDeleted(ctx context.Context, resourceType string, resourceID uuid.UUID) error
	BroadcastToAgents(ctx context.Context, agentIDs []string, message interface{}) error
	BroadcastToWorkspace(ctx context.Context, workspaceID uuid.UUID, message interface{}) error
}

NotificationService handles notifications

func NewNotificationService

func NewNotificationService(config ServiceConfig) NotificationService

NewNotificationService creates a new notification service

type PerformanceBasedStrategy

type PerformanceBasedStrategy struct {
	// contains filtered or unexported fields
}

PerformanceBasedStrategy assigns tasks based on agent performance metrics

func (*PerformanceBasedStrategy) Assign

func (s *PerformanceBasedStrategy) Assign(ctx context.Context, task *models.Task, agents []*models.Agent) (*models.Agent, error)

func (*PerformanceBasedStrategy) GetName

func (s *PerformanceBasedStrategy) GetName() string

type ProgressTracker

type ProgressTracker struct {
	// contains filtered or unexported fields
}

ProgressTracker tracks task progress

func NewProgressTracker

func NewProgressTracker(service *taskService) *ProgressTracker

NewProgressTracker creates a new progress tracker

func (*ProgressTracker) Start

func (t *ProgressTracker) Start()

Start starts the progress tracker

func (*ProgressTracker) Stop

func (t *ProgressTracker) Stop()

Stop stops the progress tracker

func (*ProgressTracker) Track

func (t *ProgressTracker) Track(taskID uuid.UUID)

Track adds a task to track

func (*ProgressTracker) Untrack

func (t *ProgressTracker) Untrack(taskID uuid.UUID)

Untrack removes a task from tracking

type QuotaError

type QuotaError struct {
	TenantID uuid.UUID
	Resource string
	Limit    int64
	Current  int64
}

QuotaError provides quota details

func (QuotaError) Error

func (e QuotaError) Error() string

type QuotaExceededError

type QuotaExceededError struct {
	Resource string
	Used     int64
	Limit    int64
}

QuotaExceededError represents a quota exceeded error (alternative structure)

func (QuotaExceededError) Error

func (e QuotaExceededError) Error() string

type QuotaInfo

type QuotaInfo struct {
	Resource  string
	Limit     int64
	Used      int64
	Available int64
	Period    string
}

QuotaInfo contains quota information for a resource

type QuotaManager

type QuotaManager interface {
	GetQuota(ctx context.Context, tenantID uuid.UUID, resource string) (int64, error)
	GetUsage(ctx context.Context, tenantID uuid.UUID, resource string) (int64, error)
	IncrementUsage(ctx context.Context, tenantID uuid.UUID, resource string, amount int64) error
	SetQuota(ctx context.Context, tenantID uuid.UUID, resource string, limit int64) error
	GetQuotaStatus(ctx context.Context, tenantID uuid.UUID) (*QuotaStatus, error)
}

QuotaManager manages resource quotas

func NewInMemoryQuotaManager

func NewInMemoryQuotaManager() QuotaManager

NewInMemoryQuotaManager creates a new in-memory quota manager

type QuotaStatus

type QuotaStatus struct {
	TenantID uuid.UUID
	Quotas   map[string]QuotaInfo
}

QuotaStatus represents the quota status for a tenant

type RateLimitError

type RateLimitError struct {
	Key        string
	Limit      int
	Window     time.Duration
	RetryAfter time.Duration
}

RateLimitError provides rate limit details

func (RateLimitError) Error

func (e RateLimitError) Error() string

type RateLimiter

type RateLimiter interface {
	Check(ctx context.Context, key string) error
	CheckWithLimit(ctx context.Context, key string, limit int, window time.Duration) error
	GetRemaining(ctx context.Context, key string) (int, error)
	Reset(ctx context.Context, key string) error
}

RateLimiter provides rate limiting functionality

func NewInMemoryRateLimiter

func NewInMemoryRateLimiter(rate int, window time.Duration) RateLimiter

NewInMemoryRateLimiter creates a new in-memory rate limiter

type ResultAggregator

type ResultAggregator struct {
	// contains filtered or unexported fields
}

ResultAggregator aggregates results from distributed tasks

func NewResultAggregator

func NewResultAggregator() *ResultAggregator

NewResultAggregator creates a new result aggregator

func (*ResultAggregator) AddResult

func (a *ResultAggregator) AddResult(parentTaskID, subtaskID uuid.UUID, result interface{})

AddResult adds a subtask result

func (*ResultAggregator) AggregateResults

func (a *ResultAggregator) AggregateResults(parentTaskID uuid.UUID, config models.AggregationConfig) (interface{}, error)

AggregateResults aggregates results based on configuration

func (*ResultAggregator) Clear

func (a *ResultAggregator) Clear(parentTaskID uuid.UUID)

Clear clears results for a parent task

func (*ResultAggregator) GetResults

func (a *ResultAggregator) GetResults(parentTaskID uuid.UUID) map[uuid.UUID]interface{}

GetResults gets all results for a parent task

type RoundRobinStrategy

type RoundRobinStrategy struct {
	// contains filtered or unexported fields
}

RoundRobinStrategy assigns tasks in round-robin fashion

func (*RoundRobinStrategy) Assign

func (s *RoundRobinStrategy) Assign(ctx context.Context, task *models.Task, agents []*models.Agent) (*models.Agent, error)

func (*RoundRobinStrategy) GetName

func (s *RoundRobinStrategy) GetName() string

type Sanitizer

type Sanitizer interface {
	SanitizeString(input string) string
	SanitizeJSON(input interface{}) (interface{}, error)
	SanitizeHTML(input string) string
}

Sanitizer provides input sanitization

func NewDefaultSanitizer

func NewDefaultSanitizer() Sanitizer

NewDefaultSanitizer creates a new default sanitizer

type SectionLock

type SectionLock struct {
	DocumentLock
	SectionID   string `json:"section_id"`
	StartOffset int    `json:"start_offset"`
	EndOffset   int    `json:"end_offset"`
}

SectionLock represents a lock on a document section

type SectionLockConflictError

type SectionLockConflictError struct {
	DocumentID    uuid.UUID
	SectionID     string
	CurrentHolder string
}

func (*SectionLockConflictError) Error

func (e *SectionLockConflictError) Error() string

type ServiceConfig

type ServiceConfig struct {
	// Resilience
	CircuitBreaker *gobreaker.Settings
	RetryPolicy    resilience.RetryPolicy
	TimeoutPolicy  resilience.TimeoutPolicy
	BulkheadPolicy resilience.BulkheadPolicy

	// Rate Limiting
	RateLimiter  RateLimiter
	QuotaManager QuotaManager

	// Security
	Authorizer        auth.Authorizer
	Sanitizer         Sanitizer
	EncryptionService EncryptionService

	// Observability
	Logger  observability.Logger
	Metrics observability.MetricsClient
	Tracer  observability.StartSpanFunc

	// Business Rules
	RuleEngine    rules.Engine
	PolicyManager rules.PolicyManager
}

ServiceConfig provides common configuration for all services

type StateStore

type StateStore interface {
	GetState(ctx context.Context, key string) (interface{}, error)
	SetState(ctx context.Context, key string, value interface{}) error
	GetStateForUpdate(ctx context.Context, key string) (interface{}, error)
	SaveState(ctx context.Context, key string, state interface{}) error
}

StateStore provides state persistence

type StepDependencyError

type StepDependencyError struct {
	StepID       string
	DependencyID string
	Status       string
}

StepDependencyError represents a step dependency failure

func (StepDependencyError) Error

func (e StepDependencyError) Error() string

type StepNotFoundError

type StepNotFoundError struct {
	WorkflowID uuid.UUID
	StepID     string
}

StepNotFoundError represents a missing workflow step

func (StepNotFoundError) Error

func (e StepNotFoundError) Error() string

type TaskCreationSaga

type TaskCreationSaga struct {
	// contains filtered or unexported fields
}

TaskCreationSaga handles distributed task creation

func NewTaskCreationSaga

func NewTaskCreationSaga(service *taskService, dt *models.DistributedTask) *TaskCreationSaga

NewTaskCreationSaga creates a new task creation saga

func (*TaskCreationSaga) CreateMainTask

func (s *TaskCreationSaga) CreateMainTask(ctx context.Context, tx Transaction) (*models.Task, error)

CreateMainTask creates the main task

func (*TaskCreationSaga) CreateSubtask

func (s *TaskCreationSaga) CreateSubtask(ctx context.Context, tx Transaction, mainTask *models.Task, subtaskDef models.Subtask, agent *models.Agent) (*models.Task, error)

CreateSubtask creates a subtask

func (*TaskCreationSaga) DeleteMainTask

func (s *TaskCreationSaga) DeleteMainTask(ctx context.Context, taskID uuid.UUID) error

DeleteMainTask deletes the main task (compensation)

func (*TaskCreationSaga) DeleteSubtask

func (s *TaskCreationSaga) DeleteSubtask(ctx context.Context, subtaskID uuid.UUID) error

DeleteSubtask deletes a subtask (compensation)

func (*TaskCreationSaga) PublishEvents

func (s *TaskCreationSaga) PublishEvents(ctx context.Context) error

PublishEvents publishes all saga events

func (*TaskCreationSaga) ValidateAgents

func (s *TaskCreationSaga) ValidateAgents(ctx context.Context, subtasks []models.Subtask) (map[string]*models.Agent, error)

ValidateAgents validates agent availability

type TaskRebalancer

type TaskRebalancer struct {
	// contains filtered or unexported fields
}

TaskRebalancer handles task rebalancing

func NewTaskRebalancer

func NewTaskRebalancer(service *taskService, ruleEngine rules.Engine) *TaskRebalancer

NewTaskRebalancer creates a new task rebalancer

func (*TaskRebalancer) Start

func (r *TaskRebalancer) Start()

Start starts the rebalancer

func (*TaskRebalancer) Stop

func (r *TaskRebalancer) Stop()

Stop stops the rebalancer

type TaskService

type TaskService interface {
	// Task lifecycle with idempotency
	Create(ctx context.Context, task *models.Task, idempotencyKey string) error
	CreateBatch(ctx context.Context, tasks []*models.Task) error
	Get(ctx context.Context, id uuid.UUID) (*models.Task, error)
	GetBatch(ctx context.Context, ids []uuid.UUID) ([]*models.Task, error)
	Update(ctx context.Context, task *models.Task) error
	Delete(ctx context.Context, id uuid.UUID) error

	// Task assignment with load balancing
	AssignTask(ctx context.Context, taskID uuid.UUID, agentID string) error
	AutoAssignTask(ctx context.Context, taskID uuid.UUID, strategy AssignmentStrategy) error
	DelegateTask(ctx context.Context, delegation *models.TaskDelegation) error
	AcceptTask(ctx context.Context, taskID uuid.UUID, agentID string) error
	RejectTask(ctx context.Context, taskID uuid.UUID, agentID string, reason string) error

	// Task execution with monitoring
	StartTask(ctx context.Context, taskID uuid.UUID, agentID string) error
	UpdateProgress(ctx context.Context, taskID uuid.UUID, progress int, message string) error
	CompleteTask(ctx context.Context, taskID uuid.UUID, agentID string, result interface{}) error
	FailTask(ctx context.Context, taskID uuid.UUID, agentID string, errorMsg string) error
	RetryTask(ctx context.Context, taskID uuid.UUID) error

	// Advanced querying with caching
	GetAgentTasks(ctx context.Context, agentID string, filters interfaces.TaskFilters) ([]*models.Task, error)
	GetAvailableTasks(ctx context.Context, agentID string, capabilities []string) ([]*models.Task, error)
	SearchTasks(ctx context.Context, query string, filters interfaces.TaskFilters) ([]*models.Task, error)
	GetTaskTimeline(ctx context.Context, taskID uuid.UUID) ([]*models.TaskEvent, error)

	// Distributed task management
	CreateDistributedTask(ctx context.Context, task *models.DistributedTask) error
	SubmitSubtaskResult(ctx context.Context, parentTaskID, subtaskID uuid.UUID, result interface{}) error
	GetTaskTree(ctx context.Context, rootTaskID uuid.UUID) (*models.TaskTree, error)
	CancelTaskTree(ctx context.Context, rootTaskID uuid.UUID, reason string) error

	// Workflow integration
	CreateWorkflowTask(ctx context.Context, workflowID, stepID uuid.UUID, params map[string]interface{}) (*models.Task, error)
	CompleteWorkflowTask(ctx context.Context, taskID uuid.UUID, output interface{}) error

	// Analytics and reporting
	GetTaskStats(ctx context.Context, filters interfaces.TaskFilters) (*models.TaskStats, error)
	GetAgentPerformance(ctx context.Context, agentID string, period time.Duration) (*models.AgentPerformance, error)
	GenerateTaskReport(ctx context.Context, filters interfaces.TaskFilters, format string) ([]byte, error)

	// Maintenance
	ArchiveCompletedTasks(ctx context.Context, before time.Time) (int64, error)
	RebalanceTasks(ctx context.Context) error
}

TaskService handles task lifecycle with production features

func NewEnhancedTaskService

func NewEnhancedTaskService(
	baseService *taskService,
	txManager repository.TransactionManager,
	uow database.UnitOfWork,
	eventPublisher events.Publisher,
) TaskService

NewEnhancedTaskService creates a production-ready task service

func NewTaskService

func NewTaskService(
	config ServiceConfig,
	repo interfaces.TaskRepository,
	agentService AgentService,
	notifier NotificationService,
) TaskService

NewTaskService creates a production-ready task service

type TaskStateMachine

type TaskStateMachine struct {
	// contains filtered or unexported fields
}

TaskStateMachine defines valid state transitions

func NewTaskStateMachine

func NewTaskStateMachine() *TaskStateMachine

NewTaskStateMachine creates a new state machine for task status transitions

func (*TaskStateMachine) CanTransition

func (sm *TaskStateMachine) CanTransition(from, to models.TaskStatus) bool

CanTransition checks if a status transition is valid

type TenantAwareAuthService

type TenantAwareAuthService struct {
	*auth.Service
	// contains filtered or unexported fields
}

TenantAwareAuthService extends the auth service with tenant configuration support

func NewTenantAwareAuthService

func NewTenantAwareAuthService(authService *auth.Service, tenantConfigService TenantConfigService, logger observability.Logger) *TenantAwareAuthService

NewTenantAwareAuthService creates a new tenant-aware auth service

func (*TenantAwareAuthService) CheckFeatureEnabled

func (s *TenantAwareAuthService) CheckFeatureEnabled(ctx context.Context, tenantID, feature string) (bool, error)

CheckFeatureEnabled checks if a feature is enabled for a tenant

func (*TenantAwareAuthService) GetAllowedOrigins

func (s *TenantAwareAuthService) GetAllowedOrigins(ctx context.Context, tenantID string) ([]string, error)

GetAllowedOrigins returns the allowed CORS origins for a tenant

func (*TenantAwareAuthService) GetServiceToken

func (s *TenantAwareAuthService) GetServiceToken(ctx context.Context, tenantID, provider string) (string, error)

GetServiceToken retrieves a decrypted service token for a tenant and provider

func (*TenantAwareAuthService) ValidateAPIKeyWithTenantConfig

func (s *TenantAwareAuthService) ValidateAPIKeyWithTenantConfig(ctx context.Context, apiKey string) (*auth.User, *models.TenantConfig, error)

ValidateAPIKeyWithTenantConfig validates an API key and returns both user and tenant configuration

func (*TenantAwareAuthService) ValidateWithEndpointRateLimit

func (s *TenantAwareAuthService) ValidateWithEndpointRateLimit(ctx context.Context, apiKey, endpoint string) (*auth.User, *models.EndpointRateLimit, error)

ValidateWithEndpointRateLimit validates an API key and checks endpoint-specific rate limits

type TenantConfigService

type TenantConfigService interface {
	GetConfig(ctx context.Context, tenantID string) (*models.TenantConfig, error)
	CreateConfig(ctx context.Context, config *models.TenantConfig) error
	UpdateConfig(ctx context.Context, config *models.TenantConfig) error
	DeleteConfig(ctx context.Context, tenantID string) error

	// Service token management
	SetServiceToken(ctx context.Context, tenantID, provider, token string) error
	RemoveServiceToken(ctx context.Context, tenantID, provider string) error

	// Feature flag management
	SetFeature(ctx context.Context, tenantID, feature string, value interface{}) error
	IsFeatureEnabled(ctx context.Context, tenantID, feature string) (bool, error)

	// Rate limit management
	SetRateLimitForKeyType(ctx context.Context, tenantID, keyType string, limit models.KeyTypeRateLimit) error
	SetRateLimitForEndpoint(ctx context.Context, tenantID, endpoint string, limit models.EndpointRateLimit) error
}

TenantConfigService handles tenant configuration management

func NewTenantConfigService

func NewTenantConfigService(
	repo repository.TenantConfigRepository,
	cache cache.Cache,
	encryption EncryptionService,
	logger observability.Logger,
) TenantConfigService

NewTenantConfigService creates a new tenant configuration service

type Transaction

type Transaction interface {
	Commit() error
	Rollback() error
	GetID() uuid.UUID
}

Transaction represents a distributed transaction

type TransactionParticipant

type TransactionParticipant interface {
	Prepare(ctx context.Context) error
	Commit(ctx context.Context) error
	Rollback(ctx context.Context) error
	GetID() string
}

TransactionParticipant represents a participant in a distributed transaction

type TransactionState

type TransactionState int

Transaction states

const (
	TxStateActive TransactionState = iota
	TxStatePreparing
	TxStateCommitting
	TxStateAborting
	TxStateCommitted
	TxStateAborted
)

type TransactionalWorkflowService

type TransactionalWorkflowService struct {
	// contains filtered or unexported fields
}

TransactionalWorkflowService extends the workflow service with proper transaction management

func (TransactionalWorkflowService) ArchiveCompletedExecutions

func (s TransactionalWorkflowService) ArchiveCompletedExecutions(ctx context.Context, before time.Time) (int64, error)

func (TransactionalWorkflowService) CancelExecution

func (s TransactionalWorkflowService) CancelExecution(ctx context.Context, executionID uuid.UUID, reason string) error

func (TransactionalWorkflowService) CompleteStep

func (s TransactionalWorkflowService) CompleteStep(ctx context.Context, executionID uuid.UUID, stepID string, output map[string]interface{}) error

CompleteStep marks a workflow step as completed

func (TransactionalWorkflowService) CreateBranchingPath

func (s TransactionalWorkflowService) CreateBranchingPath(ctx context.Context, executionID uuid.UUID, branchPoint string, conditions map[string]interface{}) error

func (TransactionalWorkflowService) CreateCompensation

func (s TransactionalWorkflowService) CreateCompensation(ctx context.Context, executionID uuid.UUID, failedStep string, compensation *models.CompensationAction) error

func (TransactionalWorkflowService) CreateExecution

func (s TransactionalWorkflowService) CreateExecution(ctx context.Context, execution *models.WorkflowExecution) error

CreateExecution creates a new workflow execution record

func (TransactionalWorkflowService) CreateFromTemplate

func (s TransactionalWorkflowService) CreateFromTemplate(ctx context.Context, templateID uuid.UUID, params map[string]interface{}) (*models.Workflow, error)

func (TransactionalWorkflowService) CreateWorkflow

func (s TransactionalWorkflowService) CreateWorkflow(ctx context.Context, workflow *models.Workflow) error

CreateWorkflow creates a new workflow with validation

func (TransactionalWorkflowService) CreateWorkflowTemplate

func (s TransactionalWorkflowService) CreateWorkflowTemplate(ctx context.Context, template *models.WorkflowTemplate) error

func (TransactionalWorkflowService) DeleteWorkflow

func (s TransactionalWorkflowService) DeleteWorkflow(ctx context.Context, id uuid.UUID) error

func (TransactionalWorkflowService) ExecuteCompensation

func (s TransactionalWorkflowService) ExecuteCompensation(ctx context.Context, executionID uuid.UUID) error

func (TransactionalWorkflowService) ExecuteWithCircuitBreaker

func (s TransactionalWorkflowService) ExecuteWithCircuitBreaker(ctx context.Context, operation string, fn func() (interface{}, error)) (interface{}, error)

ExecuteWithCircuitBreaker executes a function with circuit breaker protection

func (*TransactionalWorkflowService) ExecuteWorkflow

func (s *TransactionalWorkflowService) ExecuteWorkflow(ctx context.Context, workflowID uuid.UUID, input map[string]interface{}, idempotencyKey string) (*models.WorkflowExecution, error)

ExecuteWorkflow executes a workflow with full transaction support

func (*TransactionalWorkflowService) ExecuteWorkflowStep

func (s *TransactionalWorkflowService) ExecuteWorkflowStep(ctx context.Context, executionID uuid.UUID, stepID string) error

ExecuteWorkflowStep executes a single workflow step with transaction and savepoint support

func (TransactionalWorkflowService) FailStep

func (s TransactionalWorkflowService) FailStep(ctx context.Context, executionID uuid.UUID, stepID string, reason string, details map[string]interface{}) error

FailStep marks a workflow step as failed

func (TransactionalWorkflowService) GenerateWorkflowReport

func (s TransactionalWorkflowService) GenerateWorkflowReport(ctx context.Context, filters interfaces.WorkflowFilters, format string) ([]byte, error)

func (TransactionalWorkflowService) GetCurrentStep

func (s TransactionalWorkflowService) GetCurrentStep(ctx context.Context, executionID uuid.UUID) (*models.StepExecution, error)

GetCurrentStep returns the currently executing step

func (TransactionalWorkflowService) GetExecution

func (s TransactionalWorkflowService) GetExecution(ctx context.Context, executionID uuid.UUID) (*models.WorkflowExecution, error)

func (TransactionalWorkflowService) GetExecutionHistory

func (s TransactionalWorkflowService) GetExecutionHistory(ctx context.Context, workflowID uuid.UUID) ([]*models.WorkflowExecution, error)

GetExecutionHistory returns the execution history (same as GetWorkflowHistory for compatibility)

func (TransactionalWorkflowService) GetExecutionStatus

func (s TransactionalWorkflowService) GetExecutionStatus(ctx context.Context, executionID uuid.UUID) (*models.ExecutionStatus, error)

func (TransactionalWorkflowService) GetExecutionTimeline

func (s TransactionalWorkflowService) GetExecutionTimeline(ctx context.Context, executionID uuid.UUID) ([]*models.ExecutionEvent, error)

func (TransactionalWorkflowService) GetPendingApprovals

func (s TransactionalWorkflowService) GetPendingApprovals(ctx context.Context, approverID string) ([]*models.PendingApproval, error)

func (TransactionalWorkflowService) GetPendingSteps

func (s TransactionalWorkflowService) GetPendingSteps(ctx context.Context, executionID uuid.UUID) ([]*models.StepExecution, error)

GetPendingSteps returns all pending steps for parallel execution

func (TransactionalWorkflowService) GetStepExecution

func (s TransactionalWorkflowService) GetStepExecution(ctx context.Context, executionID uuid.UUID, stepID string) (*models.StepExecution, error)

GetStepExecution returns a specific step execution

func (TransactionalWorkflowService) GetWorkflow

func (s TransactionalWorkflowService) GetWorkflow(ctx context.Context, id uuid.UUID) (*models.Workflow, error)

func (TransactionalWorkflowService) GetWorkflowHistory

func (s TransactionalWorkflowService) GetWorkflowHistory(ctx context.Context, workflowID uuid.UUID, limit int, offset int) ([]*models.WorkflowExecution, error)

GetWorkflowHistory returns execution history for a workflow

func (TransactionalWorkflowService) GetWorkflowMetrics

func (s TransactionalWorkflowService) GetWorkflowMetrics(ctx context.Context, workflowID uuid.UUID) (*models.WorkflowMetrics, error)

GetWorkflowMetrics returns metrics for a workflow

func (TransactionalWorkflowService) GetWorkflowStats

func (s TransactionalWorkflowService) GetWorkflowStats(ctx context.Context, workflowID uuid.UUID, period time.Duration) (*interfaces.WorkflowStats, error)

func (TransactionalWorkflowService) GetWorkflowTemplate

func (s TransactionalWorkflowService) GetWorkflowTemplate(ctx context.Context, templateID uuid.UUID) (*models.WorkflowTemplate, error)

func (TransactionalWorkflowService) ListExecutions

func (s TransactionalWorkflowService) ListExecutions(ctx context.Context, workflowID uuid.UUID, filters interfaces.ExecutionFilters) ([]*models.WorkflowExecution, error)

func (TransactionalWorkflowService) ListWorkflowTemplates

func (s TransactionalWorkflowService) ListWorkflowTemplates(ctx context.Context) ([]*models.WorkflowTemplate, error)

func (TransactionalWorkflowService) ListWorkflows

func (s TransactionalWorkflowService) ListWorkflows(ctx context.Context, filters interfaces.WorkflowFilters) ([]*models.Workflow, error)

func (TransactionalWorkflowService) MergeBranchingPaths

func (s TransactionalWorkflowService) MergeBranchingPaths(ctx context.Context, executionID uuid.UUID, branchIDs []string) error

func (TransactionalWorkflowService) PauseExecution

func (s TransactionalWorkflowService) PauseExecution(ctx context.Context, executionID uuid.UUID, reason string) error

func (TransactionalWorkflowService) ResumeExecution

func (s TransactionalWorkflowService) ResumeExecution(ctx context.Context, executionID uuid.UUID) error

func (TransactionalWorkflowService) RetryExecution

func (s TransactionalWorkflowService) RetryExecution(ctx context.Context, executionID uuid.UUID, fromStep string) error

func (TransactionalWorkflowService) RetryStep

func (s TransactionalWorkflowService) RetryStep(ctx context.Context, executionID uuid.UUID, stepID string) error

RetryStep retries a failed workflow step

func (TransactionalWorkflowService) SearchWorkflows

func (s TransactionalWorkflowService) SearchWorkflows(ctx context.Context, query string) ([]*models.Workflow, error)

func (TransactionalWorkflowService) SimulateWorkflow

func (s TransactionalWorkflowService) SimulateWorkflow(ctx context.Context, workflow *models.Workflow, input map[string]interface{}) (*models.SimulationResult, error)

func (TransactionalWorkflowService) StartWorkflow

func (s TransactionalWorkflowService) StartWorkflow(ctx context.Context, workflowID uuid.UUID, initiatorID string, input map[string]interface{}) (*models.WorkflowExecution, error)

StartWorkflow starts a new workflow execution

func (TransactionalWorkflowService) SubmitApproval

func (s TransactionalWorkflowService) SubmitApproval(ctx context.Context, executionID uuid.UUID, stepID string, approval *models.ApprovalDecision) error

func (TransactionalWorkflowService) UpdateExecution

func (s TransactionalWorkflowService) UpdateExecution(ctx context.Context, execution *models.WorkflowExecution) error

func (TransactionalWorkflowService) UpdateWorkflow

func (s TransactionalWorkflowService) UpdateWorkflow(ctx context.Context, workflow *models.Workflow) error

func (TransactionalWorkflowService) ValidateWorkflow

func (s TransactionalWorkflowService) ValidateWorkflow(ctx context.Context, workflow *models.Workflow) error

type TxConfig

type TxConfig struct {
	Timeout        time.Duration
	IsolationLevel string
	RetryPolicy    resilience.RetryPolicy
}

TxConfig holds transaction configuration

type TxOption

type TxOption func(*TxConfig)

TxOption configures a distributed transaction

func WithTxIsolationLevel

func WithTxIsolationLevel(level string) TxOption

WithTxIsolationLevel sets the transaction isolation level

func WithTxRetryPolicy

func WithTxRetryPolicy(policy resilience.RetryPolicy) TxOption

WithTxRetryPolicy sets the transaction retry policy

func WithTxTimeout

func WithTxTimeout(timeout time.Duration) TxOption

WithTxTimeout sets the transaction timeout

type UnauthorizedError

type UnauthorizedError struct {
	Action string
	Reason string
}

UnauthorizedError provides authorization failure details

func (UnauthorizedError) Error

func (e UnauthorizedError) Error() string

type UnauthorizedLockError

type UnauthorizedLockError struct {
	DocumentID uuid.UUID
	AgentID    string
	OwnerID    string
}

func (*UnauthorizedLockError) Error

func (e *UnauthorizedLockError) Error() string

type ValidationError

type ValidationError struct {
	Field   string
	Message string
}

ValidationError represents a validation failure

func (ValidationError) Error

func (e ValidationError) Error() string

type VersionMismatchError

type VersionMismatchError struct {
	DocumentID      uuid.UUID
	ExpectedVersion int
	ActualVersion   int
}

VersionMismatchError represents a document version mismatch

func (VersionMismatchError) Error

func (e VersionMismatchError) Error() string

type WorkflowNotActiveError

type WorkflowNotActiveError struct {
	WorkflowID uuid.UUID
}

WorkflowNotActiveError represents an inactive workflow

func (WorkflowNotActiveError) Error

func (e WorkflowNotActiveError) Error() string

type WorkflowService

type WorkflowService interface {
	// Workflow management
	CreateWorkflow(ctx context.Context, workflow *models.Workflow) error
	GetWorkflow(ctx context.Context, id uuid.UUID) (*models.Workflow, error)
	UpdateWorkflow(ctx context.Context, workflow *models.Workflow) error
	DeleteWorkflow(ctx context.Context, id uuid.UUID) error
	ListWorkflows(ctx context.Context, filters interfaces.WorkflowFilters) ([]*models.Workflow, error)
	SearchWorkflows(ctx context.Context, query string) ([]*models.Workflow, error)

	// Execution management
	CreateExecution(ctx context.Context, execution *models.WorkflowExecution) error
	ExecuteWorkflow(ctx context.Context, workflowID uuid.UUID, input map[string]interface{}, idempotencyKey string) (*models.WorkflowExecution, error)
	ExecuteWorkflowStep(ctx context.Context, executionID uuid.UUID, stepID string) error
	StartWorkflow(ctx context.Context, workflowID uuid.UUID, initiatorID string, input map[string]interface{}) (*models.WorkflowExecution, error)
	GetExecution(ctx context.Context, executionID uuid.UUID) (*models.WorkflowExecution, error)
	ListExecutions(ctx context.Context, workflowID uuid.UUID, filters interfaces.ExecutionFilters) ([]*models.WorkflowExecution, error)
	GetExecutionStatus(ctx context.Context, executionID uuid.UUID) (*models.ExecutionStatus, error)
	GetExecutionTimeline(ctx context.Context, executionID uuid.UUID) ([]*models.ExecutionEvent, error)
	GetExecutionHistory(ctx context.Context, workflowID uuid.UUID) ([]*models.WorkflowExecution, error)

	// Execution control
	UpdateExecution(ctx context.Context, execution *models.WorkflowExecution) error
	PauseExecution(ctx context.Context, executionID uuid.UUID, reason string) error
	ResumeExecution(ctx context.Context, executionID uuid.UUID) error
	CancelExecution(ctx context.Context, executionID uuid.UUID, reason string) error
	RetryExecution(ctx context.Context, executionID uuid.UUID, fromStep string) error

	// Step management
	CompleteStep(ctx context.Context, executionID uuid.UUID, stepID string, output map[string]interface{}) error
	FailStep(ctx context.Context, executionID uuid.UUID, stepID string, reason string, details map[string]interface{}) error
	RetryStep(ctx context.Context, executionID uuid.UUID, stepID string) error
	GetCurrentStep(ctx context.Context, executionID uuid.UUID) (*models.StepExecution, error)
	GetPendingSteps(ctx context.Context, executionID uuid.UUID) ([]*models.StepExecution, error)
	GetStepExecution(ctx context.Context, executionID uuid.UUID, stepID string) (*models.StepExecution, error)

	// Approval management
	SubmitApproval(ctx context.Context, executionID uuid.UUID, stepID string, approval *models.ApprovalDecision) error
	GetPendingApprovals(ctx context.Context, approverID string) ([]*models.PendingApproval, error)

	// Template management
	CreateWorkflowTemplate(ctx context.Context, template *models.WorkflowTemplate) error
	GetWorkflowTemplate(ctx context.Context, templateID uuid.UUID) (*models.WorkflowTemplate, error)
	ListWorkflowTemplates(ctx context.Context) ([]*models.WorkflowTemplate, error)
	CreateFromTemplate(ctx context.Context, templateID uuid.UUID, params map[string]interface{}) (*models.Workflow, error)

	// Validation and simulation
	ValidateWorkflow(ctx context.Context, workflow *models.Workflow) error
	SimulateWorkflow(ctx context.Context, workflow *models.Workflow, input map[string]interface{}) (*models.SimulationResult, error)

	// Analytics and reporting
	GetWorkflowStats(ctx context.Context, workflowID uuid.UUID, period time.Duration) (*interfaces.WorkflowStats, error)
	GetWorkflowHistory(ctx context.Context, workflowID uuid.UUID, limit int, offset int) ([]*models.WorkflowExecution, error)
	GetWorkflowMetrics(ctx context.Context, workflowID uuid.UUID) (*models.WorkflowMetrics, error)
	GenerateWorkflowReport(ctx context.Context, filters interfaces.WorkflowFilters, format string) ([]byte, error)

	// Maintenance
	ArchiveCompletedExecutions(ctx context.Context, before time.Time) (int64, error)

	// Advanced features
	CreateBranchingPath(ctx context.Context, executionID uuid.UUID, branchPoint string, conditions map[string]interface{}) error
	MergeBranchingPaths(ctx context.Context, executionID uuid.UUID, branchIDs []string) error
	CreateCompensation(ctx context.Context, executionID uuid.UUID, failedStep string, compensation *models.CompensationAction) error
	ExecuteCompensation(ctx context.Context, executionID uuid.UUID) error
}

WorkflowService orchestrates multi-agent workflows with saga pattern

func NewTransactionalWorkflowService

func NewTransactionalWorkflowService(
	baseService *workflowService,
	uow database.UnitOfWork,
	txManager repository.TransactionManager,
	compensationMgr repository.CompensationManager,
) WorkflowService

NewTransactionalWorkflowService creates a workflow service with transaction support

func NewWorkflowService

func NewWorkflowService(
	config ServiceConfig,
	repo interfaces.WorkflowRepository,
	taskService TaskService,
	agentService AgentService,
	notifier NotificationService,
) WorkflowService

NewWorkflowService creates a production-ready workflow service

type WorkspaceActivity

type WorkspaceActivity struct {
	ID           uuid.UUID              `json:"id" db:"id"`
	WorkspaceID  uuid.UUID              `json:"workspace_id" db:"workspace_id"`
	AgentID      string                 `json:"agent_id" db:"agent_id"`
	ActivityType string                 `json:"activity_type" db:"activity_type"`
	ResourceType string                 `json:"resource_type,omitempty" db:"resource_type"`
	ResourceID   string                 `json:"resource_id,omitempty" db:"resource_id"`
	Action       string                 `json:"action" db:"action"`
	Metadata     map[string]interface{} `json:"metadata" db:"metadata"`
	IPAddress    string                 `json:"ip_address,omitempty" db:"ip_address"`
	UserAgent    string                 `json:"user_agent,omitempty" db:"user_agent"`
	CreatedAt    time.Time              `json:"created_at" db:"created_at"`
}

WorkspaceActivity represents an activity log entry

type WorkspaceCreatedEvent

type WorkspaceCreatedEvent struct {
	WorkspaceID uuid.UUID
	TenantID    uuid.UUID
	CreatedBy   string
	Timestamp   time.Time
}

Event types for workspace operations

type WorkspaceDeletedEvent

type WorkspaceDeletedEvent struct {
	WorkspaceID uuid.UUID
	TenantID    uuid.UUID
	DeletedBy   string
	Timestamp   time.Time
}

type WorkspaceFullError

type WorkspaceFullError struct {
	WorkspaceID uuid.UUID
	MaxAgents   int
}

WorkspaceFullError represents a full workspace

func (WorkspaceFullError) Error

func (e WorkspaceFullError) Error() string

type WorkspaceMember

type WorkspaceMember struct {
	WorkspaceID uuid.UUID              `json:"workspace_id" db:"workspace_id"`
	AgentID     string                 `json:"agent_id" db:"agent_id"`
	Role        WorkspaceMemberRole    `json:"role" db:"role"`
	Permissions map[string]interface{} `json:"permissions" db:"permissions"`
	JoinedAt    time.Time              `json:"joined_at" db:"joined_at"`
	JoinedBy    string                 `json:"joined_by" db:"joined_by"`
	LastActive  time.Time              `json:"last_active" db:"last_active"`
	Metadata    map[string]interface{} `json:"metadata" db:"metadata"`
}

WorkspaceMember represents a member of a workspace with their role

type WorkspaceMemberRole

type WorkspaceMemberRole string

WorkspaceMemberRole represents the role of a member in a workspace

const (
	WorkspaceMemberRoleOwner  WorkspaceMemberRole = "owner"
	WorkspaceMemberRoleAdmin  WorkspaceMemberRole = "admin"
	WorkspaceMemberRoleMember WorkspaceMemberRole = "member"
	WorkspaceMemberRoleViewer WorkspaceMemberRole = "viewer"
)

type WorkspaceMemberService

type WorkspaceMemberService interface {
	// Member management
	AddMember(ctx context.Context, workspaceID uuid.UUID, agentID string, role WorkspaceMemberRole) error
	UpdateMemberRole(ctx context.Context, workspaceID uuid.UUID, agentID string, newRole WorkspaceMemberRole) error
	RemoveMember(ctx context.Context, workspaceID uuid.UUID, agentID string) error
	GetMember(ctx context.Context, workspaceID uuid.UUID, agentID string) (*WorkspaceMember, error)
	ListMembers(ctx context.Context, workspaceID uuid.UUID) ([]*WorkspaceMember, error)

	// Role-based access control
	CheckPermission(ctx context.Context, workspaceID uuid.UUID, agentID string, permission string) (bool, error)
	GetMemberRole(ctx context.Context, workspaceID uuid.UUID, agentID string) (WorkspaceMemberRole, error)

	// Activity tracking
	LogActivity(ctx context.Context, activity *WorkspaceActivity) error
	GetActivities(ctx context.Context, workspaceID uuid.UUID, limit int) ([]*WorkspaceActivity, error)

	// Workspace quotas
	CheckMemberQuota(ctx context.Context, workspaceID uuid.UUID) error
	UpdateLastActive(ctx context.Context, workspaceID uuid.UUID, agentID string) error
}

WorkspaceMemberService handles workspace member management

func NewWorkspaceMemberService

func NewWorkspaceMemberService(
	config ServiceConfig,
	txManager repository.TransactionManager,
	eventPublisher events.Publisher,
	authorizer auth.Authorizer,
) WorkspaceMemberService

NewWorkspaceMemberService creates a new workspace member service

type WorkspaceService

type WorkspaceService interface {
	// Workspace lifecycle
	Create(ctx context.Context, workspace *models.Workspace) error
	Get(ctx context.Context, id uuid.UUID) (*models.Workspace, error)
	Update(ctx context.Context, workspace *models.Workspace) error
	Delete(ctx context.Context, id uuid.UUID) error
	Archive(ctx context.Context, id uuid.UUID) error

	// Member management with permissions
	AddMember(ctx context.Context, member *models.WorkspaceMember) error
	RemoveMember(ctx context.Context, workspaceID uuid.UUID, agentID string) error
	UpdateMemberRole(ctx context.Context, workspaceID uuid.UUID, agentID string, role string) error
	UpdateMemberPermissions(ctx context.Context, workspaceID uuid.UUID, agentID string, permissions []string) error
	ListMembers(ctx context.Context, workspaceID uuid.UUID) ([]*models.WorkspaceMember, error)
	GetMemberActivity(ctx context.Context, workspaceID uuid.UUID) ([]*models.MemberActivity, error)

	// State management with CRDT
	GetState(ctx context.Context, workspaceID uuid.UUID) (*models.WorkspaceState, error)
	UpdateState(ctx context.Context, workspaceID uuid.UUID, operation *models.StateOperation) error
	MergeState(ctx context.Context, workspaceID uuid.UUID, remoteState *models.WorkspaceState) error
	GetStateHistory(ctx context.Context, workspaceID uuid.UUID, limit int) ([]*models.StateSnapshot, error)
	RestoreState(ctx context.Context, workspaceID uuid.UUID, snapshotID uuid.UUID) error

	// Document management
	CreateDocument(ctx context.Context, doc *models.SharedDocument) error
	GetDocument(ctx context.Context, docID uuid.UUID) (*models.SharedDocument, error)
	UpdateDocument(ctx context.Context, docID uuid.UUID, operation *collaboration.DocumentOperation) error
	ListDocuments(ctx context.Context, workspaceID uuid.UUID) ([]*models.SharedDocument, error)

	// Real-time collaboration
	BroadcastToMembers(ctx context.Context, workspaceID uuid.UUID, message interface{}) error
	SendToMember(ctx context.Context, workspaceID uuid.UUID, agentID string, message interface{}) error
	GetPresence(ctx context.Context, workspaceID uuid.UUID) ([]*models.MemberPresence, error)
	UpdatePresence(ctx context.Context, workspaceID uuid.UUID, agentID string, status string) error

	// Search and discovery
	ListByAgent(ctx context.Context, agentID string) ([]*models.Workspace, error)
	SearchWorkspaces(ctx context.Context, query string, filters interfaces.WorkspaceFilters) ([]*models.Workspace, error)
	GetRecommendedWorkspaces(ctx context.Context, agentID string) ([]*models.Workspace, error)

	// Analytics
	GetWorkspaceStats(ctx context.Context, workspaceID uuid.UUID) (*models.WorkspaceStats, error)
	GetCollaborationMetrics(ctx context.Context, workspaceID uuid.UUID, period time.Duration) (*models.CollaborationMetrics, error)
}

WorkspaceService manages shared workspaces with distributed state

func NewWorkspaceService

func NewWorkspaceService(
	config ServiceConfig,
	workspaceRepo interfaces.WorkspaceRepository,
	documentRepo interfaces.DocumentRepository,
	cache cache.Cache,
) WorkspaceService

NewWorkspaceService creates a production-grade workspace service

type WorkspaceUpdatedEvent

type WorkspaceUpdatedEvent struct {
	WorkspaceID uuid.UUID
	TenantID    uuid.UUID
	UpdatedBy   string
	Version     int
	Timestamp   time.Time
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL