probe

package module
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2025 License: MIT Imports: 25 Imported by: 0

README






PROBE






GitHub Workflow Status GitHub Release Go Documentation

Probe is a YAML-based workflow automation tool. It uses plugin-based actions to execute workflows, making it highly flexible and extensible.

Example using REST API:

name: Example http workflow
jobs:
- name: Request REST API
  defaults:
    http:
      url: http://localhost:9000
      headers:
        authorization: Bearer {env.TOKEN}
        accept: application/json
  steps:
  - name: Get a user information
    uses: http
    with:
      get: /api/v1/me
    test: res.status == 200 && res.body.uname == foobar
  - name: Update user
    uses: http
    with:
      put: /api/v1/users/{steps[0].res.body.uid}
      body:
        profile: "I'm a software engineer living in Fukuoka."
    test: res.status == 201

Example of sending repeated emails:

name: Send queue congestion experiment
jobs:
- name: Normal sender
  id: normal-sender
  repeat:
    count: 60
    interval: 10
  steps:
  - use: smtp
    with:
      addr: localhost:5871
      from: alice@msa1.local
      to: bob@mx1.local
      my-hostname: msa1-local
      subject: Experiment A
- name: Throttled sender
  id: throtteled-sender
  repeat:
    count: 60
    interval: 10
  steps:
  - use: smtp
    with:
      addr: localhost:5872
      from: carol@msa2.local
      to: bob@mx2.local
      my-hostname: msa2-local
      subject: Experiment B
- name: Export latency as CSV
  needs:
  - normal-sender
  - throtteled-sender
  waitif: sh(postqueue -p 2> /dev/null | grep -c '^[A-F0-9]') != "0"
  steps:
  - use: mail-latency
    with:
      spath: /home/vmail
      dpath: ./mail-latency.csv

Features

A probe workflow consists of jobs and steps contained in the jobs. Multiple jobs are executed asynchronously, and steps are executed in sequence. Step execution results are logged, and can be expanded in YAML using curly braces.

  • Workflows can be automated using built-in http, mail, and shell actions
  • Custom actions that meet your use cases can be created using protocol buffers
  • Protocol-based YAML definitions provide low learning costs and high visibility

Install

Installation via various package managers is not yet supported, but will be soon.

go install github.com/linyows/probe/cmd/probe@latest

Usage

Run the workflow by passing the path to the yaml file where the workflow is defined to the workflow option.

probe --workflow ./worflow.yml

To-Do

Here are some additional features I'm considering:

  • Support waitif and needs params in job
  • Support rich output
  • Support multipart/form-data in http actions
  • Support some actions:
    • grpc actions
    • graphql actions
    • ssh actions
    • amqp actions
    • imap actions
    • udp actions
  • Support post-actions
  • Support pre-job and post-job

Author

linyows

Documentation

Index

Constants

View Source
const (
	IconSuccess = "✔︎ "
	IconError   = "✘ "
	IconWarning = "▲ "
	IconCircle  = "⏺ "
)

Icon constants

Variables

View Source
var (
	BuiltinCmd = "builtin-actions"
	Handshake  = plugin.HandshakeConfig{ProtocolVersion: 1, MagicCookieKey: "probe", MagicCookieValue: "actions"}
	PluginMap  = map[string]plugin.Plugin{"actions": &ActionsPlugin{}}
)

Functions

func AnyToString added in v0.4.0

func AnyToString(value any) (string, bool)

AnyToString attempts to convert any type to a string.

func AssignStruct

func AssignStruct(pa ActionsParams, st any) error

func DiffJSON added in v0.4.0

func DiffJSON(src, target map[string]any) string

DiffJSON compares two `map[string]any` objects strictly and collects differences.

func DisableSecurityExit added in v0.5.0

func DisableSecurityExit(disabled bool)

DisableSecurityExit disables process termination on security violations When enabled, security violations return errors instead of calling os.Exit(2)

func EnvMap added in v0.3.0

func EnvMap() map[string]string

func FlattenInterface

func FlattenInterface(i any) map[string]string

func MapToStructByTags

func MapToStructByTags(params map[string]any, dest any) error

converting from a map[string]any to a struct

func MatchJSON added in v0.4.0

func MatchJSON(src, target map[string]any) bool

MatchJSON compares two `map[string]any` objects strictly. All fields in `src` and `target` must match, including structure and values.

func MergeMaps added in v0.4.0

func MergeMaps(base, over map[string]any) map[string]any

MergeMaps merges two maps of type map[string]any. If keys conflict, the values from over override those in base. Nested maps are merged recursively.

func MergeStringMaps

func MergeStringMaps(base map[string]string, over map[string]any) map[string]string

merge string maps

func RunActions

func RunActions(name string, args []string, with map[string]any, verbose bool) (map[string]any, error)

func StrmapToAnymap added in v0.3.0

func StrmapToAnymap(strmap map[string]string) map[string]any

func StructToMapByTags

func StructToMapByTags(src any) (map[string]any, error)

converting from a struct to a map[string]any

func TitleCase

func TitleCase(st string, char string) string

func UnflattenInterface

func UnflattenInterface(flatMap map[string]string) map[string]any

Recursively convert a map[string]string to a map[string]any

Types

type Actions

type Actions interface {
	Run(args []string, with map[string]string) (map[string]string, error)
}

type ActionsArgs

type ActionsArgs []string

type ActionsClient

type ActionsClient struct {
	// contains filtered or unexported fields
}

func (*ActionsClient) Run

func (m *ActionsClient) Run(args []string, with map[string]string) (map[string]string, error)

type ActionsParams

type ActionsParams map[string]string

type ActionsPlugin

type ActionsPlugin struct {
	plugin.Plugin
	Impl Actions
}

func (*ActionsPlugin) GRPCClient

