Documentation
¶
Overview ¶
Package debug provides interactive workflow debugging with breakpoint support, step-through execution, and state inspection.
Index ¶
- type Breakpoint
- type BreakpointHandler
- type BreakpointInterceptor
- type BreakpointManager
- func (m *BreakpointManager) CheckBreakpoint(pipeline, step string, ctx map[string]any) bool
- func (m *BreakpointManager) ClearAll()
- func (m *BreakpointManager) DisableBreakpoint(pipeline, step string) bool
- func (m *BreakpointManager) EnableBreakpoint(pipeline, step string) bool
- func (m *BreakpointManager) GetPaused(executionID string) (*PausedExecution, bool)
- func (m *BreakpointManager) ListBreakpoints() []*PipelineBreakpoint
- func (m *BreakpointManager) ListPaused() []*PausedExecution
- func (m *BreakpointManager) Pause(executionID, pipeline, step string, stepIndex int, context map[string]any) <-chan ResumeAction
- func (m *BreakpointManager) RemoveBreakpoint(pipeline, step string) bool
- func (m *BreakpointManager) Resume(executionID string, action ResumeAction) error
- func (m *BreakpointManager) SetBreakpoint(pipeline, step string, condition string) *PipelineBreakpoint
- func (m *BreakpointManager) ShouldPause(pipeline, step string, context map[string]any) bool
- func (m *BreakpointManager) WaitForResume(executionID, pipeline, step string, stepIndex int, context map[string]any) (ResumeAction, error)
- type BreakpointType
- type Debugger
- func (d *Debugger) AddBreakpoint(bpType BreakpointType, target string, condition string) string
- func (d *Debugger) CheckBreakpoint(step string, stepType BreakpointType) (shouldPause bool, bpID string)
- func (d *Debugger) Continue() error
- func (d *Debugger) ListBreakpoints() []*Breakpoint
- func (d *Debugger) Pause(ctx context.Context, step string, bpID string, data map[string]any) error
- func (d *Debugger) RecordStep(step, stepType string, duration time.Duration, data map[string]any, err error)
- func (d *Debugger) RemoveBreakpoint(id string) error
- func (d *Debugger) Reset()
- func (d *Debugger) SetIdle()
- func (d *Debugger) SetRunning(workflowType, action string)
- func (d *Debugger) State() ExecutionState
- func (d *Debugger) Step() error
- type ExecutionState
- type Handler
- type PausedExecution
- type PipelineBreakpoint
- type ResumeAction
- type StepRecord
- type WorkflowExecutor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Breakpoint ¶
type Breakpoint struct {
ID string `json:"id"`
Type BreakpointType `json:"type"`
Target string `json:"target"` // module name, workflow type, or trigger name
Condition string `json:"condition"` // optional condition expression
Enabled bool `json:"enabled"`
HitCount int `json:"hit_count"`
}
Breakpoint represents a point where execution should pause.
type BreakpointHandler ¶
type BreakpointHandler struct {
// contains filtered or unexported fields
}
BreakpointHandler provides an HTTP API for managing pipeline breakpoints and controlling paused executions.
func NewBreakpointHandler ¶
func NewBreakpointHandler(manager *BreakpointManager, logger *slog.Logger) *BreakpointHandler
NewBreakpointHandler creates a new BreakpointHandler.
func (*BreakpointHandler) RegisterRoutes ¶
func (h *BreakpointHandler) RegisterRoutes(mux *http.ServeMux)
RegisterRoutes registers the breakpoint debug API routes on the provided mux.
type BreakpointInterceptor ¶
type BreakpointInterceptor interface {
// ShouldPause checks whether execution should pause before the given
// pipeline step. The context map contains the current pipeline state.
ShouldPause(pipeline, step string, context map[string]any) bool
// WaitForResume blocks until a ResumeAction is received for the given
// execution. It returns the action to take (continue, skip, abort, step_over)
// along with any modified context data.
WaitForResume(executionID, pipeline, step string, stepIndex int, context map[string]any) (ResumeAction, error)
}
BreakpointInterceptor is the interface that Pipeline executors use to check for breakpoints before each step. When a breakpoint fires, WaitForResume blocks the execution goroutine until an external actor (typically the HTTP debug API) sends a ResumeAction.
The Pipeline executor can optionally hold a reference to this interface. If nil, no breakpoint checking occurs and execution proceeds normally.
type BreakpointManager ¶
type BreakpointManager struct {
// contains filtered or unexported fields
}
BreakpointManager manages pipeline execution breakpoints. It tracks breakpoints keyed by "pipeline:step" and maintains a registry of paused executions that can be resumed via the API.
func NewBreakpointManager ¶
func NewBreakpointManager(logger *slog.Logger) *BreakpointManager
NewBreakpointManager creates a new BreakpointManager.
func (*BreakpointManager) CheckBreakpoint ¶
func (m *BreakpointManager) CheckBreakpoint(pipeline, step string, ctx map[string]any) bool
CheckBreakpoint checks whether execution should pause at the given pipeline/step. It evaluates the breakpoint's enabled state and optional condition. Returns true if the execution should pause.
Condition evaluation: if a condition string is set, it is matched against a key in the context map. If the context value for that key is truthy (non-nil, non-false, non-zero, non-empty-string), the breakpoint fires. If no condition is set, the breakpoint always fires when enabled.
func (*BreakpointManager) ClearAll ¶
func (m *BreakpointManager) ClearAll()
ClearAll removes all breakpoints and aborts all paused executions.
func (*BreakpointManager) DisableBreakpoint ¶
func (m *BreakpointManager) DisableBreakpoint(pipeline, step string) bool
DisableBreakpoint disables the breakpoint for the given pipeline/step without removing it. Returns true if the breakpoint was found.
func (*BreakpointManager) EnableBreakpoint ¶
func (m *BreakpointManager) EnableBreakpoint(pipeline, step string) bool
EnableBreakpoint enables the breakpoint for the given pipeline/step. Returns true if the breakpoint was found and enabled.
func (*BreakpointManager) GetPaused ¶
func (m *BreakpointManager) GetPaused(executionID string) (*PausedExecution, bool)
GetPaused returns a specific paused execution by ID.
func (*BreakpointManager) ListBreakpoints ¶
func (m *BreakpointManager) ListBreakpoints() []*PipelineBreakpoint
ListBreakpoints returns all registered breakpoints.
func (*BreakpointManager) ListPaused ¶
func (m *BreakpointManager) ListPaused() []*PausedExecution
ListPaused returns all currently paused executions.
func (*BreakpointManager) Pause ¶
func (m *BreakpointManager) Pause(executionID, pipeline, step string, stepIndex int, context map[string]any) <-chan ResumeAction
Pause registers a paused execution and returns a channel that will receive the ResumeAction when Resume is called. The calling goroutine should block on this channel.
func (*BreakpointManager) RemoveBreakpoint ¶
func (m *BreakpointManager) RemoveBreakpoint(pipeline, step string) bool
RemoveBreakpoint removes the breakpoint for the given pipeline/step. Returns true if a breakpoint was removed, false if none existed.
func (*BreakpointManager) Resume ¶
func (m *BreakpointManager) Resume(executionID string, action ResumeAction) error
Resume sends a resume action to a paused execution, unblocking it. Returns an error if the execution ID is not found.
func (*BreakpointManager) SetBreakpoint ¶
func (m *BreakpointManager) SetBreakpoint(pipeline, step string, condition string) *PipelineBreakpoint
SetBreakpoint adds or updates a breakpoint on the given pipeline step. If a breakpoint already exists for the pipeline/step pair, it is replaced. Returns the created breakpoint.
func (*BreakpointManager) ShouldPause ¶
func (m *BreakpointManager) ShouldPause(pipeline, step string, context map[string]any) bool
ShouldPause implements BreakpointInterceptor.
func (*BreakpointManager) WaitForResume ¶
func (m *BreakpointManager) WaitForResume(executionID, pipeline, step string, stepIndex int, context map[string]any) (ResumeAction, error)
WaitForResume implements BreakpointInterceptor. It pauses the execution and blocks until a ResumeAction is received.
type BreakpointType ¶
type BreakpointType string
BreakpointType identifies what kind of breakpoint is set.
const ( BreakOnModule BreakpointType = "module" BreakOnWorkflow BreakpointType = "workflow" BreakOnTrigger BreakpointType = "trigger" )
type Debugger ¶
type Debugger struct {
// contains filtered or unexported fields
}
Debugger wraps workflow execution with breakpoint and step-through support.
func (*Debugger) AddBreakpoint ¶
func (d *Debugger) AddBreakpoint(bpType BreakpointType, target string, condition string) string
AddBreakpoint registers a new breakpoint and returns its ID.
func (*Debugger) CheckBreakpoint ¶
func (d *Debugger) CheckBreakpoint(step string, stepType BreakpointType) (shouldPause bool, bpID string)
CheckBreakpoint checks whether a step should trigger a pause based on active breakpoints. It is safe for concurrent use and is intended to be called from hooks in the workflow execution path.
func (*Debugger) ListBreakpoints ¶
func (d *Debugger) ListBreakpoints() []*Breakpoint
ListBreakpoints returns all breakpoints.
func (*Debugger) Pause ¶
Pause puts the debugger into the paused state and blocks until a Step or Continue call is made. ctx can be used to abort the wait.
func (*Debugger) RecordStep ¶
func (d *Debugger) RecordStep(step, stepType string, duration time.Duration, data map[string]any, err error)
RecordStep adds a step to the execution history.
func (*Debugger) RemoveBreakpoint ¶
RemoveBreakpoint removes a breakpoint by ID.
func (*Debugger) SetRunning ¶
SetRunning updates the state to indicate active execution.
func (*Debugger) State ¶
func (d *Debugger) State() ExecutionState
State returns the current execution state.
type ExecutionState ¶
type ExecutionState struct {
Status string `json:"status"` // "running", "paused", "stopped", "idle"
CurrentStep string `json:"current_step"`
WorkflowType string `json:"workflow_type"`
Action string `json:"action"`
Data map[string]any `json:"data,omitempty"`
PausedAt *time.Time `json:"paused_at,omitempty"`
BreakpointID string `json:"breakpoint_id,omitempty"`
StepHistory []StepRecord `json:"step_history"`
}
ExecutionState captures the current state of a paused execution.
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler provides an HTTP API for controlling the debugger.
func NewHandler ¶
NewHandler creates a new debug HTTP API handler.
func (*Handler) RegisterRoutes ¶
RegisterRoutes registers the debug API routes on the provided mux.
type PausedExecution ¶
type PausedExecution struct {
ID string `json:"id"`
PipelineName string `json:"pipeline_name"`
StepName string `json:"step_name"`
StepIndex int `json:"step_index"`
Context map[string]any `json:"context"`
PausedAt time.Time `json:"paused_at"`
// contains filtered or unexported fields
}
PausedExecution captures the state of a pipeline execution that has been paused at a breakpoint. The resume channel is used to unblock the execution goroutine once a resume action is sent.
type PipelineBreakpoint ¶
type PipelineBreakpoint struct {
ID string `json:"id"`
PipelineName string `json:"pipeline_name"`
StepName string `json:"step_name"`
Condition string `json:"condition,omitempty"` // optional: only break if condition is true
Enabled bool `json:"enabled"`
HitCount int64 `json:"hit_count"`
}
PipelineBreakpoint represents a breakpoint set on a specific pipeline step.
type ResumeAction ¶
type ResumeAction struct {
Action string `json:"action"` // "continue", "skip", "abort", "step_over"
Data map[string]any `json:"data"` // optional: modified context data to inject
}
ResumeAction describes how a paused execution should continue.
type StepRecord ¶
type StepRecord struct {
Step string `json:"step"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Duration time.Duration `json:"duration"`
Data map[string]any `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
StepRecord records a single step that was executed.