agent

package
v3.115.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2025 License: MIT Imports: 54 Imported by: 2

Documentation

Overview

Package agent provides the key agent components - workers, worker pool, job runner, log streamer, artifact up/downloaders, etc.

It is intended for internal use of buildkite-agent only.

Index

Constants

View Source
const (
	// BuildkiteMessageMax is the maximum length of "BUILDKITE_MESSAGE=...\0"
	// environment entry passed to bootstrap, beyond which it will be truncated
	// to avoid exceeding the system limit. Note that it includes the variable
	// name, equals sign, and null terminator.
	//
	// The true limit varies by system and may be shared with other env/argv
	// data. We'll settle on an arbitrary generous but reasonable value, and
	// adjust it if issues arise.
	//
	// macOS 10.15:    256 KiB shared by environment & argv
	// Linux 4.19:     128 KiB per k=v env
	// Windows 10:  16,384 KiB shared
	// POSIX:            4 KiB minimum shared
	BuildkiteMessageMax = 64 * 1024

	// BuildkiteMessageName is the env var name of the build/commit message.
	BuildkiteMessageName = "BUILDKITE_MESSAGE"

	VerificationBehaviourWarn  = "warn"
	VerificationBehaviourBlock = "block"
)
View Source
const (

	// SignalReasonAgentRefused is used when the agent refused to run the job, e.g. due to a failed `pre-bootstrap` hook,
	// or a failed allowlist validation.
	SignalReasonAgentRefused = "agent_refused"

	// SignalReasonAgentStop is used when the agent is stopping, e.g. due to a pending host shutdown or EC2 spot instance termination.
	SignalReasonAgentStop = "agent_stop"

	// SignalReasonCancel is used when the job was cancelled via the Buildkite web UI.
	SignalReasonCancel = "cancel"

	// SignalReasonSignatureRejected is used when the job was signed with a signature that could not be verified, either
	// because the signature is invalid, or because the agent does not have the verification key.
	SignalReasonSignatureRejected = "signature_rejected"

	// SignalReasonProcessRunError is used when the process to run the bootstrap script failed to run, e.g. due to a
	// missing executable, or a permission error
	SignalReasonProcessRunError = "process_run_error"

	// SignalReasonStackError is used when the job was stopped due to a stack error, eg because in Kubernetes the pod running
	// could not be launched. This signal reason is not used directly by the agent, but is used by the agent-stack-kubernetes
	// to signal that the job was not run due to a stack error.
	SignalReasonStackError = "stack_error"
)

Variables

View Source
var (
	ErrNoSignature        = errors.New("job had no signature to verify")
	ErrVerificationFailed = errors.New("signature verification failed")
	ErrInvalidJob         = errors.New("job does not match signed step")
)

Functions

func FetchTags

func FetchTags(ctx context.Context, l logger.Logger, conf FetchTagsConfig) []string

FetchTags loads tags from a variety of sources

func K8sTagsFromEnv added in v3.45.0

func K8sTagsFromEnv(envn []string) (map[string]string, error)

Types

type AgentConfiguration

type AgentConfiguration struct {
	ConfigPath                  string
	BootstrapScript             string
	BuildPath                   string
	HooksPath                   string
	AdditionalHooksPaths        []string
	SocketsPath                 string
	GitMirrorsPath              string
	GitMirrorsLockTimeout       int
	GitMirrorsSkipUpdate        bool
	PluginsPath                 string
	GitCheckoutFlags            string
	GitCloneFlags               string
	GitCloneMirrorFlags         string
	GitCleanFlags               string
	GitFetchFlags               string
	GitSubmodules               bool
	AllowedRepositories         []*regexp.Regexp
	AllowedPlugins              []*regexp.Regexp
	AllowedEnvironmentVariables []*regexp.Regexp
	SSHKeyscan                  bool
	CommandEval                 bool
	PluginsEnabled              bool
	PluginValidation            bool
	PluginsAlwaysCloneFresh     bool
	LocalHooksEnabled           bool
	StrictSingleHooks           bool
	RunInPty                    bool
	KubernetesExec              bool

	SigningJWKSFile  string // Where to find the key to sign pipeline uploads with (passed through to jobs, they might be uploading pipelines)
	SigningJWKSKeyID string // The key ID to sign pipeline uploads with
	SigningAWSKMSKey string // The KMS key ID to sign pipeline uploads with
	DebugSigning     bool   // Whether to print step payloads when signing them

	VerificationJWKS             any    // The set of keys to verify jobs with
	VerificationFailureBehaviour string // What to do if job verification fails (one of `block` or `warn`)

	ANSITimestamps               bool
	TimestampLines               bool
	HealthCheckAddr              string
	DisconnectAfterJob           bool
	DisconnectAfterIdleTimeout   time.Duration
	DisconnectAfterUptime        time.Duration
	CancelGracePeriod            int
	SignalGracePeriod            time.Duration
	EnableJobLogTmpfile          bool
	JobLogPath                   string
	WriteJobLogsToStdout         bool
	LogFormat                    string
	Shell                        string
	Profile                      string
	RedactedVars                 []string
	AcquireJob                   string
	TracingBackend               string
	TracingServiceName           string
	TracingPropagateTraceparent  bool
	TraceContextEncoding         string
	DisableWarningsFor           []string
	AllowMultipartArtifactUpload bool
}

