Documentation
¶
Index ¶
- type BaseProcessor
- type EngineFileHandler
- type EngineFlowObject
- type EngineIncomingObject
- type EngineInterface
- type Flow
- type FlowManager
- type LogEntry
- type PaginatedData
- type PaginationRequest
- type Processor
- type ProcessorFactory
- type ProcessorFileHandler
- type ScheduleConfig
- type ScheduleType
- type SessionUpdate
- type SimpleProcessor
- type TriggerProcessor
- type WriteAheadLogger
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseProcessor ¶
type BaseProcessor struct {
ID string // ID is the unique identifier of the processor.
}
BaseProcessor represents a basic processor with an ID.
func (*BaseProcessor) DecodeMap ¶
func (b *BaseProcessor) DecodeMap(input interface{}, output interface{}) error
DecodeMap decodes a map into the output structure. This can be used to decode the configuration(which is a map) into its own struct. Parameters: - input: The input map to decode. - output: The output structure to decode into. Returns: - error: An error if the decoding fails.
func (*BaseProcessor) GetID ¶
func (b *BaseProcessor) GetID() string
GetID returns the ID of the processor. Returns: - string: The ID of the processor.
type EngineFileHandler ¶
type EngineFileHandler interface {
// GetInputFile returns the file path of the input file being processed.
GetInputFile() string
// GetOutputFile returns the file path of the output file generated by the processing.
GetOutputFile() string
// Close closes any resources associated with the file handler.
Close()
// GenerateNewFileHandler creates and returns a new file handler, typically used for handling parallel processing paths.
// Returns:
// - A new instance of EngineFileHandler.
// - An error if the operation fails.
GenerateNewFileHandler() (EngineFileHandler, error)
}
EngineFileHandler defines the interface for handling files in the engine's processing flow.
type EngineFlowObject ¶
type EngineFlowObject struct {
Metadata map[string]interface{} `json:"metadata"`
}
EngineFlowObject represents the flow of data through the engine, containing metadata for the processing context.
func (*EngineFlowObject) EvaluateExpression ¶
func (e *EngineFlowObject) EvaluateExpression(input string) (string, error)
EvaluateExpression evaluates a string expression using the metadata in the EngineFlowObject. It will turn any `${expression}` into the value of the expression using [expr](https://github.com/expr-lang/expr). To access the metadata, use the `$env` variable. For example, `${$env["key"]}`. Additional functions are available:
- `uuid()`: Generates a new UUID.
- `random(max)`: Generates a random number between 0 and max.
- `getEnv(key)`: Retrieves an environment variable by key.
Parameters:
- input: The string expression to evaluate.
Returns:
- A string containing the evaluated result.
- An error if the evaluation fails.
type EngineIncomingObject ¶
type EngineIncomingObject struct {
FlowID uuid.UUID
Metadata map[string]interface{}
Reader io.Reader
SessionID uuid.UUID
}
EngineIncomingObject represents an incoming object for the engine to process, including metadata, a data reader, and a session ID.
type EngineInterface ¶ added in v0.0.10
type Flow ¶ added in v0.0.6
type Flow struct {
ID uuid.UUID `json:"id"` // ID uniquely identifies the flow.
Name string `json:"name"` // Name is the human-readable name of the flow.
Description string `json:"description"` // Description provides a brief explanation of the flow's purpose.
Processors []SimpleProcessor `json:"processors"` // Processors is the list of processors that are part of the flow, executed in order.
}
Flow represents a flow of processors that are executed in a specific order. It contains the flow's metadata such as its ID, name, description, and the processors that belong to it.
type FlowManager ¶
type FlowManager interface {
// GetFirstProcessorsForFlow retrieves the first processors for a given flow.
// Parameters:
// - flowID: The unique identifier of the flow.
// Returns:
// - []SimpleProcessor: A slice of the first processors for the flow.
// - error: An error if the retrieval fails.
GetFirstProcessorsForFlow(flowID uuid.UUID) ([]SimpleProcessor, error)
// GetLastProcessorForFlow retrieves the last processor for a given flow.
// Parameters:
// - flowID: The unique identifier of the flow.
// Returns:
// - *SimpleProcessor: The last processor for the flow.
// - error: An error if the retrieval fails.
GetLastProcessorForFlow(flowID uuid.UUID) (*SimpleProcessor, error)
// ListFlows lists all flows with pagination and a time filter.
// Parameters:
// - pagination: The pagination request containing page number and items per page.
// - since: The time filter to list flows updated since this time.
// Returns:
// - PaginatedData[*Flow]: The paginated data containing the flows.
// - error: An error if the listing fails.
ListFlows(pagination *PaginationRequest, since time.Time) (PaginatedData[*Flow], error)
// GetFlowByID retrieves a flow by its unique identifier.
// Parameters:
// - flowID: The unique identifier of the flow.
// Returns:
// - *Flow: The flow with the specified ID.
// - error: An error if the retrieval fails.
GetFlowByID(flowID uuid.UUID) (*Flow, error)
// GetProcessorByID retrieves a processor by its unique identifier within a flow.
// Parameters:
// - flowID: The unique identifier of the flow.
// - processorID: The unique identifier of the processor.
// Returns:
// - *SimpleProcessor: The processor with the specified ID.
// - error: An error if the retrieval fails.
GetProcessorByID(flowID uuid.UUID, processorID uuid.UUID) (*SimpleProcessor, error)
// GetNextProcessors retrieves the next processors for a given processor within a flow.
// Parameters:
// - flowID: The unique identifier of the flow.
// - processorID: The unique identifier of the processor.
// Returns:
// - []SimpleProcessor: A slice of the next processors for the specified processor.
// - error: An error if the retrieval fails.
GetNextProcessors(flowID uuid.UUID, processorID uuid.UUID) ([]SimpleProcessor, error)
// AddProcessorToFlowBefore adds a processor to a flow before a reference processor.
// Parameters:
// - flowID: The unique identifier of the flow.
// - processor: The processor to add.
// - referenceProcessorID: The unique identifier of the reference processor.
// Returns:
// - error: An error if the addition fails.
AddProcessorToFlowBefore(flowID uuid.UUID, processor *SimpleProcessor, referenceProcessorID uuid.UUID) error
// AddProcessorToFlowAfter adds a processor to a flow after a reference processor.
// Parameters:
// - flowID: The unique identifier of the flow.
// - processor: The processor to add.
// - referenceProcessorID: The unique identifier of the reference processor.
// Returns:
// - error: An error if the addition fails.
AddProcessorToFlowAfter(flowID uuid.UUID, processor *SimpleProcessor, referenceProcessorID uuid.UUID) error
// SaveFlow saves a flow.
// Parameters:
// - flow: The flow to save.
// Returns:
// - error: An error if the save operation fails.
SaveFlow(flow *Flow) error
// GetLastUpdateTime retrieves the last update time for a given flow.
// Parameters:
// - flowID: The unique identifier of the flow.
// Returns:
// - time.Time: The last update time of the flow.
// - error: An error if the retrieval fails.
GetLastUpdateTime(flowIDs []uuid.UUID) (map[uuid.UUID]time.Time, error)
// SetFlowActive marks a flow as active or inactive.
// Parameters:
// - flowID: The unique identifier of the flow.
// - active: A boolean indicating whether the flow should be marked as active.
// Returns:
// - error: An error if the operation fails.
SetFlowActive(flowID uuid.UUID, active bool) error
}
FlowManager defines the interface for managing and interacting with flows and processors within those flows.
type LogEntry ¶ added in v0.0.2
type LogEntry struct {
SessionID uuid.UUID `json:"session_id"`
ProcessorName string `json:"processor_name"`
ProcessorID string `json:"processor_id"`
FlowID uuid.UUID `json:"flow_id"`
InputFile string `json:"input_file"`
OutputFile string `json:"output_file"`
FlowObject EngineFlowObject `json:"flow_object"`
RetryCount int `json:"retry_count"`
}
LogEntry is a struct that represents a log entry in the write ahead log.
type PaginatedData ¶ added in v0.0.8
type PaginatedData[T any] struct { Data []T `json:"data"` // Data is the slice of items for the current page. TotalCount int `json:"totalCount"` // TotalCount is the total number of items available. }
PaginatedData represents a paginated response. It includes the data and the total count of items.
type PaginationRequest ¶ added in v0.0.8
type PaginationRequest struct {
Page int `json:"page" query:"page"` // Page number of the paginated data.
PerPage int `json:"perPage" query:"per_page"` // Number of items per page.
}
PaginationRequest represents a request for paginated data. It includes the page number and the number of items per page.
func (*PaginationRequest) Limit ¶ added in v0.0.8
func (r *PaginationRequest) Limit() int
Limit calculates the limit for the paginated data. It ensures that the items per page are valid. Returns the limit as an integer.
func (*PaginationRequest) Offset ¶ added in v0.0.8
func (r *PaginationRequest) Offset() int
Offset calculates the offset for the paginated data. It ensures that the page number and items per page are valid. Returns the offset as an integer.
type Processor ¶
type Processor interface {
// GetID returns the ID of the processor.
// Returns:
// - string: The ID of the processor.
GetID() string
// Name returns the name of the processor.
// Returns:
// - string: The name of the processor.
Name() string
// Execute executes the processor logic.
// Parameters:
// - info: The EngineFlowObject containing the execution information.
// - fileHandler: The ProcessorFileHandler for handling file operations.
// - log: The logger for logging information.
// Returns:
// - *EngineFlowObject: The updated EngineFlowObject after execution.
// - error: An error if the execution fails.
Execute(info *EngineFlowObject, fileHandler ProcessorFileHandler, log *logrus.Logger) (*EngineFlowObject, error)
// SetConfig sets the configuration for the processor.
// Parameters:
// - config: A map containing the configuration settings.
// Returns:
// - error: An error if setting the configuration fails.
SetConfig(config map[string]interface{}) error
}
Processor defines the interface for a processor.
type ProcessorFactory ¶
type ProcessorFactory interface {
// GetProcessor retrieves a processor by its type name.
// Parameters:
// - typeName: The type name of the processor to retrieve.
// Returns:
// - Processor: The retrieved processor.
// - error: An error if the retrieval fails.
GetProcessor(typeName string) (Processor, error)
// RegisterProcessor registers a new processor.
// Parameters:
// - processor: The processor to register.
RegisterProcessor(processor Processor)
// RegisterProcessorWithTypeName registers a new processor with a specific type name.
// Parameters:
// - typeName: The type name to associate with the processor.
// - processor: The processor to register.
RegisterProcessorWithTypeName(typeName string, processor Processor)
}
ProcessorFactory defines an interface for retrieving processors. This can be used to support all sorts of processors.
type ProcessorFileHandler ¶
type ProcessorFileHandler interface {
// Read reads data from a file.
// Returns:
// - io.Reader: The reader for reading data.
// - error: An error if the read operation fails.
Read() (io.Reader, error)
// Write writes data to a file.
// Returns:
// - io.Writer: The writer for writing data.
// - error: An error if the write operation fails.
Write() (io.Writer, error)
}
ProcessorFileHandler defines the interface for handling the current contents.
type ScheduleConfig ¶ added in v0.0.10
type ScheduleConfig struct {
Type ScheduleType
CronExpr string // Used only if Type == CronDriven
}
type ScheduleType ¶ added in v0.0.10
type ScheduleType int
const ( EventDriven ScheduleType = iota CronDriven )
type SessionUpdate ¶
type SessionUpdate struct {
SessionID uuid.UUID `json:"session_id"`
Finished bool `json:"finished"`
Error error `json:"error"`
}
SessionUpdate represents the update status of a session being processed by the engine.
type SimpleProcessor ¶ added in v0.0.5
type SimpleProcessor struct {
ID uuid.UUID `json:"id"` // ID uniquely identifies the processor.
FlowID uuid.UUID `json:"flow_id"` // FlowID is the ID of the flow to which the processor belongs.
Name string `json:"name"` // Name is the human-readable name of the processor.
Type string `json:"type"` // Type indicates the type of the processor, typically used to determine its implementation.
FlowOrder int `json:"flow_order"` // FlowOrder specifies the order in which the processor is executed within the flow.
Config map[string]interface{} `json:"config"` // Config holds the specific configuration for the processor.
MaxRetries int `json:"max_retries"` // MaxRetries specifies the maximum number of retries allowed for this processor in case of failure.
LogLevel logrus.Level `json:"log_level"` // LogLevel defines the logging level used for this processor.
}
SimpleProcessor represents a processor within a flow. It contains metadata about the processor, including its configuration and execution parameters.
type TriggerProcessor ¶ added in v0.0.10
type TriggerProcessor interface {
Processor
GetScheduleConfig() ScheduleConfig
HandleSessionUpdate(update SessionUpdate)
}
type WriteAheadLogger ¶ added in v0.0.2
WriteAheadLogger is an interface for writing log entries to a file and reading them back. The log entries are used to keep track of the state of the processing of a flow.