Documentation
¶
Index ¶
- type Barrier
- type BarrierAligner
- type BarrierHandler
- type BarrierTracker
- type BufferOrEvent
- type Coordinator
- func (c *Coordinator) Activate() error
- func (c *Coordinator) ActiveForceSaveState()
- func (c *Coordinator) Deactivate() error
- func (c *Coordinator) FinishForceSaveState()
- func (c *Coordinator) ForceSaveState() (chan any, error)
- func (c *Coordinator) GetCompleteCount() int
- func (c *Coordinator) GetLatest() int64
- func (c *Coordinator) IsActivated() bool
- type Message
- type NonSinkTask
- type NonSourceTask
- type Responder
- type ResponderExecutor
- type Signal
- type SinkTask
- type SourceSubTopoTask
- type StreamCheckpointContext
- type StreamTask
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BarrierAligner ¶
type BarrierAligner struct {
// contains filtered or unexported fields
}
For qos 2, block an input until all barriers are received
func NewBarrierAligner ¶
func NewBarrierAligner(responder Responder, inputCount int) *BarrierAligner
func (*BarrierAligner) Process ¶
func (h *BarrierAligner) Process(data *BufferOrEvent, ctx api.StreamContext) bool
func (*BarrierAligner) SetOutput ¶
func (h *BarrierAligner) SetOutput(output chan<- *BufferOrEvent)
type BarrierHandler ¶
type BarrierHandler interface {
Process(data *BufferOrEvent, ctx api.StreamContext) bool // If data is barrier return true, else return false
SetOutput(chan<- *BufferOrEvent) // It is using for block a channel
}
type BarrierTracker ¶
type BarrierTracker struct {
// contains filtered or unexported fields
}
For qos 1, simple track barriers
func NewBarrierTracker ¶
func NewBarrierTracker(responder Responder, inputCount int) *BarrierTracker
func (*BarrierTracker) Process ¶
func (h *BarrierTracker) Process(data *BufferOrEvent, ctx api.StreamContext) bool
func (*BarrierTracker) SetOutput ¶
func (h *BarrierTracker) SetOutput(_ chan<- *BufferOrEvent)
type BufferOrEvent ¶
type BufferOrEvent struct {
Data interface{}
Channel string
}
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
func NewCoordinator ¶
func NewCoordinator(ruleId string, sources []StreamTask, operators []NonSourceTask, sinks []SinkTask, qos def.Qos, store api.Store, interval time.Duration, ctx api.StreamContext) *Coordinator
func (*Coordinator) Activate ¶
func (c *Coordinator) Activate() error
func (*Coordinator) ActiveForceSaveState ¶ added in v2.0.7
func (c *Coordinator) ActiveForceSaveState()
func (*Coordinator) Deactivate ¶
func (c *Coordinator) Deactivate() error
func (*Coordinator) FinishForceSaveState ¶ added in v2.0.7
func (c *Coordinator) FinishForceSaveState()
func (*Coordinator) ForceSaveState ¶ added in v2.0.7
func (c *Coordinator) ForceSaveState() (chan any, error)
func (*Coordinator) GetLatest ¶
func (c *Coordinator) GetLatest() int64
func (*Coordinator) IsActivated ¶
func (c *Coordinator) IsActivated() bool
type NonSinkTask ¶
type NonSinkTask interface {
Broadcast(data any)
}
type NonSourceTask ¶
type NonSourceTask interface {
StreamTask
GetInputCount() int
AddInputCount()
SetBarrierHandler(BarrierHandler)
}
type ResponderExecutor ¶
type ResponderExecutor struct {
// contains filtered or unexported fields
}
func NewResponderExecutor ¶
func NewResponderExecutor(responder chan<- *Signal, task StreamTask) *ResponderExecutor
func (*ResponderExecutor) GetName ¶
func (re *ResponderExecutor) GetName() string
func (*ResponderExecutor) TriggerCheckpoint ¶
func (re *ResponderExecutor) TriggerCheckpoint(checkpointId int64) error
type SinkTask ¶
type SinkTask interface {
NonSourceTask
}
type SourceSubTopoTask ¶
type SourceSubTopoTask interface {
EnableCheckpoint(sources *[]StreamTask, ops *[]NonSourceTask)
}
type StreamCheckpointContext ¶
type StreamTask ¶
type StreamTask interface {
GetName() string
GetStreamContext() api.StreamContext
SetQos(qos def.Qos)
}
Click to show internal directories.
Click to hide internal directories.