Documentation
¶
Index ¶
- Constants
- type FileLogManager
- func (f *FileLogManager) LoggerExists(execID string) bool
- func (f *FileLogManager) NewLogger(id string) (Logger, error)
- func (f *FileLogManager) Run(ctx context.Context, l *slog.Logger) error
- func (f *FileLogManager) StreamLogs(ctx context.Context, execID string, actionRetries map[string]int32) (<-chan string, error)
- type FileLogManagerCfg
- type FileLogger
- func (fl *FileLogger) Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error
- func (fl *FileLogger) Close() error
- func (fl *FileLogger) GetID() string
- func (fl *FileLogger) IsClosed() bool
- func (fl *FileLogger) SetActionID(id string)
- func (fl *FileLogger) SetRetry(retry int32)
- func (fl *FileLogger) Write(p []byte) (int, error)
- type LogManager
- type Logger
- type MessageType
- type NodeContextLogger
- func (n *NodeContextLogger) Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error
- func (n *NodeContextLogger) Close() error
- func (n *NodeContextLogger) GetID() string
- func (n *NodeContextLogger) SetActionID(id string)
- func (n *NodeContextLogger) SetRetry(retry int32)
- func (n *NodeContextLogger) Write(p []byte) (int, error)
- type StreamMessage
Constants ¶
const FileSyncInterval = 100 * time.Millisecond
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type FileLogManager ¶
type FileLogManager struct {
// contains filtered or unexported fields
}
func (*FileLogManager) LoggerExists ¶
func (f *FileLogManager) LoggerExists(execID string) bool
LoggerExists checks if an active logger exists for the given exec ID
func (*FileLogManager) NewLogger ¶
func (f *FileLogManager) NewLogger(id string) (Logger, error)
NewLogger creates a new FileLogger. This is used by the task handler to create a new logger for each flow execution.
func (*FileLogManager) Run ¶
Run starts the scan loop. This is a blocking call and should be run from a goroutine.
func (*FileLogManager) StreamLogs ¶
func (f *FileLogManager) StreamLogs(ctx context.Context, execID string, actionRetries map[string]int32) (<-chan string, error)
StreamLogs creates and returns a channel that streams log lines for the given exec ID It filters logs to show only the highest retry attempt for each action
type FileLogManagerCfg ¶
type FileLogManagerCfg struct {
// RetentionTime is used to determine files that are old enough to be deleted.
// If the file modification is older than RetentionTime, it will be deleted.
RetentionTime time.Duration
// MaxSizeBytes is used by the FileLogger to rotate files.
// If the written bytes exceed this value, a new file will be created
MaxSizeBytes int64
// ScanInterval is the interval with which the FileLogManager scans the log directory to determine
// files that should be deleted
ScanInterval time.Duration
// LogDir stores the log files created by the FileLogger
LogDir string
}
type FileLogger ¶
type FileLogger struct {
// ExecID is the execution ID of the associated flow
ExecID string
// Retry is the retry count for the current action
Retry atomic.Int32
// contains filtered or unexported fields
}
FileLogger implements io.Writer and is meant to be used for a single execution
func (*FileLogger) Checkpoint ¶
func (fl *FileLogger) Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error
Checkpoint can be used to set checkpoints for an action on a node like resuls, logs, errors etc.
func (*FileLogger) Close ¶
func (fl *FileLogger) Close() error
Close flushes the buffer and closes the logger and file
func (*FileLogger) IsClosed ¶
func (fl *FileLogger) IsClosed() bool
func (*FileLogger) SetActionID ¶
func (fl *FileLogger) SetActionID(id string)
SetActionID sets the action ID
func (*FileLogger) SetRetry ¶ added in v0.3.0
func (fl *FileLogger) SetRetry(retry int32)
SetRetry sets the retry count for the current action
type LogManager ¶
type LogManager interface {
NewLogger(id string) (Logger, error)
LoggerExists(execID string) bool
StreamLogs(ctx context.Context, execID string, actionRetries map[string]int32) (<-chan string, error)
Run(ctx context.Context, logger *slog.Logger) error
}
LogManager manages multiple loggers and can be used for enforce retention, log rotation etc.
func NewFileLogManager ¶
func NewFileLogManager(cfg FileLogManagerCfg) LogManager
NewFileLogManager creates a log manager that uses files as the storage backend. FileLogManager supports retention time to clean up old log files
type Logger ¶
type Logger interface {
io.Writer
GetID() string
// SetActionID is a global value that is used in Write calls
SetActionID(id string)
// SetRetry sets the retry count for the current action
SetRetry(retry int32)
// Checkpoint is an underlying function to log different message types. Used by Write calls too. If the id is set, it will
// override the global action ID
Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error
Close() error
}
Logger is used to write individual execution logs to different backends
type MessageType ¶
type MessageType string
const ( LogMessageType MessageType = "log" ErrMessageType MessageType = "error" ResultMessageType MessageType = "result" StateMessageType MessageType = "state" CancelledMessageType MessageType = "cancelled" )
type NodeContextLogger ¶
type NodeContextLogger struct {
// contains filtered or unexported fields
}
NodeContextLogger wraps a Logger to provide node context for concurrent execution
func NewNodeContextLogger ¶
func NewNodeContextLogger(logger Logger, actionID, nodeID string) *NodeContextLogger
NewNodeContextLogger creates a new NodeContextLogger.
func (*NodeContextLogger) Checkpoint ¶
func (n *NodeContextLogger) Checkpoint(id string, nodeID string, val interface{}, mtype MessageType) error
Checkpoint delegates to the underlying logger with node context. If id is empty, uses the stored actionID.
func (*NodeContextLogger) Close ¶
func (n *NodeContextLogger) Close() error
Close delegates to the underlying logger.
func (*NodeContextLogger) GetID ¶
func (n *NodeContextLogger) GetID() string
GetID delegates to the underlying logger.
func (*NodeContextLogger) SetActionID ¶
func (n *NodeContextLogger) SetActionID(id string)
SetActionID updates the action ID for this node context.
func (*NodeContextLogger) SetRetry ¶ added in v0.3.0
func (n *NodeContextLogger) SetRetry(retry int32)
SetRetry delegates to the underlying logger.