definitions

package
v0.0.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2024 License: MIT Imports: 6 Imported by: 20

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseProcessor

type BaseProcessor struct {
	ID string // ID is the unique identifier of the processor.
}

BaseProcessor represents a basic processor with an ID.

func (*BaseProcessor) DecodeMap

func (b *BaseProcessor) DecodeMap(input interface{}, output interface{}) error

DecodeMap decodes a map into the output structure. This can be used to decode the configuration(which is a map) into its own struct. Parameters: - input: The input map to decode. - output: The output structure to decode into. Returns: - error: An error if the decoding fails.

func (*BaseProcessor) GetID

func (b *BaseProcessor) GetID() string

GetID returns the ID of the processor. Returns: - string: The ID of the processor.

type BranchTracker added in v0.0.20

type BranchTracker interface {
	// MarkProcessorComplete marks a processor as complete for a session.
	// It returns whether the current processor was the last one for this level and if the next level should be scheduled.
	MarkProcessorComplete(sessionID uuid.UUID, processorID uuid.UUID) (bool, error)

	// AddProcessorsForSession adds a set of processors to be tracked for the session at the current level.
	AddProcessorsForSession(sessionID uuid.UUID, processorIDs []uuid.UUID)

	// HasPendingProcessors returns true if there are processors in the current level that haven't completed.
	HasPendingProcessors(sessionID uuid.UUID) bool

	// ClearSession removes a session from the tracker once it's completed.
	ClearSession(sessionID uuid.UUID)
}

BranchTracker is an interface that tracks the state of the processor branches for a flow.

type EngineFileHandler

type EngineFileHandler interface {
	ProcessorFileHandler

	// GetInputFile returns the file path of the input file being processed.
	GetInputFile() string

	// GetOutputFile returns the file path of the output file generated by the processing.
	GetOutputFile() string

	// Close closes any resources associated with the file handler.
	Close()

	// GenerateNewFileHandler creates and returns a new file handler, typically used for handling parallel processing paths.
	// Returns:
	//   - A new instance of EngineFileHandler.
	//   - An error if the operation fails.
	GenerateNewFileHandler() (EngineFileHandler, error)
}

EngineFileHandler defines the interface for handling files in the engine's processing flow.

type EngineFlowObject

type EngineFlowObject struct {
	Metadata map[string]interface{} `json:"metadata"`
}

EngineFlowObject represents the flow of data through the engine, containing metadata for the processing context.

func (*EngineFlowObject) EvaluateExpression

func (e *EngineFlowObject) EvaluateExpression(input string) (string, error)

EvaluateExpression evaluates a string expression using the metadata in the EngineFlowObject. It will turn any `${expression}` into the value of the expression using [expr](https://github.com/expr-lang/expr). To access the metadata, use the `$env` variable. For example, `${$env["key"]}`. Additional functions are available:

  • `uuid()`: Generates a new UUID.
  • `random(max)`: Generates a random number between 0 and max.
  • `getEnv(key)`: Retrieves an environment variable by key.

Parameters:

  • input: The string expression to evaluate.

Returns:

  • A string containing the evaluated result.
  • An error if the evaluation fails.

type EngineIncomingObject

type EngineIncomingObject struct {
	FlowID    uuid.UUID
	Metadata  map[string]interface{}
	Reader    io.Reader
	SessionID uuid.UUID
}

EngineIncomingObject represents an incoming object for the engine to process, including metadata, a data reader, and a session ID.

type EngineInterface added in v0.0.10

type EngineInterface interface {
	Submit(flowID uuid.UUID, metadata map[string]interface{}, reader io.Reader) uuid.UUID
	SessionUpdates() <-chan SessionUpdate
}

type Flow added in v0.0.6

type Flow struct {
	ID                uuid.UUID                `json:"id"`                 // ID uniquely identifies the flow.
	Name              string                   `json:"name"`               // Name is the human-readable name of the flow.
	Description       string                   `json:"description"`        // Description provides a brief explanation of the flow's purpose.
	Processors        []SimpleProcessor        `json:"processors"`         // Processors is the list of processors that are part of the flow, executed in order.
	TriggerProcessors []SimpleTriggerProcessor `json:"trigger_processors"` // TriggerProcessors is the list of trigger processors associated with the flow.
	Active            bool                     `json:"active"`             // Active indicates whether the flow is currently active.
	LastUpdated       int64                    `json:"last_updated"`       // LastUpdated indicates the last update timestamp for the flow.
}

Flow represents a flow of processors that are executed in a specific order. It contains the flow's metadata such as its ID, name, description, and the processors that belong to it.

type FlowManager

type FlowManager interface {
	// GetFirstProcessorsForFlow retrieves the first processors for a given flow.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// Returns:
	// - []SimpleProcessor: A slice of the first processors for the flow.
	// - error: An error if the retrieval fails.
	GetFirstProcessorsForFlow(flowID uuid.UUID) ([]SimpleProcessor, error)

	// GetLastProcessorForFlow retrieves the last processor for a given flow.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// Returns:
	// - *SimpleProcessor: The last processor for the flow.
	// - error: An error if the retrieval fails.
	GetLastProcessorForFlow(flowID uuid.UUID) (*SimpleProcessor, error)

	// GetTriggerProcessorsForFlow retrieves the trigger processors for a given flow.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// Returns:
	// - []*SimpleTriggerProcessor: A slice of the trigger processors for the flow.
	// - error: An error if the retrieval fails.
	GetTriggerProcessorsForFlow(flowID uuid.UUID) ([]*SimpleTriggerProcessor, error)

	// ListFlows lists all flows with pagination and a time filter.
	// Parameters:
	// - pagination: The pagination request containing page number and items per page.
	// - since: The time filter to list flows updated since this time.
	// Returns:
	// - PaginatedData[*Flow]: The paginated data containing the flows.
	// - error: An error if the listing fails.
	ListFlows(pagination *PaginationRequest, since time.Time) (PaginatedData[*Flow], error)

	// GetFlowByID retrieves a flow by its unique identifier.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// Returns:
	// - *Flow: The flow with the specified ID.
	// - error: An error if the retrieval fails.
	GetFlowByID(flowID uuid.UUID) (*Flow, error)

	// GetProcessorByID retrieves a processor by its unique identifier within a flow.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// - processorID: The unique identifier of the processor.
	// Returns:
	// - *SimpleProcessor: The processor with the specified ID.
	// - error: An error if the retrieval fails.
	GetProcessorByID(flowID uuid.UUID, processorID uuid.UUID) (*SimpleProcessor, error)

	// GetNextProcessors retrieves the next processors for a given processor within a flow.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// - processorID: The unique identifier of the processor.
	// Returns:
	// - []SimpleProcessor: A slice of the next processors for the specified processor.
	// - error: An error if the retrieval fails.
	GetNextProcessors(flowID uuid.UUID, processorID uuid.UUID) ([]SimpleProcessor, error)

	// AddProcessorToFlowBefore adds a processor to a flow before a reference processor.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// - processor: The processor to add.
	// - referenceProcessorID: The unique identifier of the reference processor.
	// Returns:
	// - error: An error if the addition fails.
	AddProcessorToFlowBefore(flowID uuid.UUID, processor *SimpleProcessor, referenceProcessorID uuid.UUID) error

	// AddProcessorToFlowAfter adds a processor to a flow after a reference processor.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// - processor: The processor to add.
	// - referenceProcessorID: The unique identifier of the reference processor.
	// Returns:
	// - error: An error if the addition fails.
	AddProcessorToFlowAfter(flowID uuid.UUID, processor *SimpleProcessor, referenceProcessorID uuid.UUID) error

	// SaveFlow saves a flow.
	// Parameters:
	// - flow: The flow to save.
	// Returns:
	// - error: An error if the save operation fails.
	SaveFlow(flow *Flow) error

	// GetLastUpdateTime retrieves the last update time for a given flow.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// Returns:
	// - time.Time: The last update time of the flow.
	// - error: An error if the retrieval fails.
	GetLastUpdateTime(flowIDs []uuid.UUID) (map[uuid.UUID]time.Time, error)

	// SetFlowActive marks a flow as active or inactive.
	// Parameters:
	// - flowID: The unique identifier of the flow.
	// - active: A boolean indicating whether the flow should be marked as active.
	// Returns:
	// - error: An error if the operation fails.
	SetFlowActive(flowID uuid.UUID, active bool) error
}

FlowManager defines the interface for managing and interacting with flows and processors within those flows.

type LogEntry added in v0.0.2

type LogEntry struct {
	SessionID     uuid.UUID        `json:"session_id"`
	ProcessorName string           `json:"processor_name"`
	ProcessorID   string           `json:"processor_id"`
	FlowID        uuid.UUID        `json:"flow_id"`
	InputFile     string           `json:"input_file"`
	OutputFile    string           `json:"output_file"`
	FlowObject    EngineFlowObject `json:"flow_object"`
	RetryCount    int              `json:"retry_count"`
}

LogEntry is a struct that represents a log entry in the write ahead log.

type PaginatedData added in v0.0.8

type PaginatedData[T any] struct {
	Data       []T `json:"data"`       // Data is the slice of items for the current page.
	TotalCount int `json:"totalCount"` // TotalCount is the total number of items available.
}

PaginatedData represents a paginated response. It includes the data and the total count of items.

type PaginationRequest added in v0.0.8

type PaginationRequest struct {
	Page    int `json:"page" query:"page"`        // Page number of the paginated data.
	PerPage int `json:"perPage" query:"per_page"` // Number of items per page.
}

PaginationRequest represents a request for paginated data. It includes the page number and the number of items per page.

func (*PaginationRequest) Limit added in v0.0.8

func (r *PaginationRequest) Limit() int

Limit calculates the limit for the paginated data. It ensures that the items per page are valid. Returns the limit as an integer.

func (*PaginationRequest) Offset added in v0.0.8

func (r *PaginationRequest) Offset() int

Offset calculates the offset for the paginated data. It ensures that the page number and items per page are valid. Returns the offset as an integer.

type Processor

type Processor interface {
	// GetID returns the ID of the processor.
	// Returns:
	// - string: The ID of the processor.
	GetID() string

	// Name returns the name of the processor.
	// Returns:
	// - string: The name of the processor.
	Name() string

	// Execute executes the processor logic.
	// Parameters:
	// - info: The EngineFlowObject containing the execution information.
	// - fileHandler: The ProcessorFileHandler for handling file operations.
	// - log: The logger for logging information.
	// Returns:
	// - *EngineFlowObject: The updated EngineFlowObject after execution.
	// - error: An error if the execution fails.
	Execute(info *EngineFlowObject, fileHandler ProcessorFileHandler, log *logrus.Logger) (*EngineFlowObject, error)

	// SetConfig sets the configuration for the processor.
	// Parameters:
	// - config: A map containing the configuration settings.
	// Returns:
	// - error: An error if setting the configuration fails.
	SetConfig(config map[string]interface{}) error
}

Processor defines the interface for a processor.

type ProcessorFactory

type ProcessorFactory interface {
	// GetProcessor retrieves a processor by its type name.
	// Parameters:
	// - typeName: The type name of the processor to retrieve.
	// Returns:
	// - Processor: The retrieved processor.
	// - error: An error if the retrieval fails.
	GetProcessor(typeName string) (Processor, error)

	// GetTriggerProcessor retrieves a trigger processor by its type name.
	// Parameters:
	// - typeName: The type name of the trigger processor to retrieve.
	// Returns:
	// - TriggerProcessor: The retrieved trigger processor.
	// - error: An error if the retrieval fails.
	GetTriggerProcessor(typeName string) (TriggerProcessor, error)
}

ProcessorFactory defines an interface for retrieving processors and trigger processors.

type ProcessorFileHandler

type ProcessorFileHandler interface {
	// Read reads data from a file.
	// Returns:
	// - io.Reader: The reader for reading data.
	// - error: An error if the read operation fails.
	Read() (io.Reader, error)

	// Write writes data to a file.
	// Returns:
	// - io.Writer: The writer for writing data.
	// - error: An error if the write operation fails.
	Write() (io.Writer, error)
}

ProcessorFileHandler defines the interface for handling the current contents.

type ScheduleConfig added in v0.0.10

type ScheduleConfig struct {
	Type     ScheduleType
	CronExpr string // Used only if Type == CronDriven
}

type ScheduleType added in v0.0.10

type ScheduleType int
const (
	EventDriven ScheduleType = iota
	CronDriven
)

type SessionUpdate

type SessionUpdate struct {
	SessionID uuid.UUID `json:"session_id"`
	Finished  bool      `json:"finished"`
	Error     error     `json:"error"`
}

SessionUpdate represents the update status of a session being processed by the engine.

type SimpleProcessor added in v0.0.5

type SimpleProcessor struct {
	ID         uuid.UUID              `json:"id"`          // ID uniquely identifies the processor.
	FlowID     uuid.UUID              `json:"flow_id"`     // FlowID is the ID of the flow to which the processor belongs.
	Name       string                 `json:"name"`        // Name is the human-readable name of the processor.
	Type       string                 `json:"type"`        // Type indicates the type of the processor, used to determine its implementation.
	FlowOrder  int                    `json:"flow_order"`  // FlowOrder specifies the order in which the processor is executed within the flow.
	Config     map[string]interface{} `json:"config"`      // Config holds the specific configuration for the processor.
	MaxRetries int                    `json:"max_retries"` // MaxRetries specifies the maximum number of retries allowed for this processor in case of failure.
	LogLevel   logrus.Level           `json:"log_level"`   // LogLevel defines the logging level used for this processor.
	Enabled    bool                   `json:"enabled"`     // Enabled indicates if the processor is enabled.
}

SimpleProcessor represents a processor within a flow. It contains metadata about the processor, including its configuration and execution parameters.

type SimpleTriggerProcessor added in v0.0.15

type SimpleTriggerProcessor struct {
	ID           uuid.UUID              `json:"id"`            // ID uniquely identifies the trigger processor.
	FlowID       uuid.UUID              `json:"flow_id"`       // FlowID is the ID of the flow to which the trigger processor belongs.
	Name         string                 `json:"name"`          // Name is the human-readable name of the trigger processor.
	Type         string                 `json:"type"`          // Type indicates the type of the trigger processor, used to determine its implementation.
	Config       map[string]interface{} `json:"config"`        // Config holds the specific configuration for the trigger processor.
	ScheduleType ScheduleType           `json:"schedule_type"` // ScheduleType defines the type of scheduling (event-driven or cron-driven).
	CronExpr     string                 `json:"cron_expr"`     // CronExpr is the cron expression used for scheduling, applicable if ScheduleType is CronDriven.
	EventName    string                 `json:"event_name"`    // EventName is the name of the event to listen to, applicable if ScheduleType is EventDriven.
	LogLevel     logrus.Level           `json:"log_level"`     // LogLevel defines the logging level used for this trigger processor.
	Enabled      bool                   `json:"enabled"`       // Enabled indicates if the processor is enabled.
}

SimpleTriggerProcessor represents a trigger processor within a flow. It contains metadata about the trigger processor, including its configuration and scheduling parameters.

type TriggerProcessor added in v0.0.10

type TriggerProcessor interface {
	Processor
	// GetScheduleType returns the scheduling type supported by the TriggerProcessor.
	GetScheduleType() ScheduleType
	// HandleSessionUpdate allows the TriggerProcessor to respond to session updates.
	HandleSessionUpdate(update SessionUpdate)
	// Close is called when the TriggerProcessor is being stopped or cleaned up.
	Close() error
}

TriggerProcessor defines the interface for a trigger processor.

type WriteAheadLogger added in v0.0.2

type WriteAheadLogger interface {
	WriteEntry(entry LogEntry)
	ReadEntries() ([]LogEntry, error)
}

WriteAheadLogger is an interface for writing log entries to a file and reading them back. The log entries are used to keep track of the state of the processing of a flow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL