streamlogger

package
v0.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const FileSyncInterval = 100 * time.Millisecond

Variables

This section is empty.

Functions

This section is empty.

Types

type FileLogManager

type FileLogManager struct {
	// contains filtered or unexported fields
}

func (*FileLogManager) LoggerExists

func (f *FileLogManager) LoggerExists(execID string) bool

LoggerExists checks if an active logger exists for the given exec ID

func (*FileLogManager) NewLogger

func (f *FileLogManager) NewLogger(id string) (Logger, error)

NewLogger creates a new FileLogger. This is used by the task handler to create a new logger for each flow execution.

func (*FileLogManager) Run

func (f *FileLogManager) Run(ctx context.Context, l *slog.Logger) error

Run starts the scan loop. This is a blocking call and should be run from a goroutine.

func (*FileLogManager) StreamLogs

func (f *FileLogManager) StreamLogs(ctx context.Context, execID string, actionRetries map[string]int32) (<-chan string, error)

StreamLogs creates and returns a channel that streams log lines for the given exec ID It filters logs to show only the highest retry attempt for each action

type FileLogManagerCfg

type FileLogManagerCfg struct {
	// RetentionTime is used to determine files that are old enough to be deleted.
	// If the file modification is older than RetentionTime, it will be deleted.
	RetentionTime time.Duration

	// MaxSizeBytes is used by the FileLogger to rotate files.
	// If the written bytes exceed this value, a new file will be created
	MaxSizeBytes int64

	// ScanInterval is the interval with which the FileLogManager scans the log directory to determine
	// files that should be deleted
	ScanInterval time.Duration

	// LogDir stores the log files created by the FileLogger
	LogDir string
}

type FileLogger

type FileLogger struct {
	// ExecID is the execution ID of the associated flow
	ExecID string

	// Retry is the retry count for the current action
	Retry atomic.Int32
	// contains filtered or unexported fields
}

FileLogger implements io.Writer and is meant to be used for a single execution

func (*FileLogger) Checkpoint

func (fl *FileLogger) Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error

Checkpoint can be used to set checkpoints for an action on a node like resuls, logs, errors etc.

func (*FileLogger) Close

func (fl *FileLogger) Close() error

Close flushes the buffer and closes the logger and file

func (*FileLogger) GetID

func (fl *FileLogger) GetID() string

GetID returns the exec ID

func (*FileLogger) IsClosed

func (fl *FileLogger) IsClosed() bool

func (*FileLogger) SetActionID

func (fl *FileLogger) SetActionID(id string)

SetActionID sets the action ID

func (*FileLogger) SetRetry added in v0.3.0

func (fl *FileLogger) SetRetry(retry int32)

SetRetry sets the retry count for the current action

func (*FileLogger) Write

func (fl *FileLogger) Write(p []byte) (int, error)

type LogManager

type LogManager interface {
	NewLogger(id string) (Logger, error)
	LoggerExists(execID string) bool
	StreamLogs(ctx context.Context, execID string, actionRetries map[string]int32) (<-chan string, error)
	Run(ctx context.Context, logger *slog.Logger) error
}

LogManager manages multiple loggers and can be used for enforce retention, log rotation etc.

func NewFileLogManager

func NewFileLogManager(cfg FileLogManagerCfg) LogManager

NewFileLogManager creates a log manager that uses files as the storage backend. FileLogManager supports retention time to clean up old log files

type Logger

type Logger interface {
	io.Writer
	GetID() string
	// SetActionID is a global value that is used in Write calls
	SetActionID(id string)
	// SetRetry sets the retry count for the current action
	SetRetry(retry int32)
	// Checkpoint is an underlying function to log different message types. Used by Write calls too. If the id is set, it will
	// override the global action ID
	Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error

	Close() error
}

Logger is used to write individual execution logs to different backends

type MessageType

type MessageType string
const (
	LogMessageType       MessageType = "log"
	ErrMessageType       MessageType = "error"
	ResultMessageType    MessageType = "result"
	StateMessageType     MessageType = "state"
	CancelledMessageType MessageType = "cancelled"
)

type NodeContextLogger

type NodeContextLogger struct {
	// contains filtered or unexported fields
}

NodeContextLogger wraps a Logger to provide node context for concurrent execution

func NewNodeContextLogger

func NewNodeContextLogger(logger Logger, actionID, nodeID string) *NodeContextLogger

NewNodeContextLogger creates a new NodeContextLogger.

func (*NodeContextLogger) Checkpoint

func (n *NodeContextLogger) Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error

Checkpoint delegates to the underlying logger with node context. If id is empty, uses the stored actionID.

func (*NodeContextLogger) Close

func (n *NodeContextLogger) Close() error

Close delegates to the underlying logger.

func (*NodeContextLogger) GetID

func (n *NodeContextLogger) GetID() string

GetID delegates to the underlying logger.

func (*NodeContextLogger) SetActionID

func (n *NodeContextLogger) SetActionID(id string)

SetActionID updates the action ID for this node context.

func (*NodeContextLogger) SetRetry added in v0.3.0

func (n *NodeContextLogger) SetRetry(retry int32)

SetRetry delegates to the underlying logger.

func (*NodeContextLogger) Write

func (n *NodeContextLogger) Write(p []byte) (int, error)

Write implements io.Writer by delegating to Checkpoint with node context.

type StreamMessage

type StreamMessage struct {
	ActionID  string      `json:"action_id"`
	MType     MessageType `json:"message_type"`
	NodeID    string      `json:"node_id"`
	Val       string      `json:"value"`
	Timestamp string      `json:"timestamp"`
	Retry     int32       `json:"retry"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL