exec

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: AGPL-3.0 Imports: 27 Imported by: 3

Documentation

Index

Constants

View Source
const (
	DefaultOutputBufferSize = 1000000
	DefaultStreamBufferSize = 4096
	InitialRows             = 27
	InitialCols             = 72
	ResizeTimeout           = 500 * time.Millisecond
)

Variables

View Source
var (
	ErrSessionNotFound = &ExecSessionError{Code: ErrCodeSessionNotFound}
	ErrHandlerNotFound = &ExecSessionError{Code: ErrCodeHandlerNotFound}
	ErrSessionClosed   = &ExecSessionError{Code: ErrCodeSessionClosed}
	ErrTerminalError   = &ExecSessionError{Code: ErrCodeTerminalError}
	ErrSessionExists   = &ExecSessionError{Code: ErrCodeSessionExists}
)

Sentinel errors for errors.Is matching.

Functions

func RegisterPlugin

func RegisterPlugin(
	p *sdk.Plugin,
	opts PluginOpts,
) error

Types

type ActionTargetBuilder

type ActionTargetBuilder struct {
	Label         string            `json:"label"`
	LabelSelector string            `json:"label_selector"`
	Paths         []string          `json:"paths"`
	Selectors     map[string]string `json:"selectors"`
}

ActionTargetBuilder builds a dynamic list of targets for an action.

func ActionTargetBuilderFromProto

func ActionTargetBuilderFromProto(p *commonpb.ActionTargetBuilder) ActionTargetBuilder

ActionTargetBuilderFromProto converts a proto ActionTargetBuilder to the domain type.

func (ActionTargetBuilder) ToProto

ToProto converts ActionTargetBuilder to its proto representation.

type ChannelSink

type ChannelSink struct {
	// contains filtered or unexported fields
}

ChannelSink writes StreamOutput to a channel with context-aware sends. This is the production implementation used by Stream(). If the context is cancelled (e.g., gRPC stream closed), sends may be dropped instead of blocking forever on a full channel.

func NewChannelSink

func NewChannelSink(ctx context.Context, out chan<- StreamOutput) *ChannelSink

NewChannelSink creates a ChannelSink wrapping the given output channel. The context controls the lifetime — sends may be dropped after cancellation. If ctx is nil, context.Background() is used. If out is nil, a buffered drop channel is created (output is silently discarded).

func (*ChannelSink) OnOutput

func (s *ChannelSink) OnOutput(output StreamOutput)

type CommandHandler

type CommandHandler func(
	ctx *types.PluginContext,
	opts SessionOptions,
) (stdout io.Reader, stderr io.Reader, err error)

CommandHandler is the expected signature for a non-TTY handler. It should immediately return its standard output and error readers.

type ErrorCode

type ErrorCode int

ErrorCode classifies exec errors for programmatic handling.

const (
	ErrCodeSessionNotFound ErrorCode = iota + 1
	ErrCodeHandlerNotFound
	ErrCodeSessionClosed
	ErrCodeTerminalError
	ErrCodeSessionExists
)

func (ErrorCode) String

func (c ErrorCode) String() string

type ExecError

type ExecError struct {
	Err           error
	Title         string
	Message       string
	Suggestion    string
	Retryable     bool
	RetryCommands []string
}

ExecError is a structured error type that plugins use to provide classified exec errors with user-facing information.

func (*ExecError) Error

func (e *ExecError) Error() string

func (*ExecError) Unwrap

func (e *ExecError) Unwrap() error

type ExecSessionError

type ExecSessionError struct {
	Code      ErrorCode
	SessionID string
	Message   string
	Err       error // optional wrapped error
}

ExecSessionError is a structured error for exec operations.

func NewHandlerNotFoundError

func NewHandlerNotFoundError(handlerKey string, available []string) *ExecSessionError

func NewSessionClosedError

func NewSessionClosedError(sessionID string) *ExecSessionError

func NewSessionExistsError

func NewSessionExistsError(sessionID string) *ExecSessionError

func NewSessionNotFoundError

func NewSessionNotFoundError(sessionID string) *ExecSessionError

func NewTerminalError

func NewTerminalError(sessionID string, err error) *ExecSessionError

func (*ExecSessionError) Error

func (e *ExecSessionError) Error() string

func (*ExecSessionError) Is

func (e *ExecSessionError) Is(target error) bool

Is reports whether target matches this error's code.

func (*ExecSessionError) Unwrap

func (e *ExecSessionError) Unwrap() error

type Handler

type Handler struct {
	Plugin         string              `json:"plugin"`
	Resource       string              `json:"resource"`
	TargetBuilder  ActionTargetBuilder `json:"target_builder"`
	DefaultCommand []string            `json:"default_command"`
	TTYHandler     TTYHandlerFunc      `json:"-"`
	CommandHandler CommandHandler      `json:"-"`
	// HandlesResize: if true, resize events are sent through the channel
	// instead of via terminal.Resize().
	HandlesResize bool `json:"-"`
}

Handler handles running commands and creating sessions for a resource.

func HandlerFromProto

func HandlerFromProto(p *execpb.ExecHandler) Handler

func (Handler) ID

func (h Handler) ID() string

func (Handler) String

func (h Handler) String() string

func (Handler) ToProto

func (h Handler) ToProto() *execpb.ExecHandler

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages the lifecycle of terminal sessions within a plugin process.

func NewManager

func NewManager(cfg ManagerConfig) *Manager

NewManager creates a new Manager with the given config.

func (*Manager) AttachSession

func (m *Manager) AttachSession(
	pluginctx *types.PluginContext,
	sessionID string,
) (*Session, []byte, error)

AttachSession marks a session as attached and returns its current output buffer.

func (*Manager) Close

func (m *Manager) Close()

Close cancels all active sessions and waits for their goroutines to finish.

func (*Manager) CloseSession

func (m *Manager) CloseSession(pluginctx *types.PluginContext, sessionID string) error

CloseSession cancels the session's context, triggering cleanup.

func (*Manager) CreateSession

func (m *Manager) CreateSession(
	pluginctx *types.PluginContext,
	opts SessionOptions,
) (*Session, error)

CreateSession creates a new terminal session with a given command.

func (*Manager) DetachSession

func (m *Manager) DetachSession(pluginctx *types.PluginContext, sessionID string) (*Session, error)

DetachSession marks a session as not attached, stopping output broadcast.

func (*Manager) GetSession

func (m *Manager) GetSession(_ *types.PluginContext, sessionID string) (*Session, error)

GetSession returns a session by ID.

func (*Manager) GetSupportedResources

func (m *Manager) GetSupportedResources(_ *types.PluginContext) []Handler

func (*Manager) ListSessions

func (m *Manager) ListSessions(_ *types.PluginContext) ([]*Session, error)

ListSessions returns a list of details for all active sessions.

func (*Manager) ResizeSession

func (m *Manager) ResizeSession(
	_ *types.PluginContext,
	sessionID string,
	rows, cols int32,
) error

ResizeSession resizes a session.

func (*Manager) Stream

func (m *Manager) Stream(ctx context.Context, in chan StreamInput) (chan StreamOutput, error)

Stream creates a new stream to multiplex sessions.

type ManagerConfig

type ManagerConfig struct {
	Logger          logging.Logger
	Settings        settings.Provider
	Handlers        map[string]Handler
	Sink            OutputSink      // nil → ChannelSink created by Stream()
	TerminalFactory TerminalFactory // nil → NewRealTerminalFactory()
	Clock           timeutil.Clock  // nil → timeutil.RealClock
}

ManagerConfig configures the Manager.

type OutputBuffer

type OutputBuffer struct {
	// contains filtered or unexported fields
}

OutputBuffer stores terminal output bytes, providing a fixed-size cyclic buffer.

func NewDefaultOutputBuffer

func NewDefaultOutputBuffer() *OutputBuffer

NewDefaultOutputBuffer creates an OutputBuffer with DefaultOutputBufferSize.

func NewOutputBuffer

func NewOutputBuffer(capacity int) *OutputBuffer

NewOutputBuffer initializes an OutputBuffer with a specified capacity. If capacity is <= 0, a minimum capacity of 0 is used (all appends are discarded).

func (*OutputBuffer) Append

func (b *OutputBuffer) Append(data []byte)

Append adds data to the buffer, evicting the oldest bytes when capacity is exceeded. The backing array never grows beyond capacity to avoid retaining oversized memory.

func (*OutputBuffer) GetAll

func (b *OutputBuffer) GetAll() []byte

GetAll retrieves a copy of all stored bytes in the buffer.

func (*OutputBuffer) Len

func (b *OutputBuffer) Len() int

Len returns the current number of bytes in the buffer.

type OutputSink

type OutputSink interface {
	OnOutput(output StreamOutput)
}

OutputSink receives stream output from the Manager. Tests inject RecordingOutput; production uses ChannelSink.

type Plugin

type Plugin struct {
	plugin.Plugin
	Impl Provider
}

func (*Plugin) GRPCClient

func (p *Plugin) GRPCClient(
	_ context.Context,
	_ *plugin.GRPCBroker,
	c *grpc.ClientConn,
) (interface{}, error)

func (*Plugin) GRPCServer

func (p *Plugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error

type PluginClient

type PluginClient struct {
	// contains filtered or unexported fields
}

func (*PluginClient) AttachSession

func (c *PluginClient) AttachSession(
	ctx *types.PluginContext,
	sessionID string,
) (*Session, []byte, error)

func (*PluginClient) Close

func (c *PluginClient) Close()

Close is a no-op on the client side — the server manages session lifecycle.

func (*PluginClient) CloseSession

func (c *PluginClient) CloseSession(ctx *types.PluginContext, sessionID string) error

func (*PluginClient) CreateSession

func (c *PluginClient) CreateSession(
	ctx *types.PluginContext,
	opts SessionOptions,
) (*Session, error)

func (*PluginClient) DetachSession

func (c *PluginClient) DetachSession(
	ctx *types.PluginContext,
	sessionID string,
) (*Session, error)

func (*PluginClient) GetSession

func (c *PluginClient) GetSession(
	ctx *types.PluginContext,
	sessionID string,
) (*Session, error)

func (*PluginClient) GetSupportedResources

func (c *PluginClient) GetSupportedResources(ctx *types.PluginContext) []Handler

func (*PluginClient) ListSessions

func (c *PluginClient) ListSessions(ctx *types.PluginContext) ([]*Session, error)

func (*PluginClient) ResizeSession

func (c *PluginClient) ResizeSession(
	ctx *types.PluginContext,
	sessionID string,
	rows, cols int32,
) error

func (*PluginClient) Stream

func (c *PluginClient) Stream(
	ctx context.Context,
	in chan StreamInput,
) (chan StreamOutput, error)

type PluginOpts

type PluginOpts struct {
	Handlers []Handler `json:"handlers"`
}

PluginOpts contains the options for the exec plugin.

type PluginServer

type PluginServer struct {
	execpb.UnimplementedExecPluginServer

	Impl Provider
	// contains filtered or unexported fields
}

func (*PluginServer) AttachSession

func (*PluginServer) CloseSession

func (*PluginServer) CreateSession

func (*PluginServer) DetachSession

func (*PluginServer) GetSession

func (*PluginServer) GetSupportedResources

func (s *PluginServer) GetSupportedResources(
	ctx context.Context,
	_ *emptypb.Empty,
) (*execpb.GetSupportedResourcesResponse, error)

func (*PluginServer) ListSessions

func (s *PluginServer) ListSessions(
	ctx context.Context,
	in *emptypb.Empty,
) (*execpb.ListSessionsResponse, error)

func (*PluginServer) ResizeSession

func (*PluginServer) Stream

type Provider

type Provider interface {
	// GetSupportedResources returns the supported resource types
	GetSupportedResources(ctx *types.PluginContext) []Handler
	// GetSession returns a session by ID
	GetSession(ctx *types.PluginContext, sessionID string) (*Session, error)
	// ListSessions returns all of the sessions
	ListSessions(ctx *types.PluginContext) ([]*Session, error)
	// CreateSession creates a new session
	CreateSession(ctx *types.PluginContext, opts SessionOptions) (*Session, error)
	// AttachSession attaches a session
	AttachSession(ctx *types.PluginContext, sessionID string) (*Session, []byte, error)
	// DetachSession detaches a session
	DetachSession(ctx *types.PluginContext, sessionID string) (*Session, error)
	// CloseSession closes a session
	CloseSession(ctx *types.PluginContext, sessionID string) error
	// ResizeSession resizes a session
	ResizeSession(ctx *types.PluginContext, sessionID string, rows, cols int32) error
	// Stream starts a new stream to multiplex sessions
	Stream(context.Context, chan StreamInput) (chan StreamOutput, error)
	// Close shuts down the provider, releasing all resources.
	Close()
}

Provider is the interface satisfied by the plugin server and client to provide the exec functionality.

type Session

type Session struct {
	CreatedAt time.Time         `json:"created_at"`
	Labels    map[string]string `json:"labels"`
	Params    map[string]string `json:"params"`
	ID        string            `json:"id"`
	Command   []string          `json:"command"`
	Attached  bool              `json:"attached"`
}

Session is a snapshot value type — safe for concurrent reads, returned by the public API. It carries no mutable internal state.

func NewSessionFromProto

func NewSessionFromProto(s *execpb.Session) *Session

func (*Session) ToProto

func (s *Session) ToProto() *execpb.Session

type SessionHandler

type SessionHandler func(
	ctx *types.PluginContext,
	opts SessionOptions,
) (stdin io.Writer, stdout io.Reader, stderr io.Reader, err error)

SessionHandler is the expected signature for a function that creates a new session, returning the standard input, output, and error streams which will be multiplexed to the client.

type SessionOptions

type SessionOptions struct {
	Params         map[string]string      `json:"params"`
	Labels         map[string]string      `json:"labels"`
	ID             string                 `json:"id"`
	ResourcePlugin string                 `json:"resource_plugin"`
	ResourceKey    string                 `json:"resource_key"`
	ResourceData   map[string]interface{} `json:"resource_data"`
	Command        []string               `json:"command"`
	TTY            bool                   `json:"tty"`
}

SessionOptions contains options for creating a new terminal session.

func NewSessionOptionsFromProto

func NewSessionOptionsFromProto(opts *execpb.SessionOptions) *SessionOptions

func (*SessionOptions) ToProto

func (o *SessionOptions) ToProto() (*execpb.SessionOptions, error)

type SessionResizeInput

type SessionResizeInput struct {
	Rows int32 `json:"rows"`
	Cols int32 `json:"cols"`
}

SessionResizeInput carries resize dimensions for a session.

type StreamError

type StreamError struct {
	Title         string   `json:"title"`
	Message       string   `json:"message"`
	Suggestion    string   `json:"suggestion"`
	Retryable     bool     `json:"retryable"`
	RetryCommands []string `json:"retry_commands,omitempty"`
}

StreamError contains structured error information from a failed exec session.

func NewStreamErrorFromProto

func NewStreamErrorFromProto(p *execpb.StreamError) *StreamError

func (*StreamError) ToProto

func (e *StreamError) ToProto() *execpb.StreamError

type StreamInput

type StreamInput struct {
	// SessionID
	SessionID string `json:"session_id"`
	// Data
	Data []byte `json:"data"`
}

func NewStreamInputFromProto

func NewStreamInputFromProto(p *execpb.StreamInput) StreamInput

func (*StreamInput) ToProto

func (i *StreamInput) ToProto() *execpb.StreamInput

type StreamOutput

type StreamOutput struct {
	SessionID string       `json:"session_id"`
	Data      []byte       `json:"data"`
	Target    StreamTarget `json:"target"`
	Signal    StreamSignal `json:"signal"`
	Error     *StreamError `json:"error,omitempty"`
}

func NewStreamOutputFromProto

func NewStreamOutputFromProto(p *execpb.StreamOutput) StreamOutput

func (*StreamOutput) ToProto

func (o *StreamOutput) ToProto() *execpb.StreamOutput

type StreamResize

type StreamResize struct {
	SessionID string `json:"session_id"`
	Cols      uint16 `json:"cols"`
	Rows      uint16 `json:"rows"`
}

func NewStreamResizeFromProto

func NewStreamResizeFromProto(p *execpb.ResizeSessionRequest) (StreamResize, error)

func (*StreamResize) ToProto

type StreamSignal

type StreamSignal int
const (
	// while close isn't a signal, it's used to close the stream.
	StreamSignalNone StreamSignal = iota
	StreamSignalClose
	StreamSignalSigint
	StreamSignalSigquit
	StreamSignalSigterm
	StreamSignalSigkill
	StreamSignalSighup
	StreamSignalSigusr1
	StreamSignalSigusr2
	StreamSignalSigwinch
	StreamSignalError
)

func NewStreamSignalFromProto

func NewStreamSignalFromProto(p execpb.StreamSignal) StreamSignal

func (StreamSignal) String

func (s StreamSignal) String() string

func (StreamSignal) ToProto

func (s StreamSignal) ToProto() execpb.StreamSignal

ToProto converts a StreamSignal value to its proto representation.

type StreamTarget

type StreamTarget int
const (
	StreamTargetStdOut StreamTarget = iota
	StreamTargetStdErr
)

func (StreamTarget) String

func (t StreamTarget) String() string

type TTYHandlerFunc

type TTYHandlerFunc func(
	ctx *types.PluginContext,
	opts SessionOptions,
	tty *os.File,
	stopCh chan error,
	resize <-chan SessionResizeInput,
) error

TTYHandlerFunc is the expected signature for a function that creates a new session with a TTY. It is passed the TTY file descriptor, a stop channel for signalling errors, and a receive-only resize channel.

Renamed from TTYHandler to avoid confusion with the Handler struct.

type Terminal

type Terminal interface {
	// MasterFd returns the PTY (master) side — the manager reads output from here.
	MasterFd() *os.File
	// SlaveFd returns the TTY (slave) side — passed to the handler.
	SlaveFd() *os.File
	// Resize sets the terminal dimensions.
	Resize(rows, cols uint16) error
	// Close releases both file descriptors.
	Close() error
}

Terminal abstracts a PTY/TTY pair so tests can inject fakes.

type TerminalFactory

type TerminalFactory func() (Terminal, error)

TerminalFactory creates a new Terminal. The default factory opens a real PTY via creack/pty. Tests inject a factory that returns FakeTerminal.

func NewRealTerminalFactory

func NewRealTerminalFactory() TerminalFactory

NewRealTerminalFactory returns a TerminalFactory that creates real PTY terminals. The slave is set to raw mode and the terminal is sized to the given initial dimensions.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL