logs

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: AGPL-3.0 Imports: 28 Imported by: 2

Documentation

Index

Constants

View Source
const (
	MaxConcurrentStreamsPerSession = 20
	MaxScannerBuffer               = 1024 * 1024 // 1MB max line length

	ReconnectMaxAttempts  = 5
	ReconnectInitialDelay = 1 * time.Second
	ReconnectMaxDelay     = 30 * time.Second
)

Variables

This section is empty.

Functions

func RegisterPlugin

func RegisterPlugin(
	p *sdk.Plugin,
	opts PluginOpts,
) error

RegisterPlugin registers the log capability with the plugin system.

Types

type ActionTargetBuilder

type ActionTargetBuilder struct {
	Label         string            `json:"label"`
	LabelSelector string            `json:"label_selector"`
	Paths         []string          `json:"paths"`
	Selectors     map[string]string `json:"selectors"`
}

ActionTargetBuilder builds a dynamic list of targets for an action.

func ActionTargetBuilderFromProto

func ActionTargetBuilderFromProto(p *commonpb.ActionTargetBuilder) ActionTargetBuilder

ActionTargetBuilderFromProto converts a proto ActionTargetBuilder to the domain type.

func (ActionTargetBuilder) ToProto

ToProto converts ActionTargetBuilder to its proto representation.

type ChannelSink

type ChannelSink struct {
	// contains filtered or unexported fields
}

ChannelSink writes StreamOutput to a channel with context-aware sends. This is the production implementation used by Stream(). If the context is cancelled (e.g., gRPC stream closed), sends are dropped instead of blocking forever on a full channel.

func NewChannelSink

func NewChannelSink(ctx context.Context, out chan StreamOutput) *ChannelSink

NewChannelSink creates a ChannelSink wrapping the given output channel. The context controls the lifetime — sends are dropped after cancellation.

func (*ChannelSink) OnEvent

func (s *ChannelSink) OnEvent(sessionID string, event LogStreamEvent)

func (*ChannelSink) OnLine

func (s *ChannelSink) OnLine(line LogLine)

type CreateSessionOptions

type CreateSessionOptions struct {
	ResourceKey  string                 `json:"resource_key"`
	ResourceID   string                 `json:"resource_id"`
	ResourceData map[string]interface{} `json:"resource_data"`
	Options      LogSessionOptions      `json:"options"`
}

CreateSessionOptions contains everything needed to create a log session.

type Handler

type Handler struct {
	Plugin        string              `json:"plugin"`
	Resource      string              `json:"resource"`
	TargetBuilder ActionTargetBuilder `json:"target_builder"`
	Handler       LogHandlerFunc      `json:"-"`
	SourceBuilder SourceBuilderFunc   `json:"-"`
}

Handler describes a log handler for a specific resource type.

func HandlerFromProto

func HandlerFromProto(p *logspb.LogHandler) Handler

func (Handler) ID

func (h Handler) ID() string

func (Handler) ToProto

func (h Handler) ToProto() *logspb.LogHandler

type LogHandlerFunc

type LogHandlerFunc func(ctx *types.PluginContext, req LogStreamRequest) (io.ReadCloser, error)

LogHandlerFunc opens a log stream for a single source, returning an io.ReadCloser.

type LogLevel

type LogLevel int

LogLevel represents the severity level of a log line.

const (
	LogLevelUnspecified LogLevel = iota
	LogLevelTrace
	LogLevelDebug
	LogLevelInfo
	LogLevelWarn
	LogLevelError
	LogLevelFatal
)

func LogLevelFromProto

func LogLevelFromProto(p logspb.LogLevel) LogLevel

func (LogLevel) ToProto

func (l LogLevel) ToProto() logspb.LogLevel

type LogLine

type LogLine struct {
	SessionID string            `json:"session_id"`
	SourceID  string            `json:"source_id"`
	Labels    map[string]string `json:"labels"`
	Timestamp time.Time         `json:"timestamp"`
	Content   string            `json:"content"`
	Origin    LogLineOrigin     `json:"origin"`
	Level     LogLevel          `json:"level"`
}

LogLine represents a single log line from any source.

func LogLineFromProto

func LogLineFromProto(p *logspb.LogLine) LogLine

func (*LogLine) ToProto

func (l *LogLine) ToProto() *logspb.LogLine

type LogLineOrigin

type LogLineOrigin int

LogLineOrigin indicates the origin of a log line.

const (
	LogLineOriginCurrent LogLineOrigin = iota
	LogLineOriginPrevious
	LogLineOriginSystem
)

func LogLineOriginFromProto

func LogLineOriginFromProto(p logspb.LogLineOrigin) LogLineOrigin

func (LogLineOrigin) ToProto

func (o LogLineOrigin) ToProto() logspb.LogLineOrigin

type LogSession

type LogSession struct {
	ID            string            `json:"id"`
	PluginID      string            `json:"plugin_id"`
	ConnectionID  string            `json:"connection_id"`
	ResourceKey   string            `json:"resource_key"`
	ResourceID    string            `json:"resource_id"`
	Options       LogSessionOptions `json:"options"`
	Status        LogSessionStatus  `json:"status"`
	ActiveSources []LogSource       `json:"active_sources"`
	CreatedAt     time.Time         `json:"created_at"`
}

LogSession represents an active log viewing session.

func LogSessionFromProto

func LogSessionFromProto(p *logspb.LogSession) *LogSession

func (*LogSession) ToProto

func (s *LogSession) ToProto() *logspb.LogSession

type LogSessionOptions

type LogSessionOptions struct {
	Target              string            `json:"target"`
	Follow              bool              `json:"follow"`
	IncludePrevious     bool              `json:"include_previous"`
	IncludeTimestamps   bool              `json:"include_timestamps"`
	TailLines           int64             `json:"tail_lines"`
	SinceSeconds        int64             `json:"since_seconds"`
	SinceTime           *time.Time        `json:"since_time,omitempty"`
	LimitBytes          int64             `json:"limit_bytes"`
	IncludeSourceEvents bool              `json:"include_source_events"`
	Params              map[string]string `json:"params"`
}

LogSessionOptions contains options for creating or updating a log session.

func LogSessionOptionsFromProto

func LogSessionOptionsFromProto(p *logspb.LogSessionOptions) LogSessionOptions

func (*LogSessionOptions) ToProto

type LogSessionStatus

type LogSessionStatus int

LogSessionStatus represents the status of a log session.

const (
	LogSessionStatusActive LogSessionStatus = iota
	LogSessionStatusPaused
	LogSessionStatusClosed
	LogSessionStatusError
	LogSessionStatusConnecting
	LogSessionStatusInitializing
)

func LogSessionStatusFromProto

func LogSessionStatusFromProto(p logspb.LogSessionStatus) LogSessionStatus

func (LogSessionStatus) ToProto

type LogSource

type LogSource struct {
	ID     string            `json:"id"`
	Labels map[string]string `json:"labels"`
}

LogSource is a generic log-producing entity.

func LogSourceFromProto

func LogSourceFromProto(p *logspb.LogSource) LogSource

func (*LogSource) ToProto

func (s *LogSource) ToProto() *logspb.LogSource

type LogStreamCommand

type LogStreamCommand int

LogStreamCommand represents a command from the client to control the stream.

const (
	StreamCommandPause LogStreamCommand = iota
	StreamCommandResume
	StreamCommandClose
)

func LogStreamCommandFromProto

func LogStreamCommandFromProto(p logspb.LogStreamCommand) LogStreamCommand

func (LogStreamCommand) ToProto

type LogStreamEvent

type LogStreamEvent struct {
	Type      LogStreamEventType `json:"type"`
	SourceID  string             `json:"source_id"`
	Message   string             `json:"message"`
	Timestamp time.Time          `json:"timestamp"`
}

LogStreamEvent represents a lifecycle event during streaming.

func LogStreamEventFromProto

func LogStreamEventFromProto(p *logspb.LogStreamEvent) LogStreamEvent

func (*LogStreamEvent) ToProto

func (e *LogStreamEvent) ToProto() *logspb.LogStreamEvent

type LogStreamEventType

type LogStreamEventType int

LogStreamEventType represents the type of log stream event.

const (
	StreamEventSourceAdded LogStreamEventType = iota
	StreamEventSourceRemoved
	StreamEventStreamError
	StreamEventReconnecting
	StreamEventReconnected
	StreamEventStreamEnded
	StreamEventSessionReady
)

func (LogStreamEventType) String

func (t LogStreamEventType) String() string

func (LogStreamEventType) ToProto

type LogStreamRequest

type LogStreamRequest struct {
	SourceID          string
	Labels            map[string]string
	ResourceData      map[string]interface{}
	Target            string
	Follow            bool
	IncludePrevious   bool
	IncludeTimestamps bool
	TailLines         int64
	SinceSeconds      int64
	SinceTime         *time.Time
	LimitBytes        int64
	Params            map[string]string
}

LogStreamRequest is the generic request to open a log stream for one source.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages the lifecycle of log sessions within a plugin process.

func NewManager

func NewManager(cfg ManagerConfig) *Manager

NewManager creates a new Manager with the given config.

func (*Manager) Close

func (m *Manager) Close()

Close cancels all active sessions and waits for their goroutines to finish.

func (*Manager) CloseSession

func (m *Manager) CloseSession(_ *types.PluginContext, sessionID string) error

func (*Manager) CreateSession

func (m *Manager) CreateSession(
	pluginctx *types.PluginContext,
	opts CreateSessionOptions,
) (*LogSession, error)

func (*Manager) GetSession

func (m *Manager) GetSession(_ *types.PluginContext, sessionID string) (*LogSession, error)

func (*Manager) GetSupportedResources

func (m *Manager) GetSupportedResources(_ *types.PluginContext) []Handler

func (*Manager) ListSessions

func (m *Manager) ListSessions(_ *types.PluginContext) ([]*LogSession, error)

func (*Manager) Stream

func (m *Manager) Stream(ctx context.Context, in chan StreamInput) (chan StreamOutput, error)

func (*Manager) UpdateSessionOptions

func (m *Manager) UpdateSessionOptions(
	_ *types.PluginContext,
	sessionID string,
	opts LogSessionOptions,
) (*LogSession, error)

func (*Manager) Wait

func (m *Manager) Wait()

Wait blocks until all session goroutines finish.

type ManagerConfig

type ManagerConfig struct {
	Logger    logging.Logger
	Settings  settings.Provider
	Handlers  map[string]Handler
	Resolvers map[string]SourceResolver
	Sink      OutputSink     // optional: nil → ChannelSink created by Stream()
	Clock     timeutil.Clock // optional: nil → timeutil.RealClock
}

ManagerConfig configures the Manager.

type OutputSink

type OutputSink interface {
	OnLine(line LogLine)
	OnEvent(sessionID string, event LogStreamEvent)
}

OutputSink receives log lines and stream events from the Manager. Tests inject RecordingOutput; production uses ChannelSink.

type Plugin

type Plugin struct {
	plugin.Plugin
	Impl Provider
}

Plugin implements the hashicorp go-plugin interfaces for log capability.

func (*Plugin) GRPCClient

func (p *Plugin) GRPCClient(
	_ context.Context,
	_ *plugin.GRPCBroker,
	c *grpc.ClientConn,
) (interface{}, error)

func (*Plugin) GRPCServer

func (p *Plugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error

type PluginClient

type PluginClient struct {
	// contains filtered or unexported fields
}

func (*PluginClient) CloseSession

func (c *PluginClient) CloseSession(ctx *types.PluginContext, sessionID string) error

func (*PluginClient) CreateSession

func (c *PluginClient) CreateSession(
	ctx *types.PluginContext,
	opts CreateSessionOptions,
) (*LogSession, error)

func (*PluginClient) GetSession

func (c *PluginClient) GetSession(
	ctx *types.PluginContext,
	sessionID string,
) (*LogSession, error)

func (*PluginClient) GetSupportedResources

func (c *PluginClient) GetSupportedResources(ctx *types.PluginContext) []Handler

func (*PluginClient) ListSessions

func (c *PluginClient) ListSessions(ctx *types.PluginContext) ([]*LogSession, error)

func (*PluginClient) Stream

func (c *PluginClient) Stream(
	ctx context.Context,
	in chan StreamInput,
) (chan StreamOutput, error)

func (*PluginClient) UpdateSessionOptions

func (c *PluginClient) UpdateSessionOptions(
	ctx *types.PluginContext,
	sessionID string,
	opts LogSessionOptions,
) (*LogSession, error)

type PluginOpts

type PluginOpts struct {
	// Handlers maps resource keys to their log handlers.
	// For example: "core::v1::Pod" -> Handler
	Handlers map[string]Handler `json:"handlers"`

	// SourceResolvers maps resource keys to source resolvers that resolve
	// group resources into individual log sources.
	// For example: "apps::v1::Deployment" -> SourceResolver
	SourceResolvers map[string]SourceResolver `json:"-"`
}

PluginOpts contains the options for the log plugin.

type PluginServer

type PluginServer struct {
	Impl Provider
	// contains filtered or unexported fields
}

func (*PluginServer) CloseSession

func (*PluginServer) CreateSession

func (*PluginServer) GetSession

func (*PluginServer) GetSupportedResources

func (s *PluginServer) GetSupportedResources(
	ctx context.Context,
	_ *emptypb.Empty,
) (*logspb.GetSupportedLogResourcesResponse, error)

func (*PluginServer) ListSessions

func (s *PluginServer) ListSessions(
	ctx context.Context,
	_ *emptypb.Empty,
) (*logspb.ListLogSessionsResponse, error)

func (*PluginServer) Stream

func (s *PluginServer) Stream(stream logspb.LogPlugin_StreamServer) error

func (*PluginServer) UpdateSessionOptions

type Provider

type Provider interface {
	GetSupportedResources(ctx *types.PluginContext) []Handler
	CreateSession(ctx *types.PluginContext, opts CreateSessionOptions) (*LogSession, error)
	GetSession(ctx *types.PluginContext, sessionID string) (*LogSession, error)
	ListSessions(ctx *types.PluginContext) ([]*LogSession, error)
	CloseSession(ctx *types.PluginContext, sessionID string) error
	UpdateSessionOptions(ctx *types.PluginContext, sessionID string, opts LogSessionOptions) (*LogSession, error)
	Stream(ctx context.Context, in chan StreamInput) (chan StreamOutput, error)
}

Provider is the interface satisfied by the plugin server and client to provide log viewing functionality.

type SourceBuilderFunc

type SourceBuilderFunc func(
	resourceID string,
	resourceData map[string]interface{},
	opts LogSessionOptions,
) []LogSource

SourceBuilderFunc builds LogSources from resource data for direct handler streaming. Plugins implement this to translate their resource data into properly-labeled sources that the LogHandlerFunc can consume. For example, a K8s plugin extracts pod name, namespace, and container names from the pod spec.

type SourceEvent

type SourceEvent struct {
	Type   SourceEventType
	Source LogSource
}

SourceEvent represents a source lifecycle change.

type SourceEventType

type SourceEventType int

SourceEventType represents the type of source lifecycle event.

const (
	SourceAdded SourceEventType = iota
	SourceRemoved
)

type SourceResolver

type SourceResolver func(
	ctx *types.PluginContext,
	resourceData map[string]interface{},
	opts SourceResolverOptions,
) (*SourceResolverResult, error)

SourceResolver resolves a "group" resource into individual log sources.

type SourceResolverOptions

type SourceResolverOptions struct {
	Watch  bool              // if true, also return a channel for source changes
	Target string            // optional filter (e.g., specific container name)
	Params map[string]string // plugin-specific resolver params
}

SourceResolverOptions configures how sources are resolved.

type SourceResolverResult

type SourceResolverResult struct {
	Sources []LogSource        // current sources
	Events  <-chan SourceEvent // source lifecycle stream (nil if Watch=false)
}

SourceResolverResult contains the resolved sources and optional event channel.

type StreamInput

type StreamInput struct {
	SessionID string           `json:"session_id"`
	Command   LogStreamCommand `json:"command"`
}

StreamInput is a control message from the client to the stream.

func StreamInputFromProto

func StreamInputFromProto(p *logspb.LogStreamInput) StreamInput

func (*StreamInput) ToProto

func (i *StreamInput) ToProto() *logspb.LogStreamInput

type StreamOutput

type StreamOutput struct {
	SessionID string
	Line      *LogLine
	Event     *LogStreamEvent
}

StreamOutput is a log line or event from the stream.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL