definitions

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2024 License: MIT Imports: 5 Imported by: 20

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseProcessor

type BaseProcessor struct {
	ID string // ID is the unique identifier of the processor.
}

BaseProcessor represents a basic processor with an ID.

func (*BaseProcessor) DecodeMap

func (b *BaseProcessor) DecodeMap(input interface{}, output interface{}) error

DecodeMap decodes a map into the output structure. This can be used to decode the configuration(which is a map) into its own struct. Parameters: - input: The input map to decode. - output: The output structure to decode into. Returns: - error: An error if the decoding fails.

func (*BaseProcessor) GetID

func (b *BaseProcessor) GetID() string

GetID returns the ID of the processor. Returns: - string: The ID of the processor.

type EngineFileHandler

type EngineFileHandler interface {
	// GetInputFile returns the file path of the input file being processed.
	GetInputFile() string

	// GetOutputFile returns the file path of the output file generated by the processing.
	GetOutputFile() string

	// Close closes any resources associated with the file handler.
	Close()

	// GenerateNewFileHandler creates and returns a new file handler, typically used for handling parallel processing paths.
	// Returns:
	//   - A new instance of EngineFileHandler.
	//   - An error if the operation fails.
	GenerateNewFileHandler() (EngineFileHandler, error)
}

EngineFileHandler defines the interface for handling files in the engine's processing flow.

type EngineFlowObject

type EngineFlowObject struct {
	Metadata map[string]interface{} `json:"metadata"`
}

EngineFlowObject represents the flow of data through the engine, containing metadata for the processing context.

func (*EngineFlowObject) EvaluateExpression

func (e *EngineFlowObject) EvaluateExpression(input string) (string, error)

EvaluateExpression evaluates a string expression using the metadata in the EngineFlowObject. It will turn any `${expression}` into the value of the expression using [expr](https://github.com/expr-lang/expr). To access the metadata, use the `$env` variable. For example, `${$env["key"]}`. Additional functions are available:

  • `uuid()`: Generates a new UUID.
  • `random(max)`: Generates a random number between 0 and max.
  • `getEnv(key)`: Retrieves an environment variable by key.

Parameters:

  • input: The string expression to evaluate.

Returns:

  • A string containing the evaluated result.
  • An error if the evaluation fails.

type EngineIncomingObject

type EngineIncomingObject struct {
	FlowID    uuid.UUID
	Metadata  map[string]interface{}
	Reader    io.Reader
	SessionID uuid.UUID
}

EngineIncomingObject represents an incoming object for the engine to process, including metadata, a data reader, and a session ID.

type EngineInterface added in v0.0.10

type EngineInterface interface {
	Submit(flowID uuid.UUID, metadata map[string]interface{}, reader io.Reader) uuid.UUID
	SessionUpdates() <-chan SessionUpdate
}

type Flow added in v0.0.6

type Flow struct {
	ID          uuid.UUID         `json:"id"`          // ID uniquely identifies the flow.
	Name        string            `json:"name"`        // Name is the human-readable name of the flow.
	Description string            `json:"description"` // Description provides a brief explanation of the flow's purpose.
	Processors  []SimpleProcessor `json:"processors"`  // Processors is the list of processors that are part of the flow, executed in order.
}

Flow represents a flow of processors that are executed in a specific order. It contains the flow's metadata such as its ID, name, description, and the processors that belong to it.

type FlowManager

type FlowManager interface {
	// GetFirstProcessorsForFlow retrieves the first processors in the execution order for a given flow.
	// Parameters:
	//   - flowID: The UUID of the flow.
	// Returns:
	//   - A slice of SimpleProcessor representing the first processors in the flow.
	//   - An error if the operation fails.
	GetFirstProcessorsForFlow(flowID uuid.UUID) ([]SimpleProcessor, error)

	// GetLastProcessorForFlow retrieves the last processor in the execution order for a given flow.
	// Parameters:
	//   - flowID: The UUID of the flow.
	// Returns:
	//   - A pointer to a SimpleProcessor representing the last processor in the flow.
	//   - An error if the operation fails.
	GetLastProcessorForFlow(flowID uuid.UUID) (*SimpleProcessor, error)

	// ListFlows retrieves a paginated list of flows.
	// Parameters:
	//   - pagination: A pointer to a PaginationRequest containing pagination information.
	// Returns:
	//   - A PaginatedData struct containing a slice of Flow pointers and the total count of flows.
	//   - An error if the operation fails.
	ListFlows(pagination *PaginationRequest) (PaginatedData[*Flow], error)

	// GetFlowByID retrieves a flow by its UUID.
	// Parameters:
	//   - flowID: The UUID of the flow to retrieve.
	// Returns:
	//   - A pointer to the Flow struct representing the retrieved flow.
	//   - An error if the operation fails.
	GetFlowByID(flowID uuid.UUID) (*Flow, error)

	// GetProcessorByID retrieves a processor by its UUID within a specific flow.
	// Parameters:
	//   - flowID: The UUID of the flow containing the processor.
	//   - processorID: The UUID of the processor to retrieve.
	// Returns:
	//   - A pointer to a SimpleProcessor representing the retrieved processor.
	//   - An error if the operation fails.
	GetProcessorByID(flowID uuid.UUID, processorID uuid.UUID) (*SimpleProcessor, error)

	// GetNextProcessors retrieves the processors that follow a specific processor in a flow.
	// Parameters:
	//   - flowID: The UUID of the flow containing the processors.
	//   - processorID: The UUID of the processor whose successors are to be retrieved.
	// Returns:
	//   - A slice of SimpleProcessor representing the next processors in the flow.
	//   - An error if the operation fails.
	GetNextProcessors(flowID uuid.UUID, processorID uuid.UUID) ([]SimpleProcessor, error)

	// AddProcessorToFlowBefore adds a processor to a flow before a specified reference processor.
	// Parameters:
	//   - flowID: The UUID of the flow to which the processor will be added.
	//   - processor: A pointer to the SimpleProcessor to add to the flow.
	//   - referenceProcessorID: The UUID of the processor before which the new processor will be added.
	// Returns:
	//   - An error if the operation fails.
	AddProcessorToFlowBefore(flowID uuid.UUID, processor *SimpleProcessor, referenceProcessorID uuid.UUID) error

	// AddProcessorToFlowAfter adds a processor to a flow after a specified reference processor.
	// Parameters:
	//   - flowID: The UUID of the flow to which the processor will be added.
	//   - processor: A pointer to the SimpleProcessor to add to the flow.
	//   - referenceProcessorID: The UUID of the processor after which the new processor will be added.
	// Returns:
	//   - An error if the operation fails.
	AddProcessorToFlowAfter(flowID uuid.UUID, processor *SimpleProcessor, referenceProcessorID uuid.UUID) error

	// SaveFlow saves the given flow and its associated processors to the persistent storage.
	// Parameters:
	//   - flow: A pointer to the Flow struct representing the flow to be saved.
	// Returns:
	//   - An error if the operation fails.
	SaveFlow(flow *Flow) error
}

FlowManager defines the interface for managing and interacting with flows and processors within those flows.

type LogEntry added in v0.0.2

type LogEntry struct {
	SessionID     uuid.UUID        `json:"session_id"`
	ProcessorName string           `json:"processor_name"`
	ProcessorID   string           `json:"processor_id"`
	FlowID        uuid.UUID        `json:"flow_id"`
	InputFile     string           `json:"input_file"`
	OutputFile    string           `json:"output_file"`
	FlowObject    EngineFlowObject `json:"flow_object"`
	RetryCount    int              `json:"retry_count"`
}

LogEntry is a struct that represents a log entry in the write ahead log.

type PaginatedData added in v0.0.8

type PaginatedData[T any] struct {
	Data       []T `json:"data"`       // Data is the slice of items for the current page.
	TotalCount int `json:"totalCount"` // TotalCount is the total number of items available.
}

PaginatedData represents a paginated response. It includes the data and the total count of items.

type PaginationRequest added in v0.0.8

type PaginationRequest struct {
	Page    int `json:"page" query:"page"`        // Page number of the paginated data.
	PerPage int `json:"perPage" query:"per_page"` // Number of items per page.
}

PaginationRequest represents a request for paginated data. It includes the page number and the number of items per page.

func (*PaginationRequest) Limit added in v0.0.8

func (r *PaginationRequest) Limit() int

Limit calculates the limit for the paginated data. It ensures that the items per page are valid. Returns the limit as an integer.

func (*PaginationRequest) Offset added in v0.0.8

func (r *PaginationRequest) Offset() int

Offset calculates the offset for the paginated data. It ensures that the page number and items per page are valid. Returns the offset as an integer.

type Processor

type Processor interface {
	// GetID returns the ID of the processor.
	// Returns:
	// - string: The ID of the processor.
	GetID() string

	// Name returns the name of the processor.
	// Returns:
	// - string: The name of the processor.
	Name() string

	// Execute executes the processor logic.
	// Parameters:
	// - info: The EngineFlowObject containing the execution information.
	// - fileHandler: The ProcessorFileHandler for handling file operations.
	// - log: The logger for logging information.
	// Returns:
	// - *EngineFlowObject: The updated EngineFlowObject after execution.
	// - error: An error if the execution fails.
	Execute(info *EngineFlowObject, fileHandler ProcessorFileHandler, log *logrus.Logger) (*EngineFlowObject, error)

	// SetConfig sets the configuration for the processor.
	// Parameters:
	// - config: A map containing the configuration settings.
	// Returns:
	// - error: An error if setting the configuration fails.
	SetConfig(config map[string]interface{}) error
}

Processor defines the interface for a processor.

type ProcessorFactory

type ProcessorFactory interface {
	// GetProcessor retrieves a processor by its type name.
	// Parameters:
	// - typeName: The type name of the processor to retrieve.
	// Returns:
	// - Processor: The retrieved processor.
	// - error: An error if the retrieval fails.
	GetProcessor(typeName string) (Processor, error)

	// RegisterProcessor registers a new processor.
	// Parameters:
	// - processor: The processor to register.
	RegisterProcessor(processor Processor)

	// RegisterProcessorWithTypeName registers a new processor with a specific type name.
	// Parameters:
	// - typeName: The type name to associate with the processor.
	// - processor: The processor to register.
	RegisterProcessorWithTypeName(typeName string, processor Processor)
}

ProcessorFactory defines an interface for retrieving processors. This can be used to support all sorts of processors.

type ProcessorFileHandler

type ProcessorFileHandler interface {
	// Read reads data from a file.
	// Returns:
	// - io.Reader: The reader for reading data.
	// - error: An error if the read operation fails.
	Read() (io.Reader, error)

	// Write writes data to a file.
	// Returns:
	// - io.Writer: The writer for writing data.
	// - error: An error if the write operation fails.
	Write() (io.Writer, error)
}

ProcessorFileHandler defines the interface for handling the current contents.

type ScheduleConfig added in v0.0.10

type ScheduleConfig struct {
	Type     ScheduleType
	CronExpr string // Used only if Type == CronDriven
}

type ScheduleType added in v0.0.10

type ScheduleType int
const (
	EventDriven ScheduleType = iota
	CronDriven
)

type SessionUpdate

type SessionUpdate struct {
	SessionID uuid.UUID `json:"session_id"`
	Finished  bool      `json:"finished"`
	Error     error     `json:"error"`
}

SessionUpdate represents the update status of a session being processed by the engine.

type SimpleProcessor added in v0.0.5

type SimpleProcessor struct {
	ID         uuid.UUID              `json:"id"`          // ID uniquely identifies the processor.
	FlowID     uuid.UUID              `json:"flow_id"`     // FlowID is the ID of the flow to which the processor belongs.
	Name       string                 `json:"name"`        // Name is the human-readable name of the processor.
	Type       string                 `json:"type"`        // Type indicates the type of the processor, typically used to determine its implementation.
	FlowOrder  int                    `json:"flow_order"`  // FlowOrder specifies the order in which the processor is executed within the flow.
	Config     map[string]interface{} `json:"config"`      // Config holds the specific configuration for the processor.
	MaxRetries int                    `json:"max_retries"` // MaxRetries specifies the maximum number of retries allowed for this processor in case of failure.
	LogLevel   logrus.Level           `json:"log_level"`   // LogLevel defines the logging level used for this processor.
}

SimpleProcessor represents a processor within a flow. It contains metadata about the processor, including its configuration and execution parameters.

type TriggerProcessor added in v0.0.10

type TriggerProcessor interface {
	Processor
	GetScheduleConfig() ScheduleConfig
	HandleSessionUpdate(update SessionUpdate)
}

type WriteAheadLogger added in v0.0.2

type WriteAheadLogger interface {
	WriteEntry(entry LogEntry)
	ReadEntries() ([]LogEntry, error)
}

WriteAheadLogger is an interface for writing log entries to a file and reading them back. The log entries are used to keep track of the state of the processing of a flow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL