Documentation
¶
Overview ¶
context_keys.go
Index ¶
- Constants
- func AddDAG(key string, handler *DAG)
- func AddHandler(key string, handler func(string) mq.Processor)
- func AvailableDAG() []string
- func ClearDAG()
- func GetHandler(key string) func(string) mq.Processor
- func GetVal(c context.Context, v string, data map[string]any) (key string, val any)
- func Header(c context.Context, headerKey string) (val map[string]any, exists bool)
- func HeaderVal(c context.Context, headerKey string, key string) (val any)
- func WsEvents(s *sio.Server)
- type ActivityAlertHandler
- type ActivityEntry
- type ActivityFilter
- type ActivityHook
- type ActivityLevel
- type ActivityLogger
- func (al *ActivityLogger) AddHook(hook ActivityHook)
- func (al *ActivityLogger) Flush() error
- func (al *ActivityLogger) GetActivities(filter ActivityFilter) ([]ActivityEntry, error)
- func (al *ActivityLogger) GetStats(filter ActivityFilter) (ActivityStats, error)
- func (al *ActivityLogger) Log(level ActivityLevel, activityType ActivityType, message string, ...)
- func (al *ActivityLogger) LogNodeExecution(ctx context.Context, taskID string, nodeID string, result mq.Result, ...)
- func (al *ActivityLogger) LogTaskComplete(ctx context.Context, taskID string, nodeID string, duration time.Duration)
- func (al *ActivityLogger) LogTaskFail(ctx context.Context, taskID string, nodeID string, err error, ...)
- func (al *ActivityLogger) LogTaskStart(ctx context.Context, taskID string, nodeID string)
- func (al *ActivityLogger) LogWithContext(ctx context.Context, level ActivityLevel, activityType ActivityType, ...)
- func (al *ActivityLogger) RemoveHook(hook ActivityHook)
- func (al *ActivityLogger) Stop()
- type ActivityLoggerConfig
- type ActivityPersistence
- type ActivityStats
- type ActivityTimeRange
- type ActivityType
- type Alert
- type AlertHandler
- type AlertSeverity
- type AlertThresholds
- type AlertType
- type AlertWebhookHandler
- type BatchProcessor
- type CacheConfig
- type CacheEntry
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerState
- type CleanupManager
- type Condition
- type ConditionProcessor
- type ConfigManager
- type ConfigWatcher
- type DAG
- func (tm *DAG) AddActivityHook(hook ActivityHook)
- func (tm *DAG) AddAlertHandler(handler AlertHandler)
- func (tm *DAG) AddCondition(fromNode string, conditions map[string]string) *DAG
- func (tm *DAG) AddConfigWatcher(watcher ConfigWatcher)
- func (tm *DAG) AddDAGNode(nodeType NodeType, name string, key string, dag *DAG, firstNode ...bool) *DAG
- func (tm *DAG) AddDeferredNode(nodeType NodeType, name, key string, firstNode ...bool) error
- func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string) *DAG
- func (tm *DAG) AddNode(nodeType NodeType, name, nodeID string, handler mq.Processor, ...) *DAG
- func (tm *DAG) AddNodeWithRetry(nodeType NodeType, name, nodeID string, handler mq.Processor, ...) *DAG
- func (tm *DAG) AddWebhook(event string, config WebhookConfig)
- func (tm *DAG) AssignTopic(topic string)
- func (tm *DAG) BaseURI() string
- func (tm *DAG) BeginTransaction(taskID string) *Transaction
- func (tm *DAG) CancelTask(taskID string) error
- func (tm *DAG) CheckRateLimit(nodeID string) bool
- func (tm *DAG) ClassifyEdges(startNodes ...string) (string, bool, error)
- func (tm *DAG) Clone() *DAG
- func (tm *DAG) Close() error
- func (tm *DAG) CommitTransaction(txID string) error
- func (tm *DAG) Consume(ctx context.Context) error
- func (d *DAG) ContainsPageNodes() bool
- func (tm *DAG) Export() map[string]interface{}
- func (tm *DAG) ExportDOT(direction ...Direction) string
- func (tm *DAG) FlushActivityLogs() error
- func (tm *DAG) GetActivities(filter ActivityFilter) ([]ActivityEntry, error)
- func (tm *DAG) GetActivityLogger() *ActivityLogger
- func (tm *DAG) GetActivityStats(filter ActivityFilter) (ActivityStats, error)
- func (tm *DAG) GetAllNodes() map[string]*Node
- func (tm *DAG) GetCircuitBreakerStatus(nodeID string) CircuitBreakerState
- func (tm *DAG) GetConfiguration() *DAGConfig
- func (tm *DAG) GetCriticalPath() ([]string, error)
- func (tm *DAG) GetDAGStatistics() map[string]interface{}
- func (tm *DAG) GetEdgeCount() int
- func (tm *DAG) GetKey() string
- func (tm *DAG) GetLastNodes() ([]*Node, error)
- func (tm *DAG) GetMonitoringMetrics() *MonitoringMetrics
- func (tm *DAG) GetNextNodes(nodeID string) ([]*Node, error)
- func (tm *DAG) GetNodeByID(nodeID string) (*Node, error)
- func (tm *DAG) GetNodeCount() int
- func (tm *DAG) GetNodeStats(nodeID string) *NodeStats
- func (tm *DAG) GetPreviousNodes(nodeID string) ([]*Node, error)
- func (tm *DAG) GetReport() string
- func (tm *DAG) GetStartNode() string
- func (tm *DAG) GetStatus() map[string]interface{}
- func (d *DAG) GetTaskMetrics() TaskMetrics
- func (tm *DAG) GetTopologicalOrder() ([]string, error)
- func (tm *DAG) GetTransaction(txID string) (*Transaction, error)
- func (tm *DAG) GetType() string
- func (tm *DAG) Handlers(app any, prefix string)
- func (d *DAG) HasPageNode() bool
- func (tm *DAG) InitializeActivityLogger(config ActivityLoggerConfig, persistence ActivityPersistence)
- func (tm *DAG) IsLastNode(nodeID string) (bool, error)
- func (tm *DAG) IsReady() bool
- func (tm *DAG) LogActivity(ctx context.Context, level ActivityLevel, activityType ActivityType, ...)
- func (tm *DAG) Logger() logger.Logger
- func (tm *DAG) OptimizePerformance() error
- func (tm *DAG) Pause(_ context.Context) error
- func (tm *DAG) PauseConsumer(ctx context.Context, id string)
- func (tm *DAG) PrintGraph()
- func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result
- func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
- func (tm *DAG) ProcessTaskNew(ctx context.Context, task *mq.Task) mq.Result
- func (tm *DAG) RemoveNode(nodeID string) error
- func (tm *DAG) ReportNodeResult(callback func(mq.Result))
- func (tm *DAG) Reset()
- func (tm *DAG) Resume(_ context.Context) error
- func (tm *DAG) ResumeConsumer(ctx context.Context, id string)
- func (tm *DAG) RollbackTransaction(txID string) error
- func (tm *DAG) SVGViewerHTML(svgContent string) string
- func (tm *DAG) SaveDOTFile(filename string, direction ...Direction) error
- func (tm *DAG) SavePNG(pngFile string) error
- func (tm *DAG) SaveSVG(svgFile string) error
- func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.SchedulerOption) mq.Result
- func (tm *DAG) SetAlertThresholds(thresholds *AlertThresholds)
- func (tm *DAG) SetBatchProcessingEnabled(enabled bool)
- func (tm *DAG) SetKey(key string)
- func (tm *DAG) SetNotifyResponse(callback mq.Callback)
- func (tm *DAG) SetPostProcessHook(hook func(ctx context.Context, node *Node, taskID string, result mq.Result))
- func (tm *DAG) SetPreProcessHook(...)
- func (tm *DAG) SetRateLimit(nodeID string, requestsPerSecond float64, burst int)
- func (tm *DAG) SetRetryConfig(config *RetryConfig)
- func (tm *DAG) SetStartNode(node string)
- func (tm *DAG) SetWebhookManager(manager *WebhookManager)
- func (tm *DAG) SetupWS() *sio.Server
- func (tm *DAG) Start(ctx context.Context, addr string) error
- func (tm *DAG) StartCleanup(ctx context.Context)
- func (tm *DAG) StartMonitoring(ctx context.Context)
- func (tm *DAG) Stop(ctx context.Context) error
- func (tm *DAG) StopCleanup()
- func (tm *DAG) StopEnhanced(ctx context.Context) error
- func (tm *DAG) StopMonitoring()
- func (tm *DAG) TopologicalSort() (stack []string)
- func (tm *DAG) UpdateConfiguration(config *DAGConfig) error
- func (tm *DAG) Validate() error
- func (tm *DAG) ValidateDAG() error
- type DAGCache
- type DAGConfig
- type DAGValidator
- type Debugger
- type Direction
- type Edge
- type EdgeLevel
- type EdgeStyleConfig
- type EdgeType
- type EnhancedAPIHandler
- type HTTPClient
- type Handler
- type List
- type MemoryActivityPersistence
- func (mp *MemoryActivityPersistence) Close() error
- func (mp *MemoryActivityPersistence) GetStats(filter ActivityFilter) (ActivityStats, error)
- func (mp *MemoryActivityPersistence) Query(filter ActivityFilter) ([]ActivityEntry, error)
- func (mp *MemoryActivityPersistence) Store(entries []ActivityEntry) error
- type Monitor
- type MonitoringMetrics
- func (m *MonitoringMetrics) GetNodeStats(nodeID string) *NodeStats
- func (m *MonitoringMetrics) GetSnapshot() *MonitoringMetrics
- func (m *MonitoringMetrics) RecordNodeEnd(nodeID string)
- func (m *MonitoringMetrics) RecordNodeExecution(nodeID string, duration time.Duration, success bool)
- func (m *MonitoringMetrics) RecordNodeStart(nodeID string)
- func (m *MonitoringMetrics) RecordTaskCompletion(taskID string, status mq.Status)
- func (m *MonitoringMetrics) RecordTaskStart(taskID string)
- type Node
- type NodeFailureStats
- type NodeRateLimit
- type NodeRetryManager
- func (rm *NodeRetryManager) GetAttempts(taskID, nodeID string) int
- func (rm *NodeRetryManager) GetRetryDelay(taskID, nodeID string) time.Duration
- func (rm *NodeRetryManager) RecordAttempt(taskID, nodeID string)
- func (rm *NodeRetryManager) Reset(taskID, nodeID string)
- func (rm *NodeRetryManager) ResetTask(taskID string)
- func (rm *NodeRetryManager) SetGlobalConfig(config *RetryConfig)
- func (rm *NodeRetryManager) SetNodeConfig(nodeID string, config *RetryConfig)
- func (rm *NodeRetryManager) ShouldRetry(taskID, nodeID string, err error) bool
- type NodeStats
- type NodeType
- type Operation
- func (e *Operation) Close() error
- func (e *Operation) Consume(_ context.Context) error
- func (e *Operation) Debug(ctx context.Context, task *mq.Task)
- func (e *Operation) GetKey() string
- func (e *Operation) GetMappedData(ctx context.Context, task *mq.Task) map[string]any
- func (e *Operation) GetTags() []string
- func (e *Operation) GetType() string
- func (e *Operation) Pause(_ context.Context) error
- func (e *Operation) ProcessTask(_ context.Context, task *mq.Task) mq.Result
- func (e *Operation) Resume(_ context.Context) error
- func (e *Operation) SetConfig(payload Payload)
- func (e *Operation) SetKey(key string)
- func (e *Operation) SetTags(tag ...string)
- func (e *Operation) Stop(_ context.Context) error
- func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[string]any, error)
- type Operations
- type Payload
- type PerformanceOptimizer
- type Processor
- type Provider
- type RateLimitConfig
- type RateLimiter
- type RetryConfig
- type RetryableProcessor
- func (rp *RetryableProcessor) Close() error
- func (rp *RetryableProcessor) Consume(ctx context.Context) error
- func (rp *RetryableProcessor) GetKey() string
- func (rp *RetryableProcessor) GetType() string
- func (rp *RetryableProcessor) Pause(ctx context.Context) error
- func (rp *RetryableProcessor) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
- func (rp *RetryableProcessor) Resume(ctx context.Context) error
- func (rp *RetryableProcessor) SetKey(key string)
- func (rp *RetryableProcessor) Stop(ctx context.Context) error
- type RollbackHandler
- type SavePoint
- type SimpleHTTPClient
- type TaskError
- type TaskManager
- type TaskManagerConfig
- type TaskMetrics
- type TaskState
- type Transaction
- type TransactionManager
- func (tm *TransactionManager) AddOperation(txID string, operation TransactionOperation) error
- func (tm *TransactionManager) AddSavePoint(txID, name string, state map[string]interface{}) error
- func (tm *TransactionManager) BeginTransaction(taskID string) *Transaction
- func (tm *TransactionManager) CommitTransaction(txID string) error
- func (tm *TransactionManager) GetTransaction(txID string) (*Transaction, error)
- func (tm *TransactionManager) RollbackTransaction(txID string) error
- type TransactionOperation
- type TransactionStatus
- type WebSocketHandler
- type WebhookConfig
- type WebhookEvent
- type WebhookManager
Constants ¶
const ( Delimiter = "___" ContextIndex = "index" DefaultChannelSize = 1000 RetryInterval = 5 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func AvailableDAG ¶
func AvailableDAG() []string
Types ¶
type ActivityAlertHandler ¶ added in v0.0.17
type ActivityAlertHandler struct {
// contains filtered or unexported fields
}
ActivityAlertHandler handles alerts by logging them as activities
func (*ActivityAlertHandler) HandleAlert ¶ added in v0.0.17
func (h *ActivityAlertHandler) HandleAlert(alert Alert) error
type ActivityEntry ¶ added in v0.0.17
type ActivityEntry struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
DAGName string `json:"dag_name"`
Level ActivityLevel `json:"level"`
Type ActivityType `json:"type"`
Message string `json:"message"`
TaskID string `json:"task_id,omitempty"`
NodeID string `json:"node_id,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
Success *bool `json:"success,omitempty"`
Error string `json:"error,omitempty"`
Details map[string]interface{} `json:"details,omitempty"`
ContextData map[string]interface{} `json:"context_data,omitempty"`
UserID string `json:"user_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
}
ActivityEntry represents a single activity log entry
type ActivityFilter ¶ added in v0.0.17
type ActivityFilter struct {
StartTime *time.Time `json:"start_time,omitempty"`
EndTime *time.Time `json:"end_time,omitempty"`
Levels []ActivityLevel `json:"levels,omitempty"`
Types []ActivityType `json:"types,omitempty"`
TaskIDs []string `json:"task_ids,omitempty"`
NodeIDs []string `json:"node_ids,omitempty"`
UserIDs []string `json:"user_ids,omitempty"`
SuccessOnly *bool `json:"success_only,omitempty"`
FailuresOnly *bool `json:"failures_only,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
SortBy string `json:"sort_by,omitempty"` // timestamp, level, type
SortOrder string `json:"sort_order,omitempty"` // asc, desc
}
ActivityFilter provides filtering options for activity queries
type ActivityHook ¶ added in v0.0.17
type ActivityHook interface {
OnActivity(entry ActivityEntry) error
}
ActivityHook allows custom processing of activity entries
type ActivityLevel ¶ added in v0.0.17
type ActivityLevel string
ActivityLevel represents the severity level of an activity
const ( ActivityLevelDebug ActivityLevel = "debug" ActivityLevelInfo ActivityLevel = "info" ActivityLevelWarn ActivityLevel = "warn" ActivityLevelError ActivityLevel = "error" ActivityLevelFatal ActivityLevel = "fatal" )
type ActivityLogger ¶ added in v0.0.17
type ActivityLogger struct {
// contains filtered or unexported fields
}
ActivityLogger provides comprehensive activity logging for DAG operations
func NewActivityLogger ¶ added in v0.0.17
func NewActivityLogger(dagName string, config ActivityLoggerConfig, persistence ActivityPersistence, logger logger.Logger) *ActivityLogger
NewActivityLogger creates a new activity logger
func (*ActivityLogger) AddHook ¶ added in v0.0.17
func (al *ActivityLogger) AddHook(hook ActivityHook)
AddHook adds an activity hook
func (*ActivityLogger) Flush ¶ added in v0.0.17
func (al *ActivityLogger) Flush() error
Flush flushes the buffer to persistence
func (*ActivityLogger) GetActivities ¶ added in v0.0.17
func (al *ActivityLogger) GetActivities(filter ActivityFilter) ([]ActivityEntry, error)
GetActivities retrieves activities based on filter
func (*ActivityLogger) GetStats ¶ added in v0.0.17
func (al *ActivityLogger) GetStats(filter ActivityFilter) (ActivityStats, error)
GetStats returns activity statistics
func (*ActivityLogger) Log ¶ added in v0.0.17
func (al *ActivityLogger) Log(level ActivityLevel, activityType ActivityType, message string, details map[string]interface{})
Log logs an activity entry
func (*ActivityLogger) LogNodeExecution ¶ added in v0.0.17
func (al *ActivityLogger) LogNodeExecution(ctx context.Context, taskID string, nodeID string, result mq.Result, duration time.Duration)
LogNodeExecution logs node execution details
func (*ActivityLogger) LogTaskComplete ¶ added in v0.0.17
func (al *ActivityLogger) LogTaskComplete(ctx context.Context, taskID string, nodeID string, duration time.Duration)
LogTaskComplete logs task completion activity
func (*ActivityLogger) LogTaskFail ¶ added in v0.0.17
func (al *ActivityLogger) LogTaskFail(ctx context.Context, taskID string, nodeID string, err error, duration time.Duration)
LogTaskFail logs task failure activity
func (*ActivityLogger) LogTaskStart ¶ added in v0.0.17
func (al *ActivityLogger) LogTaskStart(ctx context.Context, taskID string, nodeID string)
LogTaskStart logs task start activity
func (*ActivityLogger) LogWithContext ¶ added in v0.0.17
func (al *ActivityLogger) LogWithContext(ctx context.Context, level ActivityLevel, activityType ActivityType, message string, details map[string]interface{})
LogWithContext logs an activity entry with context information
func (*ActivityLogger) RemoveHook ¶ added in v0.0.17
func (al *ActivityLogger) RemoveHook(hook ActivityHook)
RemoveHook removes an activity hook
func (*ActivityLogger) Stop ¶ added in v0.0.17
func (al *ActivityLogger) Stop()
Stop stops the activity logger
type ActivityLoggerConfig ¶ added in v0.0.17
type ActivityLoggerConfig struct {
BufferSize int `json:"buffer_size"`
FlushInterval time.Duration `json:"flush_interval"`
MaxRetries int `json:"max_retries"`
EnableHooks bool `json:"enable_hooks"`
EnableCompression bool `json:"enable_compression"`
MaxEntryAge time.Duration `json:"max_entry_age"`
AsyncMode bool `json:"async_mode"`
}
ActivityLoggerConfig configures the activity logger
func DefaultActivityLoggerConfig ¶ added in v0.0.17
func DefaultActivityLoggerConfig() ActivityLoggerConfig
DefaultActivityLoggerConfig returns default configuration
type ActivityPersistence ¶ added in v0.0.17
type ActivityPersistence interface {
Store(entries []ActivityEntry) error
Query(filter ActivityFilter) ([]ActivityEntry, error)
GetStats(filter ActivityFilter) (ActivityStats, error)
Close() error
}
ActivityPersistence defines the interface for persisting activities
type ActivityStats ¶ added in v0.0.17
type ActivityStats struct {
TotalActivities int64 `json:"total_activities"`
ActivitiesByLevel map[ActivityLevel]int64 `json:"activities_by_level"`
ActivitiesByType map[ActivityType]int64 `json:"activities_by_type"`
ActivitiesByNode map[string]int64 `json:"activities_by_node"`
ActivitiesByTask map[string]int64 `json:"activities_by_task"`
SuccessRate float64 `json:"success_rate"`
FailureRate float64 `json:"failure_rate"`
AverageDuration time.Duration `json:"average_duration"`
PeakActivitiesPerMin int64 `json:"peak_activities_per_minute"`
TimeRange ActivityTimeRange `json:"time_range"`
RecentErrors []ActivityEntry `json:"recent_errors"`
TopFailingNodes []NodeFailureStats `json:"top_failing_nodes"`
HourlyDistribution map[string]int64 `json:"hourly_distribution"`
}
ActivityStats provides statistics about activities
type ActivityTimeRange ¶ added in v0.0.17
ActivityTimeRange represents a time range for activities
type ActivityType ¶ added in v0.0.17
type ActivityType string
ActivityType represents the type of activity
const ( ActivityTypeTaskStart ActivityType = "task_start" ActivityTypeTaskComplete ActivityType = "task_complete" ActivityTypeTaskFail ActivityType = "task_fail" ActivityTypeTaskCancel ActivityType = "task_cancel" ActivityTypeNodeStart ActivityType = "node_start" ActivityTypeNodeComplete ActivityType = "node_complete" ActivityTypeNodeFail ActivityType = "node_fail" ActivityTypeNodeTimeout ActivityType = "node_timeout" ActivityTypeValidation ActivityType = "validation" ActivityTypeConfiguration ActivityType = "configuration" ActivityTypeAlert ActivityType = "alert" ActivityTypeCleanup ActivityType = "cleanup" ActivityTypeTransaction ActivityType = "transaction" ActivityTypeRetry ActivityType = "retry" ActivityTypeCircuitBreaker ActivityType = "circuit_breaker" ActivityTypeWebhook ActivityType = "webhook" ActivityTypeCustom ActivityType = "custom" )
type Alert ¶ added in v0.0.16
type Alert struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
Severity AlertSeverity `json:"severity"`
Type AlertType `json:"type"`
Message string `json:"message"`
Details map[string]interface{} `json:"details"`
NodeID string `json:"node_id,omitempty"`
TaskID string `json:"task_id,omitempty"`
Threshold interface{} `json:"threshold,omitempty"`
ActualValue interface{} `json:"actual_value,omitempty"`
}
Alert represents a monitoring alert
type AlertHandler ¶ added in v0.0.16
AlertHandler defines interface for handling alerts
type AlertSeverity ¶ added in v0.0.17
type AlertSeverity string
const ( AlertSeverityInfo AlertSeverity = "info" AlertSeverityWarning AlertSeverity = "warning" AlertSeverityCritical AlertSeverity = "critical" )
type AlertThresholds ¶ added in v0.0.16
type AlertThresholds struct {
MaxFailureRate float64 `json:"max_failure_rate"`
MaxExecutionTime time.Duration `json:"max_execution_time"`
MaxTasksInProgress int64 `json:"max_tasks_in_progress"`
MinSuccessRate float64 `json:"min_success_rate"`
MaxNodeFailures int64 `json:"max_node_failures"`
HealthCheckInterval time.Duration `json:"health_check_interval"`
}
AlertThresholds defines thresholds for alerting
type AlertType ¶ added in v0.0.17
type AlertType string
const ( AlertTypeFailureRate AlertType = "failure_rate" AlertTypeExecutionTime AlertType = "execution_time" AlertTypeTaskLoad AlertType = "task_load" AlertTypeNodeFailures AlertType = "node_failures" AlertTypeCircuitBreaker AlertType = "circuit_breaker" AlertTypeHealthCheck AlertType = "health_check" )
type AlertWebhookHandler ¶ added in v0.0.16
type AlertWebhookHandler struct {
// contains filtered or unexported fields
}
AlertWebhookHandler handles webhook alerts
func NewAlertWebhookHandler ¶ added in v0.0.16
func NewAlertWebhookHandler(logger logger.Logger) *AlertWebhookHandler
NewAlertWebhookHandler creates a new alert webhook handler
func (*AlertWebhookHandler) HandleAlert ¶ added in v0.0.16
func (h *AlertWebhookHandler) HandleAlert(alert Alert) error
HandleAlert implements the AlertHandler interface
type BatchProcessor ¶ added in v0.0.16
type BatchProcessor struct {
// contains filtered or unexported fields
}
BatchProcessor handles batch processing of tasks
func NewBatchProcessor ¶ added in v0.0.16
func NewBatchProcessor(dag *DAG, batchSize int, batchTimeout time.Duration, logger logger.Logger) *BatchProcessor
NewBatchProcessor creates a new batch processor
func (*BatchProcessor) AddTask ¶ added in v0.0.16
func (bp *BatchProcessor) AddTask(task *mq.Task) error
AddTask adds a task to the batch
func (*BatchProcessor) SetProcessFunc ¶ added in v0.0.16
func (bp *BatchProcessor) SetProcessFunc(fn func([]*mq.Task) error)
SetProcessFunc sets the function to process batches
func (*BatchProcessor) Stop ¶ added in v0.0.16
func (bp *BatchProcessor) Stop()
Stop stops the batch processor
type CacheConfig ¶ added in v0.0.16
type CacheConfig struct {
Enabled bool `json:"enabled"`
TTL time.Duration `json:"ttl"`
MaxSize int `json:"max_size"`
}
CacheConfig holds cache configuration
type CacheEntry ¶ added in v0.0.16
type CacheEntry struct {
Value interface{}
ExpiresAt time.Time
AccessCount int64
LastAccess time.Time
}
CacheEntry represents a cached item
type CircuitBreaker ¶ added in v0.0.16
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements circuit breaker pattern for nodes
func NewCircuitBreaker ¶ added in v0.0.16
func NewCircuitBreaker(config *CircuitBreakerConfig, logger logger.Logger) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker
func (*CircuitBreaker) Execute ¶ added in v0.0.16
func (cb *CircuitBreaker) Execute(fn func() error) error
Execute executes a function with circuit breaker protection
func (*CircuitBreaker) GetState ¶ added in v0.0.16
func (cb *CircuitBreaker) GetState() CircuitBreakerState
GetState returns the current circuit breaker state
func (*CircuitBreaker) Reset ¶ added in v0.0.16
func (cb *CircuitBreaker) Reset()
Reset manually resets the circuit breaker
type CircuitBreakerConfig ¶ added in v0.0.16
type CircuitBreakerConfig struct {
FailureThreshold int
ResetTimeout time.Duration
HalfOpenMaxCalls int
}
CircuitBreakerConfig defines circuit breaker behavior
type CircuitBreakerState ¶ added in v0.0.16
type CircuitBreakerState int
Circuit Breaker Implementation
const ( CircuitClosed CircuitBreakerState = iota CircuitOpen CircuitHalfOpen )
type CleanupManager ¶ added in v0.0.16
type CleanupManager struct {
// contains filtered or unexported fields
}
CleanupManager handles cleanup of completed tasks and resources
func NewCleanupManager ¶ added in v0.0.16
func NewCleanupManager(dag *DAG, cleanupInterval, retentionPeriod time.Duration, maxEntries int, logger logger.Logger) *CleanupManager
NewCleanupManager creates a new cleanup manager
func (*CleanupManager) Start ¶ added in v0.0.16
func (cm *CleanupManager) Start(ctx context.Context)
Start begins the cleanup routine
func (*CleanupManager) Stop ¶ added in v0.0.16
func (cm *CleanupManager) Stop()
Stop stops the cleanup routine
type ConditionProcessor ¶
type ConfigManager ¶ added in v0.0.16
type ConfigManager struct {
// contains filtered or unexported fields
}
ConfigManager handles dynamic DAG configuration
func NewConfigManager ¶ added in v0.0.16
func NewConfigManager(logger logger.Logger) *ConfigManager
NewConfigManager creates a new configuration manager
func (*ConfigManager) AddWatcher ¶ added in v0.0.16
func (cm *ConfigManager) AddWatcher(watcher ConfigWatcher)
AddWatcher adds a configuration watcher
func (*ConfigManager) GetConfig ¶ added in v0.0.16
func (cm *ConfigManager) GetConfig() *DAGConfig
GetConfig returns a copy of the current configuration
func (*ConfigManager) UpdateConfig ¶ added in v0.0.16
func (cm *ConfigManager) UpdateConfig(newConfig *DAGConfig) error
UpdateConfig updates the configuration
func (*ConfigManager) UpdateConfiguration ¶ added in v0.0.17
func (cm *ConfigManager) UpdateConfiguration(config *DAGConfig) error
UpdateConfiguration updates the DAG configuration (alias for UpdateConfig)
type ConfigWatcher ¶ added in v0.0.16
ConfigWatcher interface for configuration change notifications
type DAG ¶
type DAG struct {
Error error
Notifier *sio.Server
// New hook fields:
PreProcessHook func(ctx context.Context, node *Node, taskID string, payload json.RawMessage) context.Context
PostProcessHook func(ctx context.Context, node *Node, taskID string, result mq.Result)
// contains filtered or unexported fields
}
func (*DAG) AddActivityHook ¶ added in v0.0.17
func (tm *DAG) AddActivityHook(hook ActivityHook)
AddActivityHook adds an activity hook
func (*DAG) AddAlertHandler ¶ added in v0.0.16
func (tm *DAG) AddAlertHandler(handler AlertHandler)
AddAlertHandler adds an alert handler
func (*DAG) AddCondition ¶
func (*DAG) AddConfigWatcher ¶ added in v0.0.17
func (tm *DAG) AddConfigWatcher(watcher ConfigWatcher)
AddConfigWatcher adds a configuration change watcher
func (*DAG) AddDAGNode ¶
func (*DAG) AddDeferredNode ¶
func (*DAG) AddNodeWithRetry ¶ added in v0.0.16
func (tm *DAG) AddNodeWithRetry(nodeType NodeType, name, nodeID string, handler mq.Processor, retryConfig *RetryConfig, startNode ...bool) *DAG
AddNodeWithRetry adds a node with specific retry configuration
func (*DAG) AddWebhook ¶ added in v0.0.17
func (tm *DAG) AddWebhook(event string, config WebhookConfig)
AddWebhook adds a webhook configuration
func (*DAG) AssignTopic ¶
func (*DAG) BeginTransaction ¶ added in v0.0.16
func (tm *DAG) BeginTransaction(taskID string) *Transaction
BeginTransaction starts a new transaction
func (*DAG) CancelTask ¶ added in v0.0.11
New method to cancel a running task.
func (*DAG) CheckRateLimit ¶ added in v0.0.17
CheckRateLimit checks if request is allowed for a node
func (*DAG) ClassifyEdges ¶
func (*DAG) CommitTransaction ¶ added in v0.0.16
CommitTransaction commits a transaction
func (*DAG) ContainsPageNodes ¶ added in v0.0.18
ContainsPageNodes iterates through all nodes to check if any are Page nodes This method provides an alternative way to check for Page nodes by examining the actual nodes rather than relying on the cached hasPageNode field
func (*DAG) FlushActivityLogs ¶ added in v0.0.17
FlushActivityLogs flushes activity logs to persistence
func (*DAG) GetActivities ¶ added in v0.0.17
func (tm *DAG) GetActivities(filter ActivityFilter) ([]ActivityEntry, error)
GetActivities retrieves activities based on filter
func (*DAG) GetActivityLogger ¶ added in v0.0.17
func (tm *DAG) GetActivityLogger() *ActivityLogger
GetActivityLogger returns the activity logger instance
func (*DAG) GetActivityStats ¶ added in v0.0.17
func (tm *DAG) GetActivityStats(filter ActivityFilter) (ActivityStats, error)
GetActivityStats returns activity statistics
func (*DAG) GetAllNodes ¶ added in v0.0.17
GetAllNodes returns all nodes in the DAG
func (*DAG) GetCircuitBreakerStatus ¶ added in v0.0.17
func (tm *DAG) GetCircuitBreakerStatus(nodeID string) CircuitBreakerState
GetCircuitBreakerStatus returns circuit breaker status for a node
func (*DAG) GetConfiguration ¶ added in v0.0.16
GetConfiguration returns current DAG configuration
func (*DAG) GetCriticalPath ¶ added in v0.0.16
GetCriticalPath returns the critical path of the DAG
func (*DAG) GetDAGStatistics ¶ added in v0.0.16
GetDAGStatistics returns comprehensive DAG statistics
func (*DAG) GetEdgeCount ¶ added in v0.0.17
GetEdgeCount returns the total number of edges
func (*DAG) GetLastNodes ¶ added in v0.0.2
func (*DAG) GetMonitoringMetrics ¶ added in v0.0.16
func (tm *DAG) GetMonitoringMetrics() *MonitoringMetrics
GetMonitoringMetrics returns current monitoring metrics
func (*DAG) GetNextNodes ¶ added in v0.0.2
GetNextNodes returns the next nodes for a given node
func (*DAG) GetNodeByID ¶ added in v0.0.17
GetNodeByID returns a node by its ID
func (*DAG) GetNodeCount ¶ added in v0.0.17
GetNodeCount returns the total number of nodes
func (*DAG) GetNodeStats ¶ added in v0.0.16
GetNodeStats returns statistics for a specific node
func (*DAG) GetPreviousNodes ¶ added in v0.0.2
GetPreviousNodes returns the previous nodes for a given node
func (*DAG) GetStartNode ¶
func (*DAG) GetStatus ¶ added in v0.0.11
GetStatus returns a summary of the DAG including node and task counts.
func (*DAG) GetTaskMetrics ¶ added in v0.0.11
func (d *DAG) GetTaskMetrics() TaskMetrics
Getter for task metrics.
func (*DAG) GetTopologicalOrder ¶ added in v0.0.16
GetTopologicalOrder returns nodes in topological order
func (*DAG) GetTransaction ¶ added in v0.0.17
func (tm *DAG) GetTransaction(txID string) (*Transaction, error)
GetTransaction retrieves transaction details
func (*DAG) HasPageNode ¶ added in v0.0.18
HasPageNode checks if the DAG contains any Page nodes
func (*DAG) InitializeActivityLogger ¶ added in v0.0.17
func (tm *DAG) InitializeActivityLogger(config ActivityLoggerConfig, persistence ActivityPersistence)
InitializeActivityLogger initializes the activity logger for the DAG
func (*DAG) IsLastNode ¶ added in v0.0.2
IsLastNode checks if a node is the last node in the DAG
func (*DAG) LogActivity ¶ added in v0.0.17
func (tm *DAG) LogActivity(ctx context.Context, level ActivityLevel, activityType ActivityType, message string, details map[string]interface{})
LogActivity logs an activity entry
func (*DAG) OptimizePerformance ¶ added in v0.0.16
OptimizePerformance triggers performance optimization
func (*DAG) PrintGraph ¶
func (tm *DAG) PrintGraph()
func (*DAG) ProcessTaskNew ¶ added in v0.0.10
func (*DAG) RemoveNode ¶ added in v0.0.16
RemoveNode removes the node with the given nodeID and adjusts the edges. For example, if A -> B and B -> C exist and B is removed, a new edge A -> C is created.
func (*DAG) ReportNodeResult ¶
func (*DAG) RollbackTransaction ¶ added in v0.0.16
RollbackTransaction rolls back a transaction
func (*DAG) SVGViewerHTML ¶ added in v0.0.18
SVGViewerHTML creates the HTML with advanced SVG zoom and pan functionality
func (*DAG) ScheduleTask ¶
func (*DAG) SetAlertThresholds ¶ added in v0.0.16
func (tm *DAG) SetAlertThresholds(thresholds *AlertThresholds)
SetAlertThresholds configures alert thresholds
func (*DAG) SetBatchProcessingEnabled ¶ added in v0.0.17
SetBatchProcessingEnabled enables or disables batch processing
func (*DAG) SetNotifyResponse ¶
func (*DAG) SetPostProcessHook ¶ added in v0.0.11
func (tm *DAG) SetPostProcessHook(hook func(ctx context.Context, node *Node, taskID string, result mq.Result))
SetPostProcessHook configures a function to be called after each node is processed.
func (*DAG) SetPreProcessHook ¶ added in v0.0.11
func (tm *DAG) SetPreProcessHook(hook func(ctx context.Context, node *Node, taskID string, payload json.RawMessage) context.Context)
SetPreProcessHook configures a function to be called before each node is processed.
func (*DAG) SetRateLimit ¶ added in v0.0.16
SetRateLimit sets rate limit for a specific node
func (*DAG) SetRetryConfig ¶ added in v0.0.16
func (tm *DAG) SetRetryConfig(config *RetryConfig)
SetRetryConfig sets the retry configuration
func (*DAG) SetStartNode ¶
func (*DAG) SetWebhookManager ¶ added in v0.0.16
func (tm *DAG) SetWebhookManager(manager *WebhookManager)
SetWebhookManager sets the webhook manager
func (*DAG) StartCleanup ¶ added in v0.0.17
StartCleanup starts the cleanup manager
func (*DAG) StartMonitoring ¶ added in v0.0.16
StartMonitoring starts the monitoring system
func (*DAG) StopCleanup ¶ added in v0.0.17
func (tm *DAG) StopCleanup()
StopCleanup stops the cleanup manager
func (*DAG) StopEnhanced ¶ added in v0.0.17
Enhanced Stop method with proper cleanup
func (*DAG) StopMonitoring ¶ added in v0.0.16
func (tm *DAG) StopMonitoring()
StopMonitoring stops the monitoring system
func (*DAG) TopologicalSort ¶
func (*DAG) UpdateConfiguration ¶ added in v0.0.16
UpdateConfiguration updates the DAG configuration
func (*DAG) ValidateDAG ¶ added in v0.0.16
ValidateDAG validates the DAG structure using the enhanced validator
type DAGCache ¶ added in v0.0.16
type DAGCache struct {
// contains filtered or unexported fields
}
DAGCache provides caching capabilities for DAG operations
func NewDAGCache ¶ added in v0.0.16
NewDAGCache creates a new DAG cache
func (*DAGCache) GetNodeResult ¶ added in v0.0.16
GetNodeResult retrieves a cached node result
func (*DAGCache) SetNodeResult ¶ added in v0.0.16
SetNodeResult caches a node result
type DAGConfig ¶ added in v0.0.16
type DAGConfig struct {
MaxConcurrentTasks int `json:"max_concurrent_tasks"`
TaskTimeout time.Duration `json:"task_timeout"`
NodeTimeout time.Duration `json:"node_timeout"`
RetryConfig *RetryConfig `json:"retry_config"`
CacheConfig *CacheConfig `json:"cache_config"`
RateLimitConfig *RateLimitConfig `json:"rate_limit_config"`
MonitoringEnabled bool `json:"monitoring_enabled"`
AlertingEnabled bool `json:"alerting_enabled"`
CleanupInterval time.Duration `json:"cleanup_interval"`
TransactionTimeout time.Duration `json:"transaction_timeout"`
BatchProcessingEnabled bool `json:"batch_processing_enabled"`
BatchSize int `json:"batch_size"`
BatchTimeout time.Duration `json:"batch_timeout"`
}
DAGConfig holds dynamic configuration for DAG
func DefaultDAGConfig ¶ added in v0.0.16
func DefaultDAGConfig() *DAGConfig
DefaultDAGConfig returns default DAG configuration
type DAGValidator ¶ added in v0.0.16
type DAGValidator struct {
// contains filtered or unexported fields
}
DAGValidator provides validation capabilities for DAG structure
func NewDAGValidator ¶ added in v0.0.16
func NewDAGValidator(dag *DAG) *DAGValidator
NewDAGValidator creates a new DAG validator
func (*DAGValidator) GetCriticalPath ¶ added in v0.0.16
func (v *DAGValidator) GetCriticalPath() ([]string, error)
GetCriticalPath finds the longest path in the DAG
func (*DAGValidator) GetNodeStatistics ¶ added in v0.0.16
func (v *DAGValidator) GetNodeStatistics() map[string]interface{}
GetNodeStatistics returns DAG statistics
func (*DAGValidator) GetTopologicalOrder ¶ added in v0.0.16
func (v *DAGValidator) GetTopologicalOrder() ([]string, error)
GetTopologicalOrder returns nodes in topological order
func (*DAGValidator) ValidateStructure ¶ added in v0.0.16
func (v *DAGValidator) ValidateStructure() error
ValidateStructure performs comprehensive DAG structure validation
type EdgeStyleConfig ¶ added in v0.0.17
type EdgeStyleConfig struct {
Color string
Style string
PenWidth string
ArrowSize string
FontSize string
}
EdgeStyleConfig contains styling information for edges
type EnhancedAPIHandler ¶ added in v0.0.16
type EnhancedAPIHandler struct {
// contains filtered or unexported fields
}
EnhancedAPIHandler provides enhanced API endpoints for DAG management
func NewEnhancedAPIHandler ¶ added in v0.0.16
func NewEnhancedAPIHandler(dag *DAG) *EnhancedAPIHandler
NewEnhancedAPIHandler creates a new enhanced API handler
func (*EnhancedAPIHandler) RegisterRoutes ¶ added in v0.0.16
func (h *EnhancedAPIHandler) RegisterRoutes(mux *http.ServeMux)
RegisterRoutes registers all enhanced API routes
type HTTPClient ¶ added in v0.0.16
type HTTPClient interface {
Post(url string, contentType string, body []byte, headers map[string]string) error
}
HTTPClient interface for HTTP requests
type Handler ¶ added in v0.0.14
func AvailableHandlers ¶
func AvailableHandlers() []Handler
type MemoryActivityPersistence ¶ added in v0.0.17
type MemoryActivityPersistence struct {
// contains filtered or unexported fields
}
MemoryActivityPersistence provides in-memory activity persistence for testing
func NewMemoryActivityPersistence ¶ added in v0.0.17
func NewMemoryActivityPersistence() *MemoryActivityPersistence
NewMemoryActivityPersistence creates a new in-memory persistence
func (*MemoryActivityPersistence) Close ¶ added in v0.0.17
func (mp *MemoryActivityPersistence) Close() error
Close closes the persistence
func (*MemoryActivityPersistence) GetStats ¶ added in v0.0.17
func (mp *MemoryActivityPersistence) GetStats(filter ActivityFilter) (ActivityStats, error)
GetStats returns statistics for the filtered entries
func (*MemoryActivityPersistence) Query ¶ added in v0.0.17
func (mp *MemoryActivityPersistence) Query(filter ActivityFilter) ([]ActivityEntry, error)
Query queries activity entries with filter
func (*MemoryActivityPersistence) Store ¶ added in v0.0.17
func (mp *MemoryActivityPersistence) Store(entries []ActivityEntry) error
Store stores activity entries in memory
type Monitor ¶ added in v0.0.16
type Monitor struct {
// contains filtered or unexported fields
}
Monitor provides comprehensive monitoring capabilities for DAG
func NewMonitor ¶ added in v0.0.16
NewMonitor creates a new DAG monitor
func (*Monitor) AddAlertHandler ¶ added in v0.0.16
func (m *Monitor) AddAlertHandler(handler AlertHandler)
AddAlertHandler adds an alert handler
func (*Monitor) GetMetrics ¶ added in v0.0.16
func (m *Monitor) GetMetrics() *MonitoringMetrics
GetMetrics returns current metrics
func (*Monitor) SetAlertThresholds ¶ added in v0.0.16
func (m *Monitor) SetAlertThresholds(thresholds *AlertThresholds)
SetAlertThresholds updates alert thresholds
type MonitoringMetrics ¶ added in v0.0.16
type MonitoringMetrics struct {
TasksTotal int64
TasksCompleted int64
TasksFailed int64
TasksCancelled int64
TasksInProgress int64
NodesExecuted map[string]int64
NodeExecutionTimes map[string][]time.Duration
NodeFailures map[string]int64
AverageExecutionTime time.Duration
TotalExecutionTime time.Duration
StartTime time.Time
LastTaskCompletedAt time.Time
ActiveTasks map[string]time.Time
NodeProcessingStats map[string]*NodeStats
// contains filtered or unexported fields
}
MonitoringMetrics holds comprehensive metrics for DAG monitoring
func NewMonitoringMetrics ¶ added in v0.0.16
func NewMonitoringMetrics() *MonitoringMetrics
NewMonitoringMetrics creates a new metrics instance
func (*MonitoringMetrics) GetNodeStats ¶ added in v0.0.16
func (m *MonitoringMetrics) GetNodeStats(nodeID string) *NodeStats
GetNodeStats returns statistics for a specific node
func (*MonitoringMetrics) GetSnapshot ¶ added in v0.0.16
func (m *MonitoringMetrics) GetSnapshot() *MonitoringMetrics
GetSnapshot returns a snapshot of current metrics
func (*MonitoringMetrics) RecordNodeEnd ¶ added in v0.0.16
func (m *MonitoringMetrics) RecordNodeEnd(nodeID string)
RecordNodeEnd records when a node finishes processing
func (*MonitoringMetrics) RecordNodeExecution ¶ added in v0.0.16
func (m *MonitoringMetrics) RecordNodeExecution(nodeID string, duration time.Duration, success bool)
RecordNodeExecution records node execution metrics
func (*MonitoringMetrics) RecordNodeStart ¶ added in v0.0.16
func (m *MonitoringMetrics) RecordNodeStart(nodeID string)
RecordNodeStart records when a node starts processing
func (*MonitoringMetrics) RecordTaskCompletion ¶ added in v0.0.16
func (m *MonitoringMetrics) RecordTaskCompletion(taskID string, status mq.Status)
RecordTaskCompletion records task completion
func (*MonitoringMetrics) RecordTaskStart ¶ added in v0.0.16
func (m *MonitoringMetrics) RecordTaskStart(taskID string)
RecordTaskStart records the start of a task
type Node ¶
type Node struct {
Label string
ID string
Edges []Edge
NodeType NodeType
Timeout time.Duration // ...new field for node-level timeout...
// contains filtered or unexported fields
}
func (*Node) SetTimeout ¶ added in v0.0.11
SetTimeout allows setting a maximum processing duration for the node.
type NodeFailureStats ¶ added in v0.0.17
type NodeFailureStats struct {
NodeID string `json:"node_id"`
FailureCount int64 `json:"failure_count"`
FailureRate float64 `json:"failure_rate"`
LastFailure time.Time `json:"last_failure"`
}
NodeFailureStats represents failure statistics for a node
type NodeRateLimit ¶ added in v0.0.16
type NodeRateLimit struct {
RequestsPerSecond float64 `json:"requests_per_second"`
Burst int `json:"burst"`
}
NodeRateLimit holds rate limit settings for a specific node
type NodeRetryManager ¶ added in v0.0.16
type NodeRetryManager struct {
// contains filtered or unexported fields
}
NodeRetryManager handles retry logic for individual nodes
func NewNodeRetryManager ¶ added in v0.0.16
func NewNodeRetryManager(config *RetryConfig, logger logger.Logger) *NodeRetryManager
NewNodeRetryManager creates a new retry manager
func (*NodeRetryManager) GetAttempts ¶ added in v0.0.16
func (rm *NodeRetryManager) GetAttempts(taskID, nodeID string) int
GetAttempts returns the number of attempts for a task/node combination
func (*NodeRetryManager) GetRetryDelay ¶ added in v0.0.16
func (rm *NodeRetryManager) GetRetryDelay(taskID, nodeID string) time.Duration
GetRetryDelay calculates the delay before the next retry
func (*NodeRetryManager) RecordAttempt ¶ added in v0.0.16
func (rm *NodeRetryManager) RecordAttempt(taskID, nodeID string)
RecordAttempt records a retry attempt
func (*NodeRetryManager) Reset ¶ added in v0.0.16
func (rm *NodeRetryManager) Reset(taskID, nodeID string)
Reset clears retry attempts for a task/node combination
func (*NodeRetryManager) ResetTask ¶ added in v0.0.16
func (rm *NodeRetryManager) ResetTask(taskID string)
ResetTask clears all retry attempts for a task
func (*NodeRetryManager) SetGlobalConfig ¶ added in v0.0.17
func (rm *NodeRetryManager) SetGlobalConfig(config *RetryConfig)
SetGlobalConfig sets the global retry configuration
func (*NodeRetryManager) SetNodeConfig ¶ added in v0.0.17
func (rm *NodeRetryManager) SetNodeConfig(nodeID string, config *RetryConfig)
SetNodeConfig sets retry configuration for a specific node
func (*NodeRetryManager) ShouldRetry ¶ added in v0.0.16
func (rm *NodeRetryManager) ShouldRetry(taskID, nodeID string, err error) bool
ShouldRetry determines if a failed node should be retried
type NodeStats ¶ added in v0.0.16
type NodeStats struct {
ExecutionCount int64
SuccessCount int64
FailureCount int64
TotalDuration time.Duration
AverageDuration time.Duration
MinDuration time.Duration
MaxDuration time.Duration
LastExecuted time.Time
LastSuccess time.Time
LastFailure time.Time
CurrentlyRunning int64
}
NodeStats holds statistics for individual nodes
type Operation ¶
type Operation struct {
ID string `json:"id"`
Key string `json:"key"`
Payload Payload
RequiredFields []string `json:"required_fields"`
OptionalFields []string `json:"optional_fields"`
GeneratedFields []string `json:"generated_fields"`
Type NodeType `json:"type"`
Tags []string `json:"tags"`
}
func (*Operation) GetMappedData ¶ added in v0.0.10
func (*Operation) ProcessTask ¶
type Operations ¶
type PerformanceOptimizer ¶ added in v0.0.16
type PerformanceOptimizer struct {
// contains filtered or unexported fields
}
PerformanceOptimizer optimizes DAG performance based on metrics
func NewPerformanceOptimizer ¶ added in v0.0.16
func NewPerformanceOptimizer(dag *DAG, monitor *Monitor, config *ConfigManager, logger logger.Logger) *PerformanceOptimizer
NewPerformanceOptimizer creates a new performance optimizer
func (*PerformanceOptimizer) OptimizePerformance ¶ added in v0.0.16
func (po *PerformanceOptimizer) OptimizePerformance() error
OptimizePerformance analyzes metrics and adjusts configuration
type Provider ¶
type Provider struct {
Mapping map[string]any `json:"mapping"`
UpdateMapping map[string]any `json:"update_mapping"`
InsertMapping map[string]any `json:"insert_mapping"`
Defaults map[string]any `json:"defaults"`
ProviderType string `json:"provider_type"`
Database string `json:"database"`
Source string `json:"source"`
Query string `json:"query"`
}
type RateLimitConfig ¶ added in v0.0.16
type RateLimitConfig struct {
Enabled bool `json:"enabled"`
GlobalLimit float64 `json:"global_limit"`
GlobalBurst int `json:"global_burst"`
NodeLimits map[string]NodeRateLimit `json:"node_limits"`
}
RateLimitConfig holds rate limiting configuration
type RateLimiter ¶ added in v0.0.16
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter provides rate limiting for DAG operations
func NewRateLimiter ¶ added in v0.0.16
func NewRateLimiter(logger logger.Logger) *RateLimiter
NewRateLimiter creates a new rate limiter
func (*RateLimiter) Allow ¶ added in v0.0.16
func (rl *RateLimiter) Allow(nodeID string) bool
Allow checks if the request is allowed for the given node
func (*RateLimiter) SetNodeLimit ¶ added in v0.0.16
func (rl *RateLimiter) SetNodeLimit(nodeID string, requestsPerSecond float64, burst int)
SetNodeLimit sets rate limit for a specific node
type RetryConfig ¶ added in v0.0.16
type RetryConfig struct {
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
Jitter bool
RetryCondition func(err error) bool
}
RetryConfig defines retry behavior for failed nodes
func DefaultRetryConfig ¶ added in v0.0.16
func DefaultRetryConfig() *RetryConfig
DefaultRetryConfig returns a sensible default retry configuration
type RetryableProcessor ¶ added in v0.0.16
type RetryableProcessor struct {
// contains filtered or unexported fields
}
RetryableProcessor wraps a processor with retry logic
func NewRetryableProcessor ¶ added in v0.0.16
func NewRetryableProcessor(processor mq.Processor, config *RetryConfig, logger logger.Logger) *RetryableProcessor
NewRetryableProcessor creates a processor with retry capabilities
func (*RetryableProcessor) Close ¶ added in v0.0.16
func (rp *RetryableProcessor) Close() error
Close closes the processor
func (*RetryableProcessor) Consume ¶ added in v0.0.16
func (rp *RetryableProcessor) Consume(ctx context.Context) error
Consume starts consuming messages
func (*RetryableProcessor) GetKey ¶ added in v0.0.16
func (rp *RetryableProcessor) GetKey() string
GetKey returns the processor key
func (*RetryableProcessor) GetType ¶ added in v0.0.16
func (rp *RetryableProcessor) GetType() string
GetType returns the processor type
func (*RetryableProcessor) Pause ¶ added in v0.0.16
func (rp *RetryableProcessor) Pause(ctx context.Context) error
Pause pauses the processor
func (*RetryableProcessor) ProcessTask ¶ added in v0.0.16
ProcessTask processes a task with retry logic
func (*RetryableProcessor) Resume ¶ added in v0.0.16
func (rp *RetryableProcessor) Resume(ctx context.Context) error
Resume resumes the processor
func (*RetryableProcessor) SetKey ¶ added in v0.0.16
func (rp *RetryableProcessor) SetKey(key string)
SetKey sets the processor key
type RollbackHandler ¶ added in v0.0.16
type RollbackHandler interface {
Rollback(operation TransactionOperation) error
}
RollbackHandler defines how to rollback operations
type SavePoint ¶ added in v0.0.17
type SavePoint struct {
ID string `json:"id"`
Name string `json:"name"`
Timestamp time.Time `json:"timestamp"`
State map[string]interface{} `json:"state"`
}
SavePoint represents a save point in a transaction
type SimpleHTTPClient ¶ added in v0.0.16
type SimpleHTTPClient struct {
// contains filtered or unexported fields
}
SimpleHTTPClient implements HTTPClient interface for webhook manager
func NewSimpleHTTPClient ¶ added in v0.0.16
func NewSimpleHTTPClient(timeout time.Duration) *SimpleHTTPClient
NewSimpleHTTPClient creates a new simple HTTP client
type TaskError ¶ added in v0.0.10
TaskError is used by node processors to indicate whether an error is recoverable.
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
func NewTaskManager ¶
func (*TaskManager) Pause ¶ added in v0.0.10
func (tm *TaskManager) Pause()
func (*TaskManager) ProcessTask ¶ added in v0.0.2
func (tm *TaskManager) ProcessTask(ctx context.Context, startNode string, payload json.RawMessage)
func (*TaskManager) Resume ¶ added in v0.0.10
func (tm *TaskManager) Resume()
func (*TaskManager) Stop ¶ added in v0.0.2
func (tm *TaskManager) Stop()
Stop gracefully stops the task manager
type TaskManagerConfig ¶ added in v0.0.10
type TaskMetrics ¶ added in v0.0.11
type TaskMetrics struct {
NotStarted int
Queued int
Cancelled int
Completed int
Failed int
// contains filtered or unexported fields
}
New task metrics type.
type TaskState ¶ added in v0.0.2
type TaskState struct {
UpdatedAt time.Time
NodeID string
Status mq.Status
Result mq.Result
// contains filtered or unexported fields
}
TaskState holds state and intermediate results for a given task (identified by a node ID).
type Transaction ¶ added in v0.0.16
type Transaction struct {
ID string `json:"id"`
TaskID string `json:"task_id"`
Status TransactionStatus `json:"status"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time,omitempty"`
Operations []TransactionOperation `json:"operations"`
SavePoints []SavePoint `json:"save_points"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
Transaction represents a transactional DAG execution
type TransactionManager ¶ added in v0.0.16
type TransactionManager struct {
// contains filtered or unexported fields
}
TransactionManager handles transaction-like operations for DAG execution
func NewTransactionManager ¶ added in v0.0.16
func NewTransactionManager(dag *DAG, logger logger.Logger) *TransactionManager
NewTransactionManager creates a new transaction manager
func (*TransactionManager) AddOperation ¶ added in v0.0.17
func (tm *TransactionManager) AddOperation(txID string, operation TransactionOperation) error
AddOperation adds an operation to a transaction
func (*TransactionManager) AddSavePoint ¶ added in v0.0.16
func (tm *TransactionManager) AddSavePoint(txID, name string, state map[string]interface{}) error
AddSavePoint adds a save point to the transaction
func (*TransactionManager) BeginTransaction ¶ added in v0.0.16
func (tm *TransactionManager) BeginTransaction(taskID string) *Transaction
BeginTransaction starts a new transaction
func (*TransactionManager) CommitTransaction ¶ added in v0.0.16
func (tm *TransactionManager) CommitTransaction(txID string) error
CommitTransaction commits a transaction
func (*TransactionManager) GetTransaction ¶ added in v0.0.17
func (tm *TransactionManager) GetTransaction(txID string) (*Transaction, error)
GetTransaction retrieves a transaction by ID
func (*TransactionManager) RollbackTransaction ¶ added in v0.0.16
func (tm *TransactionManager) RollbackTransaction(txID string) error
RollbackTransaction rolls back a transaction
type TransactionOperation ¶ added in v0.0.17
type TransactionOperation struct {
ID string `json:"id"`
Type string `json:"type"`
NodeID string `json:"node_id"`
Data map[string]interface{} `json:"data"`
Timestamp time.Time `json:"timestamp"`
RollbackHandler RollbackHandler `json:"-"`
}
TransactionOperation represents an operation within a transaction
type TransactionStatus ¶ added in v0.0.16
type TransactionStatus string
TransactionStatus represents the status of a transaction
const ( TransactionStatusStarted TransactionStatus = "started" TransactionStatusCommitted TransactionStatus = "committed" TransactionStatusRolledBack TransactionStatus = "rolled_back" TransactionStatusFailed TransactionStatus = "failed" )
type WebSocketHandler ¶ added in v0.0.16
type WebSocketHandler struct {
// contains filtered or unexported fields
}
WebSocketHandler provides real-time monitoring via WebSocket
func NewWebSocketHandler ¶ added in v0.0.16
func NewWebSocketHandler(dag *DAG) *WebSocketHandler
NewWebSocketHandler creates a new WebSocket handler
func (*WebSocketHandler) HandleWebSocket ¶ added in v0.0.16
func (h *WebSocketHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request)
HandleWebSocket handles WebSocket connections for real-time monitoring
type WebhookConfig ¶ added in v0.0.16
type WebhookConfig struct {
URL string `json:"url"`
Headers map[string]string `json:"headers"`
Method string `json:"method"`
RetryCount int `json:"retry_count"`
Timeout time.Duration `json:"timeout"`
Events []string `json:"events"`
}
WebhookConfig defines webhook configuration
type WebhookEvent ¶ added in v0.0.16
type WebhookEvent struct {
Type string `json:"type"`
TaskID string `json:"task_id"`
NodeID string `json:"node_id,omitempty"`
Timestamp time.Time `json:"timestamp"`
Data map[string]interface{} `json:"data"`
}
WebhookEvent represents an event to send via webhook
type WebhookManager ¶ added in v0.0.16
type WebhookManager struct {
// contains filtered or unexported fields
}
WebhookManager handles webhook notifications
func NewWebhookManager ¶ added in v0.0.16
func NewWebhookManager(httpClient HTTPClient, logger logger.Logger) *WebhookManager
NewWebhookManager creates a new webhook manager
func (*WebhookManager) AddWebhook ¶ added in v0.0.16
func (wm *WebhookManager) AddWebhook(event string, config WebhookConfig)
AddWebhook adds a webhook configuration
func (*WebhookManager) TriggerWebhook ¶ added in v0.0.16
func (wm *WebhookManager) TriggerWebhook(event WebhookEvent)
TriggerWebhook sends webhook notifications for an event