definitions

package
v0.1.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2024 License: MIT Imports: 8 Imported by: 20

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInputFileNotInitialized = fmt.Errorf("input file not initialized")
View Source
var ErrOutputFileNotInitialized = fmt.Errorf("output file not initialized")

Functions

This section is empty.

Types

type BaseProcessor

type BaseProcessor struct {
}

BaseProcessor represents a basic processor.

func (*BaseProcessor) DecodeMap

func (b *BaseProcessor) DecodeMap(input interface{}, output interface{}) error

DecodeMap decodes a map into the output structure. This can be used to decode the configuration(which is a map) into its own struct. Parameters: - input: The input map to decode. - output: The output structure to decode into. Returns: - error: An error if the decoding fails.

type BaseProcessorInterface added in v0.1.4

type BaseProcessorInterface interface {
	// Name returns the name of the processor.
	// Returns:
	// - string: The name of the processor.
	Name() string

	// SetConfig sets the configuration for the processor.
	// Parameters:
	// - config: A map containing the configuration settings.
	// Returns:
	// - error: An error if setting the configuration fails.
	SetConfig(config map[string]interface{}) error

	// Close is called when the processor is being stopped or cleaned up.
	// Returns:
	// - error: An error if the close operation fails.
	Close() error
}

type BranchTracker added in v0.0.20

type BranchTracker interface {
	// AddProcessor track the processor as scheduled and pending completion
	AddProcessor(sessionID uuid.UUID, processorID uuid.UUID, nextProcessorIDs []uuid.UUID)

	// MarkComplete mark the processor as completed
	MarkComplete(sessionID uuid.UUID, processorID uuid.UUID) (allComplete bool)

	// IsComplete check if all processors in a session are completed
	IsComplete(sessionID uuid.UUID) bool

	RestoreState(sessionID uuid.UUID, completedProcessorIDs []uuid.UUID)

	GetCompletedProcessors(sessionID uuid.UUID) []uuid.UUID
}

type Coordinator added in v0.1.1

type Coordinator interface {
	// IsLeader checks if the current instance is the leader for the trigger processor.
	// If the current instance is the coordinator, it will also assign the leader for the trigger processor.
	// Parameters:
	// - tpID: The unique identifier of the trigger processor.
	// Returns:
	// - bool: True if the current instance is the leader, false otherwise.
	// - error: An error if the check fails.
	IsLeader(tpID uuid.UUID) (bool, error)

	// Close terminates the leader election process and stops being the coordinator(if currently is).
	// Returns:
	// - error: An error if the termination fails.
	Close() error
}

Coordinator defines the interface for assigning leaders nodes for each trigger processor. The coordinator elects a leader for each trigger processor and monitors the leader nodes. If a node drops, the coordinator is responsible for reassigning the leader.

type EngineFileHandler

type EngineFileHandler interface {
	ProcessorFileHandler

	// GetInputFile returns the file path of the input file being processed.
	GetInputFile() string

	// GetOutputFile returns the file path of the output file generated by the processing.
	GetOutputFile() string

	// Close closes any resources associated with the file handler.
	Close()

	// GenerateNewFileHandler creates and returns a new file handler, typically used for handling parallel processing paths.
	// Returns:
	//   - A new instance of EngineFileHandler.
	//   - An error if the operation fails.
	GenerateNewFileHandler() (EngineFileHandler, error)
}

EngineFileHandler defines the interface for handling files in the engine's processing flow.

type EngineFlowObject

type EngineFlowObject struct {
	Metadata map[string]interface{} `json:"metadata"`
	TPMark   string                 `json:"tp_mark"` // TPMark is a unique identifier for the flow object given by the trigger processor
}

EngineFlowObject represents the flow of data through the engine, containing metadata for the processing context.

func (*EngineFlowObject) EvaluateExpression

func (e *EngineFlowObject) EvaluateExpression(input string, exprOptions ...expr.Option) (string, error)

EvaluateExpression evaluates a string expression using the metadata in the EngineFlowObject. It will turn any `${expression}` into the value of the expression using expr(https://github.com/expr-lang/expr). To access the metadata, use the `$env` variable. For example, `${$env["key"]}`. Additional functions are available:

  • `uuid()`: Generates a new UUID.
  • `random(max)`: Generates a random number between 0 and max.
  • `getEnv(key)`: Retrieves an environment variable by key.

Parameters:

  • input: The string expression to evaluate.
  • exprOptions: Additional options for the expression evaluation. This will allow you to add custom functions or variables.

Returns:

  • A string containing the evaluated result.
  • An error if the evaluation fails.

type EngineInterface added in v0.0.10

type EngineInterface interface {
	Submit(flowID uuid.UUID, metadata map[string]interface{}, reader io.Reader) uuid.UUID
	SessionUpdates() <-chan SessionUpdate
}

type Flow added in v0.0.6

type Flow struct {
	ID                uuid.UUID                 `json:"id"`                 // ID uniquely identifies the flow.
	Name              string                    `json:"name"`               // Name is the human-readable name of the flow.
	Description       string                    `json:"description"`        // Description provides a brief explanation of the flow's purpose.
	Processors        []*SimpleProcessor        `json:"processors"`         // Processors is the list of processors that are part of the flow, executed in order.
	FirstProcessors   []*SimpleProcessor        `json:"first_processors"`   // FirstProcessors is the list of processors that should run first in the flow.
	TriggerProcessors []*SimpleTriggerProcessor `json:"trigger_processors"` // TriggerProcessors is the list of trigger processors associated with the flow.
	Active            bool                      `json:"active"`             // Active indicates whether the flow is currently active.
	LastUpdated       int64                     `json:"last_updated"`       // LastUpdated indicates the last update timestamp for the flow.
}

Flow represents a flow of processors that are executed in a specific order. It contains the flow's metadata such as its ID, name, description, and the processors that belong to it.

type FlowManager

type FlowManager interface {
	// GetFirstProcessorsForFlow retrieves the first processors for a given flow.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// Returns:
	// - []SimpleProcessor: A slice of the first processors for the flow.
	// - error: An error if the retrieval fails.
	GetFirstProcessorsForFlow(flowID uuid.UUID) ([]*SimpleProcessor, error)

	// GetFlowProcessors retrieves all processors for a given flow.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// Returns:
	// - []SimpleProcessor: A slice of processors for the flow.
	// - error: An error if the retrieval fails.
	GetFlowProcessors(flowID uuid.UUID) ([]*SimpleProcessor, error)

	// GetTriggerProcessorsForFlow retrieves the trigger processors for a given flow.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// Returns:
	// - []*SimpleTriggerProcessor: A slice of the trigger processors for the flow.
	// - error: An error if the retrieval fails.
	GetTriggerProcessorsForFlow(flowID uuid.UUID) ([]*SimpleTriggerProcessor, error)

	// GetProcessors retrieves processors by their unique identifiers.
	// Parameters:
	// - processorIDs: A slice of unique identifiers for the processors.
	// Returns:
	// - []SimpleProcessor: A slice of processors with the specified IDs.
	// - error: An error if the retrieval fails.
	GetProcessors(processorIDs []uuid.UUID) ([]*SimpleProcessor, error)

	// ListFlows lists all flows with pagination and a time filter.
	// Parameters:
	// - pagination: The pagination request containing page number and items per page.
	// - since: The time filter to list flows updated since this time.
	// Returns:
	// - PaginatedData[*Flow]: The paginated data containing the flows.
	// - error: An error if the listing fails.
	ListFlows(pagination *PaginationRequest, since time.Time) (*PaginatedData[*Flow], error)

	// GetFlowByID retrieves a flow by its unique identifier.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// Returns:
	// - *Flow: The flow with the specified ID.
	// - error: An error if the retrieval fails.
	GetFlowByID(flowID uuid.UUID) (*Flow, error)

	// GetProcessorByID retrieves a processor by its unique identifier within a flow.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// - processorID: The unique identifier of the processor.
	// Returns:
	// - *SimpleProcessor: The processor with the specified ID.
	// - error: An error if the retrieval fails.
	GetProcessorByID(flowID uuid.UUID, processorID uuid.UUID) (*SimpleProcessor, error)

	// AddProcessorToFlowBefore adds a processor to a flow before a reference processor.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// - processor: The processor to add.
	// - referenceProcessorID: The unique identifier of the reference processor.
	// Returns:
	// - error: An error if the addition fails.
	AddProcessorToFlowBefore(flowID uuid.UUID, processor *SimpleProcessor, referenceProcessorID uuid.UUID) error

	// AddProcessorToFlowAfter adds a processor to the flow after a reference processor.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// - processor: The processor to add.
	// - referenceProcessorID: The unique identifier of the reference processor.
	// Returns:
	// - error: An error if the addition fails.
	AddProcessorToFlowAfter(flowID uuid.UUID, processor *SimpleProcessor, referenceProcessorID uuid.UUID) error

	// SaveFlow saves a flow and its processors to the database.
	// Parameters:
	// - flow: The flow to save.
	// Returns:
	// - error: An error if the save operation fails.
	SaveFlow(flow *Flow) error

	// GetLastUpdateTime retrieves the last update time for a given flow.
	// Parameters:
	// - flowIDs: A slice of flow IDs.
	// Returns:
	// - map[uuid.UUID]time.Time: A map of flow IDs to their last update time.
	// - error: An error if the retrieval fails.
	GetLastUpdateTime(flowIDs []uuid.UUID) (map[uuid.UUID]time.Time, error)

	// SetFlowActive marks a flow as active or inactive.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// - active: A boolean indicating whether the flow should be marked as active.
	// Returns:
	// - error: An error if the operation fails.
	SetFlowActive(flowID uuid.UUID, active bool) error
}

FlowManager defines the interface for managing and interacting with flows and processors within those flows.

type LeaderSelector added in v0.1.1

type LeaderSelector interface {
	// Start initiates the leader election process.
	// Returns:
	// - error: An error if the initiation fails.
	Start() error

	// IsLeader checks if the current participant is the leader.
	// Returns:
	// - bool: True if the current instance is the leader, false otherwise.
	// - error: An error if the check fails.
	IsLeader() (bool, error)

	// Participants retrieves the list of nodes participating in the leader election.
	// Returns:
	// - []string: A slice of node identifiers.
	// - error: An error if the retrieval fails.
	Participants() ([]string, error)

	// Close terminates the leader election process.
	// Returns:
	// - error: An error if the termination fails.
	Close() error

	// ParticipantName retrieves the name of the current participant.
	// Returns:
	// - string: The name of the current node.
	ParticipantName() string

	// ParticipantsChangeChannel returns a channel that receives updates about node changes.
	// Returns:
	// - <-chan []string: A receive-only channel that provides slices of the nodes.
	ParticipantsChangeChannel() <-chan []string
}

LeaderSelector defines the interface for leader election mechanisms.

type LogEntry added in v0.0.2

type LogEntry struct {
	SessionID             uuid.UUID        `json:"session_id"`
	ProcessorName         string           `json:"processor_name"`
	ProcessorID           string           `json:"processor_id"`
	FlowID                uuid.UUID        `json:"flow_id"`
	InputFile             string           `json:"input_file"`
	OutputFile            string           `json:"output_file"`
	FlowObject            EngineFlowObject `json:"flow_object"`
	RetryCount            int              `json:"retry_count"`
	IsComplete            bool             `json:"is_complete"` // indicates if the processor completed
	CompletedProcessorIDs []uuid.UUID      `json:"completed_processor_ids"`
}

LogEntry is a struct that represents a log entry in the write ahead log.

type LoggerFactory added in v0.1.12

type LoggerFactory interface {
	// GetLogger retrieves a logger based on the specified type and name.
	// Parameters:
	// - loggerType: The type of logger to retrieve. For example: Human
	// - name: The name of the instance of the logger. For example, if I have multiple instances for Human,
	// I can use its name to differentiate between them.
	// Returns:
	// - *logrus.Logger: A pointer to the retrieved logger.
	GetLogger(loggerType string, name string) *logrus.Logger
}

type PaginatedData added in v0.0.8

type PaginatedData[T any] struct {
	Data       []T `json:"data"`       // Data is the slice of items for the current page.
	TotalCount int `json:"totalCount"` // TotalCount is the total number of items available.
}

PaginatedData represents a paginated response. It includes the data and the total count of items.

type PaginationRequest added in v0.0.8

type PaginationRequest struct {
	Page    int `json:"page" query:"page"`        // Page number of the paginated data.
	PerPage int `json:"perPage" query:"per_page"` // Number of items per page.
}

PaginationRequest represents a request for paginated data. It includes the page number and the number of items per page.

func (*PaginationRequest) Limit added in v0.0.8

func (r *PaginationRequest) Limit() int

Limit calculates the limit for the paginated data. It ensures that the items per page are valid. Returns the limit as an integer.

func (*PaginationRequest) Offset added in v0.0.8

func (r *PaginationRequest) Offset() int

Offset calculates the offset for the paginated data. It ensures that the page number and items per page are valid. Returns the offset as an integer.

type Processor

type Processor interface {
	BaseProcessorInterface

	// Execute executes the processor logic.
	// Parameters:
	// - info: The EngineFlowObject containing the execution information.
	// - fileHandler: The ProcessorFileHandler for handling file operations.
	// - log: The logger for logging information.
	// Returns:
	// - *EngineFlowObject: The updated EngineFlowObject after execution.
	// - error: An error if the execution fails.
	Execute(info *EngineFlowObject, fileHandler ProcessorFileHandler, log *logrus.Logger) (*EngineFlowObject, error)
}

Processor defines the interface for a processor.

type ProcessorFactory

type ProcessorFactory interface {
	// GetProcessor retrieves a processor by its type name.
	// Parameters:
	// - id: The ID of the processor to retrieve.
	// - typeName: The type name of the processor to retrieve.
	// Returns:
	// - Processor: The retrieved processor.
	// - error: An error if the retrieval fails.
	GetProcessor(id uuid.UUID, typeName string) (Processor, error)

	// GetTriggerProcessor retrieves a trigger processor by its type name.
	// Parameters:
	// - id: The ID of the trigger processor to retrieve.
	// - typeName: The type name of the trigger processor to retrieve.
	// Returns:
	// - TriggerProcessor: The retrieved trigger processor.
	// - error: An error if the retrieval fails.
	GetTriggerProcessor(id uuid.UUID, typeName string) (TriggerProcessor, error)
}

ProcessorFactory defines an interface for retrieving processors and trigger processors.

type ProcessorFileHandler

type ProcessorFileHandler interface {
	// Read reads data from a file.
	// Returns:
	// - io.Reader: The reader for reading data.
	// - error: An error if the read operation fails.
	Read() (io.Reader, error)

	// Write writes data to a file.
	// Returns:
	// - io.Writer: The writer for writing data.
	// - error: An error if the write operation fails.
	Write() (io.Writer, error)
}

ProcessorFileHandler defines the interface for handling the current contents.

type ScheduleConfig added in v0.0.10

type ScheduleConfig struct {
	Type     ScheduleType
	CronExpr string // Used only if Type == CronDriven
}

type ScheduleType added in v0.0.10

type ScheduleType int
const (
	EventDriven ScheduleType = iota
	CronDriven
)

type SessionUpdate

type SessionUpdate struct {
	SessionID uuid.UUID `json:"session_id"`
	Finished  bool      `json:"finished"`
	Error     error     `json:"error"`
	TPMark    string    `json:"tp_mark"`
}

SessionUpdate represents the update status of a session being processed by the engine.

type SimpleProcessor added in v0.0.5

type SimpleProcessor struct {
	ID             uuid.UUID              `json:"id"`              // ID uniquely identifies the processor.
	FlowID         uuid.UUID              `json:"flow_id"`         // FlowID is the ID of the flow to which the processor belongs.
	Name           string                 `json:"name"`            // Name is the human-readable name of the processor.
	Type           string                 `json:"type"`            // Type indicates the type of the processor, used to determine its implementation.
	Config         map[string]interface{} `json:"config"`          // Config holds the specific configuration for the processor.
	MaxRetries     int                    `json:"max_retries"`     // MaxRetries specifies the maximum number of retries allowed for this processor in case of failure.
	BackoffSeconds int                    `json:"backoff_seconds"` // Specifies the delay in seconds between retry attempts for this processor in case of failure.
	LogLevel       logrus.Level           `json:"log_level"`       // LogLevel defines the logging level used for this processor.
	Enabled        bool                   `json:"enabled"`         // Enabled indicates if the processor is enabled.
	NextProcessors []*SimpleProcessor     `json:"next_processors"` // NextProcessors explicitly defines the processors that should run after this one.
}

SimpleProcessor represents a processor within a flow. It contains metadata about the processor, including its configuration and execution parameters.

type SimpleTriggerProcessor added in v0.0.15

type SimpleTriggerProcessor struct {
	ID         uuid.UUID              `json:"id"`          // ID uniquely identifies the trigger processor.
	FlowID     uuid.UUID              `json:"flow_id"`     // FlowID is the ID of the flow to which the trigger processor belongs.
	Name       string                 `json:"name"`        // Name is the human-readable name of the trigger processor.
	Type       string                 `json:"type"`        // Type indicates the type of the trigger processor, used to determine its implementation.
	Config     map[string]interface{} `json:"config"`      // Config holds the specific configuration for the trigger processor.
	CronExpr   string                 `json:"cron_expr"`   // CronExpr is the cron expression used for scheduling, applicable if ScheduleType is CronDriven.
	LogLevel   logrus.Level           `json:"log_level"`   // LogLevel defines the logging level used for this trigger processor.
	SingleNode bool                   `json:"single_node"` // SingleNode defines whether this trigger processor should run on a single node when running in a cluster.
	Enabled    bool                   `json:"enabled"`     // Enabled indicates if the processor is enabled.
}

SimpleTriggerProcessor represents a trigger processor within a flow. It contains metadata about the trigger processor, including its configuration and scheduling parameters.

type StateManager added in v0.1.3

type StateManager interface {
	// Reset resets the state manager including closing it before then.
	// Returns:
	// - error: An error if the reset fails.
	Reset() error

	// Close closes the state manager.
	// Returns:
	// - error: An error if the close operation fails.
	Close() error

	// GetState retrieves the state for the given state type.
	// Parameters:
	// - stateType: The type of state to retrieve.
	// Returns:
	// - map[string]any: A map representing the state.
	// - error: An error if the retrieval fails.
	GetState(stateType StateType) (map[string]any, error)

	// SetState sets the state for the given state type.
	// Parameters:
	// - stateType: The type of state to set.
	// - value: A map representing the state to set.
	// Returns:
	// - error: An error if the set operation fails.
	SetState(stateType StateType, value map[string]any) error

	// WatchState watches for changes in the given state type.
	// Parameters:
	// - state: The type of state to watch.
	// - callback: A callback function to invoke when the state changes.
	// Returns:
	// - error: An error if the watch operation fails.
	WatchState(state StateType, callback func()) error
}

type StateManagerFactory added in v0.1.3

type StateManagerFactory interface {
	CreateStateManager(id uuid.UUID) StateManager
}

type StateType added in v0.1.3

type StateType string
const (
	StateTypeCluster StateType = "cluster"
	StateTypeLocal   StateType = "local"
)

type TriggerProcessor added in v0.0.10

type TriggerProcessor interface {
	BaseProcessorInterface

	// Execute executes the processor logic.
	// Parameters:
	// - info: The EngineFlowObject containing the execution information.
	// - produceFileHandler: A function to produce a ProcessorFileHandler.
	// - log: The logger for logging information.
	// Returns:
	// - []*EngineFlowObject: A list of *EngineFlowObject after execution.
	// - error: An error if the execution fails.
	Execute(
		info *EngineFlowObject,
		produceFileHandler func() ProcessorFileHandler,
		log *logrus.Logger,
	) ([]*TriggerProcessorResponse, error)

	// GetScheduleType returns the scheduling type supported by the TriggerProcessor.
	GetScheduleType() ScheduleType

	// HandleSessionUpdate allows the TriggerProcessor to respond to session updates.
	HandleSessionUpdate(update SessionUpdate)
}

TriggerProcessor defines the interface for a trigger processor.

type TriggerProcessorResponse added in v0.1.5

type TriggerProcessorResponse struct {
	EngineFlowObject *EngineFlowObject
	FileHandler      ProcessorFileHandler
}

type WriteAheadLogger added in v0.0.2

type WriteAheadLogger interface {
	WriteEntry(entry LogEntry)
	ReadEntries() ([]LogEntry, error)
	ReadLastEntries() ([]LogEntry, error)
}

WriteAheadLogger is an interface for writing log entries to a file and reading them back. The log entries are used to keep track of the state of the processing of a flow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL