Documentation
¶
Overview ¶
context_keys.go
Index ¶
- Constants
- func AddDAG(key string, handler *DAG)
- func AddHandler(key string, handler func(string) mq.Processor)
- func AvailableDAG() []string
- func ClearDAG()
- func GetHandler(key string) func(string) mq.Processor
- func GetVal(c context.Context, v string, data map[string]any) (key string, val any)
- func Header(c context.Context, headerKey string) (val map[string]any, exists bool)
- func HeaderVal(c context.Context, headerKey string, key string) (val any)
- func WsEvents(s *sio.Server)
- type ActivityAlertHandler
- type ActivityEntry
- type ActivityFilter
- type ActivityHook
- type ActivityLevel
- type ActivityLogger
- func (al *ActivityLogger) AddHook(hook ActivityHook)
- func (al *ActivityLogger) Flush() error
- func (al *ActivityLogger) GetActivities(filter ActivityFilter) ([]ActivityEntry, error)
- func (al *ActivityLogger) GetStats(filter ActivityFilter) (ActivityStats, error)
- func (al *ActivityLogger) Log(level ActivityLevel, activityType ActivityType, message string, ...)
- func (al *ActivityLogger) LogNodeExecution(ctx context.Context, taskID string, nodeID string, result mq.Result, ...)
- func (al *ActivityLogger) LogTaskComplete(ctx context.Context, taskID string, nodeID string, duration time.Duration)
- func (al *ActivityLogger) LogTaskFail(ctx context.Context, taskID string, nodeID string, err error, ...)
- func (al *ActivityLogger) LogTaskStart(ctx context.Context, taskID string, nodeID string)
- func (al *ActivityLogger) LogWithContext(ctx context.Context, level ActivityLevel, activityType ActivityType, ...)
- func (al *ActivityLogger) RemoveHook(hook ActivityHook)
- func (al *ActivityLogger) Stop()
- type ActivityLoggerConfig
- type ActivityPersistence
- type ActivityStats
- type ActivityTimeRange
- type ActivityType
- type Alert
- type AlertHandler
- type AlertSeverity
- type AlertThresholds
- type AlertType
- type AlertWebhookHandler
- type BatchProcessor
- type CacheConfig
- type CacheEntry
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerState
- type CleanupManager
- type Condition
- type ConditionProcessor
- type ConfigManager
- type ConfigWatcher
- type DAG
- func (tm *DAG) AddActivityHook(hook ActivityHook)
- func (tm *DAG) AddAlertHandler(handler AlertHandler)
- func (tm *DAG) AddCondition(fromNode string, conditions map[string]string) *DAG
- func (tm *DAG) AddConfigWatcher(watcher ConfigWatcher)
- func (tm *DAG) AddDAGNode(nodeType NodeType, name string, key string, dag *DAG, firstNode ...bool) *DAG
- func (tm *DAG) AddDeferredNode(nodeType NodeType, name, key string, firstNode ...bool) error
- func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string) *DAG
- func (tm *DAG) AddNode(nodeType NodeType, name, nodeID string, handler mq.Processor, ...) *DAG
- func (tm *DAG) AddNodeWithRetry(nodeType NodeType, name, nodeID string, handler mq.Processor, ...) *DAG
- func (tm *DAG) AddWebhook(event string, config WebhookConfig)
- func (tm *DAG) AssignTopic(topic string)
- func (tm *DAG) BaseURI() string
- func (tm *DAG) BeginTransaction(taskID string) *Transaction
- func (tm *DAG) CancelTask(taskID string) error
- func (tm *DAG) CheckRateLimit(nodeID string) bool
- func (tm *DAG) ClassifyEdges(startNodes ...string) (string, bool, error)
- func (tm *DAG) Clone() *DAG
- func (tm *DAG) Close() error
- func (tm *DAG) CommitTransaction(txID string) error
- func (tm *DAG) Consume(ctx context.Context) error
- func (tm *DAG) Export() map[string]interface{}
- func (tm *DAG) ExportDOT(direction ...Direction) string
- func (tm *DAG) FlushActivityLogs() error
- func (tm *DAG) GetActivities(filter ActivityFilter) ([]ActivityEntry, error)
- func (tm *DAG) GetActivityLogger() *ActivityLogger
- func (tm *DAG) GetActivityStats(filter ActivityFilter) (ActivityStats, error)
- func (tm *DAG) GetAllNodes() map[string]*Node
- func (tm *DAG) GetCircuitBreakerStatus(nodeID string) CircuitBreakerState
- func (tm *DAG) GetConfiguration() *DAGConfig
- func (tm *DAG) GetCriticalPath() ([]string, error)
- func (tm *DAG) GetDAGStatistics() map[string]interface{}
- func (tm *DAG) GetEdgeCount() int
- func (tm *DAG) GetKey() string
- func (tm *DAG) GetLastNodes() ([]*Node, error)
- func (tm *DAG) GetMonitoringMetrics() *MonitoringMetrics
- func (tm *DAG) GetNextNodes(nodeID string) ([]*Node, error)
- func (tm *DAG) GetNodeByID(nodeID string) (*Node, error)
- func (tm *DAG) GetNodeCount() int
- func (tm *DAG) GetNodeStats(nodeID string) *NodeStats
- func (tm *DAG) GetPreviousNodes(nodeID string) ([]*Node, error)
- func (tm *DAG) GetReport() string
- func (tm *DAG) GetStartNode() string
- func (tm *DAG) GetStatus() map[string]interface{}
- func (d *DAG) GetTaskMetrics() TaskMetrics
- func (tm *DAG) GetTopologicalOrder() ([]string, error)
- func (tm *DAG) GetTransaction(txID string) (*Transaction, error)
- func (tm *DAG) GetType() string
- func (tm *DAG) Handlers(app any, prefix string)
- func (tm *DAG) InitializeActivityLogger(config ActivityLoggerConfig, persistence ActivityPersistence)
- func (tm *DAG) IsLastNode(nodeID string) (bool, error)
- func (tm *DAG) IsReady() bool
- func (tm *DAG) LogActivity(ctx context.Context, level ActivityLevel, activityType ActivityType, ...)
- func (tm *DAG) Logger() logger.Logger
- func (tm *DAG) OptimizePerformance() error
- func (tm *DAG) Pause(_ context.Context) error
- func (tm *DAG) PauseConsumer(ctx context.Context, id string)
- func (tm *DAG) PrintGraph()
- func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result
- func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
- func (tm *DAG) ProcessTaskNew(ctx context.Context, task *mq.Task) mq.Result
- func (tm *DAG) RemoveNode(nodeID string) error
- func (tm *DAG) ReportNodeResult(callback func(mq.Result))
- func (tm *DAG) Reset()
- func (tm *DAG) Resume(_ context.Context) error
- func (tm *DAG) ResumeConsumer(ctx context.Context, id string)
- func (tm *DAG) RollbackTransaction(txID string) error
- func (tm *DAG) SaveDOTFile(filename string, direction ...Direction) error
- func (tm *DAG) SavePNG(pngFile string) error
- func (tm *DAG) SaveSVG(svgFile string) error
- func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.SchedulerOption) mq.Result
- func (tm *DAG) SetAlertThresholds(thresholds *AlertThresholds)
- func (tm *DAG) SetBatchProcessingEnabled(enabled bool)
- func (tm *DAG) SetKey(key string)
- func (tm *DAG) SetNotifyResponse(callback mq.Callback)
- func (tm *DAG) SetPostProcessHook(hook func(ctx context.Context, node *Node, taskID string, result mq.Result))
- func (tm *DAG) SetPreProcessHook(...)
- func (tm *DAG) SetRateLimit(nodeID string, requestsPerSecond float64, burst int)
- func (tm *DAG) SetRetryConfig(config *RetryConfig)
- func (tm *DAG) SetStartNode(node string)
- func (tm *DAG) SetWebhookManager(manager *WebhookManager)
- func (tm *DAG) SetupWS() *sio.Server
- func (tm *DAG) Start(ctx context.Context, addr string) error
- func (tm *DAG) StartCleanup(ctx context.Context)
- func (tm *DAG) StartMonitoring(ctx context.Context)
- func (tm *DAG) Stop(ctx context.Context) error
- func (tm *DAG) StopCleanup()
- func (tm *DAG) StopEnhanced(ctx context.Context) error
- func (tm *DAG) StopMonitoring()
- func (tm *DAG) TopologicalSort() (stack []string)
- func (tm *DAG) UpdateConfiguration(config *DAGConfig) error
- func (tm *DAG) Validate() error
- func (tm *DAG) ValidateDAG() error
- type DAGCache
- type DAGConfig
- type DAGValidator
- type Debugger
- type Direction
- type Edge
- type EdgeLevel
- type EdgeStyleConfig
- type EdgeType
- type EnhancedAPIHandler
- type HTTPClient
- type Handler
- type List
- type MemoryActivityPersistence
- func (mp *MemoryActivityPersistence) Close() error
- func (mp *MemoryActivityPersistence) GetStats(filter ActivityFilter) (ActivityStats, error)
- func (mp *MemoryActivityPersistence) Query(filter ActivityFilter) ([]ActivityEntry, error)
- func (mp *MemoryActivityPersistence) Store(entries []ActivityEntry) error
- type Monitor
- type MonitoringMetrics
- func (m *MonitoringMetrics) GetNodeStats(nodeID string) *NodeStats
- func (m *MonitoringMetrics) GetSnapshot() *MonitoringMetrics
- func (m *MonitoringMetrics) RecordNodeEnd(nodeID string)
- func (m *MonitoringMetrics) RecordNodeExecution(nodeID string, duration time.Duration, success bool)
- func (m *MonitoringMetrics) RecordNodeStart(nodeID string)
- func (m *MonitoringMetrics) RecordTaskCompletion(taskID string, status mq.Status)
- func (m *MonitoringMetrics) RecordTaskStart(taskID string)
- type Node
- type NodeFailureStats
- type NodeRateLimit
- type NodeRetryManager
- func (rm *NodeRetryManager) GetAttempts(taskID, nodeID string) int
- func (rm *NodeRetryManager) GetRetryDelay(taskID, nodeID string) time.Duration
- func (rm *NodeRetryManager) RecordAttempt(taskID, nodeID string)
- func (rm *NodeRetryManager) Reset(taskID, nodeID string)
- func (rm *NodeRetryManager) ResetTask(taskID string)
- func (rm *NodeRetryManager) SetGlobalConfig(config *RetryConfig)
- func (rm *NodeRetryManager) SetNodeConfig(nodeID string, config *RetryConfig)
- func (rm *NodeRetryManager) ShouldRetry(taskID, nodeID string, err error) bool
- type NodeStats
- type NodeType
- type Operation
- func (e *Operation) Close() error
- func (e *Operation) Consume(_ context.Context) error
- func (e *Operation) Debug(ctx context.Context, task *mq.Task)
- func (e *Operation) GetKey() string
- func (e *Operation) GetMappedData(ctx context.Context, task *mq.Task) map[string]any
- func (e *Operation) GetTags() []string
- func (e *Operation) GetType() string
- func (e *Operation) Pause(_ context.Context) error
- func (e *Operation) ProcessTask(_ context.Context, task *mq.Task) mq.Result
- func (e *Operation) Resume(_ context.Context) error
- func (e *Operation) SetConfig(payload Payload)
- func (e *Operation) SetKey(key string)
- func (e *Operation) SetTags(tag ...string)
- func (e *Operation) Stop(_ context.Context) error
- func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[string]any, error)
- type Operations
- type Payload
- type PerformanceOptimizer
- type Processor
- type Provider
- type RateLimitConfig
- type RateLimiter
- type RetryConfig
- type RetryableProcessor
- func (rp *RetryableProcessor) Close() error
- func (rp *RetryableProcessor) Consume(ctx context.Context) error
- func (rp *RetryableProcessor) GetKey() string
- func (rp *RetryableProcessor) GetType() string
- func (rp *RetryableProcessor) Pause(ctx context.Context) error
- func (rp *RetryableProcessor) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
- func (rp *RetryableProcessor) Resume(ctx context.Context) error
- func (rp *RetryableProcessor) SetKey(key string)
- func (rp *RetryableProcessor) Stop(ctx context.Context) error
- type RollbackHandler
- type SavePoint
- type SimpleHTTPClient
- type TaskError
- type TaskManager
- type TaskManagerConfig
- type TaskMetrics
- type TaskState
- type Transaction
- type TransactionManager
- func (tm *TransactionManager) AddOperation(txID string, operation TransactionOperation) error
- func (tm *TransactionManager) AddSavePoint(txID, name string, state map[string]interface{}) error
- func (tm *TransactionManager) BeginTransaction(taskID string) *Transaction
- func (tm *TransactionManager) CommitTransaction(txID string) error
- func (tm *TransactionManager) GetTransaction(txID string) (*Transaction, error)
- func (tm *TransactionManager) RollbackTransaction(txID string) error
- type TransactionOperation
- type TransactionStatus
- type WebSocketHandler
- type WebhookConfig
- type WebhookEvent
- type WebhookManager
Constants ¶
const ( Delimiter = "___" ContextIndex = "index" DefaultChannelSize = 1000 RetryInterval = 5 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func AvailableDAG ¶
func AvailableDAG() []string
Types ¶
type ActivityAlertHandler ¶ added in v0.0.17
type ActivityAlertHandler struct {
// contains filtered or unexported fields
}
ActivityAlertHandler handles alerts by logging them as activities
func (*ActivityAlertHandler) HandleAlert ¶ added in v0.0.17
func (h *ActivityAlertHandler) HandleAlert(alert Alert) error
type ActivityEntry ¶ added in v0.0.17
type ActivityEntry struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
DAGName string `json:"dag_name"`
Level ActivityLevel `json:"level"`
Type ActivityType `json:"type"`
Message string `json:"message"`
TaskID string `json:"task_id,omitempty"`
NodeID string `json:"node_id,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
Success *bool `json:"success,omitempty"`
Error string `json:"error,omitempty"`
Details map[string]interface{} `json:"details,omitempty"`
ContextData map[string]interface{} `json:"context_data,omitempty"`
UserID string `json:"user_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
}
ActivityEntry represents a single activity log entry
type ActivityFilter ¶ added in v0.0.17
type ActivityFilter struct {
StartTime *time.Time `json:"start_time,omitempty"`
EndTime *time.Time `json:"end_time,omitempty"`
Levels []ActivityLevel `json:"levels,omitempty"`
Types []ActivityType `json:"types,omitempty"`
TaskIDs []string `json:"task_ids,omitempty"`
NodeIDs []string `json:"node_ids,omitempty"`
UserIDs []string `json:"user_ids,omitempty"`
SuccessOnly *bool `json:"success_only,omitempty"`
FailuresOnly *bool `json:"failures_only,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
SortBy string `json:"sort_by,omitempty"` // timestamp, level, type
SortOrder string `json:"sort_order,omitempty"` // asc, desc
}
ActivityFilter provides filtering options for activity queries
type ActivityHook ¶ added in v0.0.17
type ActivityHook interface {
OnActivity(entry ActivityEntry) error
}
ActivityHook allows custom processing of activity entries
type ActivityLevel ¶ added in v0.0.17
type ActivityLevel string
ActivityLevel represents the severity level of an activity
const ( ActivityLevelDebug ActivityLevel = "debug" ActivityLevelInfo ActivityLevel = "info" ActivityLevelWarn ActivityLevel = "warn" ActivityLevelError ActivityLevel = "error" ActivityLevelFatal ActivityLevel = "fatal" )
type ActivityLogger ¶ added in v0.0.17
type ActivityLogger struct {
// contains filtered or unexported fields
}
ActivityLogger provides comprehensive activity logging for DAG operations
func NewActivityLogger ¶ added in v0.0.17
func NewActivityLogger(dagName string, config ActivityLoggerConfig, persistence ActivityPersistence, logger logger.Logger) *ActivityLogger
NewActivityLogger creates a new activity logger
func (*ActivityLogger) AddHook ¶ added in v0.0.17
func (al *ActivityLogger) AddHook(hook ActivityHook)
AddHook adds an activity hook
func (*ActivityLogger) Flush ¶ added in v0.0.17
func (al *ActivityLogger) Flush() error
Flush flushes the buffer to persistence
func (*ActivityLogger) GetActivities ¶ added in v0.0.17
func (al *ActivityLogger) GetActivities(filter ActivityFilter) ([]ActivityEntry, error)
GetActivities retrieves activities based on filter
func (*ActivityLogger) GetStats ¶ added in v0.0.17
func (al *ActivityLogger) GetStats(filter ActivityFilter) (ActivityStats, error)
GetStats returns activity statistics
func (*ActivityLogger) Log ¶ added in v0.0.17
func (al *ActivityLogger) Log(level ActivityLevel, activityType ActivityType, message string, details map[string]interface{})
Log logs an activity entry
func (*ActivityLogger) LogNodeExecution ¶ added in v0.0.17
func (al *ActivityLogger) LogNodeExecution(ctx context.Context, taskID string, nodeID string, result mq.Result, duration time.Duration)
LogNodeExecution logs node execution details
func (*ActivityLogger) LogTaskComplete ¶ added in v0.0.17
func (al *ActivityLogger) LogTaskComplete(ctx context.Context, taskID string, nodeID string, duration time.Duration)
LogTaskComplete logs task completion activity
func (*ActivityLogger) LogTaskFail ¶ added in v0.0.17
func (al *ActivityLogger) LogTaskFail(ctx context.Context, taskID string, nodeID string, err error, duration time.Duration)
LogTaskFail logs task failure activity
func (*ActivityLogger) LogTaskStart ¶ added in v0.0.17
func (al *ActivityLogger) LogTaskStart(ctx context.Context, taskID string, nodeID string)
LogTaskStart logs task start activity
func (*ActivityLogger) LogWithContext ¶ added in v0.0.17
func (al *ActivityLogger) LogWithContext(ctx context.Context, level ActivityLevel, activityType ActivityType, message string, details map[string]interface{})
LogWithContext logs an activity entry with context information
func (*ActivityLogger) RemoveHook ¶ added in v0.0.17
func (al *ActivityLogger) RemoveHook(hook ActivityHook)
RemoveHook removes an activity hook
func (*ActivityLogger) Stop ¶ added in v0.0.17
func (al *ActivityLogger) Stop()
Stop stops the activity logger
type ActivityLoggerConfig ¶ added in v0.0.17
type ActivityLoggerConfig struct {
BufferSize int `json:"buffer_size"`
FlushInterval time.Duration `json:"flush_interval"`
MaxRetries int `json:"max_retries"`
EnableHooks bool `json:"enable_hooks"`
EnableCompression bool `json:"enable_compression"`
MaxEntryAge time.Duration `json:"max_entry_age"`
AsyncMode bool `json:"async_mode"`
}
ActivityLoggerConfig configures the activity logger
func DefaultActivityLoggerConfig ¶ added in v0.0.17
func DefaultActivityLoggerConfig() ActivityLoggerConfig
DefaultActivityLoggerConfig returns default configuration
type ActivityPersistence ¶ added in v0.0.17
type ActivityPersistence interface {
Store(entries []ActivityEntry) error
Query(filter ActivityFilter) ([]ActivityEntry, error)
GetStats(filter ActivityFilter) (ActivityStats, error)
Close() error
}
ActivityPersistence defines the interface for persisting activities
type ActivityStats ¶ added in v0.0.17
type ActivityStats struct {
TotalActivities int64 `json:"total_activities"`
ActivitiesByLevel map[ActivityLevel]int64 `json:"activities_by_level"`
ActivitiesByType map[ActivityType]int64 `json:"activities_by_type"`
ActivitiesByNode map[string]int64 `json:"activities_by_node"`
ActivitiesByTask map[string]int64 `json:"activities_by_task"`
SuccessRate float64 `json:"success_rate"`
FailureRate float64 `json:"failure_rate"`
AverageDuration time.Duration `json:"average_duration"`
PeakActivitiesPerMin int64 `json:"peak_activities_per_minute"`
TimeRange ActivityTimeRange `json:"time_range"`
RecentErrors []ActivityEntry `json:"recent_errors"`
TopFailingNodes []NodeFailureStats `json:"top_failing_nodes"`
HourlyDistribution map[string]int64 `json:"hourly_distribution"`
}
ActivityStats provides statistics about activities
type ActivityTimeRange ¶ added in v0.0.17
ActivityTimeRange represents a time range for activities
type ActivityType ¶ added in v0.0.17
type ActivityType string
ActivityType represents the type of activity
const ( ActivityTypeTaskStart ActivityType = "task_start" ActivityTypeTaskComplete ActivityType = "task_complete" ActivityTypeTaskFail ActivityType = "task_fail" ActivityTypeTaskCancel ActivityType = "task_cancel" ActivityTypeNodeStart ActivityType = "node_start" ActivityTypeNodeComplete ActivityType = "node_complete" ActivityTypeNodeFail ActivityType = "node_fail" ActivityTypeNodeTimeout ActivityType = "node_timeout" ActivityTypeValidation ActivityType = "validation" ActivityTypeConfiguration ActivityType = "configuration" ActivityTypeAlert ActivityType = "alert" ActivityTypeCleanup ActivityType = "cleanup" ActivityTypeTransaction ActivityType = "transaction" ActivityTypeRetry ActivityType = "retry" ActivityTypeCircuitBreaker ActivityType = "circuit_breaker" ActivityTypeWebhook ActivityType = "webhook" ActivityTypeCustom ActivityType = "custom" )
type Alert ¶ added in v0.0.16
type Alert struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
Severity AlertSeverity `json:"severity"`
Type AlertType `json:"type"`
Message string `json:"message"`
Details map[string]interface{} `json:"details"`
NodeID string `json:"node_id,omitempty"`
TaskID string `json:"task_id,omitempty"`
Threshold interface{} `json:"threshold,omitempty"`
ActualValue interface{} `json:"actual_value,omitempty"`
}
Alert represents a monitoring alert
type AlertHandler ¶ added in v0.0.16
AlertHandler defines interface for handling alerts
type AlertSeverity ¶ added in v0.0.17
type AlertSeverity string
const ( AlertSeverityInfo AlertSeverity = "info" AlertSeverityWarning AlertSeverity = "warning" AlertSeverityCritical AlertSeverity = "critical" )
type AlertThresholds ¶ added in v0.0.16
type AlertThresholds struct {
MaxFailureRate float64 `json:"max_failure_rate"`
MaxExecutionTime time.Duration `json:"max_execution_time"`
MaxTasksInProgress int64 `json:"max_tasks_in_progress"`
MinSuccessRate float64 `json:"min_success_rate"`
MaxNodeFailures int64 `json:"max_node_failures"`
HealthCheckInterval time.Duration `json:"health_check_interval"`
}
AlertThresholds defines thresholds for alerting
type AlertType ¶ added in v0.0.17
type AlertType string
const ( AlertTypeFailureRate AlertType = "failure_rate" AlertTypeExecutionTime AlertType = "execution_time" AlertTypeTaskLoad AlertType = "task_load" AlertTypeNodeFailures AlertType = "node_failures" AlertTypeCircuitBreaker AlertType = "circuit_breaker" AlertTypeHealthCheck AlertType = "health_check" )
type AlertWebhookHandler ¶ added in v0.0.16
type AlertWebhookHandler struct {
// contains filtered or unexported fields
}
AlertWebhookHandler handles webhook alerts
func NewAlertWebhookHandler ¶ added in v0.0.16
func NewAlertWebhookHandler(logger logger.Logger) *AlertWebhookHandler
NewAlertWebhookHandler creates a new alert webhook handler
func (*AlertWebhookHandler) HandleAlert ¶ added in v0.0.16
func (h *AlertWebhookHandler) HandleAlert(alert Alert) error
HandleAlert implements the AlertHandler interface
type BatchProcessor ¶ added in v0.0.16
type BatchProcessor struct {
// contains filtered or unexported fields
}
BatchProcessor handles batch processing of tasks
func NewBatchProcessor ¶ added in v0.0.16
func NewBatchProcessor(dag *DAG, batchSize int, batchTimeout time.Duration, logger logger.Logger) *BatchProcessor
NewBatchProcessor creates a new batch processor
func (*BatchProcessor) AddTask ¶ added in v0.0.16
func (bp *BatchProcessor) AddTask(task *mq.Task) error
AddTask adds a task to the batch
func (*BatchProcessor) SetProcessFunc ¶ added in v0.0.16
func (bp *BatchProcessor) SetProcessFunc(fn func([]*mq.Task) error)
SetProcessFunc sets the function to process batches
func (*BatchProcessor) Stop ¶ added in v0.0.16
func (bp *BatchProcessor) Stop()
Stop stops the batch processor
type CacheConfig ¶ added in v0.0.16
type CacheConfig struct {
Enabled bool `json:"enabled"`
TTL time.Duration `json:"ttl"`
MaxSize int `json:"max_size"`
}
CacheConfig holds cache configuration
type CacheEntry ¶ added in v0.0.16
type CacheEntry struct {
Value interface{}
ExpiresAt time.Time
AccessCount int64
LastAccess time.Time
}
CacheEntry represents a cached item
type CircuitBreaker ¶ added in v0.0.16
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements circuit breaker pattern for nodes
func NewCircuitBreaker ¶ added in v0.0.16
func NewCircuitBreaker(config *CircuitBreakerConfig, logger logger.Logger) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker
func (*CircuitBreaker) Execute ¶ added in v0.0.16
func (cb *CircuitBreaker) Execute(fn func() error) error
Execute executes a function with circuit breaker protection
func (*CircuitBreaker) GetState ¶ added in v0.0.16
func (cb *CircuitBreaker) GetState() CircuitBreakerState
GetState returns the current circuit breaker state
func (*CircuitBreaker) Reset ¶ added in v0.0.16
func (cb *CircuitBreaker) Reset()
Reset manually resets the circuit breaker
type CircuitBreakerConfig ¶ added in v0.0.16
type CircuitBreakerConfig struct {
FailureThreshold int
ResetTimeout time.Duration
HalfOpenMaxCalls int
}
CircuitBreakerConfig defines circuit breaker behavior
type CircuitBreakerState ¶ added in v0.0.16
type CircuitBreakerState int
Circuit Breaker Implementation
const ( CircuitClosed CircuitBreakerState = iota CircuitOpen CircuitHalfOpen )
type CleanupManager ¶ added in v0.0.16
type CleanupManager struct {
// contains filtered or unexported fields
}
CleanupManager handles cleanup of completed tasks and resources
func NewCleanupManager ¶ added in v0.0.16
func NewCleanupManager(dag *DAG, cleanupInterval, retentionPeriod time.Duration, maxEntries int, logger logger.Logger) *CleanupManager
NewCleanupManager creates a new cleanup manager
func (*CleanupManager) Start ¶ added in v0.0.16
func (cm *CleanupManager) Start(ctx context.Context)
Start begins the cleanup routine
func (*CleanupManager) Stop ¶ added in v0.0.16
func (cm *CleanupManager) Stop()
Stop stops the cleanup routine
type ConditionProcessor ¶
type ConfigManager ¶ added in v0.0.16
type ConfigManager struct {
// contains filtered or unexported fields
}
ConfigManager handles dynamic DAG configuration
func NewConfigManager ¶ added in v0.0.16
func NewConfigManager(logger logger.Logger) *ConfigManager
NewConfigManager creates a new configuration manager
func (*ConfigManager) AddWatcher ¶ added in v0.0.16
func (cm *ConfigManager) AddWatcher(watcher ConfigWatcher)
AddWatcher adds a configuration watcher
func (*ConfigManager) GetConfig ¶ added in v0.0.16
func (cm *ConfigManager) GetConfig() *DAGConfig
GetConfig returns a copy of the current configuration
func (*ConfigManager) UpdateConfig ¶ added in v0.0.16
func (cm *ConfigManager) UpdateConfig(newConfig *DAGConfig) error
UpdateConfig updates the configuration
func (*ConfigManager) UpdateConfiguration ¶ added in v0.0.17
func (cm *ConfigManager) UpdateConfiguration(config *DAGConfig) error
UpdateConfiguration updates the DAG configuration (alias for UpdateConfig)
type ConfigWatcher ¶ added in v0.0.16
ConfigWatcher interface for configuration change notifications
type DAG ¶
type DAG struct {
Error error
Notifier *sio.Server
// New hook fields:
PreProcessHook func(ctx context.Context, node *Node, taskID string, payload json.RawMessage) context.Context
PostProcessHook func(ctx context.Context, node *Node, taskID string, result mq.Result)
// contains filtered or unexported fields
}
func (*DAG) AddActivityHook ¶ added in v0.0.17
func (tm *DAG) AddActivityHook(hook ActivityHook)
AddActivityHook adds an activity hook
func (*DAG) AddAlertHandler ¶ added in v0.0.16
func (tm *DAG) AddAlertHandler(handler AlertHandler)
AddAlertHandler adds an alert handler
func (*DAG) AddCondition ¶
func (*DAG) AddConfigWatcher ¶ added in v0.0.17
func (tm *DAG) AddConfigWatcher(watcher ConfigWatcher)
AddConfigWatcher adds a configuration change watcher
func (*DAG) AddDAGNode ¶
func (*DAG) AddDeferredNode ¶
func (*DAG) AddNodeWithRetry ¶ added in v0.0.16
func (tm *DAG) AddNodeWithRetry(nodeType NodeType, name, nodeID string, handler mq.Processor, retryConfig *RetryConfig, startNode ...bool) *DAG
AddNodeWithRetry adds a node with specific retry configuration
func (*DAG) AddWebhook ¶ added in v0.0.17
func (tm *DAG) AddWebhook(event string, config WebhookConfig)
AddWebhook adds a webhook configuration
func (*DAG) AssignTopic ¶
func (*DAG) BeginTransaction ¶ added in v0.0.16
func (tm *DAG) BeginTransaction(taskID string) *Transaction
BeginTransaction starts a new transaction
func (*DAG) CancelTask ¶ added in v0.0.11
New method to cancel a running task.
func (*DAG) CheckRateLimit ¶ added in v0.0.17
CheckRateLimit checks if request is allowed for a node
func (*DAG) ClassifyEdges ¶
func (*DAG) CommitTransaction ¶ added in v0.0.16
CommitTransaction commits a transaction
func (*DAG) FlushActivityLogs ¶ added in v0.0.17
FlushActivityLogs flushes activity logs to persistence
func (*DAG) GetActivities ¶ added in v0.0.17
func (tm *DAG) GetActivities(filter ActivityFilter) ([]ActivityEntry, error)
GetActivities retrieves activities based on filter
func (*DAG) GetActivityLogger ¶ added in v0.0.17
func (tm *DAG) GetActivityLogger() *ActivityLogger
GetActivityLogger returns the activity logger instance
func (*DAG) GetActivityStats ¶ added in v0.0.17
func (tm *DAG) GetActivityStats(filter ActivityFilter) (ActivityStats, error)
GetActivityStats returns activity statistics
func (*DAG) GetAllNodes ¶ added in v0.0.17
GetAllNodes returns all nodes in the DAG
func (*DAG) GetCircuitBreakerStatus ¶ added in v0.0.17
func (tm *DAG) GetCircuitBreakerStatus(nodeID string) CircuitBreakerState
GetCircuitBreakerStatus returns circuit breaker status for a node
func (*DAG) GetConfiguration ¶ added in v0.0.16
GetConfiguration returns current DAG configuration
func (*DAG) GetCriticalPath ¶ added in v0.0.16
GetCriticalPath returns the critical path of the DAG
func (*DAG) GetDAGStatistics ¶ added in v0.0.16
GetDAGStatistics returns comprehensive DAG statistics
func (*DAG) GetEdgeCount ¶ added in v0.0.17
GetEdgeCount returns the total number of edges
func (*DAG) GetLastNodes ¶ added in v0.0.2
func (*DAG) GetMonitoringMetrics ¶ added in v0.0.16
func (tm *DAG) GetMonitoringMetrics() *MonitoringMetrics
GetMonitoringMetrics returns current monitoring metrics
func (*DAG) GetNextNodes ¶ added in v0.0.2
GetNextNodes returns the next nodes for a given node
func (*DAG) GetNodeByID ¶ added in v0.0.17
GetNodeByID returns a node by its ID
func (*DAG) GetNodeCount ¶ added in v0.0.17
GetNodeCount returns the total number of nodes
func (*DAG) GetNodeStats ¶ added in v0.0.16
GetNodeStats returns statistics for a specific node
func (*DAG) GetPreviousNodes ¶ added in v0.0.2
GetPreviousNodes returns the previous nodes for a given node
func (*DAG) GetStartNode ¶
func (*DAG) GetStatus ¶ added in v0.0.11
GetStatus returns a summary of the DAG including node and task counts.
func (*DAG) GetTaskMetrics ¶ added in v0.0.11
func (d *DAG) GetTaskMetrics() TaskMetrics
Getter for task metrics.
func (*DAG) GetTopologicalOrder ¶ added in v0.0.16
GetTopologicalOrder returns nodes in topological order
func (*DAG) GetTransaction ¶ added in v0.0.17
func (tm *DAG) GetTransaction(txID string) (*Transaction, error)
GetTransaction retrieves transaction details
func (*DAG) InitializeActivityLogger ¶ added in v0.0.17
func (tm *DAG) InitializeActivityLogger(config ActivityLoggerConfig, persistence ActivityPersistence)
InitializeActivityLogger initializes the activity logger for the DAG
func (*DAG) IsLastNode ¶ added in v0.0.2
IsLastNode checks if a node is the last node in the DAG
func (*DAG) LogActivity ¶ added in v0.0.17
func (tm *DAG) LogActivity(ctx context.Context, level ActivityLevel, activityType ActivityType, message string, details map[string]interface{})
LogActivity logs an activity entry
func (*DAG) OptimizePerformance ¶ added in v0.0.16
OptimizePerformance triggers performance optimization
func (*DAG) PrintGraph ¶
func (tm *DAG) PrintGraph()
func (*DAG) ProcessTaskNew ¶ added in v0.0.10
func (*DAG) RemoveNode ¶ added in v0.0.16
RemoveNode removes the node with the given nodeID and adjusts the edges. For example, if A -> B and B -> C exist and B is removed, a new edge A -> C is created.
func (*DAG) ReportNodeResult ¶
func (*DAG) RollbackTransaction ¶ added in v0.0.16
RollbackTransaction rolls back a transaction
func (*DAG) ScheduleTask ¶
func (*DAG) SetAlertThresholds ¶ added in v0.0.16
func (tm *DAG) SetAlertThresholds(thresholds *AlertThresholds)
SetAlertThresholds configures alert thresholds
func (*DAG) SetBatchProcessingEnabled ¶ added in v0.0.17
SetBatchProcessingEnabled enables or disables batch processing
func (*DAG) SetNotifyResponse ¶
func (*DAG) SetPostProcessHook ¶ added in v0.0.11
func (tm *DAG) SetPostProcessHook(hook func(ctx context.Context, node *Node, taskID string, result mq.Result))
SetPostProcessHook configures a function to be called after each node is processed.
func (*DAG) SetPreProcessHook ¶ added in v0.0.11
func (tm *DAG) SetPreProcessHook(hook func(ctx context.Context, node *Node, taskID string, payload json.RawMessage) context.Context)
SetPreProcessHook configures a function to be called before each node is processed.
func (*DAG) SetRateLimit ¶ added in v0.0.16
SetRateLimit sets rate limit for a specific node
func (*DAG) SetRetryConfig ¶ added in v0.0.16
func (tm *DAG) SetRetryConfig(config *RetryConfig)
SetRetryConfig sets the retry configuration
func (*DAG) SetStartNode ¶
func (*DAG) SetWebhookManager ¶ added in v0.0.16
func (tm *DAG) SetWebhookManager(manager *WebhookManager)
SetWebhookManager sets the webhook manager
func (*DAG) StartCleanup ¶ added in v0.0.17
StartCleanup starts the cleanup manager
func (*DAG) StartMonitoring ¶ added in v0.0.16
StartMonitoring starts the monitoring system
func (*DAG) StopCleanup ¶ added in v0.0.17
func (tm *DAG) StopCleanup()
StopCleanup stops the cleanup manager
func (*DAG) StopEnhanced ¶ added in v0.0.17
Enhanced Stop method with proper cleanup
func (*DAG) StopMonitoring ¶ added in v0.0.16
func (tm *DAG) StopMonitoring()
StopMonitoring stops the monitoring system
func (*DAG) TopologicalSort ¶
func (*DAG) UpdateConfiguration ¶ added in v0.0.16
UpdateConfiguration updates the DAG configuration
func (*DAG) ValidateDAG ¶ added in v0.0.16
ValidateDAG validates the DAG structure using the enhanced validator
type DAGCache ¶ added in v0.0.16
type DAGCache struct {
// contains filtered or unexported fields
}
DAGCache provides caching capabilities for DAG operations
func NewDAGCache ¶ added in v0.0.16
NewDAGCache creates a new DAG cache
func (*DAGCache) GetNodeResult ¶ added in v0.0.16
GetNodeResult retrieves a cached node result
func (*DAGCache) SetNodeResult ¶ added in v0.0.16
SetNodeResult caches a node result
type DAGConfig ¶ added in v0.0.16
type DAGConfig struct {
MaxConcurrentTasks int `json:"max_concurrent_tasks"`
TaskTimeout time.Duration `json:"task_timeout"`
NodeTimeout time.Duration `json:"node_timeout"`
RetryConfig *RetryConfig `json:"retry_config"`
CacheConfig *CacheConfig `json:"cache_config"`
RateLimitConfig *RateLimitConfig `json:"rate_limit_config"`
MonitoringEnabled bool `json:"monitoring_enabled"`
AlertingEnabled bool `json:"alerting_enabled"`
CleanupInterval time.Duration `json:"cleanup_interval"`
TransactionTimeout time.Duration `json:"transaction_timeout"`
BatchProcessingEnabled bool `json:"batch_processing_enabled"`
BatchSize int `json:"batch_size"`
BatchTimeout time.Duration `json:"batch_timeout"`
}
DAGConfig holds dynamic configuration for DAG
func DefaultDAGConfig ¶ added in v0.0.16
func DefaultDAGConfig() *DAGConfig
DefaultDAGConfig returns default DAG configuration
type DAGValidator ¶ added in v0.0.16
type DAGValidator struct {
// contains filtered or unexported fields
}
DAGValidator provides validation capabilities for DAG structure
func NewDAGValidator ¶ added in v0.0.16
func NewDAGValidator(dag *DAG) *DAGValidator
NewDAGValidator creates a new DAG validator
func (*DAGValidator) GetCriticalPath ¶ added in v0.0.16
func (v *DAGValidator) GetCriticalPath() ([]string, error)
GetCriticalPath finds the longest path in the DAG
func (*DAGValidator) GetNodeStatistics ¶ added in v0.0.16
func (v *DAGValidator) GetNodeStatistics() map[string]interface{}
GetNodeStatistics returns DAG statistics
func (*DAGValidator) GetTopologicalOrder ¶ added in v0.0.16
func (v *DAGValidator) GetTopologicalOrder() ([]string, error)
GetTopologicalOrder returns nodes in topological order
func (*DAGValidator) ValidateStructure ¶ added in v0.0.16
func (v *DAGValidator) ValidateStructure() error
ValidateStructure performs comprehensive DAG structure validation
type EdgeStyleConfig ¶ added in v0.0.17
type EdgeStyleConfig struct {
Color string
Style string
PenWidth string
ArrowSize string
FontSize string
}
EdgeStyleConfig contains styling information for edges
type EnhancedAPIHandler ¶ added in v0.0.16
type EnhancedAPIHandler struct {
// contains filtered or unexported fields
}
EnhancedAPIHandler provides enhanced API endpoints for DAG management
func NewEnhancedAPIHandler ¶ added in v0.0.16
func NewEnhancedAPIHandler(dag *DAG) *EnhancedAPIHandler
NewEnhancedAPIHandler creates a new enhanced API handler
func (*EnhancedAPIHandler) RegisterRoutes ¶ added in v0.0.16
func (h *EnhancedAPIHandler) RegisterRoutes(mux *http.ServeMux)
RegisterRoutes registers all enhanced API routes
type HTTPClient ¶ added in v0.0.16
type HTTPClient interface {
Post(url string, contentType string, body []byte, headers map[string]string) error
}
HTTPClient interface for HTTP requests
type Handler ¶ added in v0.0.14
func AvailableHandlers ¶
func AvailableHandlers() []Handler
type MemoryActivityPersistence ¶ added in v0.0.17
type MemoryActivityPersistence struct {
// contains filtered or unexported fields
}
MemoryActivityPersistence provides in-memory activity persistence for testing
func NewMemoryActivityPersistence ¶ added in v0.0.17
func NewMemoryActivityPersistence() *MemoryActivityPersistence
NewMemoryActivityPersistence creates a new in-memory persistence
func (*MemoryActivityPersistence) Close ¶ added in v0.0.17
func (mp *MemoryActivityPersistence) Close() error
Close closes the persistence
func (*MemoryActivityPersistence) GetStats ¶ added in v0.0.17
func (mp *MemoryActivityPersistence) GetStats(filter ActivityFilter) (ActivityStats, error)
GetStats returns statistics for the filtered entries
func (*MemoryActivityPersistence) Query ¶ added in v0.0.17
func (mp *MemoryActivityPersistence) Query(filter ActivityFilter) ([]ActivityEntry, error)
Query queries activity entries with filter
func (*MemoryActivityPersistence) Store ¶ added in v0.0.17
func (mp *MemoryActivityPersistence) Store(entries []ActivityEntry) error
Store stores activity entries in memory
type Monitor ¶ added in v0.0.16
type Monitor struct {
// contains filtered or unexported fields
}
Monitor provides comprehensive monitoring capabilities for DAG
func NewMonitor ¶ added in v0.0.16
NewMonitor creates a new DAG monitor
func (*Monitor) AddAlertHandler ¶ added in v0.0.16
func (m *Monitor) AddAlertHandler(handler AlertHandler)
AddAlertHandler adds an alert handler
func (*Monitor) GetMetrics ¶ added in v0.0.16
func (m *Monitor) GetMetrics() *MonitoringMetrics
GetMetrics returns current metrics
func (*Monitor) SetAlertThresholds ¶ added in v0.0.16
func (m *Monitor) SetAlertThresholds(thresholds *AlertThresholds)
SetAlertThresholds updates alert thresholds
type MonitoringMetrics ¶ added in v0.0.16
type MonitoringMetrics struct {
TasksTotal int64
TasksCompleted int64
TasksFailed int64
TasksCancelled int64
TasksInProgress int64
NodesExecuted map[string]int64
NodeExecutionTimes map[string][]time.Duration
NodeFailures map[string]int64
AverageExecutionTime time.Duration
TotalExecutionTime time.Duration
StartTime time.Time
LastTaskCompletedAt time.Time
ActiveTasks map[string]time.Time
NodeProcessingStats map[string]*NodeStats
// contains filtered or unexported fields
}
MonitoringMetrics holds comprehensive metrics for DAG monitoring
func NewMonitoringMetrics ¶ added in v0.0.16
func NewMonitoringMetrics() *MonitoringMetrics
NewMonitoringMetrics creates a new metrics instance
func (*MonitoringMetrics) GetNodeStats ¶ added in v0.0.16
func (m *MonitoringMetrics) GetNodeStats(nodeID string) *NodeStats
GetNodeStats returns statistics for a specific node
func (*MonitoringMetrics) GetSnapshot ¶ added in v0.0.16
func (m *MonitoringMetrics) GetSnapshot() *MonitoringMetrics
GetSnapshot returns a snapshot of current metrics
func (*MonitoringMetrics) RecordNodeEnd ¶ added in v0.0.16
func (m *MonitoringMetrics) RecordNodeEnd(nodeID string)
RecordNodeEnd records when a node finishes processing
func (*MonitoringMetrics) RecordNodeExecution ¶ added in v0.0.16
func (m *MonitoringMetrics) RecordNodeExecution(nodeID string, duration time.Duration, success bool)
RecordNodeExecution records node execution metrics
func (*MonitoringMetrics) RecordNodeStart ¶ added in v0.0.16
func (m *MonitoringMetrics) RecordNodeStart(nodeID string)
RecordNodeStart records when a node starts processing
func (*MonitoringMetrics) RecordTaskCompletion ¶ added in v0.0.16
func (m *MonitoringMetrics) RecordTaskCompletion(taskID string, status mq.Status)
RecordTaskCompletion records task completion
func (*MonitoringMetrics) RecordTaskStart ¶ added in v0.0.16
func (m *MonitoringMetrics) RecordTaskStart(taskID string)
RecordTaskStart records the start of a task
type Node ¶
type Node struct {
Label string
ID string
Edges []Edge
NodeType NodeType
Timeout time.Duration // ...new field for node-level timeout...
// contains filtered or unexported fields
}
func (*Node) SetTimeout ¶ added in v0.0.11
SetTimeout allows setting a maximum processing duration for the node.
type NodeFailureStats ¶ added in v0.0.17
type NodeFailureStats struct {
NodeID string `json:"node_id"`
FailureCount int64 `json:"failure_count"`
FailureRate float64 `json:"failure_rate"`
LastFailure time.Time `json:"last_failure"`
}
NodeFailureStats represents failure statistics for a node
type NodeRateLimit ¶ added in v0.0.16
type NodeRateLimit struct {
RequestsPerSecond float64 `json:"requests_per_second"`
Burst int `json:"burst"`
}
NodeRateLimit holds rate limit settings for a specific node
type NodeRetryManager ¶ added in v0.0.16
type NodeRetryManager struct {
// contains filtered or unexported fields
}
NodeRetryManager handles retry logic for individual nodes
func NewNodeRetryManager ¶ added in v0.0.16
func NewNodeRetryManager(config *RetryConfig, logger logger.Logger) *NodeRetryManager
NewNodeRetryManager creates a new retry manager
func (*NodeRetryManager) GetAttempts ¶ added in v0.0.16
func (rm *NodeRetryManager) GetAttempts(taskID, nodeID string) int
GetAttempts returns the number of attempts for a task/node combination
func (*NodeRetryManager) GetRetryDelay ¶ added in v0.0.16
func (rm *NodeRetryManager) GetRetryDelay(taskID, nodeID string) time.Duration
GetRetryDelay calculates the delay before the next retry
func (*NodeRetryManager) RecordAttempt ¶ added in v0.0.16
func (rm *NodeRetryManager) RecordAttempt(taskID, nodeID string)
RecordAttempt records a retry attempt
func (*NodeRetryManager) Reset ¶ added in v0.0.16
func (rm *NodeRetryManager) Reset(taskID, nodeID string)
Reset clears retry attempts for a task/node combination
func (*NodeRetryManager) ResetTask ¶ added in v0.0.16
func (rm *NodeRetryManager) ResetTask(taskID string)
ResetTask clears all retry attempts for a task
func (*NodeRetryManager) SetGlobalConfig ¶ added in v0.0.17
func (rm *NodeRetryManager) SetGlobalConfig(config *RetryConfig)
SetGlobalConfig sets the global retry configuration
func (*NodeRetryManager) SetNodeConfig ¶ added in v0.0.17
func (rm *NodeRetryManager) SetNodeConfig(nodeID string, config *RetryConfig)
SetNodeConfig sets retry configuration for a specific node
func (*NodeRetryManager) ShouldRetry ¶ added in v0.0.16
func (rm *NodeRetryManager) ShouldRetry(taskID, nodeID string, err error) bool
ShouldRetry determines if a failed node should be retried
type NodeStats ¶ added in v0.0.16
type NodeStats struct {
ExecutionCount int64
SuccessCount int64
FailureCount int64
TotalDuration time.Duration
AverageDuration time.Duration
MinDuration time.Duration
MaxDuration time.Duration
LastExecuted time.Time
LastSuccess time.Time
LastFailure time.Time
CurrentlyRunning int64
}
NodeStats holds statistics for individual nodes
type Operation ¶
type Operation struct {
ID string `json:"id"`
Key string `json:"key"`
Payload Payload
RequiredFields []string `json:"required_fields"`
OptionalFields []string `json:"optional_fields"`
GeneratedFields []string `json:"generated_fields"`
Type NodeType `json:"type"`
Tags []string `json:"tags"`
}
func (*Operation) GetMappedData ¶ added in v0.0.10
func (*Operation) ProcessTask ¶
type Operations ¶
type PerformanceOptimizer ¶ added in v0.0.16
type PerformanceOptimizer struct {
// contains filtered or unexported fields
}
PerformanceOptimizer optimizes DAG performance based on metrics
func NewPerformanceOptimizer ¶ added in v0.0.16
func NewPerformanceOptimizer(dag *DAG, monitor *Monitor, config *ConfigManager, logger logger.Logger) *PerformanceOptimizer
NewPerformanceOptimizer creates a new performance optimizer
func (*PerformanceOptimizer) OptimizePerformance ¶ added in v0.0.16
func (po *PerformanceOptimizer) OptimizePerformance() error
OptimizePerformance analyzes metrics and adjusts configuration
type Provider ¶
type Provider struct {
Mapping map[string]any `json:"mapping"`
UpdateMapping map[string]any `json:"update_mapping"`
InsertMapping map[string]any `json:"insert_mapping"`
Defaults map[string]any `json:"defaults"`
ProviderType string `json:"provider_type"`
Database string `json:"database"`
Source string `json:"source"`
Query string `json:"query"`
}
type RateLimitConfig ¶ added in v0.0.16
type RateLimitConfig struct {
Enabled bool `json:"enabled"`
GlobalLimit float64 `json:"global_limit"`
GlobalBurst int `json:"global_burst"`
NodeLimits map[string]NodeRateLimit `json:"node_limits"`
}
RateLimitConfig holds rate limiting configuration
type RateLimiter ¶ added in v0.0.16
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter provides rate limiting for DAG operations
func NewRateLimiter ¶ added in v0.0.16
func NewRateLimiter(logger logger.Logger) *RateLimiter
NewRateLimiter creates a new rate limiter
func (*RateLimiter) Allow ¶ added in v0.0.16
func (rl *RateLimiter) Allow(nodeID string) bool
Allow checks if the request is allowed for the given node
func (*RateLimiter) SetNodeLimit ¶ added in v0.0.16
func (rl *RateLimiter) SetNodeLimit(nodeID string, requestsPerSecond float64, burst int)
SetNodeLimit sets rate limit for a specific node
type RetryConfig ¶ added in v0.0.16
type RetryConfig struct {
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
Jitter bool
RetryCondition func(err error) bool
}
RetryConfig defines retry behavior for failed nodes
func DefaultRetryConfig ¶ added in v0.0.16
func DefaultRetryConfig() *RetryConfig
DefaultRetryConfig returns a sensible default retry configuration
type RetryableProcessor ¶ added in v0.0.16
type RetryableProcessor struct {
// contains filtered or unexported fields
}
RetryableProcessor wraps a processor with retry logic
func NewRetryableProcessor ¶ added in v0.0.16
func NewRetryableProcessor(processor mq.Processor, config *RetryConfig, logger logger.Logger) *RetryableProcessor
NewRetryableProcessor creates a processor with retry capabilities
func (*RetryableProcessor) Close ¶ added in v0.0.16
func (rp *RetryableProcessor) Close() error
Close closes the processor
func (*RetryableProcessor) Consume ¶ added in v0.0.16
func (rp *RetryableProcessor) Consume(ctx context.Context) error
Consume starts consuming messages
func (*RetryableProcessor) GetKey ¶ added in v0.0.16
func (rp *RetryableProcessor) GetKey() string
GetKey returns the processor key
func (*RetryableProcessor) GetType ¶ added in v0.0.16
func (rp *RetryableProcessor) GetType() string
GetType returns the processor type
func (*RetryableProcessor) Pause ¶ added in v0.0.16
func (rp *RetryableProcessor) Pause(ctx context.Context) error
Pause pauses the processor
func (*RetryableProcessor) ProcessTask ¶ added in v0.0.16
ProcessTask processes a task with retry logic
func (*RetryableProcessor) Resume ¶ added in v0.0.16
func (rp *RetryableProcessor) Resume(ctx context.Context) error
Resume resumes the processor
func (*RetryableProcessor) SetKey ¶ added in v0.0.16
func (rp *RetryableProcessor) SetKey(key string)
SetKey sets the processor key
type RollbackHandler ¶ added in v0.0.16
type RollbackHandler interface {
Rollback(operation TransactionOperation) error
}
RollbackHandler defines how to rollback operations
type SavePoint ¶ added in v0.0.17
type SavePoint struct {
ID string `json:"id"`
Name string `json:"name"`
Timestamp time.Time `json:"timestamp"`
State map[string]interface{} `json:"state"`
}
SavePoint represents a save point in a transaction
type SimpleHTTPClient ¶ added in v0.0.16
type SimpleHTTPClient struct {
// contains filtered or unexported fields
}
SimpleHTTPClient implements HTTPClient interface for webhook manager
func NewSimpleHTTPClient ¶ added in v0.0.16
func NewSimpleHTTPClient(timeout time.Duration) *SimpleHTTPClient
NewSimpleHTTPClient creates a new simple HTTP client
type TaskError ¶ added in v0.0.10
TaskError is used by node processors to indicate whether an error is recoverable.
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
func NewTaskManager ¶
func (*TaskManager) Pause ¶ added in v0.0.10
func (tm *TaskManager) Pause()
func (*TaskManager) ProcessTask ¶ added in v0.0.2
func (tm *TaskManager) ProcessTask(ctx context.Context, startNode string, payload json.RawMessage)
func (*TaskManager) Resume ¶ added in v0.0.10
func (tm *TaskManager) Resume()
func (*TaskManager) Stop ¶ added in v0.0.2
func (tm *TaskManager) Stop()
Stop gracefully stops the task manager
type TaskManagerConfig ¶ added in v0.0.10
type TaskMetrics ¶ added in v0.0.11
type TaskMetrics struct {
NotStarted int
Queued int
Cancelled int
Completed int
Failed int
// contains filtered or unexported fields
}
New task metrics type.
type TaskState ¶ added in v0.0.2
type TaskState struct {
UpdatedAt time.Time
NodeID string
Status mq.Status
Result mq.Result
// contains filtered or unexported fields
}
TaskState holds state and intermediate results for a given task (identified by a node ID).
type Transaction ¶ added in v0.0.16
type Transaction struct {
ID string `json:"id"`
TaskID string `json:"task_id"`
Status TransactionStatus `json:"status"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time,omitempty"`
Operations []TransactionOperation `json:"operations"`
SavePoints []SavePoint `json:"save_points"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
Transaction represents a transactional DAG execution
type TransactionManager ¶ added in v0.0.16
type TransactionManager struct {
// contains filtered or unexported fields
}
TransactionManager handles transaction-like operations for DAG execution
func NewTransactionManager ¶ added in v0.0.16
func NewTransactionManager(dag *DAG, logger logger.Logger) *TransactionManager
NewTransactionManager creates a new transaction manager
func (*TransactionManager) AddOperation ¶ added in v0.0.17
func (tm *TransactionManager) AddOperation(txID string, operation TransactionOperation) error
AddOperation adds an operation to a transaction
func (*TransactionManager) AddSavePoint ¶ added in v0.0.16
func (tm *TransactionManager) AddSavePoint(txID, name string, state map[string]interface{}) error
AddSavePoint adds a save point to the transaction
func (*TransactionManager) BeginTransaction ¶ added in v0.0.16
func (tm *TransactionManager) BeginTransaction(taskID string) *Transaction
BeginTransaction starts a new transaction
func (*TransactionManager) CommitTransaction ¶ added in v0.0.16
func (tm *TransactionManager) CommitTransaction(txID string) error
CommitTransaction commits a transaction
func (*TransactionManager) GetTransaction ¶ added in v0.0.17
func (tm *TransactionManager) GetTransaction(txID string) (*Transaction, error)
GetTransaction retrieves a transaction by ID
func (*TransactionManager) RollbackTransaction ¶ added in v0.0.16
func (tm *TransactionManager) RollbackTransaction(txID string) error
RollbackTransaction rolls back a transaction
type TransactionOperation ¶ added in v0.0.17
type TransactionOperation struct {
ID string `json:"id"`
Type string `json:"type"`
NodeID string `json:"node_id"`
Data map[string]interface{} `json:"data"`
Timestamp time.Time `json:"timestamp"`
RollbackHandler RollbackHandler `json:"-"`
}
TransactionOperation represents an operation within a transaction
type TransactionStatus ¶ added in v0.0.16
type TransactionStatus string
TransactionStatus represents the status of a transaction
const ( TransactionStatusStarted TransactionStatus = "started" TransactionStatusCommitted TransactionStatus = "committed" TransactionStatusRolledBack TransactionStatus = "rolled_back" TransactionStatusFailed TransactionStatus = "failed" )
type WebSocketHandler ¶ added in v0.0.16
type WebSocketHandler struct {
// contains filtered or unexported fields
}
WebSocketHandler provides real-time monitoring via WebSocket
func NewWebSocketHandler ¶ added in v0.0.16
func NewWebSocketHandler(dag *DAG) *WebSocketHandler
NewWebSocketHandler creates a new WebSocket handler
func (*WebSocketHandler) HandleWebSocket ¶ added in v0.0.16
func (h *WebSocketHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request)
HandleWebSocket handles WebSocket connections for real-time monitoring
type WebhookConfig ¶ added in v0.0.16
type WebhookConfig struct {
URL string `json:"url"`
Headers map[string]string `json:"headers"`
Method string `json:"method"`
RetryCount int `json:"retry_count"`
Timeout time.Duration `json:"timeout"`
Events []string `json:"events"`
}
WebhookConfig defines webhook configuration
type WebhookEvent ¶ added in v0.0.16
type WebhookEvent struct {
Type string `json:"type"`
TaskID string `json:"task_id"`
NodeID string `json:"node_id,omitempty"`
Timestamp time.Time `json:"timestamp"`
Data map[string]interface{} `json:"data"`
}
WebhookEvent represents an event to send via webhook
type WebhookManager ¶ added in v0.0.16
type WebhookManager struct {
// contains filtered or unexported fields
}
WebhookManager handles webhook notifications
func NewWebhookManager ¶ added in v0.0.16
func NewWebhookManager(httpClient HTTPClient, logger logger.Logger) *WebhookManager
NewWebhookManager creates a new webhook manager
func (*WebhookManager) AddWebhook ¶ added in v0.0.16
func (wm *WebhookManager) AddWebhook(event string, config WebhookConfig)
AddWebhook adds a webhook configuration
func (*WebhookManager) TriggerWebhook ¶ added in v0.0.16
func (wm *WebhookManager) TriggerWebhook(event WebhookEvent)
TriggerWebhook sends webhook notifications for an event