Documentation
¶
Overview ¶
Package agent provides the core orchestration engine for devloop.
The agent package contains the orchestrator implementations that handle: - File system watching and event processing - Rule execution and process management - Gateway communication for distributed setups - Logging and output management
Key Components ¶
The main orchestrator interface provides methods for:
type Orchestrator interface {
Start() error
Stop() error
GetConfig() *pb.Config
TriggerRule(ruleName string) error
GetRuleStatus(ruleName string) (*pb.RuleStatus, bool)
GetWatchedPaths() []string
ReadFileContent(path string) ([]byte, error)
StreamLogs(ruleName string, filter string, stream pb.AgentService_StreamLogsClientServer) error
SetGlobalDebounceDelay(duration time.Duration)
SetVerbose(verbose bool)
}
Usage ¶
Create and start an orchestrator:
orchestrator := agent.NewOrchestrator("config.yaml", "")
if err := orchestrator.Start(); err != nil {
log.Fatal(err)
}
defer orchestrator.Stop()
Configuration ¶
The orchestrator is configured via YAML files with rules that define: - File patterns to watch - Commands to execute when files change - Debounce settings and execution options
Example configuration:
settings:
project_id: "my-project"
prefix_logs: true
rules:
- name: "build"
watch:
- action: include
patterns: ["**/*.go"]
commands:
- "go build ."
Package gateway provides the central hub for managing multiple devloop agents.
The gateway package implements a distributed architecture where multiple devloop agents can connect to a central gateway service. This enables: - Centralized monitoring of multiple projects - Unified API access to all connected agents - Cross-project coordination and management
Gateway Service ¶
The gateway service provides both gRPC and HTTP endpoints for: - Agent registration and communication - Client API access for external tools - Real-time log streaming - Project status monitoring
Usage ¶
Start a gateway service:
gateway := gateway.NewGatewayService(orchestrator)
if err := gateway.Start(grpcPort, httpPort); err != nil {
log.Fatal(err)
}
defer gateway.Stop()
Connect agents to the gateway:
devloop --mode agent --gateway-addr localhost:50051 -c project-a/.devloop.yaml devloop --mode agent --gateway-addr localhost:50051 -c project-b/.devloop.yaml
Configuration Types ¶
The package defines configuration structures for devloop projects: - Config: Main configuration structure - Rule: Individual automation rules - WatchConfig: File watching patterns - Settings: Global project settings
Example Gateway Setup ¶
// Start the central gateway devloop --mode gateway --http-port 8080 --grpc-port 50051 // Connect individual project agents cd project-a && devloop --mode agent --gateway-addr localhost:50051 cd project-b && devloop --mode agent --gateway-addr localhost:50051
Index ¶
- func LoadConfig(configPath string) (*pb.Config, error)
- func MatcherMatches(m *pb.RuleMatcher, filePath string, rule *pb.Rule, configPath string) bool
- func RuleMatches(r *pb.Rule, filePath string, configPath string) *pb.RuleMatcher
- type AgentService
- func (s *AgentService) GetConfig(ctx context.Context, req *protos.GetConfigRequest) (resp *protos.GetConfigResponse, err error)
- func (s *AgentService) GetRule(ctx context.Context, req *protos.GetRuleRequest) (resp *protos.GetRuleResponse, err error)
- func (s *AgentService) ListWatchedPaths(ctx context.Context, req *protos.ListWatchedPathsRequest) (resp *protos.ListWatchedPathsResponse, err error)
- func (s *AgentService) StreamLogs(req *protos.StreamLogsRequest, ...) error
- func (s *AgentService) TriggerRule(ctx context.Context, req *protos.TriggerRuleRequest) (resp *protos.TriggerRuleResponse, err error)
- type CycleBreaker
- func (cb *CycleBreaker) CleanupOldData()
- func (cb *CycleBreaker) DisableRule(ruleName string, duration time.Duration)
- func (cb *CycleBreaker) GenerateCycleResolutionSuggestions(ruleName string, cycleType string) []string
- func (cb *CycleBreaker) GetDisabledRules() map[string]time.Time
- func (cb *CycleBreaker) GetEmergencyBreakCount(ruleName string) int
- func (cb *CycleBreaker) IsRuleDisabled(ruleName string) bool
- func (cb *CycleBreaker) TriggerEmergencyBreak(ruleName string)
- type DefaultScheduler
- type FileModificationTracker
- func (fmt *FileModificationTracker) CleanupOldModifications(maxAge time.Duration)
- func (fmt *FileModificationTracker) GetModificationCount(filePath string, windowSeconds uint32) int
- func (fmt *FileModificationTracker) GetThrashingFiles(windowSeconds, threshold uint32) []string
- func (fmt *FileModificationTracker) IsFileThrashing(filePath string, windowSeconds, threshold uint32) bool
- func (fmt *FileModificationTracker) RecordModification(filePath string)
- type LROManager
- type LROProcess
- type Orchestrator
- func (o *Orchestrator) GetConfig() *pb.Config
- func (o *Orchestrator) GetRuleRunner(ruleName string) *RuleRunner
- func (o *Orchestrator) GetRuleStatus(ruleName string) (rule *pb.Rule, status *pb.RuleStatus, ok bool)
- func (o *Orchestrator) GetWatchedPaths() []string
- func (o *Orchestrator) ProjectRoot() string
- func (o *Orchestrator) ReadFileContent(path string) ([]byte, error)
- func (o *Orchestrator) SetGlobalDebounceDelay(duration time.Duration)
- func (o *Orchestrator) SetRuleDebounceDelay(ruleName string, duration time.Duration) error
- func (o *Orchestrator) SetRuleVerbose(ruleName string, verbose bool) error
- func (o *Orchestrator) SetVerbose(verbose bool)
- func (o *Orchestrator) Start() error
- func (o *Orchestrator) Stop() error
- func (o *Orchestrator) StreamLogs(ruleName string, filter string, timeoutSeconds int64, ...) error
- func (o *Orchestrator) TriggerRule(ruleName string) error
- func (o *Orchestrator) ValidateConfig() error
- type PrefixWriter
- type ProcessInfo
- type ProcessManager
- type Rule
- type RuleJob
- type RuleRunner
- func (r *RuleRunner) CleanupTriggerHistory()
- func (r *RuleRunner) Execute() error
- func (r *RuleRunner) GetRule() *pb.Rule
- func (r *RuleRunner) GetStatus() *pb.RuleStatus
- func (r *RuleRunner) GetTriggerCount(duration time.Duration) int
- func (r *RuleRunner) GetTriggerRate() float64
- func (r *RuleRunner) IsRunning() bool
- func (r *RuleRunner) Restart() error
- func (r *RuleRunner) SetDebounceDelay(duration time.Duration)
- func (r *RuleRunner) SetVerbose(verbose bool)
- func (r *RuleRunner) Start() error
- func (r *RuleRunner) Stop() error
- func (r *RuleRunner) TerminateProcesses() error
- func (r *RuleRunner) TriggerDebounced()
- func (r *RuleRunner) TriggerDebouncedWithOptions(bypassRateLimit bool)
- func (r *RuleRunner) UpdateStatus(isRunning bool, buildStatus string)
- type RuleStatus
- type Scheduler
- type Settings
- type TestConfig
- func (tc *TestConfig) AddLRORule(name string, commands []string) *TestConfig
- func (tc *TestConfig) AddRule(rule TestRule) *TestConfig
- func (tc *TestConfig) AddShortRunningRule(name string, commands []string) *TestConfig
- func (tc *TestConfig) ToYAML() string
- func (tc *TestConfig) WithMaxWorkers(max int) *TestConfig
- type TestHelper
- func (th *TestHelper) AssertLROProcessCount(lroManager *LROManager, expectedCount int)
- func (th *TestHelper) AssertRuleStatus(orchestrator *Orchestrator, ruleName string, expectedRunning bool, ...)
- func (th *TestHelper) WithOrchestrator(config *TestConfig, testFunc func(*TestOrchestrator))
- func (th *TestHelper) WithRunningOrchestrator(config *TestConfig, testFunc func(*TestOrchestrator))
- type TestOrchestrator
- type TestRule
- type TriggerChain
- type TriggerEvent
- type TriggerTracker
- func (t *TriggerTracker) CleanupOldTriggers(duration time.Duration)
- func (t *TriggerTracker) GetBackoffLevel() int
- func (t *TriggerTracker) GetLastTrigger() *time.Time
- func (t *TriggerTracker) GetTriggerCount(duration time.Duration) int
- func (t *TriggerTracker) GetTriggerRate() float64
- func (t *TriggerTracker) IsInBackoff() bool
- func (t *TriggerTracker) RecordTrigger()
- func (t *TriggerTracker) ResetBackoff()
- func (t *TriggerTracker) SetBackoff()
- type Watcher
- type Worker
- type WorkerPool
- func (wp *WorkerPool) CompleteJob(job *RuleJob, worker *Worker)
- func (wp *WorkerPool) EnqueueJob(job *RuleJob)
- func (wp *WorkerPool) GetExecutingRules() map[string]int
- func (wp *WorkerPool) GetPendingRules() []string
- func (wp *WorkerPool) GetStatus() (active int, capacity int, pending int, executing int)
- func (wp *WorkerPool) Start() error
- func (wp *WorkerPool) Stop() error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func LoadConfig ¶
LoadConfig reads and unmarshals the .devloop.yaml configuration file, resolving all relative watch paths to be absolute from the config file's location.
func MatcherMatches ¶ added in v0.0.42
func RuleMatches ¶ added in v0.0.42
Matches checks if the given file path matches the rule's watch criteria.
Types ¶
type AgentService ¶ added in v0.0.42
type AgentService struct {
protos.UnimplementedAgentServiceServer
// contains filtered or unexported fields
}
func NewAgentService ¶ added in v0.0.42
func NewAgentService(orch *Orchestrator) *AgentService
func (*AgentService) GetConfig ¶ added in v0.0.42
func (s *AgentService) GetConfig(ctx context.Context, req *protos.GetConfigRequest) (resp *protos.GetConfigResponse, err error)
Get the configs of this agent
func (*AgentService) GetRule ¶ added in v0.0.42
func (s *AgentService) GetRule(ctx context.Context, req *protos.GetRuleRequest) (resp *protos.GetRuleResponse, err error)
func (*AgentService) ListWatchedPaths ¶ added in v0.0.42
func (s *AgentService) ListWatchedPaths(ctx context.Context, req *protos.ListWatchedPathsRequest) (resp *protos.ListWatchedPathsResponse, err error)
func (*AgentService) StreamLogs ¶ added in v0.0.42
func (s *AgentService) StreamLogs(req *protos.StreamLogsRequest, stream grpc.ServerStreamingServer[protos.StreamLogsResponse]) error
func (*AgentService) TriggerRule ¶ added in v0.0.42
func (s *AgentService) TriggerRule(ctx context.Context, req *protos.TriggerRuleRequest) (resp *protos.TriggerRuleResponse, err error)
type CycleBreaker ¶ added in v0.0.51
type CycleBreaker struct {
// contains filtered or unexported fields
}
CycleBreaker manages advanced cycle breaking and resolution mechanisms
func NewCycleBreaker ¶ added in v0.0.51
func NewCycleBreaker() *CycleBreaker
NewCycleBreaker creates a new cycle breaker
func (*CycleBreaker) CleanupOldData ¶ added in v0.0.51
func (cb *CycleBreaker) CleanupOldData()
CleanupOldData removes old emergency break and disable records
func (*CycleBreaker) DisableRule ¶ added in v0.0.51
func (cb *CycleBreaker) DisableRule(ruleName string, duration time.Duration)
DisableRule temporarily disables a rule for cycle breaking
func (*CycleBreaker) GenerateCycleResolutionSuggestions ¶ added in v0.0.51
func (cb *CycleBreaker) GenerateCycleResolutionSuggestions(ruleName string, cycleType string) []string
GenerateCycleResolutionSuggestions generates suggestions for resolving detected cycles
func (*CycleBreaker) GetDisabledRules ¶ added in v0.0.51
func (cb *CycleBreaker) GetDisabledRules() map[string]time.Time
GetDisabledRules returns a map of currently disabled rules and their disable times
func (*CycleBreaker) GetEmergencyBreakCount ¶ added in v0.0.51
func (cb *CycleBreaker) GetEmergencyBreakCount(ruleName string) int
GetEmergencyBreakCount returns the number of emergency breaks for a rule
func (*CycleBreaker) IsRuleDisabled ¶ added in v0.0.51
func (cb *CycleBreaker) IsRuleDisabled(ruleName string) bool
IsRuleDisabled checks if a rule is currently disabled
func (*CycleBreaker) TriggerEmergencyBreak ¶ added in v0.0.51
func (cb *CycleBreaker) TriggerEmergencyBreak(ruleName string)
TriggerEmergencyBreak triggers an emergency cycle break for a rule
type DefaultScheduler ¶ added in v0.0.71
type DefaultScheduler struct {
// contains filtered or unexported fields
}
DefaultScheduler routes rules to appropriate execution engines
func NewDefaultScheduler ¶ added in v0.0.71
func NewDefaultScheduler(orchestrator *Orchestrator, workerPool *WorkerPool, lroManager *LROManager) *DefaultScheduler
NewDefaultScheduler creates a new scheduler
func (*DefaultScheduler) ScheduleRule ¶ added in v0.0.71
func (s *DefaultScheduler) ScheduleRule(event *TriggerEvent) error
ScheduleRule routes the rule to the appropriate execution engine
func (*DefaultScheduler) Start ¶ added in v0.0.71
func (s *DefaultScheduler) Start() error
Start starts the scheduler (currently no background processes needed)
func (*DefaultScheduler) Stop ¶ added in v0.0.71
func (s *DefaultScheduler) Stop() error
Stop stops the scheduler
type FileModificationTracker ¶ added in v0.0.51
type FileModificationTracker struct {
// contains filtered or unexported fields
}
FileModificationTracker tracks file modification frequencies for thrashing detection
func NewFileModificationTracker ¶ added in v0.0.51
func NewFileModificationTracker() *FileModificationTracker
NewFileModificationTracker creates a new file modification tracker
func (*FileModificationTracker) CleanupOldModifications ¶ added in v0.0.51
func (fmt *FileModificationTracker) CleanupOldModifications(maxAge time.Duration)
CleanupOldModifications removes old modification records
func (*FileModificationTracker) GetModificationCount ¶ added in v0.0.51
func (fmt *FileModificationTracker) GetModificationCount(filePath string, windowSeconds uint32) int
GetModificationCount returns the number of modifications within the time window
func (*FileModificationTracker) GetThrashingFiles ¶ added in v0.0.51
func (fmt *FileModificationTracker) GetThrashingFiles(windowSeconds, threshold uint32) []string
GetThrashingFiles returns a list of files that are currently thrashing
func (*FileModificationTracker) IsFileThrashing ¶ added in v0.0.51
func (fmt *FileModificationTracker) IsFileThrashing(filePath string, windowSeconds, threshold uint32) bool
IsFileThrashing checks if a file is being modified too frequently
func (*FileModificationTracker) RecordModification ¶ added in v0.0.51
func (fmt *FileModificationTracker) RecordModification(filePath string)
RecordModification records a file modification
type LROManager ¶ added in v0.0.71
type LROManager struct {
// contains filtered or unexported fields
}
LROManager manages long-running operation processes
func NewLROManager ¶ added in v0.0.71
func NewLROManager(orchestrator *Orchestrator) *LROManager
NewLROManager creates a new LRO manager
func (*LROManager) GetRunningProcesses ¶ added in v0.0.71
func (lro *LROManager) GetRunningProcesses() map[string]int
GetRunningProcesses returns a map of currently running LRO processes
func (*LROManager) IsProcessRunning ¶ added in v0.0.71
func (lro *LROManager) IsProcessRunning(ruleName string) bool
IsProcessRunning checks if a process is running for the given rule
func (*LROManager) RestartProcess ¶ added in v0.0.71
func (lro *LROManager) RestartProcess(rule *pb.Rule, triggerType string) error
RestartProcess kills any existing process for the rule and starts a new one
func (*LROManager) Stop ¶ added in v0.0.71
func (lro *LROManager) Stop() error
Stop gracefully stops all LRO processes
type LROProcess ¶ added in v0.0.71
type LROProcess struct {
// contains filtered or unexported fields
}
LROProcess represents a running long-running operation
type Orchestrator ¶
type Orchestrator struct {
ConfigPath string
Config *pb.Config
Verbose bool
LogManager *utils.LogManager
ColorManager *utils.ColorManager
// contains filtered or unexported fields
}
Orchestrator manages file watching and delegates execution to RuleRunners
func NewOrchestrator ¶
func NewOrchestrator(configPath string) (*Orchestrator, error)
NewOrchestrator creates a new orchestrator instance for managing file watching and rule execution.
The orchestrator handles: - Loading and validating configuration from configPath - Setting up file system watching for specified patterns - Managing rule execution and process lifecycle - Optional gateway communication if gatewayAddr is provided
Parameters:
- configPath: Path to the .devloop.yaml configuration file
- gatewayAddr: Optional gateway address for distributed mode (empty for standalone)
Returns an orchestrator instance ready to be started, or an error if configuration loading or initialization fails.
Example:
// Standalone mode
orchestrator, err := NewOrchestrator(".devloop.yaml", "")
// Agent mode (connect to gateway)
orchestrator, err := NewOrchestrator(".devloop.yaml", "localhost:50051")
func NewOrchestratorForTesting ¶
func NewOrchestratorForTesting(configPath string) (*Orchestrator, error)
NewOrchestratorForTesting creates a new V2 orchestrator for testing
func (*Orchestrator) GetConfig ¶
func (o *Orchestrator) GetConfig() *pb.Config
GetConfig returns the orchestrator's configuration.
func (*Orchestrator) GetRuleRunner ¶ added in v0.0.71
func (o *Orchestrator) GetRuleRunner(ruleName string) *RuleRunner
GetRuleRunner returns the RuleRunner for a given rule name This is used by execution engines to update rule status
func (*Orchestrator) GetRuleStatus ¶
func (o *Orchestrator) GetRuleStatus(ruleName string) (rule *pb.Rule, status *pb.RuleStatus, ok bool)
GetRuleStatus returns the status of a specific rule
func (*Orchestrator) GetWatchedPaths ¶
func (o *Orchestrator) GetWatchedPaths() []string
GetWatchedPaths returns a unique list of all paths being watched by any rule
func (*Orchestrator) ProjectRoot ¶
func (o *Orchestrator) ProjectRoot() string
ProjectRoot returns the project root directory
func (*Orchestrator) ReadFileContent ¶
func (o *Orchestrator) ReadFileContent(path string) ([]byte, error)
ReadFileContent reads and returns the content of a specified file
func (*Orchestrator) SetGlobalDebounceDelay ¶
func (o *Orchestrator) SetGlobalDebounceDelay(duration time.Duration)
SetGlobalDebounceDelay sets the default debounce delay for all rules
func (*Orchestrator) SetRuleDebounceDelay ¶
func (o *Orchestrator) SetRuleDebounceDelay(ruleName string, duration time.Duration) error
SetRuleDebounceDelay sets the debounce delay for a specific rule
func (*Orchestrator) SetRuleVerbose ¶
func (o *Orchestrator) SetRuleVerbose(ruleName string, verbose bool) error
SetRuleVerbose sets the verbose flag for a specific rule
func (*Orchestrator) SetVerbose ¶
func (o *Orchestrator) SetVerbose(verbose bool)
SetVerbose sets the global verbose flag
func (*Orchestrator) Start ¶
func (o *Orchestrator) Start() error
Start begins file watching and initializes all RuleRunners
func (*Orchestrator) Stop ¶
func (o *Orchestrator) Stop() error
Stop gracefully shuts down the orchestrator
func (*Orchestrator) StreamLogs ¶
func (o *Orchestrator) StreamLogs(ruleName string, filter string, timeoutSeconds int64, writer *gocurrent.Writer[*pb.StreamLogsResponse]) error
StreamLogs streams the logs for a given rule to the provided Writer
func (*Orchestrator) TriggerRule ¶
func (o *Orchestrator) TriggerRule(ruleName string) error
TriggerRule manually triggers the execution of a specific rule
func (*Orchestrator) ValidateConfig ¶ added in v0.0.51
func (o *Orchestrator) ValidateConfig() error
ValidateConfig performs static cycle detection and validation on the loaded configuration
type PrefixWriter ¶
type PrefixWriter struct {
// contains filtered or unexported fields
}
PrefixWriter is an io.Writer that adds a prefix to each line of output.
type ProcessInfo ¶ added in v0.0.71
type ProcessInfo struct {
// contains filtered or unexported fields
}
ProcessInfo contains information about a running process
type ProcessManager ¶ added in v0.0.71
type ProcessManager struct {
// contains filtered or unexported fields
}
ProcessManager handles reliable process termination
func NewProcessManager ¶ added in v0.0.71
func NewProcessManager(verbose bool) *ProcessManager
NewProcessManager creates a new process manager
func (*ProcessManager) GetProcessInfo ¶ added in v0.0.71
func (pm *ProcessManager) GetProcessInfo(cmd *exec.Cmd, ruleName string) *ProcessInfo
GetProcessInfo extracts process information from an exec.Cmd
func (*ProcessManager) TerminateProcess ¶ added in v0.0.71
func (pm *ProcessManager) TerminateProcess(processInfo *ProcessInfo) error
TerminateProcess reliably terminates a process and all its children
type Rule ¶ added in v0.0.42
Rule defines a single watch-and-run rule.
func (*Rule) GetColor ¶ added in v0.0.42
GetColor returns the rule color (implements ColorRule interface)
type RuleJob ¶ added in v0.0.64
type RuleJob struct {
Rule *pb.Rule
TriggerType string // "file_change", "manual", "startup"
Context context.Context
JobID string // Unique job identifier
CreatedAt time.Time
}
RuleJob represents a job to execute a rule
type RuleRunner ¶
type RuleRunner struct {
// contains filtered or unexported fields
}
RuleRunner manages the execution lifecycle of a single rule
func NewRuleRunner ¶
func NewRuleRunner(rule *pb.Rule, orchestrator *Orchestrator) *RuleRunner
NewRuleRunner creates a new RuleRunner for the given rule
func (*RuleRunner) CleanupTriggerHistory ¶ added in v0.0.51
func (r *RuleRunner) CleanupTriggerHistory()
CleanupTriggerHistory removes old trigger records to prevent memory growth
func (*RuleRunner) Execute ¶
func (r *RuleRunner) Execute() error
Execute sends a trigger event to the scheduler for execution
func (*RuleRunner) GetRule ¶
func (r *RuleRunner) GetRule() *pb.Rule
GetRule returns the rule configuration
func (*RuleRunner) GetStatus ¶
func (r *RuleRunner) GetStatus() *pb.RuleStatus
GetStatus returns a copy of the current status
func (*RuleRunner) GetTriggerCount ¶ added in v0.0.51
func (r *RuleRunner) GetTriggerCount(duration time.Duration) int
GetTriggerCount returns the trigger count within the specified duration
func (*RuleRunner) GetTriggerRate ¶ added in v0.0.51
func (r *RuleRunner) GetTriggerRate() float64
GetTriggerRate returns the current trigger rate for this rule
func (*RuleRunner) IsRunning ¶
func (r *RuleRunner) IsRunning() bool
IsRunning returns true if any commands are currently running
func (*RuleRunner) SetDebounceDelay ¶
func (r *RuleRunner) SetDebounceDelay(duration time.Duration)
SetDebounceDelay sets the debounce delay for this rule
func (*RuleRunner) SetVerbose ¶
func (r *RuleRunner) SetVerbose(verbose bool)
SetVerbose sets the verbose flag for this rule
func (*RuleRunner) Start ¶
func (r *RuleRunner) Start() error
Start begins monitoring for this rule (non-blocking)
func (*RuleRunner) Stop ¶
func (r *RuleRunner) Stop() error
Stop terminates all processes and cleans up
func (*RuleRunner) TerminateProcesses ¶
func (r *RuleRunner) TerminateProcesses() error
TerminateProcesses terminates all running processes for this rule
func (*RuleRunner) TriggerDebounced ¶
func (r *RuleRunner) TriggerDebounced()
TriggerDebounced triggers execution after debounce period
func (*RuleRunner) TriggerDebouncedWithOptions ¶ added in v0.0.51
func (r *RuleRunner) TriggerDebouncedWithOptions(bypassRateLimit bool)
TriggerDebouncedWithOptions triggers execution after debounce period with options
func (*RuleRunner) UpdateStatus ¶ added in v0.0.71
func (r *RuleRunner) UpdateStatus(isRunning bool, buildStatus string)
UpdateStatus allows external systems to update this rule's execution status This is called by execution engines (WorkerPool, LROManager) to keep RuleRunner in sync
type RuleStatus ¶ added in v0.0.42
type RuleStatus struct {
*pb.RuleStatus
}
RuleStatus defines the status of a rule.
type Scheduler ¶ added in v0.0.71
type Scheduler interface {
ScheduleRule(event *TriggerEvent) error
Start() error
Stop() error
}
Scheduler interface for routing rule execution
type Settings ¶ added in v0.0.42
func (*Settings) GetColorLogs ¶ added in v0.0.42
GetColorLogs returns whether color logs are enabled (implements ColorSettings interface)
func (*Settings) GetColorScheme ¶ added in v0.0.42
GetColorScheme returns the color scheme (implements ColorSettings interface)
func (*Settings) GetCustomColors ¶ added in v0.0.42
GetCustomColors returns the custom color mappings (implements ColorSettings interface)
type TestConfig ¶ added in v0.0.71
type TestConfig struct {
ProjectID string
Verbose bool
Rules []TestRule
Settings map[string]interface{}
}
TestConfig represents a test configuration builder
func NewTestConfig ¶ added in v0.0.71
func NewTestConfig(projectID string) *TestConfig
NewTestConfig creates a new test configuration builder
func (*TestConfig) AddLRORule ¶ added in v0.0.71
func (tc *TestConfig) AddLRORule(name string, commands []string) *TestConfig
AddLRORule adds a typical long-running rule
func (*TestConfig) AddRule ¶ added in v0.0.71
func (tc *TestConfig) AddRule(rule TestRule) *TestConfig
AddRule adds a rule to the test configuration
func (*TestConfig) AddShortRunningRule ¶ added in v0.0.71
func (tc *TestConfig) AddShortRunningRule(name string, commands []string) *TestConfig
AddShortRunningRule adds a typical short-running rule
func (*TestConfig) ToYAML ¶ added in v0.0.71
func (tc *TestConfig) ToYAML() string
ToYAML converts the test config to YAML string
func (*TestConfig) WithMaxWorkers ¶ added in v0.0.71
func (tc *TestConfig) WithMaxWorkers(max int) *TestConfig
WithMaxWorkers sets the max parallel workers setting
type TestHelper ¶ added in v0.0.71
type TestHelper struct {
// contains filtered or unexported fields
}
TestHelper provides utilities for orchestrator testing
func NewTestHelper ¶ added in v0.0.71
func NewTestHelper(t *testing.T, timeout time.Duration) *TestHelper
NewTestHelper creates a new test helper
func (*TestHelper) AssertLROProcessCount ¶ added in v0.0.71
func (th *TestHelper) AssertLROProcessCount(lroManager *LROManager, expectedCount int)
AssertLROProcessCount asserts the expected number of LRO processes
func (*TestHelper) AssertRuleStatus ¶ added in v0.0.71
func (th *TestHelper) AssertRuleStatus(orchestrator *Orchestrator, ruleName string, expectedRunning bool, expectedStatus string)
AssertRuleStatus asserts a rule has the expected status
func (*TestHelper) WithOrchestrator ¶ added in v0.0.71
func (th *TestHelper) WithOrchestrator(config *TestConfig, testFunc func(*TestOrchestrator))
WithOrchestrator creates an orchestrator from config and runs test function
func (*TestHelper) WithRunningOrchestrator ¶ added in v0.0.71
func (th *TestHelper) WithRunningOrchestrator(config *TestConfig, testFunc func(*TestOrchestrator))
WithRunningOrchestrator creates and starts an orchestrator for integration testing
type TestOrchestrator ¶ added in v0.0.71
type TestOrchestrator struct {
*Orchestrator
// contains filtered or unexported fields
}
TestOrchestrator represents a test orchestrator with cleanup
func (*TestOrchestrator) CreateRule ¶ added in v0.0.71
func (to *TestOrchestrator) CreateRule(name string, lro bool, commands []string)
CreateRule creates a new rule for testing
func (*TestOrchestrator) Stop ¶ added in v0.0.71
func (to *TestOrchestrator) Stop()
Stop cleans up the test orchestrator and temporary directory
func (*TestOrchestrator) TriggerRule ¶ added in v0.0.71
func (to *TestOrchestrator) TriggerRule(ruleName string) error
TriggerRule manually triggers a rule for testing
func (*TestOrchestrator) WaitForRuleCompletion ¶ added in v0.0.71
func (to *TestOrchestrator) WaitForRuleCompletion(ruleName string, timeout time.Duration) error
WaitForRuleCompletion waits for a rule to complete with timeout
type TestRule ¶ added in v0.0.71
type TestRule struct {
Name string
LRO bool
SkipRunOnInit bool
Commands []string
WatchPatterns []string
ExcludePatterns []string
Env map[string]string
}
TestRule represents a rule configuration for testing
type TriggerChain ¶ added in v0.0.51
type TriggerChain struct {
// contains filtered or unexported fields
}
TriggerChain tracks cross-rule trigger relationships for cycle detection
func NewTriggerChain ¶ added in v0.0.51
func NewTriggerChain() *TriggerChain
NewTriggerChain creates a new trigger chain tracker
func (*TriggerChain) CleanupOldChains ¶ added in v0.0.51
func (tc *TriggerChain) CleanupOldChains()
CleanupOldChains removes old trigger chains to prevent memory growth
func (*TriggerChain) DetectCycle ¶ added in v0.0.51
func (tc *TriggerChain) DetectCycle(sourceRule, targetRule string, maxDepth uint32) bool
DetectCycle detects if adding a new trigger would create a cycle
func (*TriggerChain) GetChainLength ¶ added in v0.0.51
func (tc *TriggerChain) GetChainLength(ruleName string) int
GetChainLength returns the length of the trigger chain starting from a rule
func (*TriggerChain) RecordTrigger ¶ added in v0.0.51
func (tc *TriggerChain) RecordTrigger(sourceRule, targetRule string)
RecordTrigger records that sourceRule triggered targetRule
type TriggerEvent ¶ added in v0.0.71
type TriggerEvent struct {
Rule *pb.Rule
TriggerType string // "file_change", "manual", "startup"
Context context.Context
}
TriggerEvent represents a request to execute a rule
type TriggerTracker ¶ added in v0.0.51
type TriggerTracker struct {
// contains filtered or unexported fields
}
TriggerTracker tracks rule execution frequency for rate limiting
func NewTriggerTracker ¶ added in v0.0.51
func NewTriggerTracker() *TriggerTracker
NewTriggerTracker creates a new trigger tracker
func (*TriggerTracker) CleanupOldTriggers ¶ added in v0.0.51
func (t *TriggerTracker) CleanupOldTriggers(duration time.Duration)
CleanupOldTriggers removes trigger records older than the specified duration
func (*TriggerTracker) GetBackoffLevel ¶ added in v0.0.51
func (t *TriggerTracker) GetBackoffLevel() int
GetBackoffLevel returns the current backoff level
func (*TriggerTracker) GetLastTrigger ¶ added in v0.0.51
func (t *TriggerTracker) GetLastTrigger() *time.Time
GetLastTrigger returns the timestamp of the most recent trigger
func (*TriggerTracker) GetTriggerCount ¶ added in v0.0.51
func (t *TriggerTracker) GetTriggerCount(duration time.Duration) int
GetTriggerCount returns the number of triggers within the specified duration
func (*TriggerTracker) GetTriggerRate ¶ added in v0.0.51
func (t *TriggerTracker) GetTriggerRate() float64
GetTriggerRate returns the current trigger rate (triggers per minute)
func (*TriggerTracker) IsInBackoff ¶ added in v0.0.51
func (t *TriggerTracker) IsInBackoff() bool
IsInBackoff checks if the rule is currently in backoff period
func (*TriggerTracker) RecordTrigger ¶ added in v0.0.51
func (t *TriggerTracker) RecordTrigger()
RecordTrigger records a new trigger event
func (*TriggerTracker) ResetBackoff ¶ added in v0.0.51
func (t *TriggerTracker) ResetBackoff()
ResetBackoff resets the backoff level if enough time has passed
func (*TriggerTracker) SetBackoff ¶ added in v0.0.51
func (t *TriggerTracker) SetBackoff()
SetBackoff sets the backoff period based on the current level
type Watcher ¶ added in v0.0.64
type Watcher struct {
// contains filtered or unexported fields
}
Watcher manages file system watching for a single rule
func NewWatcher ¶ added in v0.0.64
NewWatcher creates a new Watcher for the given rule
func (*Watcher) GetWatchedDirectories ¶ added in v0.0.64
GetWatchedDirectories returns a copy of currently watched directories
func (*Watcher) SetEventHandler ¶ added in v0.0.64
SetEventHandler sets the callback function for file change events
type Worker ¶ added in v0.0.64
type Worker struct {
// contains filtered or unexported fields
}
Worker represents a single worker that executes rule jobs
func NewWorker ¶ added in v0.0.64
func NewWorker(id int, orchestrator *Orchestrator) *Worker
NewWorker creates a new worker instance
func (*Worker) Start ¶ added in v0.0.64
func (w *Worker) Start(jobQueue <-chan *RuleJob, workerPool *WorkerPool)
Start begins the worker's job processing loop
func (*Worker) TerminateProcesses ¶ added in v0.0.64
TerminateProcesses terminates all running processes for this worker
type WorkerPool ¶ added in v0.0.64
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages a pool of workers and job distribution with deduplication
func NewWorkerPool ¶ added in v0.0.64
func NewWorkerPool(orchestrator *Orchestrator, maxWorkers int) *WorkerPool
NewWorkerPool creates a new worker pool with on-demand worker creation
func (*WorkerPool) CompleteJob ¶ added in v0.0.64
func (wp *WorkerPool) CompleteJob(job *RuleJob, worker *Worker)
CompleteJob handles job completion and processes any pending jobs for the same rule
func (*WorkerPool) EnqueueJob ¶ added in v0.0.64
func (wp *WorkerPool) EnqueueJob(job *RuleJob)
EnqueueJob adds a job to the queue with deduplication logic
func (*WorkerPool) GetExecutingRules ¶ added in v0.0.64
func (wp *WorkerPool) GetExecutingRules() map[string]int
GetExecutingRules returns a copy of currently executing rules
func (*WorkerPool) GetPendingRules ¶ added in v0.0.64
func (wp *WorkerPool) GetPendingRules() []string
GetPendingRules returns a list of rules with pending jobs
func (*WorkerPool) GetStatus ¶ added in v0.0.64
func (wp *WorkerPool) GetStatus() (active int, capacity int, pending int, executing int)
GetStatus returns the current status of the worker pool
func (*WorkerPool) Start ¶ added in v0.0.64
func (wp *WorkerPool) Start() error
Start initializes the worker pool (workers created on-demand)
func (*WorkerPool) Stop ¶ added in v0.0.64
func (wp *WorkerPool) Stop() error
Stop gracefully stops all workers