Documentation
¶
Index ¶
- func RetryUntilSuccessfulWatcherCreation(js *jsclient.JetStreamContext, bucketName string, infiniteLoop bool, ...) nats.KeyWatcher
- type Edge
- type Fetcher
- type FromVertexer
- type OffsetTimeline
- func (t *OffsetTimeline) Capacity() int
- func (t *OffsetTimeline) Dump() string
- func (t *OffsetTimeline) GetEventTime(inputOffset isb.Offset) int64
- func (t *OffsetTimeline) GetEventtimeFromInt64(inputOffsetInt64 int64) int64
- func (t *OffsetTimeline) GetHeadOffset() int64
- func (t *OffsetTimeline) GetOffset(eventTime int64) int64
- func (t *OffsetTimeline) GetTailOffset() int64
- func (t *OffsetTimeline) Put(node OffsetWatermark)
- type OffsetWatermark
- type ProcessorHeartbeat
- type ProcessorToFetch
- type VertexOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RetryUntilSuccessfulWatcherCreation ¶
func RetryUntilSuccessfulWatcherCreation(js *jsclient.JetStreamContext, bucketName string, infiniteLoop bool, log *zap.SugaredLogger) nats.KeyWatcher
RetryUntilSuccessfulWatcherCreation creates a watcher and will wait till it is created if infiniteLoop is set to true. TODO: use `wait.ExponentialBackoffWithContext`
Types ¶
type Edge ¶ added in v0.5.3
type Edge struct {
// contains filtered or unexported fields
}
Edge is the edge relation between two vertices.
func NewEdgeBuffer ¶
func NewEdgeBuffer(ctx context.Context, edgeName string, fromV FromVertexer) *Edge
NewEdgeBuffer returns a new Edge. FromVertex has the details about the processors responsible for writing to this edge.
func (*Edge) GetHeadWatermark ¶ added in v0.5.4
GetHeadWatermark returns the watermark using the HeadOffset (latest offset). This can be used in showing the watermark progression for a vertex when not consuming the messages directly (eg. UX, tests,)
type Fetcher ¶
type Fetcher interface {
// GetWatermark returns the inorder monotonically increasing watermark of the edge connected to Vn-1.
GetWatermark(offset isb.Offset) processor.Watermark
// GetHeadWatermark returns the latest watermark based on the head offset
GetHeadWatermark() processor.Watermark
}
Fetcher fetches watermark data from Vn-1 vertex.
type FromVertexer ¶
type FromVertexer interface {
// GetAllProcessors fetches all the processors from Vn-1 vertex. processors could be pods or when the vertex is a
// source vertex, it could be partitions if the source is Kafka.
GetAllProcessors() map[string]*ProcessorToFetch
// GetProcessor gets a processor.
GetProcessor(processor string) *ProcessorToFetch
// DeleteProcessor deletes a processor.
DeleteProcessor(processor string)
}
FromVertexer defines an interface which builds the view of Vn-th vertex from the point of view of Vn-th vertex.
func NewFromVertex ¶
func NewFromVertex(ctx context.Context, hbWatcher store.WatermarkKVWatcher, otWatcher store.WatermarkKVWatcher, inputOpts ...VertexOption) FromVertexer
NewFromVertex returns `FromVertex`
type OffsetTimeline ¶
type OffsetTimeline struct {
// contains filtered or unexported fields
}
OffsetTimeline is to store the event time to the offset records. Our list is sorted by event time from highest to lowest.
func NewOffsetTimeline ¶
func NewOffsetTimeline(ctx context.Context, c int) *OffsetTimeline
NewOffsetTimeline returns OffsetTimeline.
func (*OffsetTimeline) Capacity ¶
func (t *OffsetTimeline) Capacity() int
Capacity returns the capacity of the OffsetTimeline list.
func (*OffsetTimeline) Dump ¶
func (t *OffsetTimeline) Dump() string
Dump dumps the in-memory representation of the OffsetTimeline. Could get very ugly if the list is large, like > 100 elements. I am assuming we will have it in 10K+ (86400 seconds are there in a day).
func (*OffsetTimeline) GetEventTime ¶
func (t *OffsetTimeline) GetEventTime(inputOffset isb.Offset) int64
GetEventTime will return the event-time for the given offset. TODO(jyu6): will make Watermark an interface make it easy to get a Watermark and return an Offset?
func (*OffsetTimeline) GetEventtimeFromInt64 ¶ added in v0.5.4
func (t *OffsetTimeline) GetEventtimeFromInt64(inputOffsetInt64 int64) int64
func (*OffsetTimeline) GetHeadOffset ¶
func (t *OffsetTimeline) GetHeadOffset() int64
GetHeadOffset returns the head offset, that is the most recent offset which will have the highest Watermark.
func (*OffsetTimeline) GetOffset ¶
func (t *OffsetTimeline) GetOffset(eventTime int64) int64
GetOffset will return the offset for the given event-time. TODO(jyu6): will make Watermark an interface make it easy to pass an Offset and return a Watermark?
func (*OffsetTimeline) GetTailOffset ¶
func (t *OffsetTimeline) GetTailOffset() int64
GetTailOffset returns the smallest offset with the smallest watermark.
func (*OffsetTimeline) Put ¶
func (t *OffsetTimeline) Put(node OffsetWatermark)
Put inserts the OffsetWatermark into list. It ensures that the list will remain sorted after the insert.
type OffsetWatermark ¶
type OffsetWatermark struct {
// contains filtered or unexported fields
}
OffsetWatermark stores the maximum offset for the given event time we use basic data type int64 to compare the value
type ProcessorHeartbeat ¶
type ProcessorHeartbeat struct {
// contains filtered or unexported fields
}
ProcessorHeartbeat has details about each processor heartbeat. This information is populated by watching the Vn-1th vertex's processors. It stores only the latest heartbeat value.
func NewProcessorHeartbeat ¶
func NewProcessorHeartbeat() *ProcessorHeartbeat
NewProcessorHeartbeat returns ProcessorHeartbeat.
func (*ProcessorHeartbeat) Delete ¶
func (hb *ProcessorHeartbeat) Delete(key string)
Delete deletes a processor from the ProcessorHeartbeat table.
func (*ProcessorHeartbeat) Get ¶
func (hb *ProcessorHeartbeat) Get(key string) int64
Get gets the heartbeat for a given processor.
func (*ProcessorHeartbeat) GetAll ¶
func (hb *ProcessorHeartbeat) GetAll() map[string]int64
GetAll returns all the heartbeat entries in the heartbeat table.
func (*ProcessorHeartbeat) Put ¶
func (hb *ProcessorHeartbeat) Put(key string, value int64)
Put inserts a heartbeat entry for a given processor key and value.
type ProcessorToFetch ¶ added in v0.5.3
type ProcessorToFetch struct {
// contains filtered or unexported fields
}
ProcessorToFetch is the smallest unit of entity (from we which we fetch data) that does inorder processing or contains inorder data.
func NewProcessorToFetch ¶ added in v0.5.3
func NewProcessorToFetch(ctx context.Context, processor processor.ProcessorEntitier, capacity int, watcher store.WatermarkKVWatcher) *ProcessorToFetch
NewProcessorToFetch creates ProcessorToFetch.
func (*ProcessorToFetch) IsActive ¶ added in v0.5.3
func (p *ProcessorToFetch) IsActive() bool
IsActive returns whether a processor is active.
func (*ProcessorToFetch) IsDeleted ¶ added in v0.5.3
func (p *ProcessorToFetch) IsDeleted() bool
IsDeleted returns whether a processor has been deleted.
func (*ProcessorToFetch) IsInactive ¶ added in v0.5.3
func (p *ProcessorToFetch) IsInactive() bool
IsInactive returns whether a processor is inactive (no heartbeats or any sort).
func (*ProcessorToFetch) String ¶ added in v0.5.3
func (p *ProcessorToFetch) String() string
type VertexOption ¶
type VertexOption func(*vertexOptions)
VertexOption set options for FromVertex.
func WithPodHeartbeatRate ¶
func WithPodHeartbeatRate(rate int64) VertexOption
WithPodHeartbeatRate sets the heartbeat rate in seconds.
func WithRefreshingProcessorsRate ¶
func WithRefreshingProcessorsRate(rate int64) VertexOption
WithRefreshingProcessorsRate sets the processor refreshing rate in seconds.
func WithSeparateOTBuckets ¶
func WithSeparateOTBuckets(separate bool) VertexOption
WithSeparateOTBuckets creates a different bucket for maintaining each processor offset-timeline.