func (p *ActionsPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (any, error)

func (*ActionsPlugin) GRPCServer

func (p *ActionsPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error

type ActionsServer

type ActionsServer struct {
	Impl Actions
}

func (*ActionsServer) Run

func (m *ActionsServer) Run(ctx context.Context, req *pb.RunRequest) (*pb.RunResponse, error)

type BufferedJobExecutor added in v0.6.0

type BufferedJobExecutor struct {
	// contains filtered or unexported fields
}

BufferedJobExecutor handles job execution with buffered output

func NewBufferedJobExecutor added in v0.6.0

func NewBufferedJobExecutor(w *Workflow) *BufferedJobExecutor

NewBufferedJobExecutor creates a new buffered job executor

func (*BufferedJobExecutor) Execute added in v0.6.0

func (e *BufferedJobExecutor) Execute(job *Job, jobID string, ctx JobContext, config ExecutionConfig) ExecutionResult

Execute runs a job with buffered output

type Config

type Config struct {
	Log     io.Writer
	Verbose bool
	RT      bool
}

type ExecutionConfig added in v0.6.0

type ExecutionConfig struct {
	UseBuffering    bool
	UseParallel     bool
	HasDependencies bool
	WorkflowOutput  *WorkflowOutput
	JobScheduler    *JobScheduler
}

ExecutionConfig contains configuration for job execution

type ExecutionResult added in v0.6.0

type ExecutionResult struct {
	Success  bool
	Duration time.Duration
	Output   string
	Error    error
}

ExecutionResult represents the result of a job execution

type Expr added in v0.2.0

type Expr struct{}

func (*Expr) Eval added in v0.3.0

func (e *Expr) Eval(input string, env any) (any, error)

func (*Expr) EvalOrEvalTemplate added in v0.4.0

func (e *Expr) EvalOrEvalTemplate(input string, env any) (string, error)

func (*Expr) EvalTemplate added in v0.2.0

func (e *Expr) EvalTemplate(input string, env any) (string, error)

func (*Expr) EvalTemplateMap added in v0.4.0

func (e *Expr) EvalTemplateMap(input map[string]any, env any) map[string]any

func (*Expr) Options added in v0.4.0

func (e *Expr) Options(env any) []ex.Option

type Interval added in v0.6.0

type Interval struct {
	time.Duration
}

Interval represents a time interval that can be specified as a number (seconds) or duration string

func (Interval) MarshalYAML added in v0.6.0

func (i Interval) MarshalYAML() (interface{}, error)

MarshalYAML implements custom YAML marshaling for Interval

func (*Interval) UnmarshalYAML added in v0.6.0

func (i *Interval) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML implements custom YAML unmarshaling for Interval

type Job

type Job struct {
	Name     string   `yaml:"name",validate:"required"`
	ID       string   `yaml:"id,omitempty"`
	Needs    []string `yaml:"needs,omitempty"`
	Steps    []*Step  `yaml:"steps",validate:"required"`
	Repeat   *Repeat  `yaml:"repeat"`
	Defaults any      `yaml:"defaults"`
	// contains filtered or unexported fields
}

func (*Job) Start

func (j *Job) Start(ctx JobContext) bool

type JobContext

type JobContext struct {
	Vars map[string]any   `expr:"vars"`
	Logs []map[string]any `expr:"steps"`
	Config
	Failed bool
	// Repeat tracking
	IsRepeating   bool
	RepeatCurrent int
	RepeatTotal   int
	StepCounters  map[int]StepRepeatCounter // step index -> counter
	// Output buffering
	UseBuffering bool
	// Output writer
	Output OutputWriter
}

func (*JobContext) SetFailed added in v0.2.0

func (j *JobContext) SetFailed()

type JobExecutor added in v0.6.0

type JobExecutor interface {
	Execute(job *Job, jobID string, ctx JobContext, config ExecutionConfig) ExecutionResult
}

JobExecutor defines the interface for executing jobs

type JobOutput added in v0.6.0

type JobOutput struct {
	JobName   string
	JobID     string
	Buffer    strings.Builder
	Status    string
	StartTime time.Time
	EndTime   time.Time
	Success   bool
	// contains filtered or unexported fields
}

JobOutput stores buffered output for a job

type JobScheduler added in v0.5.0

type JobScheduler struct {
	// contains filtered or unexported fields
}

func NewJobScheduler added in v0.5.0

func NewJobScheduler() *JobScheduler

func (*JobScheduler) AddJob added in v0.5.0

func (js *JobScheduler) AddJob(job *Job) error

func (*JobScheduler) AllJobsCompleted added in v0.5.0

func (js *JobScheduler) AllJobsCompleted() bool

func (*JobScheduler) CanRunJob added in v0.5.0

func (js *JobScheduler) CanRunJob(jobID string) bool

func (*JobScheduler) GetRepeatInfo added in v0.5.0

func (js *JobScheduler) GetRepeatInfo(jobID string) (current, target int)

GetRepeatInfo returns current repeat counter and target for a job

func (*JobScheduler) GetRunnableJobs added in v0.5.0

func (js *JobScheduler) GetRunnableJobs() []string

func (*JobScheduler) IncrementRepeatCounter added in v0.5.0

func (js *JobScheduler) IncrementRepeatCounter(jobID string)

IncrementRepeatCounter increments the repeat counter for a job

func (*JobScheduler) MarkJobsWithFailedDependencies added in v0.6.0

func (js *JobScheduler) MarkJobsWithFailedDependencies() []string

MarkJobsWithFailedDependencies marks jobs as failed if their dependencies have failed

func (*JobScheduler) SetJobStatus added in v0.5.0

func (js *JobScheduler) SetJobStatus(jobID string, status JobStatus, success bool)

func (*JobScheduler) ShouldRepeatJob added in v0.5.0

func (js *JobScheduler) ShouldRepeatJob(jobID string) bool

ShouldRepeatJob checks if a job should be repeated

func (*JobScheduler) ValidateDependencies added in v0.5.0

func (js *JobScheduler) ValidateDependencies() error

type JobStatus added in v0.5.0

type JobStatus int
const (
	JobPending JobStatus = iota
	JobRunning
	JobCompleted
	JobFailed
)

type Output added in v0.6.0

type Output struct {
	// contains filtered or unexported fields
}

Output implements OutputWriter for console output

func NewOutput added in v0.6.0

func NewOutput(verbose bool) *Output

NewOutput creates a new console output writer

func (*Output) PrintError added in v0.6.0

func (o *Output) PrintError(format string, args ...interface{})

PrintError prints an error message

func (*Output) PrintJobName added in v0.6.0

func (o *Output) PrintJobName(name string)

PrintJobName prints the job name

func (*Output) PrintJobOutput added in v0.6.0

func (o *Output) PrintJobOutput(output string)

PrintJobOutput prints buffered job output

func (*Output) PrintJobResult added in v0.6.0

func (o *Output) PrintJobResult(jobName string, status StatusType, duration float64)

PrintJobResult prints the result of a job execution

func (*Output) PrintSeparator added in v0.6.0

func (o *Output) PrintSeparator()

PrintSeparator prints a separator line for verbose output

func (*Output) PrintStepRepeatResult added in v0.6.0

func (o *Output) PrintStepRepeatResult(stepIdx int, counter StepRepeatCounter, hasTest bool)

PrintStepRepeatResult prints the final result of a repeated step execution

func (*Output) PrintStepRepeatStart added in v0.6.0

func (o *Output) PrintStepRepeatStart(stepIdx int, stepName string, repeatCount int)

PrintStepRepeatStart prints the start of a repeated step execution

func (*Output) PrintStepResult added in v0.6.0

func (o *Output) PrintStepResult(step StepResult)

PrintStepResult prints the result of a single step execution

func (*Output) PrintVerbose added in v0.6.0

func (o *Output) PrintVerbose(format string, args ...interface{})

PrintVerbose prints verbose output (only if verbose mode is enabled)

func (*Output) PrintWorkflowHeader added in v0.6.0

func (o *Output) PrintWorkflowHeader(name, description string)

PrintWorkflowHeader prints the workflow name and description

func (*Output) PrintWorkflowSummary added in v0.6.0

func (o *Output) PrintWorkflowSummary(totalTime float64, successCount, totalJobs int)

PrintWorkflowSummary prints the workflow execution summary

type OutputWriter added in v0.6.0

type OutputWriter interface {
	// Workflow level output
	PrintWorkflowHeader(name, description string)
	PrintJobName(name string)

	// Step level output
	PrintStepResult(step StepResult)
	PrintStepRepeatStart(stepIdx int, stepName string, repeatCount int)
	PrintStepRepeatResult(stepIdx int, counter StepRepeatCounter, hasTest bool)

	// Job level output
	PrintJobResult(jobName string, status StatusType, duration float64)
	PrintJobOutput(output string)

	// Workflow summary
	PrintWorkflowSummary(totalTime float64, successCount, totalJobs int)

	// Error output
	PrintError(format string, args ...interface{})

	// Verbose output
	PrintVerbose(format string, args ...interface{})
	PrintSeparator()
}

OutputWriter defines the interface for different output implementations

type ParallelJobExecutor added in v0.6.0

type ParallelJobExecutor struct {
	// contains filtered or unexported fields
}

ParallelJobExecutor handles parallel job execution without dependencies

func NewParallelJobExecutor added in v0.6.0

func NewParallelJobExecutor(w *Workflow) *ParallelJobExecutor

NewParallelJobExecutor creates a new parallel job executor

func (*ParallelJobExecutor) Execute added in v0.6.0

func (e *ParallelJobExecutor) Execute(job *Job, jobID string, ctx JobContext, config ExecutionConfig) ExecutionResult

Execute runs a job in parallel mode

type Probe

type Probe struct {
	FilePath string

	Config Config
	// contains filtered or unexported fields
}

func New

func New(path string, v bool) *Probe

func (*Probe) Do

func (p *Probe) Do() error

func (*Probe) ExitStatus added in v0.2.0

func (p *Probe) ExitStatus() int

func (*Probe) Load

func (p *Probe) Load() error

type Repeat

type Repeat struct {
	Count    int      `yaml:"count",validate:"required,gte=0,lt=100"`
	Interval Interval `yaml:"interval"`
}

Repeat defines the repeat configuration for jobs

type SequentialJobExecutor added in v0.6.0

type SequentialJobExecutor struct {
	// contains filtered or unexported fields
}

SequentialJobExecutor handles sequential job execution with dependencies

func NewSequentialJobExecutor added in v0.6.0

func NewSequentialJobExecutor(w *Workflow) *SequentialJobExecutor

NewSequentialJobExecutor creates a new sequential job executor

func (*SequentialJobExecutor) Execute added in v0.6.0

func (e *SequentialJobExecutor) Execute(job *Job, jobID string, ctx JobContext, config ExecutionConfig) ExecutionResult

Execute runs a job in sequential mode with dependency management

type StatusType added in v0.6.0

type StatusType int

StatusType represents the status of execution

const (
	StatusSuccess StatusType = iota
	StatusError
	StatusWarning
)

type Step

type Step struct {
	Name string           `yaml:"name"`
	Uses string           `yaml:"uses" validate:"required"`
	With map[string]any   `yaml:"with"`
	Test string           `yaml:"test"`
	Echo string           `yaml:"echo"`
	Vars map[string]any   `yaml:"vars"`
	Iter []map[string]any `yaml:"iter"`
	// contains filtered or unexported fields
}

func (*Step) Do added in v0.5.0

func (st *Step) Do(jCtx *JobContext)

func (*Step) DoEcho added in v0.5.0

func (st *Step) DoEcho()

func (*Step) DoEchoWithSequentialPrint added in v0.5.0

func (st *Step) DoEchoWithSequentialPrint()

func (*Step) DoTest added in v0.5.0

func (st *Step) DoTest() (string, bool)

func (*Step) DoTestWithSequentialPrint added in v0.5.0

func (st *Step) DoTestWithSequentialPrint() bool

func (*Step) SetCtx added in v0.4.0

func (st *Step) SetCtx(j JobContext, override map[string]any)

func (*Step) ShowRequestResponse added in v0.5.0

func (st *Step) ShowRequestResponse(name string)

type StepContext added in v0.5.0

type StepContext struct {
	Vars map[string]any   `expr:"vars"`
	Logs []map[string]any `expr:"steps"`
	Res  map[string]any   `expr:"res"`
	Req  map[string]any   `expr:"req"`
	RT   string           `expr:"rt"`
}

type StepRepeatCounter added in v0.6.0

type StepRepeatCounter struct {
	SuccessCount int
	FailureCount int
	Name         string
	LastResult   bool
	Output       strings.Builder
}

StepRepeatCounter tracks the execution results of repeated steps

type StepResult added in v0.6.0

type StepResult struct {
	Index      int
	Name       string
	Status     StatusType
	RT         string
	TestOutput string
	EchoOutput string
	HasTest    bool
}

StepResult represents the result of a step execution

type ValidationError

type ValidationError struct {
	// contains filtered or unexported fields
}

func (*ValidationError) AddMessage

func (e *ValidationError) AddMessage(s string)

func (*ValidationError) Error

func (e *ValidationError) Error() string

func (*ValidationError) HasError

func (e *ValidationError) HasError() bool

type Workflow

type Workflow struct {
	Name        string         `yaml:"name",validate:"required"`
	Description string         `yaml:"description,omitempty"`
	Jobs        []Job          `yaml:"jobs",validate:"required"`
	Vars        map[string]any `yaml:"vars"`
	// contains filtered or unexported fields
}

func (*Workflow) Env added in v0.3.0

func (w *Workflow) Env() map[string]string

func (*Workflow) SetExitStatus added in v0.2.0

func (w *Workflow) SetExitStatus(isErr bool)

func (*Workflow) Start

func (w *Workflow) Start(c Config) error

Start executes the workflow with the given configuration

type WorkflowOutput added in v0.6.0

type WorkflowOutput struct {
	Jobs map[string]*JobOutput
	// contains filtered or unexported fields
}

WorkflowOutput manages output for multiple jobs

func NewWorkflowOutput added in v0.6.0

func NewWorkflowOutput() *WorkflowOutput

NewWorkflowOutput creates a new WorkflowOutput instance

Directories

Path Synopsis
actions
cmd
probe command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL