streamlogger

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const FileSyncInterval = 100 * time.Millisecond

Variables

This section is empty.

Functions

This section is empty.

Types

type FileLogManager

type FileLogManager struct {
	// contains filtered or unexported fields
}

func (*FileLogManager) GetRawLogs added in v0.10.0

func (f *FileLogManager) GetRawLogs(ctx context.Context, execID string, w io.Writer) error

GetRawLogs writes all raw log file segments for the given execID to w, in order. Returns an error if the logger for this execID is still active (execution still running).

func (*FileLogManager) LoggerExists

func (f *FileLogManager) LoggerExists(execID string) bool

LoggerExists checks if an active logger exists for the given exec ID

func (*FileLogManager) NewLogger

func (f *FileLogManager) NewLogger(id string) (Logger, error)

NewLogger creates a new FileLogger. This is used by the task handler to create a new logger for each flow execution.

func (*FileLogManager) Run

func (f *FileLogManager) Run(ctx context.Context, l *slog.Logger) error

Run starts the scan loop. This is a blocking call and should be run from a goroutine.

func (*FileLogManager) StreamLogs

func (f *FileLogManager) StreamLogs(ctx context.Context, execID string, actionRetries map[string]int32) (<-chan string, error)

StreamLogs creates and returns a channel that streams log lines for the given exec ID It filters logs to show only the highest retry attempt for each action

type FileLogManagerCfg

type FileLogManagerCfg struct {
	// RetentionTime is used to determine files that are old enough to be deleted.
	// If the file modification is older than RetentionTime, it will be deleted.
	RetentionTime time.Duration

	// MaxSizeBytes is used by the FileLogger to rotate files.
	// If the written bytes exceed this value, a new file will be created
	MaxSizeBytes int64

	// ScanInterval is the interval with which the FileLogManager scans the log directory to determine
	// files that should be deleted
	ScanInterval time.Duration

	// LogDir stores the log files created by the FileLogger
	LogDir string
}

type FileLogger

type FileLogger struct {
	// ExecID is the execution ID of the associated flow
	ExecID string

	// Retry is the retry count for the current action
	Retry atomic.Int32
	// contains filtered or unexported fields
}

FileLogger implements io.Writer and is meant to be used for a single execution

func (*FileLogger) Checkpoint

func (fl *FileLogger) Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error

Checkpoint can be used to set checkpoints for an action on a node like resuls, logs, errors etc.

func (*FileLogger) Close

func (fl *FileLogger) Close() error

Close flushes the buffer and closes the logger and file

func (*FileLogger) GetID

func (fl *FileLogger) GetID() string

GetID returns the exec ID

func (*FileLogger) IsClosed

func (fl *FileLogger) IsClosed() bool

func (*FileLogger) SetActionID

func (fl *FileLogger) SetActionID(id string)

SetActionID sets the action ID

func (*FileLogger) SetRetry added in v0.3.0

func (fl *FileLogger) SetRetry(retry int32)

SetRetry sets the retry count for the current action

func (*FileLogger) Write

func (fl *FileLogger) Write(p []byte) (int, error)

type LogManager

type LogManager interface {
	NewLogger(id string) (Logger, error)
	LoggerExists(execID string) bool
	StreamLogs(ctx context.Context, execID string, actionRetries map[string]int32) (<-chan string, error)
	GetRawLogs(ctx context.Context, execID string, w io.Writer) error
	Run(ctx context.Context, logger *slog.Logger) error
}

LogManager manages multiple loggers and can be used for enforce retention, log rotation etc.

func NewFileLogManager

func NewFileLogManager(cfg FileLogManagerCfg) LogManager

NewFileLogManager creates a log manager that uses files as the storage backend. FileLogManager supports retention time to clean up old log files

type Logger

type Logger interface {
	io.Writer
	GetID() string
	// SetActionID is a global value that is used in Write calls
	SetActionID(id string)
	// SetRetry sets the retry count for the current action
	SetRetry(retry int32)
	// Checkpoint is an underlying function to log different message types. Used by Write calls too. If the id is set, it will
	// override the global action ID
	Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error

	Close() error
}

Logger is used to write individual execution logs to different backends

type MessageType

type MessageType string
const (
	LogMessageType       MessageType = "log"
	ErrMessageType       MessageType = "error"
	ResultMessageType    MessageType = "result"
	StateMessageType     MessageType = "state"
	CancelledMessageType MessageType = "cancelled"
)

type NodeContextLogger

type NodeContextLogger struct {
	// contains filtered or unexported fields
}

NodeContextLogger wraps a Logger to provide node context for concurrent execution

func NewNodeContextLogger

func NewNodeContextLogger(logger Logger, actionID, nodeID string) *NodeContextLogger

NewNodeContextLogger creates a new NodeContextLogger.

func (*NodeContextLogger) Checkpoint

func (n *NodeContextLogger) Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error

Checkpoint delegates to the underlying logger with node context. If id is empty, uses the stored actionID.

func (*NodeContextLogger) Close

func (n *NodeContextLogger) Close() error

Close delegates to the underlying logger.

func (*NodeContextLogger) GetID

func (n *NodeContextLogger) GetID() string

GetID delegates to the underlying logger.

func (*NodeContextLogger) SetActionID

func (n *NodeContextLogger) SetActionID(id string)

SetActionID updates the action ID for this node context.

func (*NodeContextLogger) SetRetry added in v0.3.0

func (n *NodeContextLogger) SetRetry(retry int32)

SetRetry delegates to the underlying logger.

func (*NodeContextLogger) Write

func (n *NodeContextLogger) Write(p []byte) (int, error)

Write implements io.Writer by delegating to Checkpoint with node context.

type StreamMessage

type StreamMessage struct {
	ActionID  string      `json:"action_id"`
	MType     MessageType `json:"message_type"`
	NodeID    string      `json:"node_id"`
	Val       string      `json:"value"`
	Timestamp string      `json:"timestamp"`
	Retry     int32       `json:"retry"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL