dag

package
v0.0.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2025 License: MIT Imports: 33 Imported by: 2

Documentation

Overview

context_keys.go

Index

Constants

View Source
const (
	Delimiter          = "___"
	ContextIndex       = "index"
	DefaultChannelSize = 1000
	RetryInterval      = 5 * time.Second
)

Variables

This section is empty.

Functions

func AddDAG

func AddDAG(key string, handler *DAG)

func AddHandler

func AddHandler(key string, handler func(string) mq.Processor)

func AvailableDAG

func AvailableDAG() []string

func ClearDAG added in v0.0.2

func ClearDAG()

func GetHandler

func GetHandler(key string) func(string) mq.Processor

func GetVal

func GetVal(c context.Context, v string, data map[string]any) (key string, val any)
func Header(c context.Context, headerKey string) (val map[string]any, exists bool)

func HeaderVal

func HeaderVal(c context.Context, headerKey string, key string) (val any)

func WsEvents

func WsEvents(s *sio.Server)

Types

type ActivityAlertHandler added in v0.0.17

type ActivityAlertHandler struct {
	// contains filtered or unexported fields
}

ActivityAlertHandler handles alerts by logging them as activities

func (*ActivityAlertHandler) HandleAlert added in v0.0.17

func (h *ActivityAlertHandler) HandleAlert(alert Alert) error

type ActivityEntry added in v0.0.17

type ActivityEntry struct {
	ID          string                 `json:"id"`
	Timestamp   time.Time              `json:"timestamp"`
	DAGName     string                 `json:"dag_name"`
	Level       ActivityLevel          `json:"level"`
	Type        ActivityType           `json:"type"`
	Message     string                 `json:"message"`
	TaskID      string                 `json:"task_id,omitempty"`
	NodeID      string                 `json:"node_id,omitempty"`
	Duration    time.Duration          `json:"duration,omitempty"`
	Success     *bool                  `json:"success,omitempty"`
	Error       string                 `json:"error,omitempty"`
	Details     map[string]interface{} `json:"details,omitempty"`
	ContextData map[string]interface{} `json:"context_data,omitempty"`
	UserID      string                 `json:"user_id,omitempty"`
	SessionID   string                 `json:"session_id,omitempty"`
	TraceID     string                 `json:"trace_id,omitempty"`
	SpanID      string                 `json:"span_id,omitempty"`
}

ActivityEntry represents a single activity log entry

type ActivityFilter added in v0.0.17

type ActivityFilter struct {
	StartTime    *time.Time      `json:"start_time,omitempty"`
	EndTime      *time.Time      `json:"end_time,omitempty"`
	Levels       []ActivityLevel `json:"levels,omitempty"`
	Types        []ActivityType  `json:"types,omitempty"`
	TaskIDs      []string        `json:"task_ids,omitempty"`
	NodeIDs      []string        `json:"node_ids,omitempty"`
	UserIDs      []string        `json:"user_ids,omitempty"`
	SuccessOnly  *bool           `json:"success_only,omitempty"`
	FailuresOnly *bool           `json:"failures_only,omitempty"`
	Limit        int             `json:"limit,omitempty"`
	Offset       int             `json:"offset,omitempty"`
	SortBy       string          `json:"sort_by,omitempty"`    // timestamp, level, type
	SortOrder    string          `json:"sort_order,omitempty"` // asc, desc
}

ActivityFilter provides filtering options for activity queries

type ActivityHook added in v0.0.17

type ActivityHook interface {
	OnActivity(entry ActivityEntry) error
}

ActivityHook allows custom processing of activity entries

type ActivityLevel added in v0.0.17

type ActivityLevel string

ActivityLevel represents the severity level of an activity

const (
	ActivityLevelDebug ActivityLevel = "debug"
	ActivityLevelInfo  ActivityLevel = "info"
	ActivityLevelWarn  ActivityLevel = "warn"
	ActivityLevelError ActivityLevel = "error"
	ActivityLevelFatal ActivityLevel = "fatal"
)

type ActivityLogger added in v0.0.17

type ActivityLogger struct {
	// contains filtered or unexported fields
}

ActivityLogger provides comprehensive activity logging for DAG operations

func NewActivityLogger added in v0.0.17

func NewActivityLogger(dagName string, config ActivityLoggerConfig, persistence ActivityPersistence, logger logger.Logger) *ActivityLogger

NewActivityLogger creates a new activity logger

func (*ActivityLogger) AddHook added in v0.0.17

func (al *ActivityLogger) AddHook(hook ActivityHook)

AddHook adds an activity hook

func (*ActivityLogger) Flush added in v0.0.17

func (al *ActivityLogger) Flush() error

Flush flushes the buffer to persistence

func (*ActivityLogger) GetActivities added in v0.0.17

func (al *ActivityLogger) GetActivities(filter ActivityFilter) ([]ActivityEntry, error)

GetActivities retrieves activities based on filter

func (*ActivityLogger) GetStats added in v0.0.17

func (al *ActivityLogger) GetStats(filter ActivityFilter) (ActivityStats, error)

GetStats returns activity statistics

func (*ActivityLogger) Log added in v0.0.17

func (al *ActivityLogger) Log(level ActivityLevel, activityType ActivityType, message string, details map[string]interface{})

Log logs an activity entry

func (*ActivityLogger) LogNodeExecution added in v0.0.17

func (al *ActivityLogger) LogNodeExecution(ctx context.Context, taskID string, nodeID string, result mq.Result, duration time.Duration)

LogNodeExecution logs node execution details

func (*ActivityLogger) LogTaskComplete added in v0.0.17

func (al *ActivityLogger) LogTaskComplete(ctx context.Context, taskID string, nodeID string, duration time.Duration)

LogTaskComplete logs task completion activity

func (*ActivityLogger) LogTaskFail added in v0.0.17

func (al *ActivityLogger) LogTaskFail(ctx context.Context, taskID string, nodeID string, err error, duration time.Duration)

LogTaskFail logs task failure activity

func (*ActivityLogger) LogTaskStart added in v0.0.17

func (al *ActivityLogger) LogTaskStart(ctx context.Context, taskID string, nodeID string)

LogTaskStart logs task start activity

func (*ActivityLogger) LogWithContext added in v0.0.17

func (al *ActivityLogger) LogWithContext(ctx context.Context, level ActivityLevel, activityType ActivityType, message string, details map[string]interface{})

LogWithContext logs an activity entry with context information

func (*ActivityLogger) RemoveHook added in v0.0.17

func (al *ActivityLogger) RemoveHook(hook ActivityHook)

RemoveHook removes an activity hook

func (*ActivityLogger) Stop added in v0.0.17

func (al *ActivityLogger) Stop()

Stop stops the activity logger

type ActivityLoggerConfig added in v0.0.17

type ActivityLoggerConfig struct {
	BufferSize        int           `json:"buffer_size"`
	FlushInterval     time.Duration `json:"flush_interval"`
	MaxRetries        int           `json:"max_retries"`
	EnableHooks       bool          `json:"enable_hooks"`
	EnableCompression bool          `json:"enable_compression"`
	MaxEntryAge       time.Duration `json:"max_entry_age"`
	AsyncMode         bool          `json:"async_mode"`
}

ActivityLoggerConfig configures the activity logger

func DefaultActivityLoggerConfig added in v0.0.17

func DefaultActivityLoggerConfig() ActivityLoggerConfig

DefaultActivityLoggerConfig returns default configuration

type ActivityPersistence added in v0.0.17

type ActivityPersistence interface {
	Store(entries []ActivityEntry) error
	Query(filter ActivityFilter) ([]ActivityEntry, error)
	GetStats(filter ActivityFilter) (ActivityStats, error)
	Close() error
}

ActivityPersistence defines the interface for persisting activities

type ActivityStats added in v0.0.17

type ActivityStats struct {
	TotalActivities      int64                   `json:"total_activities"`
	ActivitiesByLevel    map[ActivityLevel]int64 `json:"activities_by_level"`
	ActivitiesByType     map[ActivityType]int64  `json:"activities_by_type"`
	ActivitiesByNode     map[string]int64        `json:"activities_by_node"`
	ActivitiesByTask     map[string]int64        `json:"activities_by_task"`
	SuccessRate          float64                 `json:"success_rate"`
	FailureRate          float64                 `json:"failure_rate"`
	AverageDuration      time.Duration           `json:"average_duration"`
	PeakActivitiesPerMin int64                   `json:"peak_activities_per_minute"`
	TimeRange            ActivityTimeRange       `json:"time_range"`
	RecentErrors         []ActivityEntry         `json:"recent_errors"`
	TopFailingNodes      []NodeFailureStats      `json:"top_failing_nodes"`
	HourlyDistribution   map[string]int64        `json:"hourly_distribution"`
}

ActivityStats provides statistics about activities

type ActivityTimeRange added in v0.0.17

type ActivityTimeRange struct {
	Start time.Time `json:"start"`
	End   time.Time `json:"end"`
}

ActivityTimeRange represents a time range for activities

type ActivityType added in v0.0.17

type ActivityType string

ActivityType represents the type of activity

const (
	ActivityTypeTaskStart      ActivityType = "task_start"
	ActivityTypeTaskComplete   ActivityType = "task_complete"
	ActivityTypeTaskFail       ActivityType = "task_fail"
	ActivityTypeTaskCancel     ActivityType = "task_cancel"
	ActivityTypeNodeStart      ActivityType = "node_start"
	ActivityTypeNodeComplete   ActivityType = "node_complete"
	ActivityTypeNodeFail       ActivityType = "node_fail"
	ActivityTypeNodeTimeout    ActivityType = "node_timeout"
	ActivityTypeValidation     ActivityType = "validation"
	ActivityTypeConfiguration  ActivityType = "configuration"
	ActivityTypeAlert          ActivityType = "alert"
	ActivityTypeCleanup        ActivityType = "cleanup"
	ActivityTypeTransaction    ActivityType = "transaction"
	ActivityTypeRetry          ActivityType = "retry"
	ActivityTypeCircuitBreaker ActivityType = "circuit_breaker"
	ActivityTypeWebhook        ActivityType = "webhook"
	ActivityTypeCustom         ActivityType = "custom"
)

type Alert added in v0.0.16

type Alert struct {
	ID          string                 `json:"id"`
	Timestamp   time.Time              `json:"timestamp"`
	Severity    AlertSeverity          `json:"severity"`
	Type        AlertType              `json:"type"`
	Message     string                 `json:"message"`
	Details     map[string]interface{} `json:"details"`
	NodeID      string                 `json:"node_id,omitempty"`
	TaskID      string                 `json:"task_id,omitempty"`
	Threshold   interface{}            `json:"threshold,omitempty"`
	ActualValue interface{}            `json:"actual_value,omitempty"`
}

Alert represents a monitoring alert

type AlertHandler added in v0.0.16

type AlertHandler interface {
	HandleAlert(alert Alert) error
}

AlertHandler defines interface for handling alerts

type AlertSeverity added in v0.0.17

type AlertSeverity string
const (
	AlertSeverityInfo     AlertSeverity = "info"
	AlertSeverityWarning  AlertSeverity = "warning"
	AlertSeverityCritical AlertSeverity = "critical"
)

type AlertThresholds added in v0.0.16

type AlertThresholds struct {
	MaxFailureRate      float64       `json:"max_failure_rate"`
	MaxExecutionTime    time.Duration `json:"max_execution_time"`
	MaxTasksInProgress  int64         `json:"max_tasks_in_progress"`
	MinSuccessRate      float64       `json:"min_success_rate"`
	MaxNodeFailures     int64         `json:"max_node_failures"`
	HealthCheckInterval time.Duration `json:"health_check_interval"`
}

AlertThresholds defines thresholds for alerting

type AlertType added in v0.0.17

type AlertType string
const (
	AlertTypeFailureRate    AlertType = "failure_rate"
	AlertTypeExecutionTime  AlertType = "execution_time"
	AlertTypeTaskLoad       AlertType = "task_load"
	AlertTypeNodeFailures   AlertType = "node_failures"
	AlertTypeCircuitBreaker AlertType = "circuit_breaker"
	AlertTypeHealthCheck    AlertType = "health_check"
)

type AlertWebhookHandler added in v0.0.16

type AlertWebhookHandler struct {
	// contains filtered or unexported fields
}

AlertWebhookHandler handles webhook alerts

func NewAlertWebhookHandler added in v0.0.16

func NewAlertWebhookHandler(logger logger.Logger) *AlertWebhookHandler

NewAlertWebhookHandler creates a new alert webhook handler

func (*AlertWebhookHandler) HandleAlert added in v0.0.16

func (h *AlertWebhookHandler) HandleAlert(alert Alert) error

HandleAlert implements the AlertHandler interface

type BatchProcessor added in v0.0.16

type BatchProcessor struct {
	// contains filtered or unexported fields
}

BatchProcessor handles batch processing of tasks

func NewBatchProcessor added in v0.0.16

func NewBatchProcessor(dag *DAG, batchSize int, batchTimeout time.Duration, logger logger.Logger) *BatchProcessor

NewBatchProcessor creates a new batch processor

func (*BatchProcessor) AddTask added in v0.0.16

func (bp *BatchProcessor) AddTask(task *mq.Task) error

AddTask adds a task to the batch

func (*BatchProcessor) SetProcessFunc added in v0.0.16

func (bp *BatchProcessor) SetProcessFunc(fn func([]*mq.Task) error)

SetProcessFunc sets the function to process batches

func (*BatchProcessor) Stop added in v0.0.16

func (bp *BatchProcessor) Stop()

Stop stops the batch processor

type CacheConfig added in v0.0.16

type CacheConfig struct {
	Enabled bool          `json:"enabled"`
	TTL     time.Duration `json:"ttl"`
	MaxSize int           `json:"max_size"`
}

CacheConfig holds cache configuration

type CacheEntry added in v0.0.16

type CacheEntry struct {
	Value       interface{}
	ExpiresAt   time.Time
	AccessCount int64
	LastAccess  time.Time
}

CacheEntry represents a cached item

type CircuitBreaker added in v0.0.16

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements circuit breaker pattern for nodes

func NewCircuitBreaker added in v0.0.16

func NewCircuitBreaker(config *CircuitBreakerConfig, logger logger.Logger) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) Execute added in v0.0.16

func (cb *CircuitBreaker) Execute(fn func() error) error

Execute executes a function with circuit breaker protection

func (*CircuitBreaker) GetState added in v0.0.16

func (cb *CircuitBreaker) GetState() CircuitBreakerState

GetState returns the current circuit breaker state

func (*CircuitBreaker) Reset added in v0.0.16

func (cb *CircuitBreaker) Reset()

Reset manually resets the circuit breaker

type CircuitBreakerConfig added in v0.0.16

type CircuitBreakerConfig struct {
	FailureThreshold int
	ResetTimeout     time.Duration
	HalfOpenMaxCalls int
}

CircuitBreakerConfig defines circuit breaker behavior

type CircuitBreakerState added in v0.0.16

type CircuitBreakerState int

Circuit Breaker Implementation

const (
	CircuitClosed CircuitBreakerState = iota
	CircuitOpen
	CircuitHalfOpen
)

type CleanupManager added in v0.0.16

type CleanupManager struct {
	// contains filtered or unexported fields
}

CleanupManager handles cleanup of completed tasks and resources

func NewCleanupManager added in v0.0.16

func NewCleanupManager(dag *DAG, cleanupInterval, retentionPeriod time.Duration, maxEntries int, logger logger.Logger) *CleanupManager

NewCleanupManager creates a new cleanup manager

func (*CleanupManager) Start added in v0.0.16

func (cm *CleanupManager) Start(ctx context.Context)

Start begins the cleanup routine

func (*CleanupManager) Stop added in v0.0.16

func (cm *CleanupManager) Stop()

Stop stops the cleanup routine

type Condition

type Condition interface {
	Match(data any) bool
}

type ConditionProcessor

type ConditionProcessor interface {
	Processor
	SetConditions(map[string]Condition)
}

type ConfigManager added in v0.0.16

type ConfigManager struct {
	// contains filtered or unexported fields
}

ConfigManager handles dynamic DAG configuration

func NewConfigManager added in v0.0.16

func NewConfigManager(logger logger.Logger) *ConfigManager

NewConfigManager creates a new configuration manager

func (*ConfigManager) AddWatcher added in v0.0.16

func (cm *ConfigManager) AddWatcher(watcher ConfigWatcher)

AddWatcher adds a configuration watcher

func (*ConfigManager) GetConfig added in v0.0.16

func (cm *ConfigManager) GetConfig() *DAGConfig

GetConfig returns a copy of the current configuration

func (*ConfigManager) UpdateConfig added in v0.0.16

func (cm *ConfigManager) UpdateConfig(newConfig *DAGConfig) error

UpdateConfig updates the configuration

func (*ConfigManager) UpdateConfiguration added in v0.0.17

func (cm *ConfigManager) UpdateConfiguration(config *DAGConfig) error

UpdateConfiguration updates the DAG configuration (alias for UpdateConfig)

type ConfigWatcher added in v0.0.16

type ConfigWatcher interface {
	OnConfigChange(oldConfig, newConfig *DAGConfig) error
}

ConfigWatcher interface for configuration change notifications

type DAG

type DAG struct {
	Error error

	Notifier *sio.Server

	// New hook fields:
	PreProcessHook  func(ctx context.Context, node *Node, taskID string, payload json.RawMessage) context.Context
	PostProcessHook func(ctx context.Context, node *Node, taskID string, result mq.Result)
	// contains filtered or unexported fields
}

func GetDAG

func GetDAG(key string) *DAG

func NewDAG

func NewDAG(name, key string, finalResultCallback func(taskID string, result mq.Result), opts ...mq.Option) *DAG

func (*DAG) AddActivityHook added in v0.0.17

func (tm *DAG) AddActivityHook(hook ActivityHook)

AddActivityHook adds an activity hook

func (*DAG) AddAlertHandler added in v0.0.16

func (tm *DAG) AddAlertHandler(handler AlertHandler)

AddAlertHandler adds an alert handler

func (*DAG) AddCondition

func (tm *DAG) AddCondition(fromNode string, conditions map[string]string) *DAG

func (*DAG) AddConfigWatcher added in v0.0.17

func (tm *DAG) AddConfigWatcher(watcher ConfigWatcher)

AddConfigWatcher adds a configuration change watcher

func (*DAG) AddDAGNode

func (tm *DAG) AddDAGNode(nodeType NodeType, name string, key string, dag *DAG, firstNode ...bool) *DAG

func (*DAG) AddDeferredNode

func (tm *DAG) AddDeferredNode(nodeType NodeType, name, key string, firstNode ...bool) error

func (*DAG) AddEdge

func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string) *DAG

func (*DAG) AddNode

func (tm *DAG) AddNode(nodeType NodeType, name, nodeID string, handler mq.Processor, startNode ...bool) *DAG

func (*DAG) AddNodeWithRetry added in v0.0.16

func (tm *DAG) AddNodeWithRetry(nodeType NodeType, name, nodeID string, handler mq.Processor, retryConfig *RetryConfig, startNode ...bool) *DAG

AddNodeWithRetry adds a node with specific retry configuration

func (*DAG) AddWebhook added in v0.0.17

func (tm *DAG) AddWebhook(event string, config WebhookConfig)

AddWebhook adds a webhook configuration

func (*DAG) AssignTopic

func (tm *DAG) AssignTopic(topic string)

func (*DAG) BaseURI added in v0.0.10

func (tm *DAG) BaseURI() string

func (*DAG) BeginTransaction added in v0.0.16

func (tm *DAG) BeginTransaction(taskID string) *Transaction

BeginTransaction starts a new transaction

func (*DAG) CancelTask added in v0.0.11

func (tm *DAG) CancelTask(taskID string) error

New method to cancel a running task.

func (*DAG) CheckRateLimit added in v0.0.17

func (tm *DAG) CheckRateLimit(nodeID string) bool

CheckRateLimit checks if request is allowed for a node

func (*DAG) ClassifyEdges

func (tm *DAG) ClassifyEdges(startNodes ...string) (string, bool, error)

func (*DAG) Clone added in v0.0.17

func (tm *DAG) Clone() *DAG

Clone creates a deep copy of the DAG

func (*DAG) Close

func (tm *DAG) Close() error

func (*DAG) CommitTransaction added in v0.0.16

func (tm *DAG) CommitTransaction(txID string) error

CommitTransaction commits a transaction

func (*DAG) Consume

func (tm *DAG) Consume(ctx context.Context) error

func (*DAG) ContainsPageNodes added in v0.0.18

func (d *DAG) ContainsPageNodes() bool

ContainsPageNodes iterates through all nodes to check if any are Page nodes This method provides an alternative way to check for Page nodes by examining the actual nodes rather than relying on the cached hasPageNode field

func (*DAG) Export added in v0.0.17

func (tm *DAG) Export() map[string]interface{}

Export exports the DAG structure to a serializable format

func (*DAG) ExportDOT

func (tm *DAG) ExportDOT(direction ...Direction) string

func (*DAG) FlushActivityLogs added in v0.0.17

func (tm *DAG) FlushActivityLogs() error

FlushActivityLogs flushes activity logs to persistence

func (*DAG) GetActivities added in v0.0.17

func (tm *DAG) GetActivities(filter ActivityFilter) ([]ActivityEntry, error)

GetActivities retrieves activities based on filter

func (*DAG) GetActivityLogger added in v0.0.17

func (tm *DAG) GetActivityLogger() *ActivityLogger

GetActivityLogger returns the activity logger instance

func (*DAG) GetActivityStats added in v0.0.17

func (tm *DAG) GetActivityStats(filter ActivityFilter) (ActivityStats, error)

GetActivityStats returns activity statistics

func (*DAG) GetAllNodes added in v0.0.17

func (tm *DAG) GetAllNodes() map[string]*Node

GetAllNodes returns all nodes in the DAG

func (*DAG) GetCircuitBreakerStatus added in v0.0.17

func (tm *DAG) GetCircuitBreakerStatus(nodeID string) CircuitBreakerState

GetCircuitBreakerStatus returns circuit breaker status for a node

func (*DAG) GetConfiguration added in v0.0.16

func (tm *DAG) GetConfiguration() *DAGConfig

GetConfiguration returns current DAG configuration

func (*DAG) GetCriticalPath added in v0.0.16

func (tm *DAG) GetCriticalPath() ([]string, error)

GetCriticalPath returns the critical path of the DAG

func (*DAG) GetDAGStatistics added in v0.0.16

func (tm *DAG) GetDAGStatistics() map[string]interface{}

GetDAGStatistics returns comprehensive DAG statistics

func (*DAG) GetEdgeCount added in v0.0.17

func (tm *DAG) GetEdgeCount() int

GetEdgeCount returns the total number of edges

func (*DAG) GetKey

func (tm *DAG) GetKey() string

func (*DAG) GetLastNodes added in v0.0.2

func (tm *DAG) GetLastNodes() ([]*Node, error)

func (*DAG) GetMonitoringMetrics added in v0.0.16

func (tm *DAG) GetMonitoringMetrics() *MonitoringMetrics

GetMonitoringMetrics returns current monitoring metrics

func (*DAG) GetNextNodes added in v0.0.2

func (tm *DAG) GetNextNodes(nodeID string) ([]*Node, error)

GetNextNodes returns the next nodes for a given node

func (*DAG) GetNodeByID added in v0.0.17

func (tm *DAG) GetNodeByID(nodeID string) (*Node, error)

GetNodeByID returns a node by its ID

func (*DAG) GetNodeCount added in v0.0.17

func (tm *DAG) GetNodeCount() int

GetNodeCount returns the total number of nodes

func (*DAG) GetNodeStats added in v0.0.16

func (tm *DAG) GetNodeStats(nodeID string) *NodeStats

GetNodeStats returns statistics for a specific node

func (*DAG) GetPreviousNodes added in v0.0.2

func (tm *DAG) GetPreviousNodes(nodeID string) ([]*Node, error)

GetPreviousNodes returns the previous nodes for a given node

func (*DAG) GetReport

func (tm *DAG) GetReport() string

func (*DAG) GetStartNode

func (tm *DAG) GetStartNode() string

func (*DAG) GetStatus added in v0.0.11

func (tm *DAG) GetStatus() map[string]interface{}

GetStatus returns a summary of the DAG including node and task counts.

func (*DAG) GetTaskMetrics added in v0.0.11

func (d *DAG) GetTaskMetrics() TaskMetrics

Getter for task metrics.

func (*DAG) GetTopologicalOrder added in v0.0.16

func (tm *DAG) GetTopologicalOrder() ([]string, error)

GetTopologicalOrder returns nodes in topological order

func (*DAG) GetTransaction added in v0.0.17

func (tm *DAG) GetTransaction(txID string) (*Transaction, error)

GetTransaction retrieves transaction details

func (*DAG) GetType

func (tm *DAG) GetType() string

func (*DAG) Handlers

func (tm *DAG) Handlers(app any, prefix string)

Handlers initializes route handlers.

func (*DAG) HasPageNode added in v0.0.18

func (d *DAG) HasPageNode() bool

HasPageNode checks if the DAG contains any Page nodes

func (*DAG) InitializeActivityLogger added in v0.0.17

func (tm *DAG) InitializeActivityLogger(config ActivityLoggerConfig, persistence ActivityPersistence)

InitializeActivityLogger initializes the activity logger for the DAG

func (*DAG) IsLastNode added in v0.0.2

func (tm *DAG) IsLastNode(nodeID string) (bool, error)

IsLastNode checks if a node is the last node in the DAG

func (*DAG) IsReady

func (tm *DAG) IsReady() bool

func (*DAG) LogActivity added in v0.0.17

func (tm *DAG) LogActivity(ctx context.Context, level ActivityLevel, activityType ActivityType, message string, details map[string]interface{})

LogActivity logs an activity entry

func (*DAG) Logger added in v0.0.10

func (tm *DAG) Logger() logger.Logger

func (*DAG) OptimizePerformance added in v0.0.16

func (tm *DAG) OptimizePerformance() error

OptimizePerformance triggers performance optimization

func (*DAG) Pause

func (tm *DAG) Pause(_ context.Context) error

func (*DAG) PauseConsumer

func (tm *DAG) PauseConsumer(ctx context.Context, id string)

func (*DAG) PrintGraph

func (tm *DAG) PrintGraph()

func (*DAG) Process

func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result

func (*DAG) ProcessTask

func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result

func (*DAG) ProcessTaskNew added in v0.0.10

func (tm *DAG) ProcessTaskNew(ctx context.Context, task *mq.Task) mq.Result

func (*DAG) RemoveNode added in v0.0.16

func (tm *DAG) RemoveNode(nodeID string) error

RemoveNode removes the node with the given nodeID and adjusts the edges. For example, if A -> B and B -> C exist and B is removed, a new edge A -> C is created.

func (*DAG) ReportNodeResult

func (tm *DAG) ReportNodeResult(callback func(mq.Result))

func (*DAG) Reset added in v0.0.11

func (tm *DAG) Reset()

New method to reset the DAG state.

func (*DAG) Resume

func (tm *DAG) Resume(_ context.Context) error

func (*DAG) ResumeConsumer

func (tm *DAG) ResumeConsumer(ctx context.Context, id string)

func (*DAG) RollbackTransaction added in v0.0.16

func (tm *DAG) RollbackTransaction(txID string) error

RollbackTransaction rolls back a transaction

func (*DAG) SVGViewerHTML added in v0.0.18

func (tm *DAG) SVGViewerHTML(svgContent string) string

SVGViewerHTML creates the HTML with advanced SVG zoom and pan functionality

func (*DAG) SaveDOTFile

func (tm *DAG) SaveDOTFile(filename string, direction ...Direction) error

func (*DAG) SavePNG

func (tm *DAG) SavePNG(pngFile string) error

func (*DAG) SaveSVG

func (tm *DAG) SaveSVG(svgFile string) error

func (*DAG) ScheduleTask

func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.SchedulerOption) mq.Result

func (*DAG) SetAlertThresholds added in v0.0.16

func (tm *DAG) SetAlertThresholds(thresholds *AlertThresholds)

SetAlertThresholds configures alert thresholds

func (*DAG) SetBatchProcessingEnabled added in v0.0.17

func (tm *DAG) SetBatchProcessingEnabled(enabled bool)

SetBatchProcessingEnabled enables or disables batch processing

func (*DAG) SetKey

func (tm *DAG) SetKey(key string)

func (*DAG) SetNotifyResponse

func (tm *DAG) SetNotifyResponse(callback mq.Callback)

func (*DAG) SetPostProcessHook added in v0.0.11

func (tm *DAG) SetPostProcessHook(hook func(ctx context.Context, node *Node, taskID string, result mq.Result))

SetPostProcessHook configures a function to be called after each node is processed.

func (*DAG) SetPreProcessHook added in v0.0.11

func (tm *DAG) SetPreProcessHook(hook func(ctx context.Context, node *Node, taskID string, payload json.RawMessage) context.Context)

SetPreProcessHook configures a function to be called before each node is processed.

func (*DAG) SetRateLimit added in v0.0.16

func (tm *DAG) SetRateLimit(nodeID string, requestsPerSecond float64, burst int)

SetRateLimit sets rate limit for a specific node

func (*DAG) SetRetryConfig added in v0.0.16

func (tm *DAG) SetRetryConfig(config *RetryConfig)

SetRetryConfig sets the retry configuration

func (*DAG) SetStartNode

func (tm *DAG) SetStartNode(node string)

func (*DAG) SetWebhookManager added in v0.0.16

func (tm *DAG) SetWebhookManager(manager *WebhookManager)

SetWebhookManager sets the webhook manager

func (*DAG) SetupWS

func (tm *DAG) SetupWS() *sio.Server

func (*DAG) Start

func (tm *DAG) Start(ctx context.Context, addr string) error

func (*DAG) StartCleanup added in v0.0.17

func (tm *DAG) StartCleanup(ctx context.Context)

StartCleanup starts the cleanup manager

func (*DAG) StartMonitoring added in v0.0.16

func (tm *DAG) StartMonitoring(ctx context.Context)

StartMonitoring starts the monitoring system

func (*DAG) Stop

func (tm *DAG) Stop(ctx context.Context) error

func (*DAG) StopCleanup added in v0.0.17

func (tm *DAG) StopCleanup()

StopCleanup stops the cleanup manager

func (*DAG) StopEnhanced added in v0.0.17

func (tm *DAG) StopEnhanced(ctx context.Context) error

Enhanced Stop method with proper cleanup

func (*DAG) StopMonitoring added in v0.0.16

func (tm *DAG) StopMonitoring()

StopMonitoring stops the monitoring system

func (*DAG) TopologicalSort

func (tm *DAG) TopologicalSort() (stack []string)

func (*DAG) UpdateConfiguration added in v0.0.16

func (tm *DAG) UpdateConfiguration(config *DAGConfig) error

UpdateConfiguration updates the DAG configuration

func (*DAG) Validate

func (tm *DAG) Validate() error

func (*DAG) ValidateDAG added in v0.0.16

func (tm *DAG) ValidateDAG() error

ValidateDAG validates the DAG structure using the enhanced validator

type DAGCache added in v0.0.16

type DAGCache struct {
	// contains filtered or unexported fields
}

DAGCache provides caching capabilities for DAG operations

func NewDAGCache added in v0.0.16

func NewDAGCache(ttl time.Duration, maxSize int, logger logger.Logger) *DAGCache

NewDAGCache creates a new DAG cache

func (*DAGCache) GetNode added in v0.0.16

func (dc *DAGCache) GetNode(key string) (*Node, bool)

GetNode retrieves a cached node

func (*DAGCache) GetNodeResult added in v0.0.16

func (dc *DAGCache) GetNodeResult(key string) (interface{}, bool)

GetNodeResult retrieves a cached node result

func (*DAGCache) SetNode added in v0.0.16

func (dc *DAGCache) SetNode(key string, node *Node)

SetNode caches a node

func (*DAGCache) SetNodeResult added in v0.0.16

func (dc *DAGCache) SetNodeResult(key string, value interface{})

SetNodeResult caches a node result

func (*DAGCache) Stop added in v0.0.16

func (dc *DAGCache) Stop()

Stop stops the cache cleanup routine

type DAGConfig added in v0.0.16

type DAGConfig struct {
	MaxConcurrentTasks     int              `json:"max_concurrent_tasks"`
	TaskTimeout            time.Duration    `json:"task_timeout"`
	NodeTimeout            time.Duration    `json:"node_timeout"`
	RetryConfig            *RetryConfig     `json:"retry_config"`
	CacheConfig            *CacheConfig     `json:"cache_config"`
	RateLimitConfig        *RateLimitConfig `json:"rate_limit_config"`
	MonitoringEnabled      bool             `json:"monitoring_enabled"`
	AlertingEnabled        bool             `json:"alerting_enabled"`
	CleanupInterval        time.Duration    `json:"cleanup_interval"`
	TransactionTimeout     time.Duration    `json:"transaction_timeout"`
	BatchProcessingEnabled bool             `json:"batch_processing_enabled"`
	BatchSize              int              `json:"batch_size"`
	BatchTimeout           time.Duration    `json:"batch_timeout"`
}

DAGConfig holds dynamic configuration for DAG

func DefaultDAGConfig added in v0.0.16

func DefaultDAGConfig() *DAGConfig

DefaultDAGConfig returns default DAG configuration

type DAGValidator added in v0.0.16

type DAGValidator struct {
	// contains filtered or unexported fields
}

DAGValidator provides validation capabilities for DAG structure

func NewDAGValidator added in v0.0.16

func NewDAGValidator(dag *DAG) *DAGValidator

NewDAGValidator creates a new DAG validator

func (*DAGValidator) GetCriticalPath added in v0.0.16

func (v *DAGValidator) GetCriticalPath() ([]string, error)

GetCriticalPath finds the longest path in the DAG

func (*DAGValidator) GetNodeStatistics added in v0.0.16

func (v *DAGValidator) GetNodeStatistics() map[string]interface{}

GetNodeStatistics returns DAG statistics

func (*DAGValidator) GetTopologicalOrder added in v0.0.16

func (v *DAGValidator) GetTopologicalOrder() ([]string, error)

GetTopologicalOrder returns nodes in topological order

func (*DAGValidator) ValidateStructure added in v0.0.16

func (v *DAGValidator) ValidateStructure() error

ValidateStructure performs comprehensive DAG structure validation

type Debugger added in v0.0.10

type Debugger interface {
	Debug(context.Context, *mq.Task)
}

type Direction added in v0.0.7

type Direction string
const (
	TB Direction = "TB"
	LR Direction = "LR"
)

type Edge

type Edge struct {
	From       *Node
	FromSource string
	To         *Node
	Label      string
	Type       EdgeType
}

type EdgeLevel added in v0.0.17

type EdgeLevel struct {
	Level int
	Type  EdgeType
}

EdgeLevel represents the depth level of an edge in the DAG

type EdgeStyleConfig added in v0.0.17

type EdgeStyleConfig struct {
	Color     string
	Style     string
	PenWidth  string
	ArrowSize string
	FontSize  string
}

EdgeStyleConfig contains styling information for edges

type EdgeType

type EdgeType int
const (
	Simple EdgeType = iota
	Iterator
)

func (EdgeType) IsValid

func (c EdgeType) IsValid() bool

func (EdgeType) String added in v0.0.17

func (c EdgeType) String() string

type EnhancedAPIHandler added in v0.0.16

type EnhancedAPIHandler struct {
	// contains filtered or unexported fields
}

EnhancedAPIHandler provides enhanced API endpoints for DAG management

func NewEnhancedAPIHandler added in v0.0.16

func NewEnhancedAPIHandler(dag *DAG) *EnhancedAPIHandler

NewEnhancedAPIHandler creates a new enhanced API handler

func (*EnhancedAPIHandler) RegisterRoutes added in v0.0.16

func (h *EnhancedAPIHandler) RegisterRoutes(mux *http.ServeMux)

RegisterRoutes registers all enhanced API routes

type HTTPClient added in v0.0.16

type HTTPClient interface {
	Post(url string, contentType string, body []byte, headers map[string]string) error
}

HTTPClient interface for HTTP requests

type Handler added in v0.0.14

type Handler struct {
	Name string   `json:"name"`
	Tags []string `json:"tags"`
}

func AvailableHandlers

func AvailableHandlers() []Handler

type List

type List struct {
	Handlers map[string]*DAG
	// contains filtered or unexported fields
}

type MemoryActivityPersistence added in v0.0.17

type MemoryActivityPersistence struct {
	// contains filtered or unexported fields
}

MemoryActivityPersistence provides in-memory activity persistence for testing

func NewMemoryActivityPersistence added in v0.0.17

func NewMemoryActivityPersistence() *MemoryActivityPersistence

NewMemoryActivityPersistence creates a new in-memory persistence

func (*MemoryActivityPersistence) Close added in v0.0.17

func (mp *MemoryActivityPersistence) Close() error

Close closes the persistence

func (*MemoryActivityPersistence) GetStats added in v0.0.17

GetStats returns statistics for the filtered entries

func (*MemoryActivityPersistence) Query added in v0.0.17

Query queries activity entries with filter

func (*MemoryActivityPersistence) Store added in v0.0.17

func (mp *MemoryActivityPersistence) Store(entries []ActivityEntry) error

Store stores activity entries in memory

type Monitor added in v0.0.16

type Monitor struct {
	// contains filtered or unexported fields
}

Monitor provides comprehensive monitoring capabilities for DAG

func NewMonitor added in v0.0.16

func NewMonitor(dag *DAG, logger logger.Logger) *Monitor

NewMonitor creates a new DAG monitor

func (*Monitor) AddAlertHandler added in v0.0.16

func (m *Monitor) AddAlertHandler(handler AlertHandler)

AddAlertHandler adds an alert handler

func (*Monitor) GetMetrics added in v0.0.16

func (m *Monitor) GetMetrics() *MonitoringMetrics

GetMetrics returns current metrics

func (*Monitor) SetAlertThresholds added in v0.0.16

func (m *Monitor) SetAlertThresholds(thresholds *AlertThresholds)

SetAlertThresholds updates alert thresholds

func (*Monitor) Start added in v0.0.16

func (m *Monitor) Start(ctx context.Context)

Start begins monitoring

func (*Monitor) Stop added in v0.0.16

func (m *Monitor) Stop()

Stop stops monitoring

type MonitoringMetrics added in v0.0.16

type MonitoringMetrics struct {
	TasksTotal           int64
	TasksCompleted       int64
	TasksFailed          int64
	TasksCancelled       int64
	TasksInProgress      int64
	NodesExecuted        map[string]int64
	NodeExecutionTimes   map[string][]time.Duration
	NodeFailures         map[string]int64
	AverageExecutionTime time.Duration
	TotalExecutionTime   time.Duration
	StartTime            time.Time
	LastTaskCompletedAt  time.Time
	ActiveTasks          map[string]time.Time
	NodeProcessingStats  map[string]*NodeStats
	// contains filtered or unexported fields
}

MonitoringMetrics holds comprehensive metrics for DAG monitoring

func NewMonitoringMetrics added in v0.0.16

func NewMonitoringMetrics() *MonitoringMetrics

NewMonitoringMetrics creates a new metrics instance

func (*MonitoringMetrics) GetNodeStats added in v0.0.16

func (m *MonitoringMetrics) GetNodeStats(nodeID string) *NodeStats

GetNodeStats returns statistics for a specific node

func (*MonitoringMetrics) GetSnapshot added in v0.0.16

func (m *MonitoringMetrics) GetSnapshot() *MonitoringMetrics

GetSnapshot returns a snapshot of current metrics

func (*MonitoringMetrics) RecordNodeEnd added in v0.0.16

func (m *MonitoringMetrics) RecordNodeEnd(nodeID string)

RecordNodeEnd records when a node finishes processing

func (*MonitoringMetrics) RecordNodeExecution added in v0.0.16

func (m *MonitoringMetrics) RecordNodeExecution(nodeID string, duration time.Duration, success bool)

RecordNodeExecution records node execution metrics

func (*MonitoringMetrics) RecordNodeStart added in v0.0.16

func (m *MonitoringMetrics) RecordNodeStart(nodeID string)

RecordNodeStart records when a node starts processing

func (*MonitoringMetrics) RecordTaskCompletion added in v0.0.16

func (m *MonitoringMetrics) RecordTaskCompletion(taskID string, status mq.Status)

RecordTaskCompletion records task completion

func (*MonitoringMetrics) RecordTaskStart added in v0.0.16

func (m *MonitoringMetrics) RecordTaskStart(taskID string)

RecordTaskStart records the start of a task

type Node

type Node struct {
	Label    string
	ID       string
	Edges    []Edge
	NodeType NodeType

	Timeout time.Duration // ...new field for node-level timeout...
	// contains filtered or unexported fields
}

func (*Node) SetTimeout added in v0.0.11

func (n *Node) SetTimeout(d time.Duration)

SetTimeout allows setting a maximum processing duration for the node.

type NodeFailureStats added in v0.0.17

type NodeFailureStats struct {
	NodeID       string    `json:"node_id"`
	FailureCount int64     `json:"failure_count"`
	FailureRate  float64   `json:"failure_rate"`
	LastFailure  time.Time `json:"last_failure"`
}

NodeFailureStats represents failure statistics for a node

type NodeRateLimit added in v0.0.16

type NodeRateLimit struct {
	RequestsPerSecond float64 `json:"requests_per_second"`
	Burst             int     `json:"burst"`
}

NodeRateLimit holds rate limit settings for a specific node

type NodeRetryManager added in v0.0.16

type NodeRetryManager struct {
	// contains filtered or unexported fields
}

NodeRetryManager handles retry logic for individual nodes

func NewNodeRetryManager added in v0.0.16

func NewNodeRetryManager(config *RetryConfig, logger logger.Logger) *NodeRetryManager

NewNodeRetryManager creates a new retry manager

func (*NodeRetryManager) GetAttempts added in v0.0.16

func (rm *NodeRetryManager) GetAttempts(taskID, nodeID string) int

GetAttempts returns the number of attempts for a task/node combination

func (*NodeRetryManager) GetRetryDelay added in v0.0.16

func (rm *NodeRetryManager) GetRetryDelay(taskID, nodeID string) time.Duration

GetRetryDelay calculates the delay before the next retry

func (*NodeRetryManager) RecordAttempt added in v0.0.16

func (rm *NodeRetryManager) RecordAttempt(taskID, nodeID string)

RecordAttempt records a retry attempt

func (*NodeRetryManager) Reset added in v0.0.16

func (rm *NodeRetryManager) Reset(taskID, nodeID string)

Reset clears retry attempts for a task/node combination

func (*NodeRetryManager) ResetTask added in v0.0.16

func (rm *NodeRetryManager) ResetTask(taskID string)

ResetTask clears all retry attempts for a task

func (*NodeRetryManager) SetGlobalConfig added in v0.0.17

func (rm *NodeRetryManager) SetGlobalConfig(config *RetryConfig)

SetGlobalConfig sets the global retry configuration

func (*NodeRetryManager) SetNodeConfig added in v0.0.17

func (rm *NodeRetryManager) SetNodeConfig(nodeID string, config *RetryConfig)

SetNodeConfig sets retry configuration for a specific node

func (*NodeRetryManager) ShouldRetry added in v0.0.16

func (rm *NodeRetryManager) ShouldRetry(taskID, nodeID string, err error) bool

ShouldRetry determines if a failed node should be retried

type NodeStats added in v0.0.16

type NodeStats struct {
	ExecutionCount   int64
	SuccessCount     int64
	FailureCount     int64
	TotalDuration    time.Duration
	AverageDuration  time.Duration
	MinDuration      time.Duration
	MaxDuration      time.Duration
	LastExecuted     time.Time
	LastSuccess      time.Time
	LastFailure      time.Time
	CurrentlyRunning int64
}

NodeStats holds statistics for individual nodes

type NodeType added in v0.0.2

type NodeType int
const (
	Function NodeType = iota
	Page
)

func (NodeType) IsValid added in v0.0.2

func (c NodeType) IsValid() bool

func (NodeType) String added in v0.0.3

func (c NodeType) String() string

type Operation

type Operation struct {
	ID              string `json:"id"`
	Key             string `json:"key"`
	Payload         Payload
	RequiredFields  []string `json:"required_fields"`
	OptionalFields  []string `json:"optional_fields"`
	GeneratedFields []string `json:"generated_fields"`
	Type            NodeType `json:"type"`
	Tags            []string `json:"tags"`
}

func (*Operation) Close

func (e *Operation) Close() error

func (*Operation) Consume

func (e *Operation) Consume(_ context.Context) error

func (*Operation) Debug added in v0.0.10

func (e *Operation) Debug(ctx context.Context, task *mq.Task)

func (*Operation) GetKey

func (e *Operation) GetKey() string

func (*Operation) GetMappedData added in v0.0.10

func (e *Operation) GetMappedData(ctx context.Context, task *mq.Task) map[string]any

func (*Operation) GetTags added in v0.0.13

func (e *Operation) GetTags() []string

func (*Operation) GetType

func (e *Operation) GetType() string

func (*Operation) Pause

func (e *Operation) Pause(_ context.Context) error

func (*Operation) ProcessTask

func (e *Operation) ProcessTask(_ context.Context, task *mq.Task) mq.Result

func (*Operation) Resume

func (e *Operation) Resume(_ context.Context) error

func (*Operation) SetConfig

func (e *Operation) SetConfig(payload Payload)

func (*Operation) SetKey

func (e *Operation) SetKey(key string)

func (*Operation) SetTags added in v0.0.13

func (e *Operation) SetTags(tag ...string)

func (*Operation) Stop

func (e *Operation) Stop(_ context.Context) error

func (*Operation) ValidateFields

func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[string]any, error)

type Operations

type Operations struct {
	Handlers map[string]func(string) mq.Processor
	// contains filtered or unexported fields
}

type Payload

type Payload struct {
	Data            map[string]any    `json:"data"`
	Mapping         map[string]string `json:"mapping"`
	GeneratedFields []string          `json:"generated_fields"`
	Providers       []Provider        `json:"providers"`
}

type PerformanceOptimizer added in v0.0.16

type PerformanceOptimizer struct {
	// contains filtered or unexported fields
}

PerformanceOptimizer optimizes DAG performance based on metrics

func NewPerformanceOptimizer added in v0.0.16

func NewPerformanceOptimizer(dag *DAG, monitor *Monitor, config *ConfigManager, logger logger.Logger) *PerformanceOptimizer

NewPerformanceOptimizer creates a new performance optimizer

func (*PerformanceOptimizer) OptimizePerformance added in v0.0.16

func (po *PerformanceOptimizer) OptimizePerformance() error

OptimizePerformance analyzes metrics and adjusts configuration

type Processor

type Processor interface {
	mq.Processor
	SetConfig(Payload)
	SetTags(tag ...string)
	GetTags() []string
}

type Provider

type Provider struct {
	Mapping       map[string]any `json:"mapping"`
	UpdateMapping map[string]any `json:"update_mapping"`
	InsertMapping map[string]any `json:"insert_mapping"`
	Defaults      map[string]any `json:"defaults"`
	ProviderType  string         `json:"provider_type"`
	Database      string         `json:"database"`
	Source        string         `json:"source"`
	Query         string         `json:"query"`
}

type RateLimitConfig added in v0.0.16

type RateLimitConfig struct {
	Enabled     bool                     `json:"enabled"`
	GlobalLimit float64                  `json:"global_limit"`
	GlobalBurst int                      `json:"global_burst"`
	NodeLimits  map[string]NodeRateLimit `json:"node_limits"`
}

RateLimitConfig holds rate limiting configuration

type RateLimiter added in v0.0.16

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter provides rate limiting for DAG operations

func NewRateLimiter added in v0.0.16

func NewRateLimiter(logger logger.Logger) *RateLimiter

NewRateLimiter creates a new rate limiter

func (*RateLimiter) Allow added in v0.0.16

func (rl *RateLimiter) Allow(nodeID string) bool

Allow checks if the request is allowed for the given node

func (*RateLimiter) SetNodeLimit added in v0.0.16

func (rl *RateLimiter) SetNodeLimit(nodeID string, requestsPerSecond float64, burst int)

SetNodeLimit sets rate limit for a specific node

func (*RateLimiter) Wait added in v0.0.16

func (rl *RateLimiter) Wait(ctx context.Context, nodeID string) error

Wait waits until the request can be processed for the given node

type RetryConfig added in v0.0.16

type RetryConfig struct {
	MaxRetries     int
	InitialDelay   time.Duration
	MaxDelay       time.Duration
	BackoffFactor  float64
	Jitter         bool
	RetryCondition func(err error) bool
}

RetryConfig defines retry behavior for failed nodes

func DefaultRetryConfig added in v0.0.16

func DefaultRetryConfig() *RetryConfig

DefaultRetryConfig returns a sensible default retry configuration

type RetryableProcessor added in v0.0.16

type RetryableProcessor struct {
	// contains filtered or unexported fields
}

RetryableProcessor wraps a processor with retry logic

func NewRetryableProcessor added in v0.0.16

func NewRetryableProcessor(processor mq.Processor, config *RetryConfig, logger logger.Logger) *RetryableProcessor

NewRetryableProcessor creates a processor with retry capabilities

func (*RetryableProcessor) Close added in v0.0.16

func (rp *RetryableProcessor) Close() error

Close closes the processor

func (*RetryableProcessor) Consume added in v0.0.16

func (rp *RetryableProcessor) Consume(ctx context.Context) error

Consume starts consuming messages

func (*RetryableProcessor) GetKey added in v0.0.16

func (rp *RetryableProcessor) GetKey() string

GetKey returns the processor key

func (*RetryableProcessor) GetType added in v0.0.16

func (rp *RetryableProcessor) GetType() string

GetType returns the processor type

func (*RetryableProcessor) Pause added in v0.0.16

func (rp *RetryableProcessor) Pause(ctx context.Context) error

Pause pauses the processor

func (*RetryableProcessor) ProcessTask added in v0.0.16

func (rp *RetryableProcessor) ProcessTask(ctx context.Context, task *mq.Task) mq.Result

ProcessTask processes a task with retry logic

func (*RetryableProcessor) Resume added in v0.0.16

func (rp *RetryableProcessor) Resume(ctx context.Context) error

Resume resumes the processor

func (*RetryableProcessor) SetKey added in v0.0.16

func (rp *RetryableProcessor) SetKey(key string)

SetKey sets the processor key

func (*RetryableProcessor) Stop added in v0.0.16

func (rp *RetryableProcessor) Stop(ctx context.Context) error

Stop stops the processor

type RollbackHandler added in v0.0.16

type RollbackHandler interface {
	Rollback(operation TransactionOperation) error
}

RollbackHandler defines how to rollback operations

type SavePoint added in v0.0.17

type SavePoint struct {
	ID        string                 `json:"id"`
	Name      string                 `json:"name"`
	Timestamp time.Time              `json:"timestamp"`
	State     map[string]interface{} `json:"state"`
}

SavePoint represents a save point in a transaction

type SimpleHTTPClient added in v0.0.16

type SimpleHTTPClient struct {
	// contains filtered or unexported fields
}

SimpleHTTPClient implements HTTPClient interface for webhook manager

func NewSimpleHTTPClient added in v0.0.16

func NewSimpleHTTPClient(timeout time.Duration) *SimpleHTTPClient

NewSimpleHTTPClient creates a new simple HTTP client

func (*SimpleHTTPClient) Post added in v0.0.16

func (c *SimpleHTTPClient) Post(url string, contentType string, body []byte, headers map[string]string) error

Post sends a POST request to the specified URL

type TaskError added in v0.0.10

type TaskError struct {
	Err         error
	Recoverable bool
}

TaskError is used by node processors to indicate whether an error is recoverable.

func (TaskError) Error added in v0.0.10

func (te TaskError) Error() string

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

func NewTaskManager

func NewTaskManager(dag *DAG, taskID string, resultCh chan mq.Result, iteratorNodes storage.IMap[string, []Edge]) *TaskManager

func (*TaskManager) Pause added in v0.0.10

func (tm *TaskManager) Pause()

func (*TaskManager) ProcessTask added in v0.0.2

func (tm *TaskManager) ProcessTask(ctx context.Context, startNode string, payload json.RawMessage)

func (*TaskManager) Resume added in v0.0.10

func (tm *TaskManager) Resume()

func (*TaskManager) Stop added in v0.0.2

func (tm *TaskManager) Stop()

Stop gracefully stops the task manager

type TaskManagerConfig added in v0.0.10

type TaskManagerConfig struct {
	MaxRetries      int
	BaseBackoff     time.Duration
	RecoveryHandler func(ctx context.Context, result mq.Result) error
}

type TaskMetrics added in v0.0.11

type TaskMetrics struct {
	NotStarted int
	Queued     int
	Cancelled  int
	Completed  int
	Failed     int
	// contains filtered or unexported fields
}

New task metrics type.

type TaskState added in v0.0.2

type TaskState struct {
	UpdatedAt time.Time

	NodeID string
	Status mq.Status
	Result mq.Result
	// contains filtered or unexported fields
}

TaskState holds state and intermediate results for a given task (identified by a node ID).

type Transaction added in v0.0.16

type Transaction struct {
	ID         string                 `json:"id"`
	TaskID     string                 `json:"task_id"`
	Status     TransactionStatus      `json:"status"`
	StartTime  time.Time              `json:"start_time"`
	EndTime    time.Time              `json:"end_time,omitempty"`
	Operations []TransactionOperation `json:"operations"`
	SavePoints []SavePoint            `json:"save_points"`
	Metadata   map[string]interface{} `json:"metadata,omitempty"`
}

Transaction represents a transactional DAG execution

type TransactionManager added in v0.0.16

type TransactionManager struct {
	// contains filtered or unexported fields
}

TransactionManager handles transaction-like operations for DAG execution

func NewTransactionManager added in v0.0.16

func NewTransactionManager(dag *DAG, logger logger.Logger) *TransactionManager

NewTransactionManager creates a new transaction manager

func (*TransactionManager) AddOperation added in v0.0.17

func (tm *TransactionManager) AddOperation(txID string, operation TransactionOperation) error

AddOperation adds an operation to a transaction

func (*TransactionManager) AddSavePoint added in v0.0.16

func (tm *TransactionManager) AddSavePoint(txID, name string, state map[string]interface{}) error

AddSavePoint adds a save point to the transaction

func (*TransactionManager) BeginTransaction added in v0.0.16

func (tm *TransactionManager) BeginTransaction(taskID string) *Transaction

BeginTransaction starts a new transaction

func (*TransactionManager) CommitTransaction added in v0.0.16

func (tm *TransactionManager) CommitTransaction(txID string) error

CommitTransaction commits a transaction

func (*TransactionManager) GetTransaction added in v0.0.17

func (tm *TransactionManager) GetTransaction(txID string) (*Transaction, error)

GetTransaction retrieves a transaction by ID

func (*TransactionManager) RollbackTransaction added in v0.0.16

func (tm *TransactionManager) RollbackTransaction(txID string) error

RollbackTransaction rolls back a transaction

type TransactionOperation added in v0.0.17

type TransactionOperation struct {
	ID              string                 `json:"id"`
	Type            string                 `json:"type"`
	NodeID          string                 `json:"node_id"`
	Data            map[string]interface{} `json:"data"`
	Timestamp       time.Time              `json:"timestamp"`
	RollbackHandler RollbackHandler        `json:"-"`
}

TransactionOperation represents an operation within a transaction

type TransactionStatus added in v0.0.16

type TransactionStatus string

TransactionStatus represents the status of a transaction

const (
	TransactionStatusStarted    TransactionStatus = "started"
	TransactionStatusCommitted  TransactionStatus = "committed"
	TransactionStatusRolledBack TransactionStatus = "rolled_back"
	TransactionStatusFailed     TransactionStatus = "failed"
)

type WebSocketHandler added in v0.0.16

type WebSocketHandler struct {
	// contains filtered or unexported fields
}

WebSocketHandler provides real-time monitoring via WebSocket

func NewWebSocketHandler added in v0.0.16

func NewWebSocketHandler(dag *DAG) *WebSocketHandler

NewWebSocketHandler creates a new WebSocket handler

func (*WebSocketHandler) HandleWebSocket added in v0.0.16

func (h *WebSocketHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request)

HandleWebSocket handles WebSocket connections for real-time monitoring

type WebhookConfig added in v0.0.16

type WebhookConfig struct {
	URL        string            `json:"url"`
	Headers    map[string]string `json:"headers"`
	Method     string            `json:"method"`
	RetryCount int               `json:"retry_count"`
	Timeout    time.Duration     `json:"timeout"`
	Events     []string          `json:"events"`
}

WebhookConfig defines webhook configuration

type WebhookEvent added in v0.0.16

type WebhookEvent struct {
	Type      string                 `json:"type"`
	TaskID    string                 `json:"task_id"`
	NodeID    string                 `json:"node_id,omitempty"`
	Timestamp time.Time              `json:"timestamp"`
	Data      map[string]interface{} `json:"data"`
}

WebhookEvent represents an event to send via webhook

type WebhookManager added in v0.0.16

type WebhookManager struct {
	// contains filtered or unexported fields
}

WebhookManager handles webhook notifications

func NewWebhookManager added in v0.0.16

func NewWebhookManager(httpClient HTTPClient, logger logger.Logger) *WebhookManager

NewWebhookManager creates a new webhook manager

func (*WebhookManager) AddWebhook added in v0.0.16

func (wm *WebhookManager) AddWebhook(event string, config WebhookConfig)

AddWebhook adds a webhook configuration

func (*WebhookManager) TriggerWebhook added in v0.0.16

func (wm *WebhookManager) TriggerWebhook(event WebhookEvent)

TriggerWebhook sends webhook notifications for an event

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL