subscriber

package
v0.0.50 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2021 License: MIT Imports: 22 Imported by: 12

Documentation

Index

Constants

Variables

View Source
var PipelineStatusNames = map[PipelineStatus]string{
	0: "Open",
	1: "Half Open",
	2: "Close",
}

Functions

This section is empty.

Types

type CollectionSnapshot added in v0.0.13

type CollectionSnapshot struct {
	// contains filtered or unexported fields
}

type DataEvent added in v0.0.40

type DataEvent struct {
	PipelineID uint64
	Sequence   uint64
	RawData    []byte
	//	Payload    *gravity_sdk_types_projection.Projection
	Payload *gravity_sdk_types_record.Record
}

type EventHandler added in v0.0.40

type EventHandler struct {
	// contains filtered or unexported fields
}

func NewEventHandler added in v0.0.40

func NewEventHandler(subscriber *Subscriber) *EventHandler

func (*EventHandler) ProcessEvent added in v0.0.40

func (eh *EventHandler) ProcessEvent(data []byte) error

type InitialLoadOptions added in v0.0.10

type InitialLoadOptions struct {
	Enabled      bool
	Mode         string
	OmittedCount uint64
}

type Message

type Message struct {
	Pipeline     *Pipeline
	Subscription *Subscription
	Type         MessageType
	Payload      interface{}
	Callback     func(*Message)
}

func NewMessage added in v0.0.40

func NewMessage(pipeline *Pipeline, sub *Subscription, msgType MessageType, payload interface{}) *Message

func (*Message) Ack

func (msg *Message) Ack()

type MessageHandler

type MessageHandler func(*Message)

type MessageType added in v0.0.40

type MessageType int32
const (
	MESSAGE_TYPE_EVENT MessageType = iota
	MESSAGE_TYPE_SNAPSHOT
)

type Options

type Options struct {
	Endpoint    string
	Domain      string
	Key         *keyring.KeyInfo
	WorkerCount int
	BufferSize  int
	ChunkSize   int
	Verbose     bool
	StateStore  StateStore
	InitialLoad InitialLoadOptions
}

func NewOptions

func NewOptions() *Options

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(subscriber *Subscriber, id uint64, lastSeq uint64) *Pipeline

func (*Pipeline) Awake added in v0.0.13

func (pipeline *Pipeline) Awake() error

func (*Pipeline) Fail added in v0.0.40

func (pipeline *Pipeline) Fail()

func (*Pipeline) Initialize added in v0.0.40

func (pipeline *Pipeline) Initialize() error

func (*Pipeline) SetUpdatedSequence added in v0.0.40

func (pipeline *Pipeline) SetUpdatedSequence(updatedSeq uint64) error

func (*Pipeline) Succeed added in v0.0.40

func (pipeline *Pipeline) Succeed()

func (*Pipeline) UpdateLastSequence

func (pipeline *Pipeline) UpdateLastSequence(sequence uint64)

type PipelineState

type PipelineState interface {
	GetLastSequence() uint64
	UpdateLastSequence(uint64) error
	Flush() error
}

type PipelineStatus added in v0.0.40

type PipelineStatus int32
const (
	PIPELINE_STATUS_OPEN PipelineStatus = iota
	PIPELINE_STATUS_HALF_OPEN
	PIPELINE_STATUS_CLOSE
)

type Runner added in v0.0.40

type Runner struct {
	// contains filtered or unexported fields
}

func NewRunner added in v0.0.40

func NewRunner() *Runner

func (*Runner) AddPipeline added in v0.0.40

func (runner *Runner) AddPipeline(pipeline *Pipeline)

func (*Runner) Awake added in v0.0.40

func (runner *Runner) Awake(pipelineID uint64)

func (*Runner) Start added in v0.0.40

func (runner *Runner) Start()

func (*Runner) Stop added in v0.0.40

func (runner *Runner) Stop()

type Snapshot added in v0.0.10

type Snapshot struct {
	// contains filtered or unexported fields
}

func NewSnapshot added in v0.0.10

func NewSnapshot(pipeline *Pipeline) *Snapshot

func (*Snapshot) Close added in v0.0.13

func (snapshot *Snapshot) Close() error

func (*Snapshot) Create added in v0.0.10

func (snapshot *Snapshot) Create() error

func (*Snapshot) Pull added in v0.0.10

func (snapshot *Snapshot) Pull() (string, [][]byte, error)

func (snapshot *Snapshot) Pull() ([]*gravity_sdk_types_snapshot_record.SnapshotRecord, error) {

type SnapshotEvent added in v0.0.40

type SnapshotEvent struct {
	PipelineID uint64
	Collection string
	RawData    []byte
	Payload    *gravity_sdk_types_snapshot_record.SnapshotRecord
}

type StateStore

type StateStore interface {
	GetPipelineState(uint64) (PipelineState, error)
	GetPipelines() []uint64
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(options *Options) *Subscriber

func NewSubscriberWithClient

func NewSubscriberWithClient(client *core.Client, options *Options) *Subscriber

func (*Subscriber) AddAllPipelines

func (sub *Subscriber) AddAllPipelines() error

func (*Subscriber) AddPipeline

func (sub *Subscriber) AddPipeline(pipeline *Pipeline) error

func (*Subscriber) Connect

func (sub *Subscriber) Connect(host string, options *core.Options) error

func (*Subscriber) Disconnect

func (sub *Subscriber) Disconnect()

func (*Subscriber) GetCollectionInfo

func (sub *Subscriber) GetCollectionInfo(collection string) []string

func (*Subscriber) GetEndpoint added in v0.0.18

func (sub *Subscriber) GetEndpoint() (*core.Endpoint, error)

func (*Subscriber) GetPipeline added in v0.0.13

func (sub *Subscriber) GetPipeline(pipelineID uint64) *Pipeline

func (*Subscriber) GetPipelineCount

func (sub *Subscriber) GetPipelineCount() (uint64, error)

func (*Subscriber) Register

func (sub *Subscriber) Register(subscriberType subscriber_manager_pb.SubscriberType, component string, subscriberID string, name string) error

func (*Subscriber) ReleasePipeline added in v0.0.13

func (sub *Subscriber) ReleasePipeline(pipelineID uint64)

func (sub *Subscriber) AwakePipeline(pipelineID uint64) {

	if sub.scheduler == nil {
		return
	}

	sub.scheduler.Awake(pipelineID)
}

func (*Subscriber) SetEventHandler added in v0.0.13

func (sub *Subscriber) SetEventHandler(cb MessageHandler)

func (*Subscriber) SetSnapshotHandler added in v0.0.13

func (sub *Subscriber) SetSnapshotHandler(cb MessageHandler)

func (*Subscriber) Start added in v0.0.13

func (sub *Subscriber) Start()

func (*Subscriber) SubscribeToCollections

func (sub *Subscriber) SubscribeToCollections(colMap map[string][]string) error

func (*Subscriber) SubscribeToPipelines added in v0.0.20

func (sub *Subscriber) SubscribeToPipelines(pipelines []uint64) error

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}
var projectionPool = sync.Pool{
	New: func() interface{} {
		return &gravity_sdk_types_projection.Projection{}
	},
}

func NewSubscription

func NewSubscription(subscriber *Subscriber, bufferSize int) *Subscription

func (*Subscription) Push added in v0.0.40

func (s *Subscription) Push(msg *Message)

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL