Documentation
¶
Index ¶
- Variables
- type BaseProcessor
- type BaseProcessorInterface
- type BranchTracker
- type Coordinator
- type EngineFileHandler
- type EngineFlowObject
- type EngineInterface
- type Flow
- type FlowManager
- type LeaderSelector
- type LogEntry
- type LoggerFactory
- type PaginatedData
- type PaginationRequest
- type Processor
- type ProcessorFactory
- type ProcessorFileHandler
- type ScheduleConfig
- type ScheduleType
- type SessionUpdate
- type SimpleProcessor
- type SimpleTriggerProcessor
- type StateManager
- type StateManagerFactory
- type StateType
- type TriggerProcessor
- type TriggerProcessorResponse
- type WriteAheadLogger
Constants ¶
This section is empty.
Variables ¶
var ErrInputFileNotInitialized = fmt.Errorf("input file not initialized")
var ErrOutputFileNotInitialized = fmt.Errorf("output file not initialized")
Functions ¶
This section is empty.
Types ¶
type BaseProcessor ¶
type BaseProcessor struct { }
BaseProcessor represents a basic processor.
func (*BaseProcessor) DecodeMap ¶
func (b *BaseProcessor) DecodeMap(input interface{}, output interface{}) error
DecodeMap decodes a map into the output structure. This can be used to decode the configuration(which is a map) into its own struct. Parameters: - input: The input map to decode. - output: The output structure to decode into. Returns: - error: An error if the decoding fails.
type BaseProcessorInterface ¶ added in v0.1.4
type BaseProcessorInterface interface { // Name returns the name of the processor. // Returns: // - string: The name of the processor. Name() string // SetConfig sets the configuration for the processor. // Parameters: // - config: A map containing the configuration settings. // Returns: // - error: An error if setting the configuration fails. SetConfig(config map[string]interface{}) error // Close is called when the processor is being stopped or cleaned up. // Returns: // - error: An error if the close operation fails. Close() error }
type BranchTracker ¶ added in v0.0.20
type BranchTracker interface { // AddProcessor track the processor as scheduled and pending completion AddProcessor(sessionID uuid.UUID, processorID uuid.UUID, nextProcessorIDs []uuid.UUID) // MarkComplete mark the processor as completed MarkComplete(sessionID uuid.UUID, processorID uuid.UUID) (allComplete bool) // IsComplete check if all processors in a session are completed IsComplete(sessionID uuid.UUID) bool RestoreState(sessionID uuid.UUID, completedProcessorIDs []uuid.UUID) GetCompletedProcessors(sessionID uuid.UUID) []uuid.UUID }
type Coordinator ¶ added in v0.1.1
type Coordinator interface { // IsLeader checks if the current instance is the leader for the trigger processor. // If the current instance is the coordinator, it will also assign the leader for the trigger processor. // Parameters: // - tpID: The unique identifier of the trigger processor. // Returns: // - bool: True if the current instance is the leader, false otherwise. // - error: An error if the check fails. IsLeader(tpID uuid.UUID) (bool, error) // Close terminates the leader election process and stops being the coordinator(if currently is). // Returns: // - error: An error if the termination fails. Close() error }
Coordinator defines the interface for assigning leaders nodes for each trigger processor. The coordinator elects a leader for each trigger processor and monitors the leader nodes. If a node drops, the coordinator is responsible for reassigning the leader.
type EngineFileHandler ¶
type EngineFileHandler interface { ProcessorFileHandler // GetInputFile returns the file path of the input file being processed. GetInputFile() string // GetOutputFile returns the file path of the output file generated by the processing. GetOutputFile() string // Close closes any resources associated with the file handler. Close() // GenerateNewFileHandler creates and returns a new file handler, typically used for handling parallel processing paths. // Returns: // - A new instance of EngineFileHandler. // - An error if the operation fails. GenerateNewFileHandler() (EngineFileHandler, error) }
EngineFileHandler defines the interface for handling files in the engine's processing flow.
type EngineFlowObject ¶
type EngineFlowObject struct { Metadata map[string]interface{} `json:"metadata"` TPMark string `json:"tp_mark"` // TPMark is a unique identifier for the flow object given by the trigger processor }
EngineFlowObject represents the flow of data through the engine, containing metadata for the processing context.
func (*EngineFlowObject) EvaluateExpression ¶
func (e *EngineFlowObject) EvaluateExpression(input string, exprOptions ...expr.Option) (string, error)
EvaluateExpression evaluates a string expression using the metadata in the EngineFlowObject. It will turn any `${expression}` into the value of the expression using expr(https://github.com/expr-lang/expr). To access the metadata, use the `$env` variable. For example, `${$env["key"]}`. Additional functions are available:
- `uuid()`: Generates a new UUID.
- `random(max)`: Generates a random number between 0 and max.
- `getEnv(key)`: Retrieves an environment variable by key.
Parameters:
- input: The string expression to evaluate.
- exprOptions: Additional options for the expression evaluation. This will allow you to add custom functions or variables.
Returns:
- A string containing the evaluated result.
- An error if the evaluation fails.
type EngineInterface ¶ added in v0.0.10
type Flow ¶ added in v0.0.6
type Flow struct { ID uuid.UUID `json:"id"` // ID uniquely identifies the flow. Name string `json:"name"` // Name is the human-readable name of the flow. Description string `json:"description"` // Description provides a brief explanation of the flow's purpose. Processors []*SimpleProcessor `json:"processors"` // Processors is the list of processors that are part of the flow, executed in order. FirstProcessors []*SimpleProcessor `json:"first_processors"` // FirstProcessors is the list of processors that should run first in the flow. TriggerProcessors []*SimpleTriggerProcessor `json:"trigger_processors"` // TriggerProcessors is the list of trigger processors associated with the flow. Active bool `json:"active"` // Active indicates whether the flow is currently active. LastUpdated int64 `json:"last_updated"` // LastUpdated indicates the last update timestamp for the flow. }
Flow represents a flow of processors that are executed in a specific order. It contains the flow's metadata such as its ID, name, description, and the processors that belong to it.
type FlowManager ¶
type FlowManager interface { // GetFirstProcessorsForFlow retrieves the first processors for a given flow. // Parameters: // - flowID: The unique identifier of the flow. // Returns: // - []SimpleProcessor: A slice of the first processors for the flow. // - error: An error if the retrieval fails. GetFirstProcessorsForFlow(flowID uuid.UUID) ([]*SimpleProcessor, error) // GetFlowProcessors retrieves all processors for a given flow. // Parameters: // - flowID: The unique identifier of the flow. // Returns: // - []SimpleProcessor: A slice of processors for the flow. // - error: An error if the retrieval fails. GetFlowProcessors(flowID uuid.UUID) ([]*SimpleProcessor, error) // GetTriggerProcessorsForFlow retrieves the trigger processors for a given flow. // Parameters: // - flowID: The unique identifier of the flow. // Returns: // - []*SimpleTriggerProcessor: A slice of the trigger processors for the flow. // - error: An error if the retrieval fails. GetTriggerProcessorsForFlow(flowID uuid.UUID) ([]*SimpleTriggerProcessor, error) // GetProcessors retrieves processors by their unique identifiers. // Parameters: // - processorIDs: A slice of unique identifiers for the processors. // Returns: // - []SimpleProcessor: A slice of processors with the specified IDs. // - error: An error if the retrieval fails. GetProcessors(processorIDs []uuid.UUID) ([]*SimpleProcessor, error) // ListFlows lists all flows with pagination and a time filter. // Parameters: // - pagination: The pagination request containing page number and items per page. // - since: The time filter to list flows updated since this time. // Returns: // - PaginatedData[*Flow]: The paginated data containing the flows. // - error: An error if the listing fails. ListFlows(pagination *PaginationRequest, since time.Time) (*PaginatedData[*Flow], error) // GetFlowByID retrieves a flow by its unique identifier. // Parameters: // - flowID: The unique identifier of the flow. // Returns: // - *Flow: The flow with the specified ID. // - error: An error if the retrieval fails. GetFlowByID(flowID uuid.UUID) (*Flow, error) // GetProcessorByID retrieves a processor by its unique identifier within a flow. // Parameters: // - flowID: The unique identifier of the flow. // - processorID: The unique identifier of the processor. // Returns: // - *SimpleProcessor: The processor with the specified ID. // - error: An error if the retrieval fails. GetProcessorByID(flowID uuid.UUID, processorID uuid.UUID) (*SimpleProcessor, error) // AddProcessorToFlowBefore adds a processor to a flow before a reference processor. // Parameters: // - flowID: The unique identifier of the flow. // - processor: The processor to add. // - referenceProcessorID: The unique identifier of the reference processor. // Returns: // - error: An error if the addition fails. AddProcessorToFlowBefore(flowID uuid.UUID, processor *SimpleProcessor, referenceProcessorID uuid.UUID) error // AddProcessorToFlowAfter adds a processor to the flow after a reference processor. // Parameters: // - flowID: The unique identifier of the flow. // - processor: The processor to add. // - referenceProcessorID: The unique identifier of the reference processor. // Returns: // - error: An error if the addition fails. AddProcessorToFlowAfter(flowID uuid.UUID, processor *SimpleProcessor, referenceProcessorID uuid.UUID) error // SaveFlow saves a flow and its processors to the database. // Parameters: // - flow: The flow to save. // Returns: // - error: An error if the save operation fails. SaveFlow(flow *Flow) error // GetLastUpdateTime retrieves the last update time for a given flow. // Parameters: // - flowIDs: A slice of flow IDs. // Returns: // - map[uuid.UUID]time.Time: A map of flow IDs to their last update time. // - error: An error if the retrieval fails. GetLastUpdateTime(flowIDs []uuid.UUID) (map[uuid.UUID]time.Time, error) // SetFlowActive marks a flow as active or inactive. // Parameters: // - flowID: The unique identifier of the flow. // - active: A boolean indicating whether the flow should be marked as active. // Returns: // - error: An error if the operation fails. SetFlowActive(flowID uuid.UUID, active bool) error }
FlowManager defines the interface for managing and interacting with flows and processors within those flows.
type LeaderSelector ¶ added in v0.1.1
type LeaderSelector interface { // Start initiates the leader election process. // Returns: // - error: An error if the initiation fails. Start() error // IsLeader checks if the current participant is the leader. // Returns: // - bool: True if the current instance is the leader, false otherwise. // - error: An error if the check fails. IsLeader() (bool, error) // Participants retrieves the list of nodes participating in the leader election. // Returns: // - []string: A slice of node identifiers. // - error: An error if the retrieval fails. Participants() ([]string, error) // Close terminates the leader election process. // Returns: // - error: An error if the termination fails. Close() error // ParticipantName retrieves the name of the current participant. // Returns: // - string: The name of the current node. ParticipantName() string // ParticipantsChangeChannel returns a channel that receives updates about node changes. // Returns: // - <-chan []string: A receive-only channel that provides slices of the nodes. ParticipantsChangeChannel() <-chan []string }
LeaderSelector defines the interface for leader election mechanisms.
type LogEntry ¶ added in v0.0.2
type LogEntry struct { SessionID uuid.UUID `json:"session_id"` ProcessorName string `json:"processor_name"` ProcessorID string `json:"processor_id"` FlowID uuid.UUID `json:"flow_id"` InputFile string `json:"input_file"` OutputFile string `json:"output_file"` FlowObject EngineFlowObject `json:"flow_object"` RetryCount int `json:"retry_count"` IsComplete bool `json:"is_complete"` // indicates if the processor completed CompletedProcessorIDs []uuid.UUID `json:"completed_processor_ids"` }
LogEntry is a struct that represents a log entry in the write ahead log.
type LoggerFactory ¶ added in v0.1.12
type LoggerFactory interface { // GetLogger retrieves a logger based on the specified type and name. // Parameters: // - loggerType: The type of logger to retrieve. For example: Human // - name: The name of the instance of the logger. For example, if I have multiple instances for Human, // I can use its name to differentiate between them. // Returns: // - *logrus.Logger: A pointer to the retrieved logger. GetLogger(loggerType string, name string) *logrus.Logger }
type PaginatedData ¶ added in v0.0.8
type PaginatedData[T any] struct { Data []T `json:"data"` // Data is the slice of items for the current page. TotalCount int `json:"totalCount"` // TotalCount is the total number of items available. }
PaginatedData represents a paginated response. It includes the data and the total count of items.
type PaginationRequest ¶ added in v0.0.8
type PaginationRequest struct { Page int `json:"page" query:"page"` // Page number of the paginated data. PerPage int `json:"perPage" query:"per_page"` // Number of items per page. }
PaginationRequest represents a request for paginated data. It includes the page number and the number of items per page.
func (*PaginationRequest) Limit ¶ added in v0.0.8
func (r *PaginationRequest) Limit() int
Limit calculates the limit for the paginated data. It ensures that the items per page are valid. Returns the limit as an integer.
func (*PaginationRequest) Offset ¶ added in v0.0.8
func (r *PaginationRequest) Offset() int
Offset calculates the offset for the paginated data. It ensures that the page number and items per page are valid. Returns the offset as an integer.
type Processor ¶
type Processor interface { BaseProcessorInterface // Execute executes the processor logic. // Parameters: // - info: The EngineFlowObject containing the execution information. // - fileHandler: The ProcessorFileHandler for handling file operations. // - log: The logger for logging information. // Returns: // - *EngineFlowObject: The updated EngineFlowObject after execution. // - error: An error if the execution fails. Execute(info *EngineFlowObject, fileHandler ProcessorFileHandler, log *logrus.Logger) (*EngineFlowObject, error) }
Processor defines the interface for a processor.
type ProcessorFactory ¶
type ProcessorFactory interface { // GetProcessor retrieves a processor by its type name. // Parameters: // - id: The ID of the processor to retrieve. // - typeName: The type name of the processor to retrieve. // Returns: // - Processor: The retrieved processor. // - error: An error if the retrieval fails. GetProcessor(id uuid.UUID, typeName string) (Processor, error) // GetTriggerProcessor retrieves a trigger processor by its type name. // Parameters: // - id: The ID of the trigger processor to retrieve. // - typeName: The type name of the trigger processor to retrieve. // Returns: // - TriggerProcessor: The retrieved trigger processor. // - error: An error if the retrieval fails. GetTriggerProcessor(id uuid.UUID, typeName string) (TriggerProcessor, error) }
ProcessorFactory defines an interface for retrieving processors and trigger processors.
type ProcessorFileHandler ¶
type ProcessorFileHandler interface { // Read reads data from a file. // Returns: // - io.Reader: The reader for reading data. // - error: An error if the read operation fails. Read() (io.Reader, error) // Write writes data to a file. // Returns: // - io.Writer: The writer for writing data. // - error: An error if the write operation fails. Write() (io.Writer, error) }
ProcessorFileHandler defines the interface for handling the current contents.
type ScheduleConfig ¶ added in v0.0.10
type ScheduleConfig struct { Type ScheduleType CronExpr string // Used only if Type == CronDriven }
type ScheduleType ¶ added in v0.0.10
type ScheduleType int
const ( EventDriven ScheduleType = iota CronDriven )
type SessionUpdate ¶
type SessionUpdate struct { SessionID uuid.UUID `json:"session_id"` Finished bool `json:"finished"` Error error `json:"error"` TPMark string `json:"tp_mark"` }
SessionUpdate represents the update status of a session being processed by the engine.
type SimpleProcessor ¶ added in v0.0.5
type SimpleProcessor struct { ID uuid.UUID `json:"id"` // ID uniquely identifies the processor. FlowID uuid.UUID `json:"flow_id"` // FlowID is the ID of the flow to which the processor belongs. Name string `json:"name"` // Name is the human-readable name of the processor. Type string `json:"type"` // Type indicates the type of the processor, used to determine its implementation. Config map[string]interface{} `json:"config"` // Config holds the specific configuration for the processor. MaxRetries int `json:"max_retries"` // MaxRetries specifies the maximum number of retries allowed for this processor in case of failure. BackoffSeconds int `json:"backoff_seconds"` // Specifies the delay in seconds between retry attempts for this processor in case of failure. LogLevel logrus.Level `json:"log_level"` // LogLevel defines the logging level used for this processor. Enabled bool `json:"enabled"` // Enabled indicates if the processor is enabled. NextProcessors []*SimpleProcessor `json:"next_processors"` // NextProcessors explicitly defines the processors that should run after this one. }
SimpleProcessor represents a processor within a flow. It contains metadata about the processor, including its configuration and execution parameters.
type SimpleTriggerProcessor ¶ added in v0.0.15
type SimpleTriggerProcessor struct { ID uuid.UUID `json:"id"` // ID uniquely identifies the trigger processor. FlowID uuid.UUID `json:"flow_id"` // FlowID is the ID of the flow to which the trigger processor belongs. Name string `json:"name"` // Name is the human-readable name of the trigger processor. Type string `json:"type"` // Type indicates the type of the trigger processor, used to determine its implementation. Config map[string]interface{} `json:"config"` // Config holds the specific configuration for the trigger processor. CronExpr string `json:"cron_expr"` // CronExpr is the cron expression used for scheduling, applicable if ScheduleType is CronDriven. LogLevel logrus.Level `json:"log_level"` // LogLevel defines the logging level used for this trigger processor. SingleNode bool `json:"single_node"` // SingleNode defines whether this trigger processor should run on a single node when running in a cluster. Enabled bool `json:"enabled"` // Enabled indicates if the processor is enabled. }
SimpleTriggerProcessor represents a trigger processor within a flow. It contains metadata about the trigger processor, including its configuration and scheduling parameters.
type StateManager ¶ added in v0.1.3
type StateManager interface { // Reset resets the state manager including closing it before then. // Returns: // - error: An error if the reset fails. Reset() error // Close closes the state manager. // Returns: // - error: An error if the close operation fails. Close() error // GetState retrieves the state for the given state type. // Parameters: // - stateType: The type of state to retrieve. // Returns: // - map[string]any: A map representing the state. // - error: An error if the retrieval fails. GetState(stateType StateType) (map[string]any, error) // SetState sets the state for the given state type. // Parameters: // - stateType: The type of state to set. // - value: A map representing the state to set. // Returns: // - error: An error if the set operation fails. SetState(stateType StateType, value map[string]any) error // WatchState watches for changes in the given state type. // Parameters: // - state: The type of state to watch. // - callback: A callback function to invoke when the state changes. // Returns: // - error: An error if the watch operation fails. WatchState(state StateType, callback func()) error }
type StateManagerFactory ¶ added in v0.1.3
type StateManagerFactory interface {
CreateStateManager(id uuid.UUID) StateManager
}
type TriggerProcessor ¶ added in v0.0.10
type TriggerProcessor interface { BaseProcessorInterface // Execute executes the processor logic. // Parameters: // - info: The EngineFlowObject containing the execution information. // - produceFileHandler: A function to produce a ProcessorFileHandler. // - log: The logger for logging information. // Returns: // - []*EngineFlowObject: A list of *EngineFlowObject after execution. // - error: An error if the execution fails. Execute( info *EngineFlowObject, produceFileHandler func() ProcessorFileHandler, log *logrus.Logger, ) ([]*TriggerProcessorResponse, error) // GetScheduleType returns the scheduling type supported by the TriggerProcessor. GetScheduleType() ScheduleType // HandleSessionUpdate allows the TriggerProcessor to respond to session updates. HandleSessionUpdate(update SessionUpdate) }
TriggerProcessor defines the interface for a trigger processor.
type TriggerProcessorResponse ¶ added in v0.1.5
type TriggerProcessorResponse struct { EngineFlowObject *EngineFlowObject FileHandler ProcessorFileHandler }
type WriteAheadLogger ¶ added in v0.0.2
type WriteAheadLogger interface { WriteEntry(entry LogEntry) ReadEntries() ([]LogEntry, error) ReadLastEntries() ([]LogEntry, error) }
WriteAheadLogger is an interface for writing log entries to a file and reading them back. The log entries are used to keep track of the state of the processing of a flow.