Documentation
¶
Index ¶
- Constants
- Variables
- func RegisterPlugin(p *sdk.Plugin, opts PluginOpts) error
- type ActionTargetBuilder
- type ChannelSink
- type CommandHandler
- type ErrorCode
- type ExecError
- type ExecSessionError
- func NewHandlerNotFoundError(handlerKey string, available []string) *ExecSessionError
- func NewSessionClosedError(sessionID string) *ExecSessionError
- func NewSessionExistsError(sessionID string) *ExecSessionError
- func NewSessionNotFoundError(sessionID string) *ExecSessionError
- func NewTerminalError(sessionID string, err error) *ExecSessionError
- type Handler
- type Manager
- func (m *Manager) AttachSession(pluginctx *types.PluginContext, sessionID string) (*Session, []byte, error)
- func (m *Manager) Close()
- func (m *Manager) CloseSession(pluginctx *types.PluginContext, sessionID string) error
- func (m *Manager) CreateSession(pluginctx *types.PluginContext, opts SessionOptions) (*Session, error)
- func (m *Manager) DetachSession(pluginctx *types.PluginContext, sessionID string) (*Session, error)
- func (m *Manager) GetSession(_ *types.PluginContext, sessionID string) (*Session, error)
- func (m *Manager) GetSupportedResources(_ *types.PluginContext) []Handler
- func (m *Manager) ListSessions(_ *types.PluginContext) ([]*Session, error)
- func (m *Manager) ResizeSession(_ *types.PluginContext, sessionID string, rows, cols int32) error
- func (m *Manager) Stream(ctx context.Context, in chan StreamInput) (chan StreamOutput, error)
- type ManagerConfig
- type OutputBuffer
- type OutputSink
- type Plugin
- type PluginClient
- func (c *PluginClient) AttachSession(ctx *types.PluginContext, sessionID string) (*Session, []byte, error)
- func (c *PluginClient) Close()
- func (c *PluginClient) CloseSession(ctx *types.PluginContext, sessionID string) error
- func (c *PluginClient) CreateSession(ctx *types.PluginContext, opts SessionOptions) (*Session, error)
- func (c *PluginClient) DetachSession(ctx *types.PluginContext, sessionID string) (*Session, error)
- func (c *PluginClient) GetSession(ctx *types.PluginContext, sessionID string) (*Session, error)
- func (c *PluginClient) GetSupportedResources(ctx *types.PluginContext) []Handler
- func (c *PluginClient) ListSessions(ctx *types.PluginContext) ([]*Session, error)
- func (c *PluginClient) ResizeSession(ctx *types.PluginContext, sessionID string, rows, cols int32) error
- func (c *PluginClient) Stream(ctx context.Context, in chan StreamInput) (chan StreamOutput, error)
- type PluginOpts
- type PluginServer
- func (s *PluginServer) AttachSession(ctx context.Context, in *execpb.AttachSessionRequest) (*execpb.AttachSessionResponse, error)
- func (s *PluginServer) CloseSession(ctx context.Context, in *execpb.CloseSessionRequest) (*execpb.CloseSessionResponse, error)
- func (s *PluginServer) CreateSession(ctx context.Context, in *execpb.SessionOptions) (*execpb.CreateSessionResponse, error)
- func (s *PluginServer) DetachSession(ctx context.Context, in *execpb.AttachSessionRequest) (*execpb.AttachSessionResponse, error)
- func (s *PluginServer) GetSession(ctx context.Context, in *execpb.GetSessionRequest) (*execpb.GetSessionResponse, error)
- func (s *PluginServer) GetSupportedResources(ctx context.Context, _ *emptypb.Empty) (*execpb.GetSupportedResourcesResponse, error)
- func (s *PluginServer) ListSessions(ctx context.Context, in *emptypb.Empty) (*execpb.ListSessionsResponse, error)
- func (s *PluginServer) ResizeSession(ctx context.Context, in *execpb.ResizeSessionRequest) (*execpb.ResizeSessionResponse, error)
- func (s *PluginServer) Stream(stream execpb.ExecPlugin_StreamServer) error
- type Provider
- type Session
- type SessionHandler
- type SessionOptions
- type SessionResizeInput
- type StreamError
- type StreamInput
- type StreamOutput
- type StreamResize
- type StreamSignal
- type StreamTarget
- type TTYHandlerFunc
- type Terminal
- type TerminalFactory
Constants ¶
const ( DefaultOutputBufferSize = 1000000 DefaultStreamBufferSize = 4096 InitialRows = 27 InitialCols = 72 ResizeTimeout = 500 * time.Millisecond )
Variables ¶
var ( ErrSessionNotFound = &ExecSessionError{Code: ErrCodeSessionNotFound} ErrHandlerNotFound = &ExecSessionError{Code: ErrCodeHandlerNotFound} ErrSessionClosed = &ExecSessionError{Code: ErrCodeSessionClosed} ErrTerminalError = &ExecSessionError{Code: ErrCodeTerminalError} ErrSessionExists = &ExecSessionError{Code: ErrCodeSessionExists} )
Sentinel errors for errors.Is matching.
Functions ¶
func RegisterPlugin ¶
func RegisterPlugin( p *sdk.Plugin, opts PluginOpts, ) error
Types ¶
type ActionTargetBuilder ¶
type ActionTargetBuilder struct {
Label string `json:"label"`
LabelSelector string `json:"label_selector"`
Paths []string `json:"paths"`
Selectors map[string]string `json:"selectors"`
}
ActionTargetBuilder builds a dynamic list of targets for an action.
func ActionTargetBuilderFromProto ¶
func ActionTargetBuilderFromProto(p *commonpb.ActionTargetBuilder) ActionTargetBuilder
ActionTargetBuilderFromProto converts a proto ActionTargetBuilder to the domain type.
func (ActionTargetBuilder) ToProto ¶
func (a ActionTargetBuilder) ToProto() *commonpb.ActionTargetBuilder
ToProto converts ActionTargetBuilder to its proto representation.
type ChannelSink ¶
type ChannelSink struct {
// contains filtered or unexported fields
}
ChannelSink writes StreamOutput to a channel with context-aware sends. This is the production implementation used by Stream(). If the context is cancelled (e.g., gRPC stream closed), sends may be dropped instead of blocking forever on a full channel.
func NewChannelSink ¶
func NewChannelSink(ctx context.Context, out chan<- StreamOutput) *ChannelSink
NewChannelSink creates a ChannelSink wrapping the given output channel. The context controls the lifetime — sends may be dropped after cancellation. If ctx is nil, context.Background() is used. If out is nil, a buffered drop channel is created (output is silently discarded).
func (*ChannelSink) OnOutput ¶
func (s *ChannelSink) OnOutput(output StreamOutput)
type CommandHandler ¶
type CommandHandler func( ctx *types.PluginContext, opts SessionOptions, ) (stdout io.Reader, stderr io.Reader, err error)
CommandHandler is the expected signature for a non-TTY handler. It should immediately return its standard output and error readers.
type ExecError ¶
type ExecError struct {
Err error
Title string
Message string
Suggestion string
Retryable bool
RetryCommands []string
}
ExecError is a structured error type that plugins use to provide classified exec errors with user-facing information.
type ExecSessionError ¶
type ExecSessionError struct {
Code ErrorCode
SessionID string
Message string
Err error // optional wrapped error
}
ExecSessionError is a structured error for exec operations.
func NewHandlerNotFoundError ¶
func NewHandlerNotFoundError(handlerKey string, available []string) *ExecSessionError
func NewSessionClosedError ¶
func NewSessionClosedError(sessionID string) *ExecSessionError
func NewSessionExistsError ¶
func NewSessionExistsError(sessionID string) *ExecSessionError
func NewSessionNotFoundError ¶
func NewSessionNotFoundError(sessionID string) *ExecSessionError
func NewTerminalError ¶
func NewTerminalError(sessionID string, err error) *ExecSessionError
func (*ExecSessionError) Error ¶
func (e *ExecSessionError) Error() string
func (*ExecSessionError) Is ¶
func (e *ExecSessionError) Is(target error) bool
Is reports whether target matches this error's code.
func (*ExecSessionError) Unwrap ¶
func (e *ExecSessionError) Unwrap() error
type Handler ¶
type Handler struct {
Plugin string `json:"plugin"`
Resource string `json:"resource"`
TargetBuilder ActionTargetBuilder `json:"target_builder"`
DefaultCommand []string `json:"default_command"`
TTYHandler TTYHandlerFunc `json:"-"`
CommandHandler CommandHandler `json:"-"`
// HandlesResize: if true, resize events are sent through the channel
// instead of via terminal.Resize().
HandlesResize bool `json:"-"`
}
Handler handles running commands and creating sessions for a resource.
func HandlerFromProto ¶
func HandlerFromProto(p *execpb.ExecHandler) Handler
func (Handler) ToProto ¶
func (h Handler) ToProto() *execpb.ExecHandler
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages the lifecycle of terminal sessions within a plugin process.
func NewManager ¶
func NewManager(cfg ManagerConfig) *Manager
NewManager creates a new Manager with the given config.
func (*Manager) AttachSession ¶
func (m *Manager) AttachSession( pluginctx *types.PluginContext, sessionID string, ) (*Session, []byte, error)
AttachSession marks a session as attached and returns its current output buffer.
func (*Manager) Close ¶
func (m *Manager) Close()
Close cancels all active sessions and waits for their goroutines to finish.
func (*Manager) CloseSession ¶
func (m *Manager) CloseSession(pluginctx *types.PluginContext, sessionID string) error
CloseSession cancels the session's context, triggering cleanup.
func (*Manager) CreateSession ¶
func (m *Manager) CreateSession( pluginctx *types.PluginContext, opts SessionOptions, ) (*Session, error)
CreateSession creates a new terminal session with a given command.
func (*Manager) DetachSession ¶
DetachSession marks a session as not attached, stopping output broadcast.
func (*Manager) GetSession ¶
GetSession returns a session by ID.
func (*Manager) GetSupportedResources ¶
func (m *Manager) GetSupportedResources(_ *types.PluginContext) []Handler
func (*Manager) ListSessions ¶
func (m *Manager) ListSessions(_ *types.PluginContext) ([]*Session, error)
ListSessions returns a list of details for all active sessions.
func (*Manager) ResizeSession ¶
func (m *Manager) ResizeSession( _ *types.PluginContext, sessionID string, rows, cols int32, ) error
ResizeSession resizes a session.
func (*Manager) Stream ¶
func (m *Manager) Stream(ctx context.Context, in chan StreamInput) (chan StreamOutput, error)
Stream creates a new stream to multiplex sessions.
type ManagerConfig ¶
type ManagerConfig struct {
Logger logging.Logger
Settings settings.Provider
Handlers map[string]Handler
Sink OutputSink // nil → ChannelSink created by Stream()
TerminalFactory TerminalFactory // nil → NewRealTerminalFactory()
Clock timeutil.Clock // nil → timeutil.RealClock
}
ManagerConfig configures the Manager.
type OutputBuffer ¶
type OutputBuffer struct {
// contains filtered or unexported fields
}
OutputBuffer stores terminal output bytes, providing a fixed-size cyclic buffer.
func NewDefaultOutputBuffer ¶
func NewDefaultOutputBuffer() *OutputBuffer
NewDefaultOutputBuffer creates an OutputBuffer with DefaultOutputBufferSize.
func NewOutputBuffer ¶
func NewOutputBuffer(capacity int) *OutputBuffer
NewOutputBuffer initializes an OutputBuffer with a specified capacity. If capacity is <= 0, a minimum capacity of 0 is used (all appends are discarded).
func (*OutputBuffer) Append ¶
func (b *OutputBuffer) Append(data []byte)
Append adds data to the buffer, evicting the oldest bytes when capacity is exceeded. The backing array never grows beyond capacity to avoid retaining oversized memory.
func (*OutputBuffer) GetAll ¶
func (b *OutputBuffer) GetAll() []byte
GetAll retrieves a copy of all stored bytes in the buffer.
func (*OutputBuffer) Len ¶
func (b *OutputBuffer) Len() int
Len returns the current number of bytes in the buffer.
type OutputSink ¶
type OutputSink interface {
OnOutput(output StreamOutput)
}
OutputSink receives stream output from the Manager. Tests inject RecordingOutput; production uses ChannelSink.
type Plugin ¶
func (*Plugin) GRPCClient ¶
func (p *Plugin) GRPCClient( _ context.Context, _ *plugin.GRPCBroker, c *grpc.ClientConn, ) (interface{}, error)
func (*Plugin) GRPCServer ¶
type PluginClient ¶
type PluginClient struct {
// contains filtered or unexported fields
}
func (*PluginClient) AttachSession ¶
func (c *PluginClient) AttachSession( ctx *types.PluginContext, sessionID string, ) (*Session, []byte, error)
func (*PluginClient) Close ¶
func (c *PluginClient) Close()
Close is a no-op on the client side — the server manages session lifecycle.
func (*PluginClient) CloseSession ¶
func (c *PluginClient) CloseSession(ctx *types.PluginContext, sessionID string) error
func (*PluginClient) CreateSession ¶
func (c *PluginClient) CreateSession( ctx *types.PluginContext, opts SessionOptions, ) (*Session, error)
func (*PluginClient) DetachSession ¶
func (c *PluginClient) DetachSession( ctx *types.PluginContext, sessionID string, ) (*Session, error)
func (*PluginClient) GetSession ¶
func (c *PluginClient) GetSession( ctx *types.PluginContext, sessionID string, ) (*Session, error)
func (*PluginClient) GetSupportedResources ¶
func (c *PluginClient) GetSupportedResources(ctx *types.PluginContext) []Handler
func (*PluginClient) ListSessions ¶
func (c *PluginClient) ListSessions(ctx *types.PluginContext) ([]*Session, error)
func (*PluginClient) ResizeSession ¶
func (c *PluginClient) ResizeSession( ctx *types.PluginContext, sessionID string, rows, cols int32, ) error
func (*PluginClient) Stream ¶
func (c *PluginClient) Stream( ctx context.Context, in chan StreamInput, ) (chan StreamOutput, error)
type PluginOpts ¶
type PluginOpts struct {
Handlers []Handler `json:"handlers"`
}
PluginOpts contains the options for the exec plugin.
type PluginServer ¶
type PluginServer struct {
execpb.UnimplementedExecPluginServer
Impl Provider
// contains filtered or unexported fields
}
func (*PluginServer) AttachSession ¶
func (s *PluginServer) AttachSession( ctx context.Context, in *execpb.AttachSessionRequest, ) (*execpb.AttachSessionResponse, error)
func (*PluginServer) CloseSession ¶
func (s *PluginServer) CloseSession( ctx context.Context, in *execpb.CloseSessionRequest, ) (*execpb.CloseSessionResponse, error)
func (*PluginServer) CreateSession ¶
func (s *PluginServer) CreateSession( ctx context.Context, in *execpb.SessionOptions, ) (*execpb.CreateSessionResponse, error)
func (*PluginServer) DetachSession ¶
func (s *PluginServer) DetachSession( ctx context.Context, in *execpb.AttachSessionRequest, ) (*execpb.AttachSessionResponse, error)
func (*PluginServer) GetSession ¶
func (s *PluginServer) GetSession( ctx context.Context, in *execpb.GetSessionRequest, ) (*execpb.GetSessionResponse, error)
func (*PluginServer) GetSupportedResources ¶
func (s *PluginServer) GetSupportedResources( ctx context.Context, _ *emptypb.Empty, ) (*execpb.GetSupportedResourcesResponse, error)
func (*PluginServer) ListSessions ¶
func (s *PluginServer) ListSessions( ctx context.Context, in *emptypb.Empty, ) (*execpb.ListSessionsResponse, error)
func (*PluginServer) ResizeSession ¶
func (s *PluginServer) ResizeSession( ctx context.Context, in *execpb.ResizeSessionRequest, ) (*execpb.ResizeSessionResponse, error)
func (*PluginServer) Stream ¶
func (s *PluginServer) Stream(stream execpb.ExecPlugin_StreamServer) error
type Provider ¶
type Provider interface {
// GetSupportedResources returns the supported resource types
GetSupportedResources(ctx *types.PluginContext) []Handler
// GetSession returns a session by ID
GetSession(ctx *types.PluginContext, sessionID string) (*Session, error)
// ListSessions returns all of the sessions
ListSessions(ctx *types.PluginContext) ([]*Session, error)
// CreateSession creates a new session
CreateSession(ctx *types.PluginContext, opts SessionOptions) (*Session, error)
// AttachSession attaches a session
AttachSession(ctx *types.PluginContext, sessionID string) (*Session, []byte, error)
// DetachSession detaches a session
DetachSession(ctx *types.PluginContext, sessionID string) (*Session, error)
// CloseSession closes a session
CloseSession(ctx *types.PluginContext, sessionID string) error
// ResizeSession resizes a session
ResizeSession(ctx *types.PluginContext, sessionID string, rows, cols int32) error
// Stream starts a new stream to multiplex sessions
Stream(context.Context, chan StreamInput) (chan StreamOutput, error)
// Close shuts down the provider, releasing all resources.
Close()
}
Provider is the interface satisfied by the plugin server and client to provide the exec functionality.
type Session ¶
type Session struct {
CreatedAt time.Time `json:"created_at"`
Labels map[string]string `json:"labels"`
Params map[string]string `json:"params"`
ID string `json:"id"`
Command []string `json:"command"`
Attached bool `json:"attached"`
}
Session is a snapshot value type — safe for concurrent reads, returned by the public API. It carries no mutable internal state.
func NewSessionFromProto ¶
type SessionHandler ¶
type SessionHandler func( ctx *types.PluginContext, opts SessionOptions, ) (stdin io.Writer, stdout io.Reader, stderr io.Reader, err error)
SessionHandler is the expected signature for a function that creates a new session, returning the standard input, output, and error streams which will be multiplexed to the client.
type SessionOptions ¶
type SessionOptions struct {
Params map[string]string `json:"params"`
Labels map[string]string `json:"labels"`
ID string `json:"id"`
ResourcePlugin string `json:"resource_plugin"`
ResourceKey string `json:"resource_key"`
ResourceData map[string]interface{} `json:"resource_data"`
Command []string `json:"command"`
TTY bool `json:"tty"`
}
SessionOptions contains options for creating a new terminal session.
func NewSessionOptionsFromProto ¶
func NewSessionOptionsFromProto(opts *execpb.SessionOptions) *SessionOptions
func (*SessionOptions) ToProto ¶
func (o *SessionOptions) ToProto() (*execpb.SessionOptions, error)
type SessionResizeInput ¶
SessionResizeInput carries resize dimensions for a session.
type StreamError ¶
type StreamError struct {
Title string `json:"title"`
Message string `json:"message"`
Suggestion string `json:"suggestion"`
Retryable bool `json:"retryable"`
RetryCommands []string `json:"retry_commands,omitempty"`
}
StreamError contains structured error information from a failed exec session.
func NewStreamErrorFromProto ¶
func NewStreamErrorFromProto(p *execpb.StreamError) *StreamError
func (*StreamError) ToProto ¶
func (e *StreamError) ToProto() *execpb.StreamError
type StreamInput ¶
type StreamInput struct {
// SessionID
SessionID string `json:"session_id"`
// Data
Data []byte `json:"data"`
}
func NewStreamInputFromProto ¶
func NewStreamInputFromProto(p *execpb.StreamInput) StreamInput
func (*StreamInput) ToProto ¶
func (i *StreamInput) ToProto() *execpb.StreamInput
type StreamOutput ¶
type StreamOutput struct {
SessionID string `json:"session_id"`
Data []byte `json:"data"`
Target StreamTarget `json:"target"`
Signal StreamSignal `json:"signal"`
Error *StreamError `json:"error,omitempty"`
}
func NewStreamOutputFromProto ¶
func NewStreamOutputFromProto(p *execpb.StreamOutput) StreamOutput
func (*StreamOutput) ToProto ¶
func (o *StreamOutput) ToProto() *execpb.StreamOutput
type StreamResize ¶
type StreamResize struct {
SessionID string `json:"session_id"`
Cols uint16 `json:"cols"`
Rows uint16 `json:"rows"`
}
func NewStreamResizeFromProto ¶
func NewStreamResizeFromProto(p *execpb.ResizeSessionRequest) (StreamResize, error)
func (*StreamResize) ToProto ¶
func (r *StreamResize) ToProto() *execpb.ResizeSessionRequest
type StreamSignal ¶
type StreamSignal int
const ( // while close isn't a signal, it's used to close the stream. StreamSignalNone StreamSignal = iota StreamSignalClose StreamSignalSigint StreamSignalSigquit StreamSignalSigterm StreamSignalSigkill StreamSignalSighup StreamSignalSigusr1 StreamSignalSigusr2 StreamSignalSigwinch StreamSignalError )
func NewStreamSignalFromProto ¶
func NewStreamSignalFromProto(p execpb.StreamSignal) StreamSignal
func (StreamSignal) String ¶
func (s StreamSignal) String() string
func (StreamSignal) ToProto ¶
func (s StreamSignal) ToProto() execpb.StreamSignal
ToProto converts a StreamSignal value to its proto representation.
type StreamTarget ¶
type StreamTarget int
const ( StreamTargetStdOut StreamTarget = iota StreamTargetStdErr )
func (StreamTarget) String ¶
func (t StreamTarget) String() string
type TTYHandlerFunc ¶
type TTYHandlerFunc func( ctx *types.PluginContext, opts SessionOptions, tty *os.File, stopCh chan error, resize <-chan SessionResizeInput, ) error
TTYHandlerFunc is the expected signature for a function that creates a new session with a TTY. It is passed the TTY file descriptor, a stop channel for signalling errors, and a receive-only resize channel.
Renamed from TTYHandler to avoid confusion with the Handler struct.
type Terminal ¶
type Terminal interface {
// MasterFd returns the PTY (master) side — the manager reads output from here.
MasterFd() *os.File
// SlaveFd returns the TTY (slave) side — passed to the handler.
SlaveFd() *os.File
// Resize sets the terminal dimensions.
Resize(rows, cols uint16) error
// Close releases both file descriptors.
Close() error
}
Terminal abstracts a PTY/TTY pair so tests can inject fakes.
type TerminalFactory ¶
TerminalFactory creates a new Terminal. The default factory opens a real PTY via creack/pty. Tests inject a factory that returns FakeTerminal.
func NewRealTerminalFactory ¶
func NewRealTerminalFactory() TerminalFactory
NewRealTerminalFactory returns a TerminalFactory that creates real PTY terminals. The slave is set to raw mode and the terminal is sized to the given initial dimensions.