Documentation
¶
Overview ¶
context_keys.go
Index ¶
- Constants
- func AddDAG(key string, handler *DAG)
- func AddHandler(key string, handler func(string) mq.Processor)
- func AvailableDAG() []string
- func AvailableHandlers() []string
- func ClearDAG()
- func GetHandler(key string) func(string) mq.Processor
- func GetVal(c context.Context, v string, data map[string]any) (key string, val any)
- func Header(c context.Context, headerKey string) (val map[string]any, exists bool)
- func HeaderVal(c context.Context, headerKey string, key string) (val any)
- func WsEvents(s *sio.Server)
- type Condition
- type ConditionProcessor
- type DAG
- func (tm *DAG) AddCondition(fromNode string, conditions map[string]string) *DAG
- func (tm *DAG) AddDAGNode(nodeType NodeType, name string, key string, dag *DAG, firstNode ...bool) *DAG
- func (tm *DAG) AddDeferredNode(nodeType NodeType, name, key string, firstNode ...bool) error
- func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string) *DAG
- func (tm *DAG) AddNode(nodeType NodeType, name, nodeID string, handler mq.Processor, ...) *DAG
- func (tm *DAG) AssignTopic(topic string)
- func (tm *DAG) BaseURI() string
- func (tm *DAG) CancelTask(taskID string) error
- func (tm *DAG) ClassifyEdges(startNodes ...string) (string, bool, error)
- func (tm *DAG) Close() error
- func (tm *DAG) Consume(ctx context.Context) error
- func (tm *DAG) ExportDOT(direction ...Direction) string
- func (tm *DAG) GetKey() string
- func (tm *DAG) GetLastNodes() ([]*Node, error)
- func (tm *DAG) GetNextNodes(key string) ([]*Node, error)
- func (tm *DAG) GetPreviousNodes(key string) ([]*Node, error)
- func (tm *DAG) GetReport() string
- func (tm *DAG) GetStartNode() string
- func (tm *DAG) GetStatus() map[string]interface{}
- func (d *DAG) GetTaskMetrics() TaskMetrics
- func (tm *DAG) GetType() string
- func (tm *DAG) Handlers(app any, prefix string)
- func (tm *DAG) IsLastNode(key string) (bool, error)
- func (tm *DAG) IsReady() bool
- func (tm *DAG) Logger() logger.Logger
- func (tm *DAG) Pause(_ context.Context) error
- func (tm *DAG) PauseConsumer(ctx context.Context, id string)
- func (tm *DAG) PrintGraph()
- func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result
- func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
- func (tm *DAG) ProcessTaskNew(ctx context.Context, task *mq.Task) mq.Result
- func (tm *DAG) ReportNodeResult(callback func(mq.Result))
- func (tm *DAG) Reset()
- func (tm *DAG) Resume(_ context.Context) error
- func (tm *DAG) ResumeConsumer(ctx context.Context, id string)
- func (tm *DAG) SaveDOTFile(filename string, direction ...Direction) error
- func (tm *DAG) SavePNG(pngFile string) error
- func (tm *DAG) SaveSVG(svgFile string) error
- func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.SchedulerOption) mq.Result
- func (tm *DAG) SetKey(key string)
- func (tm *DAG) SetNotifyResponse(callback mq.Callback)
- func (tm *DAG) SetPostProcessHook(hook func(ctx context.Context, node *Node, taskID string, result mq.Result))
- func (tm *DAG) SetPreProcessHook(...)
- func (tm *DAG) SetStartNode(node string)
- func (tm *DAG) SetupWS() *sio.Server
- func (tm *DAG) Start(ctx context.Context, addr string) error
- func (tm *DAG) Stop(ctx context.Context) error
- func (tm *DAG) TopologicalSort() (stack []string)
- func (tm *DAG) Validate() error
- type Debugger
- type Direction
- type Edge
- type EdgeType
- type List
- type Node
- type NodeType
- type Operation
- func (e *Operation) Close() error
- func (e *Operation) Consume(_ context.Context) error
- func (e *Operation) Debug(ctx context.Context, task *mq.Task)
- func (e *Operation) GetKey() string
- func (e *Operation) GetMappedData(ctx context.Context, task *mq.Task) map[string]any
- func (e *Operation) GetTags() []string
- func (e *Operation) GetType() string
- func (e *Operation) Pause(_ context.Context) error
- func (e *Operation) ProcessTask(_ context.Context, task *mq.Task) mq.Result
- func (e *Operation) Resume(_ context.Context) error
- func (e *Operation) SetConfig(payload Payload)
- func (e *Operation) SetKey(key string)
- func (e *Operation) SetTags(tag ...string)
- func (e *Operation) Stop(_ context.Context) error
- func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[string]any, error)
- type Operations
- type Payload
- type Processor
- type Provider
- type TaskError
- type TaskManager
- type TaskManagerConfig
- type TaskMetrics
- type TaskState
Constants ¶
View Source
const ( Delimiter = "___" ContextIndex = "index" DefaultChannelSize = 1000 RetryInterval = 5 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func AvailableDAG ¶
func AvailableDAG() []string
func AvailableHandlers ¶
func AvailableHandlers() []string
Types ¶
type ConditionProcessor ¶
type DAG ¶
type DAG struct {
Error error
Notifier *sio.Server
// New hook fields:
PreProcessHook func(ctx context.Context, node *Node, taskID string, payload json.RawMessage) context.Context
PostProcessHook func(ctx context.Context, node *Node, taskID string, result mq.Result)
// contains filtered or unexported fields
}
func (*DAG) AddCondition ¶
func (*DAG) AddDAGNode ¶
func (*DAG) AddDeferredNode ¶
func (*DAG) AssignTopic ¶
func (*DAG) CancelTask ¶ added in v0.0.11
New method to cancel a running task.
func (*DAG) ClassifyEdges ¶
func (*DAG) GetLastNodes ¶ added in v0.0.2
func (*DAG) GetPreviousNodes ¶ added in v0.0.2
func (*DAG) GetStartNode ¶
func (*DAG) GetStatus ¶ added in v0.0.11
GetStatus returns a summary of the DAG including node and task counts.
func (*DAG) GetTaskMetrics ¶ added in v0.0.11
func (d *DAG) GetTaskMetrics() TaskMetrics
Getter for task metrics.
func (*DAG) PrintGraph ¶
func (tm *DAG) PrintGraph()
func (*DAG) ProcessTaskNew ¶ added in v0.0.10
func (*DAG) ReportNodeResult ¶
func (*DAG) ScheduleTask ¶
func (*DAG) SetNotifyResponse ¶
func (*DAG) SetPostProcessHook ¶ added in v0.0.11
func (tm *DAG) SetPostProcessHook(hook func(ctx context.Context, node *Node, taskID string, result mq.Result))
SetPostProcessHook configures a function to be called after each node is processed.
func (*DAG) SetPreProcessHook ¶ added in v0.0.11
func (tm *DAG) SetPreProcessHook(hook func(ctx context.Context, node *Node, taskID string, payload json.RawMessage) context.Context)
SetPreProcessHook configures a function to be called before each node is processed.
func (*DAG) SetStartNode ¶
func (*DAG) TopologicalSort ¶
type Node ¶
type Node struct {
Label string
ID string
Edges []Edge
NodeType NodeType
Timeout time.Duration // ...new field for node-level timeout...
// contains filtered or unexported fields
}
func (*Node) SetTimeout ¶ added in v0.0.11
SetTimeout allows setting a maximum processing duration for the node.
type Operation ¶
type Operation struct {
ID string `json:"id"`
Key string `json:"key"`
Payload Payload
RequiredFields []string `json:"required_fields"`
OptionalFields []string `json:"optional_fields"`
GeneratedFields []string `json:"generated_fields"`
Type NodeType `json:"type"`
Tags []string `json:"tags"`
}
func (*Operation) GetMappedData ¶ added in v0.0.10
func (*Operation) ProcessTask ¶
type Operations ¶
type Provider ¶
type Provider struct {
Mapping map[string]any `json:"mapping"`
UpdateMapping map[string]any `json:"update_mapping"`
InsertMapping map[string]any `json:"insert_mapping"`
Defaults map[string]any `json:"defaults"`
ProviderType string `json:"provider_type"`
Database string `json:"database"`
Source string `json:"source"`
Query string `json:"query"`
}
type TaskError ¶ added in v0.0.10
TaskError is used by node processors to indicate whether an error is recoverable.
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
func NewTaskManager ¶
func (*TaskManager) Pause ¶ added in v0.0.10
func (tm *TaskManager) Pause()
func (*TaskManager) ProcessTask ¶ added in v0.0.2
func (tm *TaskManager) ProcessTask(ctx context.Context, startNode string, payload json.RawMessage)
func (*TaskManager) Resume ¶ added in v0.0.10
func (tm *TaskManager) Resume()
func (*TaskManager) Stop ¶ added in v0.0.2
func (tm *TaskManager) Stop()
type TaskManagerConfig ¶ added in v0.0.10
Click to show internal directories.
Click to hide internal directories.