AgentConfiguration is the run-time configuration for an agent that has been loaded from the config file and command-line params

type AgentPool

type AgentPool struct {
	// contains filtered or unexported fields
}

AgentPool manages multiple parallel AgentWorkers.

func NewAgentPool

func NewAgentPool(workers []*AgentWorker) *AgentPool

NewAgentPool returns a new AgentPool.

func (*AgentPool) Start

func (r *AgentPool) Start(ctx context.Context) error

Start kicks off the parallel AgentWorkers and waits for them to finish.

func (*AgentPool) StartStatusServer added in v3.72.0

func (ap *AgentPool) StartStatusServer(ctx context.Context, l logger.Logger, addr string)

func (*AgentPool) StopGracefully added in v3.110.0

func (r *AgentPool) StopGracefully()

StopGracefully stops all workers in the pool gracefully.

func (*AgentPool) StopUngracefully added in v3.110.0

func (r *AgentPool) StopUngracefully()

StopUngracefully stops all workers in the pool ungracefully. It blocks until all workers have returned from stopping, which means waiting for job cancellation to finish.

type AgentWorker

type AgentWorker struct {
	// contains filtered or unexported fields
}

func NewAgentWorker

func NewAgentWorker(l logger.Logger, reg *api.AgentRegisterResponse, m *metrics.Collector, apiClient *api.Client, c AgentWorkerConfig) *AgentWorker

Creates the agent worker and initializes its API Client

func (*AgentWorker) AcceptAndRunJob

func (a *AgentWorker) AcceptAndRunJob(ctx context.Context, job *api.Job, idleMon *idleMonitor) error

Accepts a job and runs it, only returns an error if something goes wrong

func (*AgentWorker) AcquireAndRunJob

func (a *AgentWorker) AcquireAndRunJob(ctx context.Context, jobId string) error

AcquireAndRunJob attempts to acquire a job an run it. It will retry at after the server determined interval (from the Retry-After response header) if the job is in the waiting state. If the job is in an unassignable state, it will return an error immediately. Otherwise, it will retry every 3s for 30 s. The whole operation will timeout after 5 min.

func (*AgentWorker) Connect

func (a *AgentWorker) Connect(ctx context.Context) error

Connects the agent to the Buildkite Agent API, retrying up to 10 times if it fails.

func (*AgentWorker) Disconnect

func (a *AgentWorker) Disconnect(ctx context.Context) error

Disconnect notifies the Buildkite API that this agent worker/session is permanently disconnecting. Don't spend long retrying, because we want to disconnect as fast as possible.

func (*AgentWorker) Heartbeat

func (a *AgentWorker) Heartbeat(ctx context.Context) error

Performs a heatbeat

func (*AgentWorker) Ping

func (a *AgentWorker) Ping(ctx context.Context) (job *api.Job, action string, err error)

Performs a ping that checks Buildkite for a job or action to take Returns a job, or nil if none is found

func (*AgentWorker) RunJob

func (a *AgentWorker) RunJob(ctx context.Context, acceptResponse *api.Job, ignoreAgentInDispatches *bool) error

func (*AgentWorker) Start

func (a *AgentWorker) Start(ctx context.Context, idleMon *idleMonitor) (startErr error)

Starts the agent worker

func (*AgentWorker) StopGracefully added in v3.110.0

func (a *AgentWorker) StopGracefully()

StopGracefully stops the agent from accepting new work. It allows the current job to finish without interruption. Does not block.

func (*AgentWorker) StopUngracefully added in v3.110.0

func (a *AgentWorker) StopUngracefully()

StopUngracefully stops the agent from accepting new work and cancels any existing job. It blocks until the job is cancelled, if there is one.

type AgentWorkerConfig

type AgentWorkerConfig struct {
	// Whether to set debug in the job
	Debug bool

	// Whether to set debugHTTP in the job
	DebugHTTP bool

	// What signal to use for worker cancellation
	CancelSignal process.Signal

	// Time wait between sending the CancelSignal and SIGKILL to the process
	// groups that the executor starts
	SignalGracePeriod time.Duration

	// The index of this agent worker
	SpawnIndex int

	// The configuration of the agent from the CLI
	AgentConfiguration AgentConfiguration

	// Stdout of the parent agent process. Used for job log stdout writing arg, for simpler containerized log collection.
	AgentStdout io.Writer
}

type CancelReason added in v3.110.0

type CancelReason int

CancelReason captures the reason why Cancel is called.

const (
	CancelReasonJobState CancelReason = iota
	CancelReasonAgentStopping
	CancelReasonInvalidToken
)

func (CancelReason) String added in v3.110.0

func (r CancelReason) String() string

type EC2MetaData

type EC2MetaData struct{}

func (EC2MetaData) Get

func (e EC2MetaData) Get(ctx context.Context) (map[string]string, error)

func (EC2MetaData) GetPaths

func (e EC2MetaData) GetPaths(ctx context.Context, paths map[string]string) (map[string]string, error)

Takes a map of tags and meta-data paths to get, returns a map of tags and fetched values.

type EC2Tags

type EC2Tags struct{}

func (EC2Tags) Get

func (e EC2Tags) Get(ctx context.Context) (map[string]string, error)

type ECSMetadata added in v3.43.0

type ECSMetadata struct {
	DisableHTTP2 bool
}

func (ECSMetadata) Get added in v3.43.0

func (e ECSMetadata) Get(ctx context.Context) (map[string]string, error)

type FetchTagsConfig

type FetchTagsConfig struct {
	Tags []string

	TagsFromK8s               bool
	TagsFromEC2MetaData       bool
	TagsFromEC2MetaDataPaths  []string
	TagsFromEC2Tags           bool
	TagsFromECSMetaData       bool
	TagsFromGCPMetaData       bool
	TagsFromGCPMetaDataPaths  []string
	TagsFromGCPLabels         bool
	TagsFromHost              bool
	WaitForEC2TagsTimeout     time.Duration
	WaitForEC2MetaDataTimeout time.Duration
	WaitForECSMetaDataTimeout time.Duration
	WaitForGCPLabelsTimeout   time.Duration
}

type GCPLabels

type GCPLabels struct{}

func (GCPLabels) Get

func (e GCPLabels) Get(ctx context.Context) (map[string]string, error)

type GCPMetaData

type GCPMetaData struct{}

func (GCPMetaData) Get

func (e GCPMetaData) Get(ctx context.Context) (map[string]string, error)

func (GCPMetaData) GetPaths

func (e GCPMetaData) GetPaths(ctx context.Context, paths map[string]string) (map[string]string, error)

Takes a map of tags and meta-data paths to get, returns a map of tags and fetched values.

type JobRunner

type JobRunner struct {

	// How the JobRunner should respond when job verification fails (one of `block` or `warn`)
	VerificationFailureBehavior string
	// contains filtered or unexported fields
}

func NewJobRunner

func NewJobRunner(ctx context.Context, l logger.Logger, apiClient *api.Client, conf JobRunnerConfig) (*JobRunner, error)

Initializes the job runner

func (*JobRunner) Cancel

func (r *JobRunner) Cancel(reason CancelReason) error

Cancel cancels the job. It can be summarised as:

  • Send the process an Interrupt. When run via a subprocess, this translates into SIGTERM. When run via the k8s socket, this transitions the connected client to RunStateInterrupt.
  • Wait for the signal grace period.
  • If the job hasn't exited, send the process a Terminate. This is either SIGKILL or closing the k8s socket server.

Cancel blocks until this process is complete. The `agentStopping` arg mainly affects logged messages.

func (*JobRunner) Run

func (r *JobRunner) Run(ctx context.Context, ignoreAgentInDispatches *bool) (err error)

Run runs the job.

type JobRunnerConfig

type JobRunnerConfig struct {
	// The configuration of the agent from the CLI
	AgentConfiguration AgentConfiguration

	// How often to check if the job has been cancelled
	JobStatusInterval time.Duration

	// The JSON Web Keyset for verifying the job
	JWKS any

	// A scope for metrics within a job
	MetricsScope *metrics.Scope

	// The job to run
	Job *api.Job

	// What signal to use for worker cancellation
	CancelSignal process.Signal

	// Whether to set debug in the job
	Debug bool

	// Whether to set debug HTTP Requests in the job
	DebugHTTP bool

	// KubernetesExec enables Kubernetes execution mode. When true, the job runner
	// creates a kubernetes.Runner that listens on a UNIX socket for other agent containers
	// to connect, rather than spawning a local bootstrap subprocess. The other agent containers
	// containers run `kubernetes-bootstrap` which connects to this socket, receives
	// environment variables, and executes the bootstrap phases.
	KubernetesExec bool

	// Stdout of the parent agent process. Used for job log stdout writing arg, for simpler containerized log collection.
	AgentStdout io.Writer
}

type LogStreamer

type LogStreamer struct {
	// contains filtered or unexported fields
}

LogStreamer divides job log output into chunks (Process), and log streamer workers (goroutines created by Start) receive and upload those chunks. The actual uploading is performed by the callback.

func NewLogStreamer

func NewLogStreamer(
	agentLogger logger.Logger,
	callback func(context.Context, *api.Chunk) error,
	conf LogStreamerConfig,
) *LogStreamer

NewLogStreamer creates a new instance of the log streamer.

func (*LogStreamer) FailedChunks

func (ls *LogStreamer) FailedChunks() int

func (*LogStreamer) Process

func (ls *LogStreamer) Process(ctx context.Context, output []byte) error

Process streams the output. It returns an error if the output data cannot be processed at all (e.g. the streamer was stopped or a hard limit was reached). Transient failures to upload logs are instead handled in the callback.

func (*LogStreamer) Start

func (ls *LogStreamer) Start(ctx context.Context) error

Start spins up a number of log streamer workers.

func (*LogStreamer) Stop

func (ls *LogStreamer) Stop()

Stop stops the streamer.

type LogStreamerConfig

type LogStreamerConfig struct {
	// How many log streamer workers are running at any one time
	Concurrency int

	// The maximum size of each chunk
	MaxChunkSizeBytes uint64

	// The maximum size of the log
	MaxSizeBytes uint64
}

LogStreamerConfig contains configuration options for the log streamer.

type LogWriter added in v3.32.0

type LogWriter struct {
	// contains filtered or unexported fields
}

func (LogWriter) Write added in v3.32.0

func (w LogWriter) Write(bytes []byte) (int, error)

type PipelineUploader added in v3.44.0

type PipelineUploader struct {
	Client         *api.Client
	Change         *api.PipelineChange
	JobID          string
	RetrySleepFunc func(time.Duration)
}

PipelineUploader contains the data needed to upload a pipeline to Buildkite

func (*PipelineUploader) Upload added in v3.45.0

func (u *PipelineUploader) Upload(ctx context.Context, l logger.Logger) error

Upload will first attempt to perform an async pipeline upload and, depending on the API's response, it will poll for the upload's status.

There are 3 "routes" that are relevant 1. Async Route: /jobs/:job_uuid/pipelines?async=true 2. Sync Route: /jobs/:job_uuid/pipelines 3. Status Route: /jobs/:job_uuid/pipelines/:upload_uuid

In this method, the agent will first upload the pipeline to the Async Route. Then, depending on the response it will behave differetly

1. The Async Route responds 202: poll the Status Route until the upload has beed "applied" 2. The Async Route responds with other 2xx: exit, the upload succeeded synchronously (possibly after retry) 3. The Async Route responds with other xxx: retry uploading the pipeline to the Async Route

Note that the Sync Route is not used by this version of the agent at all. Typically, the Aysnc Route will return 202 whether or not the pipeline upload has been processed.

However, the API has the option to treat the Async Route as if it were the Sync Route by returning a 2xx that's not a 202. This will tigger option 2. While the API currently does not do this, we want to maintain the flexbitity to do so in the future. If that is implemented, the Status Route will not be polled, and either the Async Route will be retried until a (non 202) 2xx is returned from the API, or the method will exit early with no error. This reiterates option 2.

If, during a retry loop in option 3, the API returns a 2xx that is a 202, then we assume the API changed to supporting Async Uploads between retries and option 1 will be taken.

Directories

Path Synopsis
Package plugin provides types for managing agent plugins.
Package plugin provides types for managing agent plugins.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL