dag

package
v0.0.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2025 License: MIT Imports: 33 Imported by: 2

Documentation

Overview

context_keys.go

Index

Constants

View Source
const (
	Delimiter          = "___"
	ContextIndex       = "index"
	DefaultChannelSize = 1000
	RetryInterval      = 5 * time.Second
)

Variables

This section is empty.

Functions

func AddDAG

func AddDAG(key string, handler *DAG)

func AddHandler

func AddHandler(key string, handler func(string) mq.Processor)

func AvailableDAG

func AvailableDAG() []string

func ClearDAG added in v0.0.2

func ClearDAG()

func GetHandler

func GetHandler(key string) func(string) mq.Processor

func GetVal

func GetVal(c context.Context, v string, data map[string]any) (key string, val any)
func Header(c context.Context, headerKey string) (val map[string]any, exists bool)

func HeaderVal

func HeaderVal(c context.Context, headerKey string, key string) (val any)

func WsEvents

func WsEvents(s *sio.Server)

Types

type ActivityAlertHandler added in v0.0.17

type ActivityAlertHandler struct {
	// contains filtered or unexported fields
}

ActivityAlertHandler handles alerts by logging them as activities

func (*ActivityAlertHandler) HandleAlert added in v0.0.17

func (h *ActivityAlertHandler) HandleAlert(alert Alert) error

type ActivityEntry added in v0.0.17

type ActivityEntry struct {
	ID          string                 `json:"id"`
	Timestamp   time.Time              `json:"timestamp"`
	DAGName     string                 `json:"dag_name"`
	Level       ActivityLevel          `json:"level"`
	Type        ActivityType           `json:"type"`
	Message     string                 `json:"message"`
	TaskID      string                 `json:"task_id,omitempty"`
	NodeID      string                 `json:"node_id,omitempty"`
	Duration    time.Duration          `json:"duration,omitempty"`
	Success     *bool                  `json:"success,omitempty"`
	Error       string                 `json:"error,omitempty"`
	Details     map[string]interface{} `json:"details,omitempty"`
	ContextData map[string]interface{} `json:"context_data,omitempty"`
	UserID      string                 `json:"user_id,omitempty"`
	SessionID   string                 `json:"session_id,omitempty"`
	TraceID     string                 `json:"trace_id,omitempty"`
	SpanID      string                 `json:"span_id,omitempty"`
}

ActivityEntry represents a single activity log entry

type ActivityFilter added in v0.0.17

type ActivityFilter struct {
	StartTime    *time.Time      `json:"start_time,omitempty"`
	EndTime      *time.Time      `json:"end_time,omitempty"`
	Levels       []ActivityLevel `json:"levels,omitempty"`
	Types        []ActivityType  `json:"types,omitempty"`
	TaskIDs      []string        `json:"task_ids,omitempty"`
	NodeIDs      []string        `json:"node_ids,omitempty"`
	UserIDs      []string        `json:"user_ids,omitempty"`
	SuccessOnly  *bool           `json:"success_only,omitempty"`
	FailuresOnly *bool           `json:"failures_only,omitempty"`
	Limit        int             `json:"limit,omitempty"`
	Offset       int             `json:"offset,omitempty"`
	SortBy       string          `json:"sort_by,omitempty"`    // timestamp, level, type
	SortOrder    string          `json:"sort_order,omitempty"` // asc, desc
}

ActivityFilter provides filtering options for activity queries

type ActivityHook added in v0.0.17

type ActivityHook interface {
	OnActivity(entry ActivityEntry) error
}

ActivityHook allows custom processing of activity entries

type ActivityLevel added in v0.0.17

type ActivityLevel string

ActivityLevel represents the severity level of an activity

const (
	ActivityLevelDebug ActivityLevel = "debug"
	ActivityLevelInfo  ActivityLevel = "info"
	ActivityLevelWarn  ActivityLevel = "warn"
	ActivityLevelError ActivityLevel = "error"
	ActivityLevelFatal ActivityLevel = "fatal"
)

type ActivityLogger added in v0.0.17

type ActivityLogger struct {
	// contains filtered or unexported fields
}

ActivityLogger provides comprehensive activity logging for DAG operations

func NewActivityLogger added in v0.0.17

func NewActivityLogger(dagName string, config ActivityLoggerConfig, persistence ActivityPersistence, logger logger.Logger) *ActivityLogger

NewActivityLogger creates a new activity logger

func (*ActivityLogger) AddHook added in v0.0.17

func (al *ActivityLogger) AddHook(hook ActivityHook)

AddHook adds an activity hook

func (*ActivityLogger) Flush added in v0.0.17

func (al *ActivityLogger) Flush() error

Flush flushes the buffer to persistence

func (*ActivityLogger) GetActivities added in v0.0.17

func (al *ActivityLogger) GetActivities(filter ActivityFilter) ([]ActivityEntry, error)

GetActivities retrieves activities based on filter

func (*ActivityLogger) GetStats added in v0.0.17

func (al *ActivityLogger) GetStats(filter ActivityFilter) (ActivityStats, error)

GetStats returns activity statistics

func (*ActivityLogger) Log added in v0.0.17

func (al *ActivityLogger) Log(level ActivityLevel, activityType ActivityType, message string, details map[string]interface{})

Log logs an activity entry

func (*ActivityLogger) LogNodeExecution added in v0.0.17

func (al *ActivityLogger) LogNodeExecution(ctx context.Context, taskID string, nodeID string, result mq.Result, duration time.Duration)

LogNodeExecution logs node execution details

func (*ActivityLogger) LogTaskComplete added in v0.0.17

func (al *ActivityLogger) LogTaskComplete(ctx context.Context, taskID string, nodeID string, duration time.Duration)

LogTaskComplete logs task completion activity

func (*ActivityLogger) LogTaskFail added in v0.0.17

func (al *ActivityLogger) LogTaskFail(ctx context.Context, taskID string, nodeID string, err error, duration time.Duration)

LogTaskFail logs task failure activity

func (*ActivityLogger) LogTaskStart added in v0.0.17

func (al *ActivityLogger) LogTaskStart(ctx context.Context, taskID string, nodeID string)

LogTaskStart logs task start activity

func (*ActivityLogger) LogWithContext added in v0.0.17

func (al *ActivityLogger) LogWithContext(ctx context.Context, level ActivityLevel, activityType ActivityType, message string, details map[string]interface{})

LogWithContext logs an activity entry with context information

func (*ActivityLogger) RemoveHook added in v0.0.17

func (al *ActivityLogger) RemoveHook(hook ActivityHook)

RemoveHook removes an activity hook

func (*ActivityLogger) Stop added in v0.0.17

func (al *ActivityLogger) Stop()

Stop stops the activity logger

type ActivityLoggerConfig added in v0.0.17

type ActivityLoggerConfig struct {
	BufferSize        int           `json:"buffer_size"`
	FlushInterval     time.Duration `json:"flush_interval"`
	MaxRetries        int           `json:"max_retries"`
	EnableHooks       bool          `json:"enable_hooks"`
	EnableCompression bool          `json:"enable_compression"`
	MaxEntryAge       time.Duration `json:"max_entry_age"`
	AsyncMode         bool          `json:"async_mode"`
}

ActivityLoggerConfig configures the activity logger

func DefaultActivityLoggerConfig added in v0.0.17

func DefaultActivityLoggerConfig() ActivityLoggerConfig

DefaultActivityLoggerConfig returns default configuration

type ActivityPersistence added in v0.0.17

type ActivityPersistence interface {
	Store(entries []ActivityEntry) error
	Query(filter ActivityFilter) ([]ActivityEntry, error)
	GetStats(filter ActivityFilter) (ActivityStats, error)
	Close() error
}

ActivityPersistence defines the interface for persisting activities

type ActivityStats added in v0.0.17

type ActivityStats struct {
	TotalActivities      int64                   `json:"total_activities"`
	ActivitiesByLevel    map[ActivityLevel]int64 `json:"activities_by_level"`
	ActivitiesByType     map[ActivityType]int64  `json:"activities_by_type"`
	ActivitiesByNode     map[string]int64        `json:"activities_by_node"`
	ActivitiesByTask     map[string]int64        `json:"activities_by_task"`
	SuccessRate          float64                 `json:"success_rate"`
	FailureRate          float64                 `json:"failure_rate"`
	AverageDuration      time.Duration           `json:"average_duration"`
	PeakActivitiesPerMin int64                   `json:"peak_activities_per_minute"`
	TimeRange            ActivityTimeRange       `json:"time_range"`
	RecentErrors         []ActivityEntry         `json:"recent_errors"`
	TopFailingNodes      []NodeFailureStats      `json:"top_failing_nodes"`
	HourlyDistribution   map[string]int64        `json:"hourly_distribution"`
}

ActivityStats provides statistics about activities

type ActivityTimeRange added in v0.0.17

type ActivityTimeRange struct {
	Start time.Time `json:"start"`
	End   time.Time `json:"end"`
}

ActivityTimeRange represents a time range for activities

type ActivityType added in v0.0.17

type ActivityType string

ActivityType represents the type of activity

const (
	ActivityTypeTaskStart      ActivityType = "task_start"
	ActivityTypeTaskComplete   ActivityType = "task_complete"
	ActivityTypeTaskFail       ActivityType = "task_fail"
	ActivityTypeTaskCancel     ActivityType = "task_cancel"
	ActivityTypeNodeStart      ActivityType = "node_start"
	ActivityTypeNodeComplete   ActivityType = "node_complete"
	ActivityTypeNodeFail       ActivityType = "node_fail"
	ActivityTypeNodeTimeout    ActivityType = "node_timeout"
	ActivityTypeValidation     ActivityType = "validation"
	ActivityTypeConfiguration  ActivityType = "configuration"
	ActivityTypeAlert          ActivityType = "alert"
	ActivityTypeCleanup        ActivityType = "cleanup"
	ActivityTypeTransaction    ActivityType = "transaction"
	ActivityTypeRetry          ActivityType = "retry"
	ActivityTypeCircuitBreaker ActivityType = "circuit_breaker"
	ActivityTypeWebhook        ActivityType = "webhook"
	ActivityTypeCustom         ActivityType = "custom"
)

type Alert added in v0.0.16

type Alert struct {
	ID          string                 `json:"id"`
	Timestamp   time.Time              `json:"timestamp"`
	Severity    AlertSeverity          `json:"severity"`
	Type        AlertType              `json:"type"`
	Message     string                 `json:"message"`
	Details     map[string]interface{} `json:"details"`
	NodeID      string                 `json:"node_id,omitempty"`
	TaskID      string                 `json:"task_id,omitempty"`
	Threshold   interface{}            `json:"threshold,omitempty"`
	ActualValue interface{}            `json:"actual_value,omitempty"`
}

Alert represents a monitoring alert

type AlertHandler added in v0.0.16

type AlertHandler interface {
	HandleAlert(alert Alert) error
}

AlertHandler defines interface for handling alerts

type AlertSeverity added in v0.0.17

type AlertSeverity string
const (
	AlertSeverityInfo     AlertSeverity = "info"
	AlertSeverityWarning  AlertSeverity = "warning"
	AlertSeverityCritical AlertSeverity = "critical"
)

type AlertThresholds added in v0.0.16

type AlertThresholds struct {
	MaxFailureRate      float64       `json:"max_failure_rate"`
	MaxExecutionTime    time.Duration `json:"max_execution_time"`
	MaxTasksInProgress  int64         `json:"max_tasks_in_progress"`
	MinSuccessRate      float64       `json:"min_success_rate"`
	MaxNodeFailures     int64         `json:"max_node_failures"`
	HealthCheckInterval time.Duration `json:"health_check_interval"`
}

AlertThresholds defines thresholds for alerting

type AlertType added in v0.0.17

type AlertType string
const (
	AlertTypeFailureRate    AlertType = "failure_rate"
	AlertTypeExecutionTime  AlertType = "execution_time"
	AlertTypeTaskLoad       AlertType = "task_load"
	AlertTypeNodeFailures   AlertType = "node_failures"
	AlertTypeCircuitBreaker AlertType = "circuit_breaker"
	AlertTypeHealthCheck    AlertType = "health_check"
)

type AlertWebhookHandler added in v0.0.16

type AlertWebhookHandler struct {
	// contains filtered or unexported fields
}

AlertWebhookHandler handles webhook alerts

func NewAlertWebhookHandler added in v0.0.16

func NewAlertWebhookHandler(logger logger.Logger) *AlertWebhookHandler

NewAlertWebhookHandler creates a new alert webhook handler

func (*AlertWebhookHandler) HandleAlert added in v0.0.16

func (h *AlertWebhookHandler) HandleAlert(alert Alert) error

HandleAlert implements the AlertHandler interface

type BatchProcessor added in v0.0.16

type BatchProcessor struct {
	// contains filtered or unexported fields
}

BatchProcessor handles batch processing of tasks

func NewBatchProcessor added in v0.0.16

func NewBatchProcessor(dag *DAG, batchSize int, batchTimeout time.Duration, logger logger.Logger) *BatchProcessor

NewBatchProcessor creates a new batch processor

func (*BatchProcessor) AddTask added in v0.0.16

func (bp *BatchProcessor) AddTask(task *mq.Task) error

AddTask adds a task to the batch

func (*BatchProcessor) SetProcessFunc added in v0.0.16

func (bp *BatchProcessor) SetProcessFunc(fn func([]*mq.Task) error)

SetProcessFunc sets the function to process batches

func (*BatchProcessor) Stop added in v0.0.16

func (bp *BatchProcessor) Stop()

Stop stops the batch processor

type CacheConfig added in v0.0.16

type CacheConfig struct {
	Enabled bool          `json:"enabled"`
	TTL     time.Duration `json:"ttl"`
	MaxSize int           `json:"max_size"`
}

CacheConfig holds cache configuration

type CacheEntry added in v0.0.16

type CacheEntry struct {
	Value       interface{}
	ExpiresAt   time.Time
	AccessCount int64
	LastAccess  time.Time
}

CacheEntry represents a cached item

type CircuitBreaker added in v0.0.16

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements circuit breaker pattern for nodes

func NewCircuitBreaker added in v0.0.16

func NewCircuitBreaker(config *CircuitBreakerConfig, logger logger.Logger) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) Execute added in v0.0.16

func (cb *CircuitBreaker) Execute(fn func() error) error

Execute executes a function with circuit breaker protection

func (*CircuitBreaker) GetState added in v0.0.16

func (cb *CircuitBreaker) GetState() CircuitBreakerState

GetState returns the current circuit breaker state

func (*CircuitBreaker) Reset added in v0.0.16

func (cb *CircuitBreaker) Reset()

Reset manually resets the circuit breaker

type CircuitBreakerConfig added in v0.0.16

type CircuitBreakerConfig struct {
	FailureThreshold int
	ResetTimeout     time.Duration
	HalfOpenMaxCalls int
}

CircuitBreakerConfig defines circuit breaker behavior

type CircuitBreakerState added in v0.0.16

type CircuitBreakerState int

Circuit Breaker Implementation

const (
	CircuitClosed CircuitBreakerState = iota
	CircuitOpen
	CircuitHalfOpen
)

type CleanupManager added in v0.0.16

type CleanupManager struct {
	// contains filtered or unexported fields
}

CleanupManager handles cleanup of completed tasks and resources

func NewCleanupManager added in v0.0.16

func NewCleanupManager(dag *DAG, cleanupInterval, retentionPeriod time.Duration, maxEntries int, logger logger.Logger) *CleanupManager

NewCleanupManager creates a new cleanup manager

func (*CleanupManager) Start added in v0.0.16

func (cm *CleanupManager) Start(ctx context.Context)

Start begins the cleanup routine

func (*CleanupManager) Stop added in v0.0.16

func (cm *CleanupManager) Stop()

Stop stops the cleanup routine

type Condition

type Condition interface {
	Match(data any) bool
}

type ConditionProcessor

type ConditionProcessor interface {
	Processor
	SetConditions(map[string]Condition)
}

type ConfigManager added in v0.0.16

type ConfigManager struct {
	// contains filtered or unexported fields
}

ConfigManager handles dynamic DAG configuration

func NewConfigManager added in v0.0.16

func NewConfigManager(logger logger.Logger) *ConfigManager

NewConfigManager creates a new configuration manager

func (*ConfigManager) AddWatcher added in v0.0.16

func (cm *ConfigManager) AddWatcher(watcher ConfigWatcher)

AddWatcher adds a configuration watcher

func (*ConfigManager) GetConfig added in v0.0.16

func (cm *ConfigManager) GetConfig() *DAGConfig

GetConfig returns a copy of the current configuration

func (*ConfigManager) UpdateConfig added in v0.0.16

func (cm *ConfigManager) UpdateConfig(newConfig *DAGConfig) error

UpdateConfig updates the configuration

func (*ConfigManager) UpdateConfiguration added in v0.0.17

func (cm *ConfigManager) UpdateConfiguration(config *DAGConfig) error

UpdateConfiguration updates the DAG configuration (alias for UpdateConfig)

type ConfigWatcher added in v0.0.16

type ConfigWatcher interface {
	OnConfigChange(oldConfig, newConfig *DAGConfig) error
}

ConfigWatcher interface for configuration change notifications

type DAG

type DAG struct {
	Error error

	Notifier *sio.Server

	// New hook fields:
	PreProcessHook  func(ctx context.Context, node *Node, taskID string, payload json.RawMessage) context.Context
	PostProcessHook func(ctx context.Context, node *Node, taskID string, result mq.Result)
	// contains filtered or unexported fields
}

func GetDAG

func GetDAG(key string) *DAG

func NewDAG

func NewDAG(name, key string, finalResultCallback func(taskID string, result mq.Result), opts ...mq.Option) *DAG

func (*DAG) AddActivityHook added in v0.0.17

func (tm *DAG) AddActivityHook(hook ActivityHook)

AddActivityHook adds an activity hook

func (*DAG) AddAlertHandler added in v0.0.16

func (tm *DAG) AddAlertHandler(handler AlertHandler)

AddAlertHandler adds an alert handler

func (*DAG) AddCondition

func (tm *DAG) AddCondition(fromNode string, conditions map[string]string) *DAG

func (*DAG) AddConfigWatcher added in v0.0.17

func (tm *DAG) AddConfigWatcher(watcher ConfigWatcher)

AddConfigWatcher adds a configuration change watcher

func (*DAG) AddDAGNode

func (tm *DAG) AddDAGNode(nodeType NodeType, name string, key string, dag *DAG, firstNode ...bool) *DAG

func (*DAG) AddDeferredNode

func (tm *DAG) AddDeferredNode(nodeType NodeType, name, key string, firstNode ...bool) error

func (*DAG) AddEdge

func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string) *DAG

func (*DAG) AddNode

func (tm *DAG) AddNode(nodeType NodeType, name, nodeID string, handler mq.Processor, startNode ...bool) *DAG

func (*DAG) AddNodeWithRetry added in v0.0.16

func (tm *DAG) AddNodeWithRetry(nodeType NodeType, name, nodeID string, handler mq.Processor, retryConfig *RetryConfig, startNode ...bool) *DAG

AddNodeWithRetry adds a node with specific retry configuration

func (*DAG) AddWebhook added in v0.0.17

func (tm *DAG) AddWebhook(event string, config WebhookConfig)

AddWebhook adds a webhook configuration

func (*DAG) AssignTopic

func (tm *DAG) AssignTopic(topic string)

func (*DAG) BaseURI added in v0.0.10

func (tm *DAG) BaseURI() string

func (*DAG) BeginTransaction added in v0.0.16

func (tm *DAG) BeginTransaction(taskID string) *Transaction

BeginTransaction starts a new transaction

func (*DAG) CancelTask added in v0.0.11

func (tm *DAG) CancelTask(taskID string) error

New method to cancel a running task.

func (*DAG) CheckRateLimit added in v0.0.17

func (tm *DAG) CheckRateLimit(nodeID string) bool

CheckRateLimit checks if request is allowed for a node

func (*DAG) ClassifyEdges

func (tm *DAG) ClassifyEdges(startNodes ...string) (string, bool, error)

func (*DAG) Clone added in v0.0.17

func (tm *DAG) Clone() *DAG

Clone creates a deep copy of the DAG

func (*DAG) Close

func (tm *DAG) Close() error

func (*DAG) CommitTransaction added in v0.0.16

func (tm *DAG) CommitTransaction(txID string) error

CommitTransaction commits a transaction

func (*DAG) Consume

func (tm *DAG) Consume(ctx context.Context) error

func (*DAG) Export added in v0.0.17

func (tm *DAG) Export() map[string]interface{}

Export exports the DAG structure to a serializable format

func (*DAG) ExportDOT

func (tm *DAG) ExportDOT(direction ...Direction) string

func (*DAG) FlushActivityLogs added in v0.0.17

func (tm *DAG) FlushActivityLogs() error

FlushActivityLogs flushes activity logs to persistence

func (*DAG) GetActivities added in v0.0.17

func (tm *DAG) GetActivities(filter ActivityFilter) ([]ActivityEntry, error)

GetActivities retrieves activities based on filter

func (*DAG) GetActivityLogger added in v0.0.17

func (tm *DAG) GetActivityLogger() *ActivityLogger

GetActivityLogger returns the activity logger instance

func (*DAG) GetActivityStats added in v0.0.17

func (tm *DAG) GetActivityStats(filter ActivityFilter) (ActivityStats, error)

GetActivityStats returns activity statistics

func (*DAG) GetAllNodes added in v0.0.17

func (tm *DAG) GetAllNodes() map[string]*Node

GetAllNodes returns all nodes in the DAG

func (*DAG) GetCircuitBreakerStatus added in v0.0.17

func (tm *DAG) GetCircuitBreakerStatus(nodeID string) CircuitBreakerState

GetCircuitBreakerStatus returns circuit breaker status for a node

func (*DAG) GetConfiguration added in v0.0.16

func (tm *DAG) GetConfiguration() *DAGConfig

GetConfiguration returns current DAG configuration

func (*DAG) GetCriticalPath added in v0.0.16

func (tm *DAG) GetCriticalPath() ([]string, error)

GetCriticalPath returns the critical path of the DAG

func (*DAG) GetDAGStatistics added in v0.0.16

func (tm *DAG) GetDAGStatistics() map[string]interface{}

GetDAGStatistics returns comprehensive DAG statistics

func (*DAG) GetEdgeCount added in v0.0.17

func (tm *DAG) GetEdgeCount() int

GetEdgeCount returns the total number of edges

func (*DAG) GetKey

func (tm *DAG) GetKey() string

func (*DAG) GetLastNodes added in v0.0.2

func (tm *DAG) GetLastNodes() ([]*Node, error)

func (*DAG) GetMonitoringMetrics added in v0.0.16

func (tm *DAG) GetMonitoringMetrics() *MonitoringMetrics

GetMonitoringMetrics returns current monitoring metrics

func (*DAG) GetNextNodes added in v0.0.2

func (tm *DAG) GetNextNodes(nodeID string) ([]*Node, error)

GetNextNodes returns the next nodes for a given node

func (*DAG) GetNodeByID added in v0.0.17

func (tm *DAG) GetNodeByID(nodeID string) (*Node, error)

GetNodeByID returns a node by its ID

func (*DAG) GetNodeCount added in v0.0.17

func (tm *DAG) GetNodeCount() int

GetNodeCount returns the total number of nodes

func (*DAG) GetNodeStats added in v0.0.16

func (tm *DAG) GetNodeStats(nodeID string) *NodeStats

GetNodeStats returns statistics for a specific node

func (*DAG) GetPreviousNodes added in v0.0.2

func (tm *DAG) GetPreviousNodes(nodeID string) ([]*Node, error)

GetPreviousNodes returns the previous nodes for a given node

func (*DAG) GetReport

func (tm *DAG) GetReport() string

func (*DAG) GetStartNode

func (tm *DAG) GetStartNode() string

func (*DAG) GetStatus added in v0.0.11

func (tm *DAG) GetStatus() map[string]interface{}

GetStatus returns a summary of the DAG including node and task counts.

func (*DAG) GetTaskMetrics added in v0.0.11

func (d *DAG) GetTaskMetrics() TaskMetrics

Getter for task metrics.

func (*DAG) GetTopologicalOrder added in v0.0.16

func (tm *DAG) GetTopologicalOrder() ([]string, error)

GetTopologicalOrder returns nodes in topological order

func (*DAG) GetTransaction added in v0.0.17

func (tm *DAG) GetTransaction(txID string) (*Transaction, error)

GetTransaction retrieves transaction details

func (*DAG) GetType

func (tm *DAG) GetType() string

func (*DAG) Handlers

func (tm *DAG) Handlers(app any, prefix string)

Handlers initializes route handlers.

func (*DAG) InitializeActivityLogger added in v0.0.17

func (tm *DAG) InitializeActivityLogger(config ActivityLoggerConfig, persistence ActivityPersistence)

InitializeActivityLogger initializes the activity logger for the DAG

func (*DAG) IsLastNode added in v0.0.2

func (tm *DAG) IsLastNode(nodeID string) (bool, error)

IsLastNode checks if a node is the last node in the DAG

func (*DAG) IsReady

func (tm *DAG) IsReady() bool

func (*DAG) LogActivity added in v0.0.17

func (tm *DAG) LogActivity(ctx context.Context, level ActivityLevel, activityType ActivityType, message string, details map[string]interface{})

LogActivity logs an activity entry

func (*DAG) Logger added in v0.0.10

func (tm *DAG) Logger() logger.Logger

func (*DAG) OptimizePerformance added in v0.0.16

func (tm *DAG) OptimizePerformance() error

OptimizePerformance triggers performance optimization

func (*DAG) Pause

func (tm *DAG) Pause(_ context.Context) error

func (*DAG) PauseConsumer

func (tm *DAG) PauseConsumer(ctx context.Context, id string)

func (*DAG) PrintGraph

func (tm *DAG) PrintGraph()

func (*DAG) Process

func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result

func (*DAG) ProcessTask

func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result

func (*DAG) ProcessTaskNew added in v0.0.10

func (tm *DAG) ProcessTaskNew(ctx context.Context, task *mq.Task) mq.Result

func (*DAG) RemoveNode added in v0.0.16

func (tm *DAG) RemoveNode(nodeID string) error

RemoveNode removes the node with the given nodeID and adjusts the edges. For example, if A -> B and B -> C exist and B is removed, a new edge A -> C is created.

func (*DAG) ReportNodeResult

func (tm *DAG) ReportNodeResult(callback func(mq.Result))

func (*DAG) Reset added in v0.0.11

func (tm *DAG) Reset()

New method to reset the DAG state.

func (*DAG) Resume

func (tm *DAG) Resume(_ context.Context) error

func (*DAG) ResumeConsumer

func (tm *DAG) ResumeConsumer(ctx context.Context, id string)

func (*DAG) RollbackTransaction added in v0.0.16

func (tm *DAG) RollbackTransaction(txID string) error

RollbackTransaction rolls back a transaction

func (*DAG) SaveDOTFile

func (tm *DAG) SaveDOTFile(filename string, direction ...Direction) error

func (*DAG) SavePNG

func (tm *DAG) SavePNG(pngFile string) error

func (*DAG) SaveSVG

func (tm *DAG) SaveSVG(svgFile string) error

func (*DAG) ScheduleTask

func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.SchedulerOption) mq.Result

func (*DAG) SetAlertThresholds added in v0.0.16

func (tm *DAG) SetAlertThresholds(thresholds *AlertThresholds)

SetAlertThresholds configures alert thresholds

func (*DAG) SetBatchProcessingEnabled added in v0.0.17

func (tm *DAG) SetBatchProcessingEnabled(enabled bool)

SetBatchProcessingEnabled enables or disables batch processing

func (*DAG) SetKey

func (tm *DAG) SetKey(key string)

func (*DAG) SetNotifyResponse

func (tm *DAG) SetNotifyResponse(callback mq.Callback)

func (*DAG) SetPostProcessHook added in v0.0.11

func (tm *DAG) SetPostProcessHook(hook func(ctx context.Context, node *Node, taskID string, result mq.Result))

SetPostProcessHook configures a function to be called after each node is processed.

func (*DAG) SetPreProcessHook added in v0.0.11

func (tm *DAG) SetPreProcessHook(hook func(ctx context.Context, node *Node, taskID string, payload json.RawMessage) context.Context)

SetPreProcessHook configures a function to be called before each node is processed.

func (*DAG) SetRateLimit added in v0.0.16

func (tm *DAG) SetRateLimit(nodeID string, requestsPerSecond float64, burst int)

SetRateLimit sets rate limit for a specific node

func (*DAG) SetRetryConfig added in v0.0.16

func (tm *DAG) SetRetryConfig(config *RetryConfig)

SetRetryConfig sets the retry configuration

func (*DAG) SetStartNode

func (tm *DAG) SetStartNode(node string)

func (*DAG) SetWebhookManager added in v0.0.16

func (tm *DAG) SetWebhookManager(manager *WebhookManager)

SetWebhookManager sets the webhook manager

func (*DAG) SetupWS

func (tm *DAG) SetupWS() *sio.Server

func (*DAG) Start

func (tm *DAG) Start(ctx context.Context, addr string) error

func (*DAG) StartCleanup added in v0.0.17

func (tm *DAG) StartCleanup(ctx context.Context)

StartCleanup starts the cleanup manager

func (*DAG) StartMonitoring added in v0.0.16

func (tm *DAG) StartMonitoring(ctx context.Context)

StartMonitoring starts the monitoring system

func (*DAG) Stop

func (tm *DAG) Stop(ctx context.Context) error

func (*DAG) StopCleanup added in v0.0.17

func (tm *DAG) StopCleanup()

StopCleanup stops the cleanup manager

func (*DAG) StopEnhanced added in v0.0.17

func (tm *DAG) StopEnhanced(ctx context.Context) error

Enhanced Stop method with proper cleanup

func (*DAG) StopMonitoring added in v0.0.16

func (tm *DAG) StopMonitoring()

StopMonitoring stops the monitoring system

func (*DAG) TopologicalSort

func (tm *DAG) TopologicalSort() (stack []string)

func (*DAG) UpdateConfiguration added in v0.0.16

func (tm *DAG) UpdateConfiguration(config *DAGConfig) error

UpdateConfiguration updates the DAG configuration

func (*DAG) Validate

func (tm *DAG) Validate() error

func (*DAG) ValidateDAG added in v0.0.16

func (tm *DAG) ValidateDAG() error

ValidateDAG validates the DAG structure using the enhanced validator

type DAGCache added in v0.0.16

type DAGCache struct {
	// contains filtered or unexported fields
}

DAGCache provides caching capabilities for DAG operations

func NewDAGCache added in v0.0.16

func NewDAGCache(ttl time.Duration, maxSize int, logger logger.Logger) *DAGCache

NewDAGCache creates a new DAG cache

func (*DAGCache) GetNode added in v0.0.16

func (dc *DAGCache) GetNode(key string) (*Node, bool)

GetNode retrieves a cached node

func (*DAGCache) GetNodeResult added in v0.0.16

func (dc *DAGCache) GetNodeResult(key string) (interface{}, bool)

GetNodeResult retrieves a cached node result

func (*DAGCache) SetNode added in v0.0.16

func (dc *DAGCache) SetNode(key string, node *Node)

SetNode caches a node

func (*DAGCache) SetNodeResult added in v0.0.16

func (dc *DAGCache) SetNodeResult(key string, value interface{})

SetNodeResult caches a node result

func (*DAGCache) Stop added in v0.0.16

func (dc *DAGCache) Stop()

Stop stops the cache cleanup routine

type DAGConfig added in v0.0.16

type DAGConfig struct {
	MaxConcurrentTasks     int              `json:"max_concurrent_tasks"`
	TaskTimeout            time.Duration    `json:"task_timeout"`
	NodeTimeout            time.Duration    `json:"node_timeout"`
	RetryConfig            *RetryConfig     `json:"retry_config"`
	CacheConfig            *CacheConfig     `json:"cache_config"`
	RateLimitConfig        *RateLimitConfig `json:"rate_limit_config"`
	MonitoringEnabled      bool             `json:"monitoring_enabled"`
	AlertingEnabled        bool             `json:"alerting_enabled"`
	CleanupInterval        time.Duration    `json:"cleanup_interval"`
	TransactionTimeout     time.Duration    `json:"transaction_timeout"`
	BatchProcessingEnabled bool             `json:"batch_processing_enabled"`
	BatchSize              int              `json:"batch_size"`
	BatchTimeout           time.Duration    `json:"batch_timeout"`
}

DAGConfig holds dynamic configuration for DAG

func DefaultDAGConfig added in v0.0.16

func DefaultDAGConfig() *DAGConfig

DefaultDAGConfig returns default DAG configuration

type DAGValidator added in v0.0.16

type DAGValidator struct {
	// contains filtered or unexported fields
}

DAGValidator provides validation capabilities for DAG structure

func NewDAGValidator added in v0.0.16

func NewDAGValidator(dag *DAG) *DAGValidator

NewDAGValidator creates a new DAG validator

func (*DAGValidator) GetCriticalPath added in v0.0.16

func (v *DAGValidator) GetCriticalPath() ([]string, error)

GetCriticalPath finds the longest path in the DAG

func (*DAGValidator) GetNodeStatistics added in v0.0.16

func (v *DAGValidator) GetNodeStatistics() map[string]interface{}

GetNodeStatistics returns DAG statistics

func (*DAGValidator) GetTopologicalOrder added in v0.0.16

func (v *DAGValidator) GetTopologicalOrder() ([]string, error)

GetTopologicalOrder returns nodes in topological order

func (*DAGValidator) ValidateStructure added in v0.0.16

func (v *DAGValidator) ValidateStructure() error

ValidateStructure performs comprehensive DAG structure validation

type Debugger added in v0.0.10

type Debugger interface {
	Debug(context.Context, *mq.Task)
}

type Direction added in v0.0.7

type Direction string
const (
	TB Direction = "TB"
	LR Direction = "LR"
)

type Edge

type Edge struct {
	From       *Node
	FromSource string
	To         *Node
	Label      string
	Type       EdgeType
}

type EdgeLevel added in v0.0.17

type EdgeLevel struct {
	Level int
	Type  EdgeType
}

EdgeLevel represents the depth level of an edge in the DAG

type EdgeStyleConfig added in v0.0.17

type EdgeStyleConfig struct {
	Color     string
	Style     string
	PenWidth  string
	ArrowSize string
	FontSize  string
}

EdgeStyleConfig contains styling information for edges

type EdgeType

type EdgeType int
const (
	Simple EdgeType = iota
	Iterator
)

func (EdgeType) IsValid

func (c EdgeType) IsValid() bool

func (EdgeType) String added in v0.0.17

func (c EdgeType) String() string

type EnhancedAPIHandler added in v0.0.16

type EnhancedAPIHandler struct {
	// contains filtered or unexported fields
}

EnhancedAPIHandler provides enhanced API endpoints for DAG management

func NewEnhancedAPIHandler added in v0.0.16

func NewEnhancedAPIHandler(dag *DAG) *EnhancedAPIHandler

NewEnhancedAPIHandler creates a new enhanced API handler

func (*EnhancedAPIHandler) RegisterRoutes added in v0.0.16

func (h *EnhancedAPIHandler) RegisterRoutes(mux *http.ServeMux)

RegisterRoutes registers all enhanced API routes

type HTTPClient added in v0.0.16

type HTTPClient interface {
	Post(url string, contentType string, body []byte, headers map[string]string) error
}

HTTPClient interface for HTTP requests

type Handler added in v0.0.14

type Handler struct {
	Name string   `json:"name"`
	Tags []string `json:"tags"`
}

func AvailableHandlers

func AvailableHandlers() []Handler

type List

type List struct {
	Handlers map[string]*DAG
	// contains filtered or unexported fields
}

type MemoryActivityPersistence added in v0.0.17

type MemoryActivityPersistence struct {
	// contains filtered or unexported fields
}

MemoryActivityPersistence provides in-memory activity persistence for testing

func NewMemoryActivityPersistence added in v0.0.17

func NewMemoryActivityPersistence() *MemoryActivityPersistence

NewMemoryActivityPersistence creates a new in-memory persistence

func (*MemoryActivityPersistence) Close added in v0.0.17

func (mp *MemoryActivityPersistence) Close() error

Close closes the persistence

func (*MemoryActivityPersistence) GetStats added in v0.0.17

GetStats returns statistics for the filtered entries

func (*MemoryActivityPersistence) Query added in v0.0.17

Query queries activity entries with filter

func (*MemoryActivityPersistence) Store added in v0.0.17

func (mp *MemoryActivityPersistence) Store(entries []ActivityEntry) error

Store stores activity entries in memory

type Monitor added in v0.0.16

type Monitor struct {
	// contains filtered or unexported fields
}

Monitor provides comprehensive monitoring capabilities for DAG

func NewMonitor added in v0.0.16

func NewMonitor(dag *DAG, logger logger.Logger) *Monitor

NewMonitor creates a new DAG monitor

func (*Monitor) AddAlertHandler added in v0.0.16

func (m *Monitor) AddAlertHandler(handler AlertHandler)

AddAlertHandler adds an alert handler

func (*Monitor) GetMetrics added in v0.0.16

func (m *Monitor) GetMetrics() *MonitoringMetrics

GetMetrics returns current metrics

func (*Monitor) SetAlertThresholds added in v0.0.16

func (m *Monitor) SetAlertThresholds(thresholds *AlertThresholds)

SetAlertThresholds updates alert thresholds

func (*Monitor) Start added in v0.0.16

func (m *Monitor) Start(ctx context.Context)

Start begins monitoring

func (*Monitor) Stop added in v0.0.16

func (m *Monitor) Stop()

Stop stops monitoring

type MonitoringMetrics added in v0.0.16

type MonitoringMetrics struct {
	TasksTotal           int64
	TasksCompleted       int64
	TasksFailed          int64
	TasksCancelled       int64
	TasksInProgress      int64
	NodesExecuted        map[string]int64
	NodeExecutionTimes   map[string][]time.Duration
	NodeFailures         map[string]int64
	AverageExecutionTime time.Duration
	TotalExecutionTime   time.Duration
	StartTime            time.Time
	LastTaskCompletedAt  time.Time
	ActiveTasks          map[string]time.Time
	NodeProcessingStats  map[string]*NodeStats
	// contains filtered or unexported fields
}

MonitoringMetrics holds comprehensive metrics for DAG monitoring

func NewMonitoringMetrics added in v0.0.16

func NewMonitoringMetrics() *MonitoringMetrics

NewMonitoringMetrics creates a new metrics instance

func (*MonitoringMetrics) GetNodeStats added in v0.0.16

func (m *MonitoringMetrics) GetNodeStats(nodeID string) *NodeStats

GetNodeStats returns statistics for a specific node

func (*MonitoringMetrics) GetSnapshot added in v0.0.16

func (m *MonitoringMetrics) GetSnapshot() *MonitoringMetrics

GetSnapshot returns a snapshot of current metrics

func (*MonitoringMetrics) RecordNodeEnd added in v0.0.16

func (m *MonitoringMetrics) RecordNodeEnd(nodeID string)

RecordNodeEnd records when a node finishes processing

func (*MonitoringMetrics) RecordNodeExecution added in v0.0.16

func (m *MonitoringMetrics) RecordNodeExecution(nodeID string, duration time.Duration, success bool)

RecordNodeExecution records node execution metrics

func (*MonitoringMetrics) RecordNodeStart added in v0.0.16

func (m *MonitoringMetrics) RecordNodeStart(nodeID string)

RecordNodeStart records when a node starts processing

func (*MonitoringMetrics) RecordTaskCompletion added in v0.0.16

func (m *MonitoringMetrics) RecordTaskCompletion(taskID string, status mq.Status)

RecordTaskCompletion records task completion

func (*MonitoringMetrics) RecordTaskStart added in v0.0.16

func (m *MonitoringMetrics) RecordTaskStart(taskID string)

RecordTaskStart records the start of a task

type Node

type Node struct {
	Label    string
	ID       string
	Edges    []Edge
	NodeType NodeType

	Timeout time.Duration // ...new field for node-level timeout...
	// contains filtered or unexported fields
}

func (*Node) SetTimeout added in v0.0.11

func (n *Node) SetTimeout(d time.Duration)

SetTimeout allows setting a maximum processing duration for the node.

type NodeFailureStats added in v0.0.17

type NodeFailureStats struct {
	NodeID       string    `json:"node_id"`
	FailureCount int64     `json:"failure_count"`
	FailureRate  float64   `json:"failure_rate"`
	LastFailure  time.Time `json:"last_failure"`
}

NodeFailureStats represents failure statistics for a node

type NodeRateLimit added in v0.0.16

type NodeRateLimit struct {
	RequestsPerSecond float64 `json:"requests_per_second"`
	Burst             int     `json:"burst"`
}

NodeRateLimit holds rate limit settings for a specific node

type NodeRetryManager added in v0.0.16

type NodeRetryManager struct {
	// contains filtered or unexported fields
}

NodeRetryManager handles retry logic for individual nodes

func NewNodeRetryManager added in v0.0.16

func NewNodeRetryManager(config *RetryConfig, logger logger.Logger) *NodeRetryManager

NewNodeRetryManager creates a new retry manager

func (*NodeRetryManager) GetAttempts added in v0.0.16

func (rm *NodeRetryManager) GetAttempts(taskID, nodeID string) int

GetAttempts returns the number of attempts for a task/node combination

func (*NodeRetryManager) GetRetryDelay added in v0.0.16

func (rm *NodeRetryManager) GetRetryDelay(taskID, nodeID string) time.Duration

GetRetryDelay calculates the delay before the next retry

func (*NodeRetryManager) RecordAttempt added in v0.0.16

func (rm *NodeRetryManager) RecordAttempt(taskID, nodeID string)

RecordAttempt records a retry attempt

func (*NodeRetryManager) Reset added in v0.0.16

func (rm *NodeRetryManager) Reset(taskID, nodeID string)

Reset clears retry attempts for a task/node combination

func (*NodeRetryManager) ResetTask added in v0.0.16

func (rm *NodeRetryManager) ResetTask(taskID string)

ResetTask clears all retry attempts for a task

func (*NodeRetryManager) SetGlobalConfig added in v0.0.17

func (rm *NodeRetryManager) SetGlobalConfig(config *RetryConfig)

SetGlobalConfig sets the global retry configuration

func (*NodeRetryManager) SetNodeConfig added in v0.0.17

func (rm *NodeRetryManager) SetNodeConfig(nodeID string, config *RetryConfig)

SetNodeConfig sets retry configuration for a specific node

func (*NodeRetryManager) ShouldRetry added in v0.0.16

func (rm *NodeRetryManager) ShouldRetry(taskID, nodeID string, err error) bool

ShouldRetry determines if a failed node should be retried

type NodeStats added in v0.0.16

type NodeStats struct {
	ExecutionCount   int64
	SuccessCount     int64
	FailureCount     int64
	TotalDuration    time.Duration
	AverageDuration  time.Duration
	MinDuration      time.Duration
	MaxDuration      time.Duration
	LastExecuted     time.Time
	LastSuccess      time.Time
	LastFailure      time.Time
	CurrentlyRunning int64
}

NodeStats holds statistics for individual nodes

type NodeType added in v0.0.2

type NodeType int
const (
	Function NodeType = iota
	Page
)

func (NodeType) IsValid added in v0.0.2

func (c NodeType) IsValid() bool

func (NodeType) String added in v0.0.3

func (c NodeType) String() string

type Operation

type Operation struct {
	ID              string `json:"id"`
	Key             string `json:"key"`
	Payload         Payload
	RequiredFields  []string `json:"required_fields"`
	OptionalFields  []string `json:"optional_fields"`
	GeneratedFields []string `json:"generated_fields"`
	Type            NodeType `json:"type"`
	Tags            []string `json:"tags"`
}

func (*Operation) Close

func (e *Operation) Close() error

func (*Operation) Consume

func (e *Operation) Consume(_ context.Context) error

func (*Operation) Debug added in v0.0.10

func (e *Operation) Debug(ctx context.Context, task *mq.Task)

func (*Operation) GetKey

func (e *Operation) GetKey() string

func (*Operation) GetMappedData added in v0.0.10

func (e *Operation) GetMappedData(ctx context.Context, task *mq.Task) map[string]any

func (*Operation) GetTags added in v0.0.13

func (e *Operation) GetTags() []string

func (*Operation) GetType

func (e *Operation) GetType() string

func (*Operation) Pause

func (e *Operation) Pause(_ context.Context) error

func (*Operation) ProcessTask

func (e *Operation) ProcessTask(_ context.Context, task *mq.Task) mq.Result

func (*Operation) Resume

func (e *Operation) Resume(_ context.Context) error

func (*Operation) SetConfig

func (e *Operation) SetConfig(payload Payload)

func (*Operation) SetKey

func (e *Operation) SetKey(key string)

func (*Operation) SetTags added in v0.0.13

func (e *Operation) SetTags(tag ...string)

func (*Operation) Stop

func (e *Operation) Stop(_ context.Context) error

func (*Operation) ValidateFields

func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[string]any, error)

type Operations

type Operations struct {
	Handlers map[string]func(string) mq.Processor
	// contains filtered or unexported fields
}

type Payload

type Payload struct {
	Data            map[string]any    `json:"data"`
	Mapping         map[string]string `json:"mapping"`
	GeneratedFields []string          `json:"generated_fields"`
	Providers       []Provider        `json:"providers"`
}

type PerformanceOptimizer added in v0.0.16

type PerformanceOptimizer struct {
	// contains filtered or unexported fields
}

PerformanceOptimizer optimizes DAG performance based on metrics

func NewPerformanceOptimizer added in v0.0.16

func NewPerformanceOptimizer(dag *DAG, monitor *Monitor, config *ConfigManager, logger logger.Logger) *PerformanceOptimizer

NewPerformanceOptimizer creates a new performance optimizer

func (*PerformanceOptimizer) OptimizePerformance added in v0.0.16

func (po *PerformanceOptimizer) OptimizePerformance() error

OptimizePerformance analyzes metrics and adjusts configuration

type Processor

type Processor interface {
	mq.Processor
	SetConfig(Payload)
	SetTags(tag ...string)
	GetTags() []string
}

type Provider

type Provider struct {
	Mapping       map[string]any `json:"mapping"`
	UpdateMapping map[string]any `json:"update_mapping"`
	InsertMapping map[string]any `json:"insert_mapping"`
	Defaults      map[string]any `json:"defaults"`
	ProviderType  string         `json:"provider_type"`
	Database      string         `json:"database"`
	Source        string         `json:"source"`
	Query         string         `json:"query"`
}

type RateLimitConfig added in v0.0.16

type RateLimitConfig struct {
	Enabled     bool                     `json:"enabled"`
	GlobalLimit float64                  `json:"global_limit"`
	GlobalBurst int                      `json:"global_burst"`
	NodeLimits  map[string]NodeRateLimit `json:"node_limits"`
}

RateLimitConfig holds rate limiting configuration

type RateLimiter added in v0.0.16

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter provides rate limiting for DAG operations

func NewRateLimiter added in v0.0.16

func NewRateLimiter(logger logger.Logger) *RateLimiter

NewRateLimiter creates a new rate limiter

func (*RateLimiter) Allow added in v0.0.16

func (rl *RateLimiter) Allow(nodeID string) bool

Allow checks if the request is allowed for the given node

func (*RateLimiter) SetNodeLimit added in v0.0.16

func (rl *RateLimiter) SetNodeLimit(nodeID string, requestsPerSecond float64, burst int)

SetNodeLimit sets rate limit for a specific node

func (*RateLimiter) Wait added in v0.0.16

func (rl *RateLimiter) Wait(ctx context.Context, nodeID string) error

Wait waits until the request can be processed for the given node

type RetryConfig added in v0.0.16

type RetryConfig struct {
	MaxRetries     int
	InitialDelay   time.Duration
	MaxDelay       time.Duration
	BackoffFactor  float64
	Jitter         bool
	RetryCondition func(err error) bool
}

RetryConfig defines retry behavior for failed nodes

func DefaultRetryConfig added in v0.0.16

func DefaultRetryConfig() *RetryConfig

DefaultRetryConfig returns a sensible default retry configuration

type RetryableProcessor added in v0.0.16

type RetryableProcessor struct {
	// contains filtered or unexported fields
}

RetryableProcessor wraps a processor with retry logic

func NewRetryableProcessor added in v0.0.16

func NewRetryableProcessor(processor mq.Processor, config *RetryConfig, logger logger.Logger) *RetryableProcessor

NewRetryableProcessor creates a processor with retry capabilities

func (*RetryableProcessor) Close added in v0.0.16

func (rp *RetryableProcessor) Close() error

Close closes the processor

func (*RetryableProcessor) Consume added in v0.0.16

func (rp *RetryableProcessor) Consume(ctx context.Context) error

Consume starts consuming messages

func (*RetryableProcessor) GetKey added in v0.0.16

func (rp *RetryableProcessor) GetKey() string

GetKey returns the processor key

func (*RetryableProcessor) GetType added in v0.0.16

func (rp *RetryableProcessor) GetType() string

GetType returns the processor type

func (*RetryableProcessor) Pause added in v0.0.16

func (rp *RetryableProcessor) Pause(ctx context.Context) error

Pause pauses the processor

func (*RetryableProcessor) ProcessTask added in v0.0.16

func (rp *RetryableProcessor) ProcessTask(ctx context.Context, task *mq.Task) mq.Result

ProcessTask processes a task with retry logic

func (*RetryableProcessor) Resume added in v0.0.16

func (rp *RetryableProcessor) Resume(ctx context.Context) error

Resume resumes the processor

func (*RetryableProcessor) SetKey added in v0.0.16

func (rp *RetryableProcessor) SetKey(key string)

SetKey sets the processor key

func (*RetryableProcessor) Stop added in v0.0.16

func (rp *RetryableProcessor) Stop(ctx context.Context) error

Stop stops the processor

type RollbackHandler added in v0.0.16

type RollbackHandler interface {
	Rollback(operation TransactionOperation) error
}

RollbackHandler defines how to rollback operations

type SavePoint added in v0.0.17

type SavePoint struct {
	ID        string                 `json:"id"`
	Name      string                 `json:"name"`
	Timestamp time.Time              `json:"timestamp"`
	State     map[string]interface{} `json:"state"`
}

SavePoint represents a save point in a transaction

type SimpleHTTPClient added in v0.0.16

type SimpleHTTPClient struct {
	// contains filtered or unexported fields
}

SimpleHTTPClient implements HTTPClient interface for webhook manager

func NewSimpleHTTPClient added in v0.0.16

func NewSimpleHTTPClient(timeout time.Duration) *SimpleHTTPClient

NewSimpleHTTPClient creates a new simple HTTP client

func (*SimpleHTTPClient) Post added in v0.0.16

func (c *SimpleHTTPClient) Post(url string, contentType string, body []byte, headers map[string]string) error

Post sends a POST request to the specified URL

type TaskError added in v0.0.10

type TaskError struct {
	Err         error
	Recoverable bool
}

TaskError is used by node processors to indicate whether an error is recoverable.

func (TaskError) Error added in v0.0.10

func (te TaskError) Error() string

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

func NewTaskManager

func NewTaskManager(dag *DAG, taskID string, resultCh chan mq.Result, iteratorNodes storage.IMap[string, []Edge]) *TaskManager

func (*TaskManager) Pause added in v0.0.10

func (tm *TaskManager) Pause()

func (*TaskManager) ProcessTask added in v0.0.2

func (tm *TaskManager) ProcessTask(ctx context.Context, startNode string, payload json.RawMessage)

func (*TaskManager) Resume added in v0.0.10

func (tm *TaskManager) Resume()

func (*TaskManager) Stop added in v0.0.2

func (tm *TaskManager) Stop()

Stop gracefully stops the task manager

type TaskManagerConfig added in v0.0.10

type TaskManagerConfig struct {
	MaxRetries      int
	BaseBackoff     time.Duration
	RecoveryHandler func(ctx context.Context, result mq.Result) error
}

type TaskMetrics added in v0.0.11

type TaskMetrics struct {
	NotStarted int
	Queued     int
	Cancelled  int
	Completed  int
	Failed     int
	// contains filtered or unexported fields
}

New task metrics type.

type TaskState added in v0.0.2

type TaskState struct {
	UpdatedAt time.Time

	NodeID string
	Status mq.Status
	Result mq.Result
	// contains filtered or unexported fields
}

TaskState holds state and intermediate results for a given task (identified by a node ID).

type Transaction added in v0.0.16

type Transaction struct {
	ID         string                 `json:"id"`
	TaskID     string                 `json:"task_id"`
	Status     TransactionStatus      `json:"status"`
	StartTime  time.Time              `json:"start_time"`
	EndTime    time.Time              `json:"end_time,omitempty"`
	Operations []TransactionOperation `json:"operations"`
	SavePoints []SavePoint            `json:"save_points"`
	Metadata   map[string]interface{} `json:"metadata,omitempty"`
}

Transaction represents a transactional DAG execution

type TransactionManager added in v0.0.16

type TransactionManager struct {
	// contains filtered or unexported fields
}

TransactionManager handles transaction-like operations for DAG execution

func NewTransactionManager added in v0.0.16

func NewTransactionManager(dag *DAG, logger logger.Logger) *TransactionManager

NewTransactionManager creates a new transaction manager

func (*TransactionManager) AddOperation added in v0.0.17

func (tm *TransactionManager) AddOperation(txID string, operation TransactionOperation) error

AddOperation adds an operation to a transaction

func (*TransactionManager) AddSavePoint added in v0.0.16

func (tm *TransactionManager) AddSavePoint(txID, name string, state map[string]interface{}) error

AddSavePoint adds a save point to the transaction

func (*TransactionManager) BeginTransaction added in v0.0.16

func (tm *TransactionManager) BeginTransaction(taskID string) *Transaction

BeginTransaction starts a new transaction

func (*TransactionManager) CommitTransaction added in v0.0.16

func (tm *TransactionManager) CommitTransaction(txID string) error

CommitTransaction commits a transaction

func (*TransactionManager) GetTransaction added in v0.0.17

func (tm *TransactionManager) GetTransaction(txID string) (*Transaction, error)

GetTransaction retrieves a transaction by ID

func (*TransactionManager) RollbackTransaction added in v0.0.16

func (tm *TransactionManager) RollbackTransaction(txID string) error

RollbackTransaction rolls back a transaction

type TransactionOperation added in v0.0.17

type TransactionOperation struct {
	ID              string                 `json:"id"`
	Type            string                 `json:"type"`
	NodeID          string                 `json:"node_id"`
	Data            map[string]interface{} `json:"data"`
	Timestamp       time.Time              `json:"timestamp"`
	RollbackHandler RollbackHandler        `json:"-"`
}

TransactionOperation represents an operation within a transaction

type TransactionStatus added in v0.0.16

type TransactionStatus string

TransactionStatus represents the status of a transaction

const (
	TransactionStatusStarted    TransactionStatus = "started"
	TransactionStatusCommitted  TransactionStatus = "committed"
	TransactionStatusRolledBack TransactionStatus = "rolled_back"
	TransactionStatusFailed     TransactionStatus = "failed"
)

type WebSocketHandler added in v0.0.16

type WebSocketHandler struct {
	// contains filtered or unexported fields
}

WebSocketHandler provides real-time monitoring via WebSocket

func NewWebSocketHandler added in v0.0.16

func NewWebSocketHandler(dag *DAG) *WebSocketHandler

NewWebSocketHandler creates a new WebSocket handler

func (*WebSocketHandler) HandleWebSocket added in v0.0.16

func (h *WebSocketHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request)

HandleWebSocket handles WebSocket connections for real-time monitoring

type WebhookConfig added in v0.0.16

type WebhookConfig struct {
	URL        string            `json:"url"`
	Headers    map[string]string `json:"headers"`
	Method     string            `json:"method"`
	RetryCount int               `json:"retry_count"`
	Timeout    time.Duration     `json:"timeout"`
	Events     []string          `json:"events"`
}

WebhookConfig defines webhook configuration

type WebhookEvent added in v0.0.16

type WebhookEvent struct {
	Type      string                 `json:"type"`
	TaskID    string                 `json:"task_id"`
	NodeID    string                 `json:"node_id,omitempty"`
	Timestamp time.Time              `json:"timestamp"`
	Data      map[string]interface{} `json:"data"`
}

WebhookEvent represents an event to send via webhook

type WebhookManager added in v0.0.16

type WebhookManager struct {
	// contains filtered or unexported fields
}

WebhookManager handles webhook notifications

func NewWebhookManager added in v0.0.16

func NewWebhookManager(httpClient HTTPClient, logger logger.Logger) *WebhookManager

NewWebhookManager creates a new webhook manager

func (*WebhookManager) AddWebhook added in v0.0.16

func (wm *WebhookManager) AddWebhook(event string, config WebhookConfig)

AddWebhook adds a webhook configuration

func (*WebhookManager) TriggerWebhook added in v0.0.16

func (wm *WebhookManager) TriggerWebhook(event WebhookEvent)

TriggerWebhook sends webhook notifications for an event

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL