Documentation
¶
Index ¶
- Constants
- func RegisterPlugin(p *sdk.Plugin, opts PluginOpts) error
- type ActionTargetBuilder
- type ChannelSink
- type CreateSessionOptions
- type Handler
- type LogHandlerFunc
- type LogLevel
- type LogLine
- type LogLineOrigin
- type LogSession
- type LogSessionOptions
- type LogSessionStatus
- type LogSource
- type LogStreamCommand
- type LogStreamEvent
- type LogStreamEventType
- type LogStreamRequest
- type Manager
- func (m *Manager) Close()
- func (m *Manager) CloseSession(_ *types.PluginContext, sessionID string) error
- func (m *Manager) CreateSession(pluginctx *types.PluginContext, opts CreateSessionOptions) (*LogSession, error)
- func (m *Manager) GetSession(_ *types.PluginContext, sessionID string) (*LogSession, error)
- func (m *Manager) GetSupportedResources(_ *types.PluginContext) []Handler
- func (m *Manager) ListSessions(_ *types.PluginContext) ([]*LogSession, error)
- func (m *Manager) Stream(ctx context.Context, in chan StreamInput) (chan StreamOutput, error)
- func (m *Manager) UpdateSessionOptions(_ *types.PluginContext, sessionID string, opts LogSessionOptions) (*LogSession, error)
- func (m *Manager) Wait()
- type ManagerConfig
- type OutputSink
- type Plugin
- type PluginClient
- func (c *PluginClient) CloseSession(ctx *types.PluginContext, sessionID string) error
- func (c *PluginClient) CreateSession(ctx *types.PluginContext, opts CreateSessionOptions) (*LogSession, error)
- func (c *PluginClient) GetSession(ctx *types.PluginContext, sessionID string) (*LogSession, error)
- func (c *PluginClient) GetSupportedResources(ctx *types.PluginContext) []Handler
- func (c *PluginClient) ListSessions(ctx *types.PluginContext) ([]*LogSession, error)
- func (c *PluginClient) Stream(ctx context.Context, in chan StreamInput) (chan StreamOutput, error)
- func (c *PluginClient) UpdateSessionOptions(ctx *types.PluginContext, sessionID string, opts LogSessionOptions) (*LogSession, error)
- type PluginOpts
- type PluginServer
- func (s *PluginServer) CloseSession(ctx context.Context, in *logspb.LogSessionByIdRequest) (*logspb.CloseLogSessionResponse, error)
- func (s *PluginServer) CreateSession(ctx context.Context, in *logspb.CreateLogSessionRequest) (*logspb.CreateLogSessionResponse, error)
- func (s *PluginServer) GetSession(ctx context.Context, in *logspb.LogSessionByIdRequest) (*logspb.LogSessionByIdResponse, error)
- func (s *PluginServer) GetSupportedResources(ctx context.Context, _ *emptypb.Empty) (*logspb.GetSupportedLogResourcesResponse, error)
- func (s *PluginServer) ListSessions(ctx context.Context, _ *emptypb.Empty) (*logspb.ListLogSessionsResponse, error)
- func (s *PluginServer) Stream(stream logspb.LogPlugin_StreamServer) error
- func (s *PluginServer) UpdateSessionOptions(ctx context.Context, in *logspb.UpdateLogSessionOptionsRequest) (*logspb.LogSessionByIdResponse, error)
- type Provider
- type SourceBuilderFunc
- type SourceEvent
- type SourceEventType
- type SourceResolver
- type SourceResolverOptions
- type SourceResolverResult
- type StreamInput
- type StreamOutput
Constants ¶
const ( MaxConcurrentStreamsPerSession = 20 MaxScannerBuffer = 1024 * 1024 // 1MB max line length ReconnectMaxAttempts = 5 ReconnectInitialDelay = 1 * time.Second ReconnectMaxDelay = 30 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func RegisterPlugin ¶
func RegisterPlugin( p *sdk.Plugin, opts PluginOpts, ) error
RegisterPlugin registers the log capability with the plugin system.
Types ¶
type ActionTargetBuilder ¶
type ActionTargetBuilder struct {
Label string `json:"label"`
LabelSelector string `json:"label_selector"`
Paths []string `json:"paths"`
Selectors map[string]string `json:"selectors"`
}
ActionTargetBuilder builds a dynamic list of targets for an action.
func ActionTargetBuilderFromProto ¶
func ActionTargetBuilderFromProto(p *commonpb.ActionTargetBuilder) ActionTargetBuilder
ActionTargetBuilderFromProto converts a proto ActionTargetBuilder to the domain type.
func (ActionTargetBuilder) ToProto ¶
func (a ActionTargetBuilder) ToProto() *commonpb.ActionTargetBuilder
ToProto converts ActionTargetBuilder to its proto representation.
type ChannelSink ¶
type ChannelSink struct {
// contains filtered or unexported fields
}
ChannelSink writes StreamOutput to a channel with context-aware sends. This is the production implementation used by Stream(). If the context is cancelled (e.g., gRPC stream closed), sends are dropped instead of blocking forever on a full channel.
func NewChannelSink ¶
func NewChannelSink(ctx context.Context, out chan StreamOutput) *ChannelSink
NewChannelSink creates a ChannelSink wrapping the given output channel. The context controls the lifetime — sends are dropped after cancellation.
func (*ChannelSink) OnEvent ¶
func (s *ChannelSink) OnEvent(sessionID string, event LogStreamEvent)
func (*ChannelSink) OnLine ¶
func (s *ChannelSink) OnLine(line LogLine)
type CreateSessionOptions ¶
type CreateSessionOptions struct {
ResourceKey string `json:"resource_key"`
ResourceID string `json:"resource_id"`
ResourceData map[string]interface{} `json:"resource_data"`
Options LogSessionOptions `json:"options"`
}
CreateSessionOptions contains everything needed to create a log session.
type Handler ¶
type Handler struct {
Plugin string `json:"plugin"`
Resource string `json:"resource"`
TargetBuilder ActionTargetBuilder `json:"target_builder"`
Handler LogHandlerFunc `json:"-"`
SourceBuilder SourceBuilderFunc `json:"-"`
}
Handler describes a log handler for a specific resource type.
func HandlerFromProto ¶
func HandlerFromProto(p *logspb.LogHandler) Handler
func (Handler) ToProto ¶
func (h Handler) ToProto() *logspb.LogHandler
type LogHandlerFunc ¶
type LogHandlerFunc func(ctx *types.PluginContext, req LogStreamRequest) (io.ReadCloser, error)
LogHandlerFunc opens a log stream for a single source, returning an io.ReadCloser.
type LogLevel ¶
type LogLevel int
LogLevel represents the severity level of a log line.
func LogLevelFromProto ¶
type LogLine ¶
type LogLine struct {
SessionID string `json:"session_id"`
SourceID string `json:"source_id"`
Labels map[string]string `json:"labels"`
Timestamp time.Time `json:"timestamp"`
Content string `json:"content"`
Origin LogLineOrigin `json:"origin"`
Level LogLevel `json:"level"`
}
LogLine represents a single log line from any source.
func LogLineFromProto ¶
type LogLineOrigin ¶
type LogLineOrigin int
LogLineOrigin indicates the origin of a log line.
const ( LogLineOriginCurrent LogLineOrigin = iota LogLineOriginPrevious LogLineOriginSystem )
func LogLineOriginFromProto ¶
func LogLineOriginFromProto(p logspb.LogLineOrigin) LogLineOrigin
func (LogLineOrigin) ToProto ¶
func (o LogLineOrigin) ToProto() logspb.LogLineOrigin
type LogSession ¶
type LogSession struct {
ID string `json:"id"`
PluginID string `json:"plugin_id"`
ConnectionID string `json:"connection_id"`
ResourceKey string `json:"resource_key"`
ResourceID string `json:"resource_id"`
Options LogSessionOptions `json:"options"`
Status LogSessionStatus `json:"status"`
ActiveSources []LogSource `json:"active_sources"`
CreatedAt time.Time `json:"created_at"`
}
LogSession represents an active log viewing session.
func LogSessionFromProto ¶
func LogSessionFromProto(p *logspb.LogSession) *LogSession
func (*LogSession) ToProto ¶
func (s *LogSession) ToProto() *logspb.LogSession
type LogSessionOptions ¶
type LogSessionOptions struct {
Target string `json:"target"`
Follow bool `json:"follow"`
IncludePrevious bool `json:"include_previous"`
IncludeTimestamps bool `json:"include_timestamps"`
TailLines int64 `json:"tail_lines"`
SinceSeconds int64 `json:"since_seconds"`
SinceTime *time.Time `json:"since_time,omitempty"`
LimitBytes int64 `json:"limit_bytes"`
IncludeSourceEvents bool `json:"include_source_events"`
Params map[string]string `json:"params"`
}
LogSessionOptions contains options for creating or updating a log session.
func LogSessionOptionsFromProto ¶
func LogSessionOptionsFromProto(p *logspb.LogSessionOptions) LogSessionOptions
func (*LogSessionOptions) ToProto ¶
func (o *LogSessionOptions) ToProto() *logspb.LogSessionOptions
type LogSessionStatus ¶
type LogSessionStatus int
LogSessionStatus represents the status of a log session.
const ( LogSessionStatusActive LogSessionStatus = iota LogSessionStatusPaused LogSessionStatusClosed LogSessionStatusError LogSessionStatusConnecting LogSessionStatusInitializing )
func LogSessionStatusFromProto ¶
func LogSessionStatusFromProto(p logspb.LogSessionStatus) LogSessionStatus
func (LogSessionStatus) ToProto ¶
func (s LogSessionStatus) ToProto() logspb.LogSessionStatus
type LogStreamCommand ¶
type LogStreamCommand int
LogStreamCommand represents a command from the client to control the stream.
const ( StreamCommandPause LogStreamCommand = iota StreamCommandResume StreamCommandClose )
func LogStreamCommandFromProto ¶
func LogStreamCommandFromProto(p logspb.LogStreamCommand) LogStreamCommand
func (LogStreamCommand) ToProto ¶
func (c LogStreamCommand) ToProto() logspb.LogStreamCommand
type LogStreamEvent ¶
type LogStreamEvent struct {
Type LogStreamEventType `json:"type"`
SourceID string `json:"source_id"`
Message string `json:"message"`
Timestamp time.Time `json:"timestamp"`
}
LogStreamEvent represents a lifecycle event during streaming.
func LogStreamEventFromProto ¶
func LogStreamEventFromProto(p *logspb.LogStreamEvent) LogStreamEvent
func (*LogStreamEvent) ToProto ¶
func (e *LogStreamEvent) ToProto() *logspb.LogStreamEvent
type LogStreamEventType ¶
type LogStreamEventType int
LogStreamEventType represents the type of log stream event.
const ( StreamEventSourceAdded LogStreamEventType = iota StreamEventSourceRemoved StreamEventStreamError StreamEventReconnecting StreamEventReconnected StreamEventStreamEnded StreamEventSessionReady )
func LogStreamEventTypeFromProto ¶
func LogStreamEventTypeFromProto(p logspb.LogStreamEventType) LogStreamEventType
func (LogStreamEventType) String ¶
func (t LogStreamEventType) String() string
func (LogStreamEventType) ToProto ¶
func (t LogStreamEventType) ToProto() logspb.LogStreamEventType
type LogStreamRequest ¶
type LogStreamRequest struct {
SourceID string
Labels map[string]string
ResourceData map[string]interface{}
Target string
Follow bool
IncludePrevious bool
IncludeTimestamps bool
TailLines int64
SinceSeconds int64
SinceTime *time.Time
LimitBytes int64
Params map[string]string
}
LogStreamRequest is the generic request to open a log stream for one source.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages the lifecycle of log sessions within a plugin process.
func NewManager ¶
func NewManager(cfg ManagerConfig) *Manager
NewManager creates a new Manager with the given config.
func (*Manager) Close ¶
func (m *Manager) Close()
Close cancels all active sessions and waits for their goroutines to finish.
func (*Manager) CloseSession ¶
func (m *Manager) CloseSession(_ *types.PluginContext, sessionID string) error
func (*Manager) CreateSession ¶
func (m *Manager) CreateSession( pluginctx *types.PluginContext, opts CreateSessionOptions, ) (*LogSession, error)
func (*Manager) GetSession ¶
func (m *Manager) GetSession(_ *types.PluginContext, sessionID string) (*LogSession, error)
func (*Manager) GetSupportedResources ¶
func (m *Manager) GetSupportedResources(_ *types.PluginContext) []Handler
func (*Manager) ListSessions ¶
func (m *Manager) ListSessions(_ *types.PluginContext) ([]*LogSession, error)
func (*Manager) Stream ¶
func (m *Manager) Stream(ctx context.Context, in chan StreamInput) (chan StreamOutput, error)
func (*Manager) UpdateSessionOptions ¶
func (m *Manager) UpdateSessionOptions( _ *types.PluginContext, sessionID string, opts LogSessionOptions, ) (*LogSession, error)
type ManagerConfig ¶
type ManagerConfig struct {
Logger logging.Logger
Settings settings.Provider
Handlers map[string]Handler
Resolvers map[string]SourceResolver
Sink OutputSink // optional: nil → ChannelSink created by Stream()
Clock timeutil.Clock // optional: nil → timeutil.RealClock
}
ManagerConfig configures the Manager.
type OutputSink ¶
type OutputSink interface {
OnLine(line LogLine)
OnEvent(sessionID string, event LogStreamEvent)
}
OutputSink receives log lines and stream events from the Manager. Tests inject RecordingOutput; production uses ChannelSink.
type Plugin ¶
Plugin implements the hashicorp go-plugin interfaces for log capability.
func (*Plugin) GRPCClient ¶
func (p *Plugin) GRPCClient( _ context.Context, _ *plugin.GRPCBroker, c *grpc.ClientConn, ) (interface{}, error)
func (*Plugin) GRPCServer ¶
type PluginClient ¶
type PluginClient struct {
// contains filtered or unexported fields
}
func (*PluginClient) CloseSession ¶
func (c *PluginClient) CloseSession(ctx *types.PluginContext, sessionID string) error
func (*PluginClient) CreateSession ¶
func (c *PluginClient) CreateSession( ctx *types.PluginContext, opts CreateSessionOptions, ) (*LogSession, error)
func (*PluginClient) GetSession ¶
func (c *PluginClient) GetSession( ctx *types.PluginContext, sessionID string, ) (*LogSession, error)
func (*PluginClient) GetSupportedResources ¶
func (c *PluginClient) GetSupportedResources(ctx *types.PluginContext) []Handler
func (*PluginClient) ListSessions ¶
func (c *PluginClient) ListSessions(ctx *types.PluginContext) ([]*LogSession, error)
func (*PluginClient) Stream ¶
func (c *PluginClient) Stream( ctx context.Context, in chan StreamInput, ) (chan StreamOutput, error)
func (*PluginClient) UpdateSessionOptions ¶
func (c *PluginClient) UpdateSessionOptions( ctx *types.PluginContext, sessionID string, opts LogSessionOptions, ) (*LogSession, error)
type PluginOpts ¶
type PluginOpts struct {
// Handlers maps resource keys to their log handlers.
// For example: "core::v1::Pod" -> Handler
Handlers map[string]Handler `json:"handlers"`
// SourceResolvers maps resource keys to source resolvers that resolve
// group resources into individual log sources.
// For example: "apps::v1::Deployment" -> SourceResolver
SourceResolvers map[string]SourceResolver `json:"-"`
}
PluginOpts contains the options for the log plugin.
type PluginServer ¶
type PluginServer struct {
Impl Provider
// contains filtered or unexported fields
}
func (*PluginServer) CloseSession ¶
func (s *PluginServer) CloseSession( ctx context.Context, in *logspb.LogSessionByIdRequest, ) (*logspb.CloseLogSessionResponse, error)
func (*PluginServer) CreateSession ¶
func (s *PluginServer) CreateSession( ctx context.Context, in *logspb.CreateLogSessionRequest, ) (*logspb.CreateLogSessionResponse, error)
func (*PluginServer) GetSession ¶
func (s *PluginServer) GetSession( ctx context.Context, in *logspb.LogSessionByIdRequest, ) (*logspb.LogSessionByIdResponse, error)
func (*PluginServer) GetSupportedResources ¶
func (s *PluginServer) GetSupportedResources( ctx context.Context, _ *emptypb.Empty, ) (*logspb.GetSupportedLogResourcesResponse, error)
func (*PluginServer) ListSessions ¶
func (s *PluginServer) ListSessions( ctx context.Context, _ *emptypb.Empty, ) (*logspb.ListLogSessionsResponse, error)
func (*PluginServer) Stream ¶
func (s *PluginServer) Stream(stream logspb.LogPlugin_StreamServer) error
func (*PluginServer) UpdateSessionOptions ¶
func (s *PluginServer) UpdateSessionOptions( ctx context.Context, in *logspb.UpdateLogSessionOptionsRequest, ) (*logspb.LogSessionByIdResponse, error)
type Provider ¶
type Provider interface {
GetSupportedResources(ctx *types.PluginContext) []Handler
CreateSession(ctx *types.PluginContext, opts CreateSessionOptions) (*LogSession, error)
GetSession(ctx *types.PluginContext, sessionID string) (*LogSession, error)
ListSessions(ctx *types.PluginContext) ([]*LogSession, error)
CloseSession(ctx *types.PluginContext, sessionID string) error
UpdateSessionOptions(ctx *types.PluginContext, sessionID string, opts LogSessionOptions) (*LogSession, error)
Stream(ctx context.Context, in chan StreamInput) (chan StreamOutput, error)
}
Provider is the interface satisfied by the plugin server and client to provide log viewing functionality.
type SourceBuilderFunc ¶
type SourceBuilderFunc func( resourceID string, resourceData map[string]interface{}, opts LogSessionOptions, ) []LogSource
SourceBuilderFunc builds LogSources from resource data for direct handler streaming. Plugins implement this to translate their resource data into properly-labeled sources that the LogHandlerFunc can consume. For example, a K8s plugin extracts pod name, namespace, and container names from the pod spec.
type SourceEvent ¶
type SourceEvent struct {
Type SourceEventType
Source LogSource
}
SourceEvent represents a source lifecycle change.
type SourceEventType ¶
type SourceEventType int
SourceEventType represents the type of source lifecycle event.
const ( SourceAdded SourceEventType = iota SourceRemoved )
type SourceResolver ¶
type SourceResolver func( ctx *types.PluginContext, resourceData map[string]interface{}, opts SourceResolverOptions, ) (*SourceResolverResult, error)
SourceResolver resolves a "group" resource into individual log sources.
type SourceResolverOptions ¶
type SourceResolverOptions struct {
Watch bool // if true, also return a channel for source changes
Target string // optional filter (e.g., specific container name)
Params map[string]string // plugin-specific resolver params
}
SourceResolverOptions configures how sources are resolved.
type SourceResolverResult ¶
type SourceResolverResult struct {
Sources []LogSource // current sources
Events <-chan SourceEvent // source lifecycle stream (nil if Watch=false)
}
SourceResolverResult contains the resolved sources and optional event channel.
type StreamInput ¶
type StreamInput struct {
SessionID string `json:"session_id"`
Command LogStreamCommand `json:"command"`
}
StreamInput is a control message from the client to the stream.
func StreamInputFromProto ¶
func StreamInputFromProto(p *logspb.LogStreamInput) StreamInput
func (*StreamInput) ToProto ¶
func (i *StreamInput) ToProto() *logspb.LogStreamInput
type StreamOutput ¶
type StreamOutput struct {
SessionID string
Line *LogLine
Event *LogStreamEvent
}
StreamOutput is a log line or event from the stream.