Documentation
¶
Index ¶
- Constants
- Variables
- type CollectionSnapshot
- type DataEvent
- type EventHandler
- type InitialLoadOptions
- type Message
- type MessageHandler
- type MessageType
- type Options
- type Pipeline
- type PipelineState
- type PipelineStatus
- type Runner
- type Snapshot
- type SnapshotEvent
- type StateStore
- type Subscriber
- func (sub *Subscriber) AddAllPipelines() error
- func (sub *Subscriber) AddPipeline(pipeline *Pipeline) error
- func (sub *Subscriber) Connect(host string, options *core.Options) error
- func (sub *Subscriber) Disconnect()
- func (sub *Subscriber) GetCollectionInfo(collection string) []string
- func (sub *Subscriber) GetEndpoint() (*core.Endpoint, error)
- func (sub *Subscriber) GetPipeline(pipelineID uint64) *Pipeline
- func (sub *Subscriber) GetPipelineCount() (uint64, error)
- func (sub *Subscriber) Register(subscriberType subscriber_manager_pb.SubscriberType, component string, ...) error
- func (sub *Subscriber) ReleasePipeline(pipelineID uint64)
- func (sub *Subscriber) SetEventHandler(cb MessageHandler)
- func (sub *Subscriber) SetSnapshotHandler(cb MessageHandler)
- func (sub *Subscriber) Start()
- func (sub *Subscriber) SubscribeToCollections(colMap map[string][]string) error
- func (sub *Subscriber) SubscribeToPipelines(pipelines []uint64) error
- type Subscription
Constants ¶
View Source
const ( SubscriberType_Transmitter subscriber_manager_pb.SubscriberType = subscriber_manager_pb.SubscriberType_TRANSMITTER SubscriberType_Exporter subscriber_manager_pb.SubscriberType = subscriber_manager_pb.SubscriberType_EXPORTER )
Variables ¶
View Source
var PipelineStatusNames = map[PipelineStatus]string{
0: "Open",
1: "Half Open",
2: "Close",
}
Functions ¶
This section is empty.
Types ¶
type CollectionSnapshot ¶ added in v0.0.13
type CollectionSnapshot struct {
// contains filtered or unexported fields
}
type DataEvent ¶ added in v0.0.40
type DataEvent struct {
PipelineID uint64
Sequence uint64
RawData []byte
// Payload *gravity_sdk_types_projection.Projection
Payload *gravity_sdk_types_record.Record
}
type EventHandler ¶ added in v0.0.40
type EventHandler struct {
// contains filtered or unexported fields
}
func NewEventHandler ¶ added in v0.0.40
func NewEventHandler(subscriber *Subscriber) *EventHandler
func (*EventHandler) ProcessEvent ¶ added in v0.0.40
func (eh *EventHandler) ProcessEvent(data []byte) error
type InitialLoadOptions ¶ added in v0.0.10
type Message ¶
type Message struct {
Pipeline *Pipeline
Subscription *Subscription
Type MessageType
Payload interface{}
Callback func(*Message)
}
func NewMessage ¶ added in v0.0.40
func NewMessage(pipeline *Pipeline, sub *Subscription, msgType MessageType, payload interface{}) *Message
type MessageHandler ¶
type MessageHandler func(*Message)
type MessageType ¶ added in v0.0.40
type MessageType int32
const ( MESSAGE_TYPE_EVENT MessageType = iota MESSAGE_TYPE_SNAPSHOT )
type Options ¶
type Options struct {
Endpoint string
Domain string
Key *keyring.KeyInfo
WorkerCount int
BufferSize int
ChunkSize int
Verbose bool
StateStore StateStore
InitialLoad InitialLoadOptions
}
func NewOptions ¶
func NewOptions() *Options
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
func NewPipeline ¶
func NewPipeline(subscriber *Subscriber, id uint64, lastSeq uint64) *Pipeline
func (*Pipeline) Initialize ¶ added in v0.0.40
func (*Pipeline) SetUpdatedSequence ¶ added in v0.0.40
func (*Pipeline) UpdateLastSequence ¶
type PipelineState ¶
type PipelineStatus ¶ added in v0.0.40
type PipelineStatus int32
const ( PIPELINE_STATUS_OPEN PipelineStatus = iota PIPELINE_STATUS_HALF_OPEN PIPELINE_STATUS_CLOSE )
type Runner ¶ added in v0.0.40
type Runner struct {
// contains filtered or unexported fields
}
func (*Runner) AddPipeline ¶ added in v0.0.40
type Snapshot ¶ added in v0.0.10
type Snapshot struct {
// contains filtered or unexported fields
}
func NewSnapshot ¶ added in v0.0.10
type SnapshotEvent ¶ added in v0.0.40
type SnapshotEvent struct {
PipelineID uint64
Collection string
RawData []byte
Payload *gravity_sdk_types_snapshot_record.SnapshotRecord
}
type StateStore ¶
type StateStore interface {
GetPipelineState(uint64) (PipelineState, error)
GetPipelines() []uint64
}
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber(options *Options) *Subscriber
func NewSubscriberWithClient ¶
func NewSubscriberWithClient(client *core.Client, options *Options) *Subscriber
func (*Subscriber) AddAllPipelines ¶
func (sub *Subscriber) AddAllPipelines() error
func (*Subscriber) AddPipeline ¶
func (sub *Subscriber) AddPipeline(pipeline *Pipeline) error
func (*Subscriber) Connect ¶
func (sub *Subscriber) Connect(host string, options *core.Options) error
func (*Subscriber) Disconnect ¶
func (sub *Subscriber) Disconnect()
func (*Subscriber) GetCollectionInfo ¶
func (sub *Subscriber) GetCollectionInfo(collection string) []string
func (*Subscriber) GetEndpoint ¶ added in v0.0.18
func (sub *Subscriber) GetEndpoint() (*core.Endpoint, error)
func (*Subscriber) GetPipeline ¶ added in v0.0.13
func (sub *Subscriber) GetPipeline(pipelineID uint64) *Pipeline
func (*Subscriber) GetPipelineCount ¶
func (sub *Subscriber) GetPipelineCount() (uint64, error)
func (*Subscriber) Register ¶
func (sub *Subscriber) Register(subscriberType subscriber_manager_pb.SubscriberType, component string, subscriberID string, name string) error
func (*Subscriber) ReleasePipeline ¶ added in v0.0.13
func (sub *Subscriber) ReleasePipeline(pipelineID uint64)
func (sub *Subscriber) AwakePipeline(pipelineID uint64) {
if sub.scheduler == nil {
return
}
sub.scheduler.Awake(pipelineID)
}
func (*Subscriber) SetEventHandler ¶ added in v0.0.13
func (sub *Subscriber) SetEventHandler(cb MessageHandler)
func (*Subscriber) SetSnapshotHandler ¶ added in v0.0.13
func (sub *Subscriber) SetSnapshotHandler(cb MessageHandler)
func (*Subscriber) Start ¶ added in v0.0.13
func (sub *Subscriber) Start()
func (*Subscriber) SubscribeToCollections ¶
func (sub *Subscriber) SubscribeToCollections(colMap map[string][]string) error
func (*Subscriber) SubscribeToPipelines ¶ added in v0.0.20
func (sub *Subscriber) SubscribeToPipelines(pipelines []uint64) error
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
var projectionPool = sync.Pool{
New: func() interface{} {
return &gravity_sdk_types_projection.Projection{}
},
}
func NewSubscription ¶
func NewSubscription(subscriber *Subscriber, bufferSize int) *Subscription
func (*Subscription) Push ¶ added in v0.0.40
func (s *Subscription) Push(msg *Message)
func (*Subscription) Unsubscribe ¶
func (s *Subscription) Unsubscribe() error
Source Files
¶
Click to show internal directories.
Click to hide internal directories.