Documentation
¶
Index ¶
- Constants
- type FileLogManager
- func (f *FileLogManager) GetRawLogs(ctx context.Context, execID string, w io.Writer) error
- func (f *FileLogManager) LoggerExists(execID string) bool
- func (f *FileLogManager) NewLogger(id string) (Logger, error)
- func (f *FileLogManager) Run(ctx context.Context, l *slog.Logger) error
- func (f *FileLogManager) StreamLogs(ctx context.Context, execID string, actionRetries map[string]int32) (<-chan string, error)
- type FileLogManagerCfg
- type FileLogger
- func (fl *FileLogger) Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error
- func (fl *FileLogger) Close() error
- func (fl *FileLogger) GetID() string
- func (fl *FileLogger) IsClosed() bool
- func (fl *FileLogger) SetActionID(id string)
- func (fl *FileLogger) SetRetry(retry int32)
- func (fl *FileLogger) Write(p []byte) (int, error)
- type LogManager
- type Logger
- type MessageType
- type NodeContextLogger
- func (n *NodeContextLogger) Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error
- func (n *NodeContextLogger) Close() error
- func (n *NodeContextLogger) GetID() string
- func (n *NodeContextLogger) SetActionID(id string)
- func (n *NodeContextLogger) SetRetry(retry int32)
- func (n *NodeContextLogger) Write(p []byte) (int, error)
- type StreamMessage
Constants ¶
const FileSyncInterval = 100 * time.Millisecond
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type FileLogManager ¶
type FileLogManager struct {
// contains filtered or unexported fields
}
func (*FileLogManager) GetRawLogs ¶ added in v0.10.0
GetRawLogs writes all raw log file segments for the given execID to w, in order. Returns an error if the logger for this execID is still active (execution still running).
func (*FileLogManager) LoggerExists ¶
func (f *FileLogManager) LoggerExists(execID string) bool
LoggerExists checks if an active logger exists for the given exec ID
func (*FileLogManager) NewLogger ¶
func (f *FileLogManager) NewLogger(id string) (Logger, error)
NewLogger creates a new FileLogger. This is used by the task handler to create a new logger for each flow execution.
func (*FileLogManager) Run ¶
Run starts the scan loop. This is a blocking call and should be run from a goroutine.
func (*FileLogManager) StreamLogs ¶
func (f *FileLogManager) StreamLogs(ctx context.Context, execID string, actionRetries map[string]int32) (<-chan string, error)
StreamLogs creates and returns a channel that streams log lines for the given exec ID It filters logs to show only the highest retry attempt for each action
type FileLogManagerCfg ¶
type FileLogManagerCfg struct {
// RetentionTime is used to determine files that are old enough to be deleted.
// If the file modification is older than RetentionTime, it will be deleted.
RetentionTime time.Duration
// MaxSizeBytes is used by the FileLogger to rotate files.
// If the written bytes exceed this value, a new file will be created
MaxSizeBytes int64
// ScanInterval is the interval with which the FileLogManager scans the log directory to determine
// files that should be deleted
ScanInterval time.Duration
// LogDir stores the log files created by the FileLogger
LogDir string
}
type FileLogger ¶
type FileLogger struct {
// ExecID is the execution ID of the associated flow
ExecID string
// Retry is the retry count for the current action
Retry atomic.Int32
// contains filtered or unexported fields
}
FileLogger implements io.Writer and is meant to be used for a single execution
func (*FileLogger) Checkpoint ¶
func (fl *FileLogger) Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error
Checkpoint can be used to set checkpoints for an action on a node like resuls, logs, errors etc.
func (*FileLogger) Close ¶
func (fl *FileLogger) Close() error
Close flushes the buffer and closes the logger and file
func (*FileLogger) IsClosed ¶
func (fl *FileLogger) IsClosed() bool
func (*FileLogger) SetActionID ¶
func (fl *FileLogger) SetActionID(id string)
SetActionID sets the action ID
func (*FileLogger) SetRetry ¶ added in v0.3.0
func (fl *FileLogger) SetRetry(retry int32)
SetRetry sets the retry count for the current action
type LogManager ¶
type LogManager interface {
NewLogger(id string) (Logger, error)
LoggerExists(execID string) bool
StreamLogs(ctx context.Context, execID string, actionRetries map[string]int32) (<-chan string, error)
GetRawLogs(ctx context.Context, execID string, w io.Writer) error
Run(ctx context.Context, logger *slog.Logger) error
}
LogManager manages multiple loggers and can be used for enforce retention, log rotation etc.
func NewFileLogManager ¶
func NewFileLogManager(cfg FileLogManagerCfg) LogManager
NewFileLogManager creates a log manager that uses files as the storage backend. FileLogManager supports retention time to clean up old log files
type Logger ¶
type Logger interface {
io.Writer
GetID() string
// SetActionID is a global value that is used in Write calls
SetActionID(id string)
// SetRetry sets the retry count for the current action
SetRetry(retry int32)
// Checkpoint is an underlying function to log different message types. Used by Write calls too. If the id is set, it will
// override the global action ID
Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error
Close() error
}
Logger is used to write individual execution logs to different backends
type MessageType ¶
type MessageType string
const ( LogMessageType MessageType = "log" ErrMessageType MessageType = "error" ResultMessageType MessageType = "result" StateMessageType MessageType = "state" CancelledMessageType MessageType = "cancelled" )
type NodeContextLogger ¶
type NodeContextLogger struct {
// contains filtered or unexported fields
}
NodeContextLogger wraps a Logger to provide node context for concurrent execution
func NewNodeContextLogger ¶
func NewNodeContextLogger(logger Logger, actionID, nodeID string) *NodeContextLogger
NewNodeContextLogger creates a new NodeContextLogger.
func (*NodeContextLogger) Checkpoint ¶
func (n *NodeContextLogger) Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error
Checkpoint delegates to the underlying logger with node context. If id is empty, uses the stored actionID.
func (*NodeContextLogger) Close ¶
func (n *NodeContextLogger) Close() error
Close delegates to the underlying logger.
func (*NodeContextLogger) GetID ¶
func (n *NodeContextLogger) GetID() string
GetID delegates to the underlying logger.
func (*NodeContextLogger) SetActionID ¶
func (n *NodeContextLogger) SetActionID(id string)
SetActionID updates the action ID for this node context.
func (*NodeContextLogger) SetRetry ¶ added in v0.3.0
func (n *NodeContextLogger) SetRetry(retry int32)
SetRetry delegates to the underlying logger.