Documentation
¶
Index ¶
- Variables
- type BaseProcessor
- type BaseProcessorInterface
- type BranchTracker
- type Coordinator
- type EngineFileHandler
- type EngineFlowObject
- type EngineInterface
- type Flow
- type FlowManager
- type LeaderSelector
- type LogEntry
- type LoggerFactory
- type PaginatedData
- type PaginationRequest
- type Processor
- type ProcessorFactory
- type ProcessorFileHandler
- type ScheduleConfig
- type ScheduleType
- type SessionUpdate
- type SimpleProcessor
- type SimpleTriggerProcessor
- type StateManager
- type StateManagerFactory
- type StateType
- type TriggerProcessor
- type TriggerProcessorResponse
- type WriteAheadLogger
Constants ¶
This section is empty.
Variables ¶
var ErrInputFileNotInitialized = fmt.Errorf("input file not initialized")
var ErrOutputFileNotInitialized = fmt.Errorf("output file not initialized")
Functions ¶
This section is empty.
Types ¶
type BaseProcessor ¶
type BaseProcessor struct {
}
BaseProcessor represents a basic processor.
func (*BaseProcessor) DecodeMap ¶
func (b *BaseProcessor) DecodeMap(input interface{}, output interface{}) error
DecodeMap decodes a map into the output structure. This can be used to decode the configuration(which is a map) into its own struct. Parameters: - input: The input map to decode. - output: The output structure to decode into. Returns: - error: An error if the decoding fails.
type BaseProcessorInterface ¶ added in v0.1.4
type BaseProcessorInterface interface {
// Name returns the name of the processor.
// Returns:
// - string: The name of the processor.
Name() string
// SetConfig sets the configuration for the processor.
// Parameters:
// - config: A map containing the configuration settings.
// Returns:
// - error: An error if setting the configuration fails.
SetConfig(config map[string]interface{}) error
// Close is called when the processor is being stopped or cleaned up.
// Returns:
// - error: An error if the close operation fails.
Close() error
}
type BranchTracker ¶ added in v0.0.20
type BranchTracker interface {
// AddProcessor track the processor as scheduled and pending completion
AddProcessor(sessionID uuid.UUID, processorID uuid.UUID, nextProcessorIDs []uuid.UUID)
// MarkComplete mark the processor as completed
MarkComplete(sessionID uuid.UUID, processorID uuid.UUID) (allComplete bool)
// IsComplete check if all processors in a session are completed
IsComplete(sessionID uuid.UUID) bool
RestoreState(sessionID uuid.UUID, completedProcessorIDs []uuid.UUID)
GetCompletedProcessors(sessionID uuid.UUID) []uuid.UUID
}
type Coordinator ¶ added in v0.1.1
type Coordinator interface {
// IsLeader checks if the current instance is the leader for the trigger processor.
// If the current instance is the coordinator, it will also assign the leader for the trigger processor.
// Parameters:
// - tpID: The unique identifier of the trigger processor.
// Returns:
// - bool: True if the current instance is the leader, false otherwise.
// - error: An error if the check fails.
IsLeader(tpID uuid.UUID) (bool, error)
// Close terminates the leader election process and stops being the coordinator(if currently is).
// Returns:
// - error: An error if the termination fails.
Close() error
}
Coordinator defines the interface for assigning leaders nodes for each trigger processor. The coordinator elects a leader for each trigger processor and monitors the leader nodes. If a node drops, the coordinator is responsible for reassigning the leader.
type EngineFileHandler ¶
type EngineFileHandler interface {
ProcessorFileHandler
// GetInputFile returns the file path of the input file being processed.
GetInputFile() string
// GetOutputFile returns the file path of the output file generated by the processing.
GetOutputFile() string
// Close closes any resources associated with the file handler.
Close()
// GenerateNewFileHandler creates and returns a new file handler, typically used for handling parallel processing paths.
// Returns:
// - A new instance of EngineFileHandler.
// - An error if the operation fails.
GenerateNewFileHandler() (EngineFileHandler, error)
}
EngineFileHandler defines the interface for handling files in the engine's processing flow.
type EngineFlowObject ¶
type EngineFlowObject struct {
Metadata map[string]interface{} `json:"metadata"`
TPMark string `json:"tp_mark"` // TPMark is a unique identifier for the flow object given by the trigger processor
}
EngineFlowObject represents the flow of data through the engine, containing metadata for the processing context.
func (*EngineFlowObject) EvaluateExpression ¶
func (e *EngineFlowObject) EvaluateExpression(input string, exprOptions ...expr.Option) (string, error)
EvaluateExpression evaluates a string expression using the metadata in the EngineFlowObject. It will turn any `${expression}` into the value of the expression using expr(https://github.com/expr-lang/expr). To access the metadata, use the `$env` variable. For example, `${$env["key"]}`. Additional functions are available:
- `uuid()`: Generates a new UUID.
- `random(max)`: Generates a random number between 0 and max.
- `getEnv(key)`: Retrieves an environment variable by key.
Parameters:
- input: The string expression to evaluate.
- exprOptions: Additional options for the expression evaluation. This will allow you to add custom functions or variables.
Returns:
- A string containing the evaluated result.
- An error if the evaluation fails.
type EngineInterface ¶ added in v0.0.10
type Flow ¶ added in v0.0.6
type Flow struct {
ID uuid.UUID `json:"id"` // ID uniquely identifies the flow.
Name string `json:"name"` // Name is the human-readable name of the flow.
Description string `json:"description"` // Description provides a brief explanation of the flow's purpose.
Processors []*SimpleProcessor `json:"processors"` // Processors is the list of processors that are part of the flow, executed in order.
FirstProcessors []*SimpleProcessor `json:"first_processors"` // FirstProcessors is the list of processors that should run first in the flow.
TriggerProcessors []*SimpleTriggerProcessor `json:"trigger_processors"` // TriggerProcessors is the list of trigger processors associated with the flow.
Active bool `json:"active"` // Active indicates whether the flow is currently active.
LastUpdated int64 `json:"last_updated"` // LastUpdated indicates the last update timestamp for the flow.
}
Flow represents a flow of processors that are executed in a specific order. It contains the flow's metadata such as its ID, name, description, and the processors that belong to it.
type FlowManager ¶
type FlowManager interface {
// GetFirstProcessorsForFlow retrieves the first processors for a given flow.
// Parameters:
// - flowID: The unique identifier of the flow.
// Returns:
// - []SimpleProcessor: A slice of the first processors for the flow.
// - error: An error if the retrieval fails.
GetFirstProcessorsForFlow(flowID uuid.UUID) ([]*SimpleProcessor, error)
// GetFlowProcessors retrieves all processors for a given flow.
// Parameters:
// - flowID: The unique identifier of the flow.
// Returns:
// - []SimpleProcessor: A slice of processors for the flow.
// - error: An error if the retrieval fails.
GetFlowProcessors(flowID uuid.UUID) ([]*SimpleProcessor, error)
// GetTriggerProcessorsForFlow retrieves the trigger processors for a given flow.
// Parameters:
// - flowID: The unique identifier of the flow.
// Returns:
// - []*SimpleTriggerProcessor: A slice of the trigger processors for the flow.
// - error: An error if the retrieval fails.
GetTriggerProcessorsForFlow(flowID uuid.UUID) ([]*SimpleTriggerProcessor, error)
// GetProcessors retrieves processors by their unique identifiers.
// Parameters:
// - processorIDs: A slice of unique identifiers for the processors.
// Returns:
// - []SimpleProcessor: A slice of processors with the specified IDs.
// - error: An error if the retrieval fails.
GetProcessors(processorIDs []uuid.UUID) ([]*SimpleProcessor, error)
// ListFlows lists all flows with pagination and a time filter.
// Parameters:
// - pagination: The pagination request containing page number and items per page.
// - since: The time filter to list flows updated since this time.
// Returns:
// - PaginatedData[*Flow]: The paginated data containing the flows.
// - error: An error if the listing fails.
ListFlows(pagination *PaginationRequest, since time.Time) (*PaginatedData[*Flow], error)
// GetFlowByID retrieves a flow by its unique identifier.
// Parameters:
// - flowID: The unique identifier of the flow.
// Returns:
// - *Flow: The flow with the specified ID.
// - error: An error if the retrieval fails.
GetFlowByID(flowID uuid.UUID) (*Flow, error)
// GetProcessorByID retrieves a processor by its unique identifier within a flow.
// Parameters:
// - flowID: The unique identifier of the flow.
// - processorID: The unique identifier of the processor.
// Returns:
// - *SimpleProcessor: The processor with the specified ID.
// - error: An error if the retrieval fails.
GetProcessorByID(flowID uuid.UUID, processorID uuid.UUID) (*SimpleProcessor, error)
// AddProcessorToFlowBefore adds a processor to a flow before a reference processor.
// Parameters:
// - flowID: The unique identifier of the flow.
// - processor: The processor to add.
// - referenceProcessorID: The unique identifier of the reference processor.
// Returns:
// - error: An error if the addition fails.
AddProcessorToFlowBefore(flowID uuid.UUID, processor *SimpleProcessor, referenceProcessorID uuid.UUID) error
// AddProcessorToFlowAfter adds a processor to the flow after a reference processor.
// Parameters:
// - flowID: The unique identifier of the flow.
// - processor: The processor to add.
// - referenceProcessorID: The unique identifier of the reference processor.
// Returns:
// - error: An error if the addition fails.
AddProcessorToFlowAfter(flowID uuid.UUID, processor *SimpleProcessor, referenceProcessorID uuid.UUID) error
// SaveFlow saves a flow and its processors to the database.
// Parameters:
// - flow: The flow to save.
// Returns:
// - error: An error if the save operation fails.
SaveFlow(flow *Flow) error
// GetLastUpdateTime retrieves the last update time for a given flow.
// Parameters:
// - flowIDs: A slice of flow IDs.
// Returns:
// - map[uuid.UUID]time.Time: A map of flow IDs to their last update time.
// - error: An error if the retrieval fails.
GetLastUpdateTime(flowIDs []uuid.UUID) (map[uuid.UUID]time.Time, error)
// SetFlowActive marks a flow as active or inactive.
// Parameters:
// - flowID: The unique identifier of the flow.
// - active: A boolean indicating whether the flow should be marked as active.
// Returns:
// - error: An error if the operation fails.
SetFlowActive(flowID uuid.UUID, active bool) error
}
FlowManager defines the interface for managing and interacting with flows and processors within those flows.
type LeaderSelector ¶ added in v0.1.1
type LeaderSelector interface {
// Start initiates the leader election process.
// Returns:
// - error: An error if the initiation fails.
Start() error
// IsLeader checks if the current participant is the leader.
// Returns:
// - bool: True if the current instance is the leader, false otherwise.
// - error: An error if the check fails.
IsLeader() (bool, error)
// Participants retrieves the list of nodes participating in the leader election.
// Returns:
// - []string: A slice of node identifiers.
// - error: An error if the retrieval fails.
Participants() ([]string, error)
// Close terminates the leader election process.
// Returns:
// - error: An error if the termination fails.
Close() error
// ParticipantName retrieves the name of the current participant.
// Returns:
// - string: The name of the current node.
ParticipantName() string
// ParticipantsChangeChannel returns a channel that receives updates about node changes.
// Returns:
// - <-chan []string: A receive-only channel that provides slices of the nodes.
ParticipantsChangeChannel() <-chan []string
}
LeaderSelector defines the interface for leader election mechanisms.
type LogEntry ¶ added in v0.0.2
type LogEntry struct {
SessionID uuid.UUID `json:"session_id"`
ProcessorName string `json:"processor_name"`
ProcessorID string `json:"processor_id"`
FlowID uuid.UUID `json:"flow_id"`
InputFile string `json:"input_file"`
OutputFile string `json:"output_file"`
FlowObject EngineFlowObject `json:"flow_object"`
RetryCount int `json:"retry_count"`
IsComplete bool `json:"is_complete"` // indicates if the processor completed
CompletedProcessorIDs []uuid.UUID `json:"completed_processor_ids"`
}
LogEntry is a struct that represents a log entry in the write ahead log.
type LoggerFactory ¶ added in v0.1.12
type LoggerFactory interface {
// GetLogger retrieves a logger based on the specified type and name.
// Parameters:
// - loggerType: The type of logger to retrieve. For example: Human
// - name: The name of the instance of the logger. For example, if I have multiple instances for Human,
// I can use its name to differentiate between them.
// Returns:
// - *logrus.Logger: A pointer to the retrieved logger.
GetLogger(loggerType string, name string) *logrus.Logger
}
type PaginatedData ¶ added in v0.0.8
type PaginatedData[T any] struct { Data []T `json:"data"` // Data is the slice of items for the current page. TotalCount int `json:"totalCount"` // TotalCount is the total number of items available. }
PaginatedData represents a paginated response. It includes the data and the total count of items.
type PaginationRequest ¶ added in v0.0.8
type PaginationRequest struct {
Page int `json:"page" query:"page"` // Page number of the paginated data.
PerPage int `json:"perPage" query:"per_page"` // Number of items per page.
}
PaginationRequest represents a request for paginated data. It includes the page number and the number of items per page.
func (*PaginationRequest) Limit ¶ added in v0.0.8
func (r *PaginationRequest) Limit() int
Limit calculates the limit for the paginated data. It ensures that the items per page are valid. Returns the limit as an integer.
func (*PaginationRequest) Offset ¶ added in v0.0.8
func (r *PaginationRequest) Offset() int
Offset calculates the offset for the paginated data. It ensures that the page number and items per page are valid. Returns the offset as an integer.
type Processor ¶
type Processor interface {
BaseProcessorInterface
// Execute executes the processor logic.
// Parameters:
// - info: The EngineFlowObject containing the execution information.
// - fileHandler: The ProcessorFileHandler for handling file operations.
// - log: The logger for logging information.
// Returns:
// - *EngineFlowObject: The updated EngineFlowObject after execution.
// - error: An error if the execution fails.
Execute(info *EngineFlowObject, fileHandler ProcessorFileHandler, log *logrus.Logger) (*EngineFlowObject, error)
}
Processor defines the interface for a processor.
type ProcessorFactory ¶
type ProcessorFactory interface {
// GetProcessor retrieves a processor by its type name.
// Parameters:
// - id: The ID of the processor to retrieve.
// - typeName: The type name of the processor to retrieve.
// Returns:
// - Processor: The retrieved processor.
// - error: An error if the retrieval fails.
GetProcessor(id uuid.UUID, typeName string) (Processor, error)
// GetTriggerProcessor retrieves a trigger processor by its type name.
// Parameters:
// - id: The ID of the trigger processor to retrieve.
// - typeName: The type name of the trigger processor to retrieve.
// Returns:
// - TriggerProcessor: The retrieved trigger processor.
// - error: An error if the retrieval fails.
GetTriggerProcessor(id uuid.UUID, typeName string) (TriggerProcessor, error)
}
ProcessorFactory defines an interface for retrieving processors and trigger processors.
type ProcessorFileHandler ¶
type ProcessorFileHandler interface {
// Read reads data from a file.
// Returns:
// - io.Reader: The reader for reading data.
// - error: An error if the read operation fails.
Read() (io.Reader, error)
// Write writes data to a file.
// Returns:
// - io.Writer: The writer for writing data.
// - error: An error if the write operation fails.
Write() (io.Writer, error)
}
ProcessorFileHandler defines the interface for handling the current contents.
type ScheduleConfig ¶ added in v0.0.10
type ScheduleConfig struct {
Type ScheduleType
CronExpr string // Used only if Type == CronDriven
}
type ScheduleType ¶ added in v0.0.10
type ScheduleType int
const ( EventDriven ScheduleType = iota CronDriven )
type SessionUpdate ¶
type SessionUpdate struct {
SessionID uuid.UUID `json:"session_id"`
Finished bool `json:"finished"`
Error error `json:"error"`
TPMark string `json:"tp_mark"`
}
SessionUpdate represents the update status of a session being processed by the engine.
type SimpleProcessor ¶ added in v0.0.5
type SimpleProcessor struct {
ID uuid.UUID `json:"id"` // ID uniquely identifies the processor.
FlowID uuid.UUID `json:"flow_id"` // FlowID is the ID of the flow to which the processor belongs.
Name string `json:"name"` // Name is the human-readable name of the processor.
Type string `json:"type"` // Type indicates the type of the processor, used to determine its implementation.
Config map[string]interface{} `json:"config"` // Config holds the specific configuration for the processor.
MaxRetries int `json:"max_retries"` // MaxRetries specifies the maximum number of retries allowed for this processor in case of failure.
BackoffSeconds int `json:"backoff_seconds"` // Specifies the delay in seconds between retry attempts for this processor in case of failure.
LogLevel logrus.Level `json:"log_level"` // LogLevel defines the logging level used for this processor.
Enabled bool `json:"enabled"` // Enabled indicates if the processor is enabled.
NextProcessors []*SimpleProcessor `json:"next_processors"` // NextProcessors explicitly defines the processors that should run after this one.
}
SimpleProcessor represents a processor within a flow. It contains metadata about the processor, including its configuration and execution parameters.
type SimpleTriggerProcessor ¶ added in v0.0.15
type SimpleTriggerProcessor struct {
ID uuid.UUID `json:"id"` // ID uniquely identifies the trigger processor.
FlowID uuid.UUID `json:"flow_id"` // FlowID is the ID of the flow to which the trigger processor belongs.
Name string `json:"name"` // Name is the human-readable name of the trigger processor.
Type string `json:"type"` // Type indicates the type of the trigger processor, used to determine its implementation.
Config map[string]interface{} `json:"config"` // Config holds the specific configuration for the trigger processor.
CronExpr string `json:"cron_expr"` // CronExpr is the cron expression used for scheduling, applicable if ScheduleType is CronDriven.
LogLevel logrus.Level `json:"log_level"` // LogLevel defines the logging level used for this trigger processor.
SingleNode bool `json:"single_node"` // SingleNode defines whether this trigger processor should run on a single node when running in a cluster.
Enabled bool `json:"enabled"` // Enabled indicates if the processor is enabled.
}
SimpleTriggerProcessor represents a trigger processor within a flow. It contains metadata about the trigger processor, including its configuration and scheduling parameters.
type StateManager ¶ added in v0.1.3
type StateManager interface {
// Reset resets the state manager including closing it before then.
// Returns:
// - error: An error if the reset fails.
Reset() error
// Close closes the state manager.
// Returns:
// - error: An error if the close operation fails.
Close() error
// GetState retrieves the state for the given state type.
// Parameters:
// - stateType: The type of state to retrieve.
// Returns:
// - map[string]any: A map representing the state.
// - error: An error if the retrieval fails.
GetState(stateType StateType) (map[string]any, error)
// SetState sets the state for the given state type.
// Parameters:
// - stateType: The type of state to set.
// - value: A map representing the state to set.
// Returns:
// - error: An error if the set operation fails.
SetState(stateType StateType, value map[string]any) error
// WatchState watches for changes in the given state type.
// Parameters:
// - state: The type of state to watch.
// - callback: A callback function to invoke when the state changes.
// Returns:
// - error: An error if the watch operation fails.
WatchState(state StateType, callback func()) error
}
type StateManagerFactory ¶ added in v0.1.3
type StateManagerFactory interface {
CreateStateManager(id uuid.UUID) StateManager
}
type TriggerProcessor ¶ added in v0.0.10
type TriggerProcessor interface {
BaseProcessorInterface
// Execute executes the processor logic.
// Parameters:
// - info: The EngineFlowObject containing the execution information.
// - produceFileHandler: A function to produce a ProcessorFileHandler.
// - log: The logger for logging information.
// Returns:
// - []*EngineFlowObject: A list of *EngineFlowObject after execution.
// - error: An error if the execution fails.
Execute(
info *EngineFlowObject,
produceFileHandler func() ProcessorFileHandler,
log *logrus.Logger,
) ([]*TriggerProcessorResponse, error)
// GetScheduleType returns the scheduling type supported by the TriggerProcessor.
GetScheduleType() ScheduleType
// HandleSessionUpdate allows the TriggerProcessor to respond to session updates.
HandleSessionUpdate(update SessionUpdate)
}
TriggerProcessor defines the interface for a trigger processor.
type TriggerProcessorResponse ¶ added in v0.1.5
type TriggerProcessorResponse struct {
EngineFlowObject *EngineFlowObject
FileHandler ProcessorFileHandler
}
type WriteAheadLogger ¶ added in v0.0.2
type WriteAheadLogger interface {
WriteEntry(entry LogEntry)
ReadEntries() ([]LogEntry, error)
ReadLastEntries() ([]LogEntry, error)
}
WriteAheadLogger is an interface for writing log entries to a file and reading them back. The log entries are used to keep track of the state of the processing of a flow.