Documentation
¶
Index ¶
- Constants
- Variables
- type Core
- func (c *Core) Close() error
- func (c *Core) GetPastEvents(manifestID string, from, to *time.Time) ([]data.Event, error)
- func (c *Core) GetStatus(manifestID string) (*data.HealthStatus, error)
- func (c *Core) HandleMessage(msg event.StreamMessage)
- func (c *Core) IsHealthy() bool
- func (c *Core) Start(ctx context.Context) error
- func (c *Core) SubscribeEvents(ctx context.Context, manifestID string, lastEvtID *uuid.UUID, from *time.Time) ([]data.Event, <-chan data.Event, error)
- func (c *Core) WaitActive(ctx context.Context, manifestID string) error
- type CoreOptions
- type Record
- type RecordStorage
- type Reducer
- type ReducerFunc
- type StreamingOptions
Constants ¶
View Source
const ConditionActive data.ConditionType = "Active"
Variables ¶
View Source
var ( ErrStreamNotFound = errors.New("stream not found") ErrEventNotFound = errors.New("event not found") )
Functions ¶
This section is empty.
Types ¶
type Core ¶
type Core struct {
// contains filtered or unexported fields
}
func (*Core) GetPastEvents ¶ added in v0.1.0
func (*Core) GetStatus ¶ added in v0.1.0
func (c *Core) GetStatus(manifestID string) (*data.HealthStatus, error)
func (*Core) HandleMessage ¶
func (c *Core) HandleMessage(msg event.StreamMessage)
func (*Core) SubscribeEvents ¶ added in v0.1.0
type CoreOptions ¶
type Record ¶
type Record struct {
ID string
Conditions []data.ConditionType
sync.RWMutex
PastEvents []data.Event
EventsByID map[uuid.UUID]data.Event
EventSubs []chan<- data.Event
ReducerState interface{}
LastStatus *data.HealthStatus
// contains filtered or unexported fields
}
func (*Record) FlagInitialized ¶ added in v0.8.0
func (r *Record) FlagInitialized()
FlagInitialized will flag the record as initialized. It is meant to be called after the first event is processed, meaning the record is not empty anymore.
This is used to allow waiting until a stream is started by creating its record in an uninitialized state first and calling `WaitInitialized`. The initialization flag is simply a channel that is closed, which will unblock all goroutines waiting to receive from it (`WaitInitialized`).
func (*Record) IsInitialized ¶ added in v0.8.0
func (*Record) SubscribeLocked ¶ added in v0.1.0
type RecordStorage ¶
type RecordStorage struct {
SizeGauge prometheus.Gauge
// contains filtered or unexported fields
}
func (*RecordStorage) GetOrCreate ¶
func (s *RecordStorage) GetOrCreate(id string, conditions []data.ConditionType) *Record
func (*RecordStorage) StartCleanupLoop ¶ added in v0.1.0
func (s *RecordStorage) StartCleanupLoop(ctx context.Context, ttl time.Duration)
type Reducer ¶
type Reducer interface {
Bindings() []event.BindingArgs
Conditions() []data.ConditionType
Reduce(current *data.HealthStatus, state interface{}, evt data.Event) (*data.HealthStatus, interface{})
}
type ReducerFunc ¶
type ReducerFunc func(*data.HealthStatus, interface{}, data.Event) (*data.HealthStatus, interface{})
func (ReducerFunc) Bindings ¶
func (f ReducerFunc) Bindings() []event.BindingArgs
func (ReducerFunc) Conditions ¶
func (f ReducerFunc) Conditions() []data.ConditionType
func (ReducerFunc) Reduce ¶
func (f ReducerFunc) Reduce(current *data.HealthStatus, state interface{}, evt data.Event) (*data.HealthStatus, interface{})
type StreamingOptions ¶
type StreamingOptions struct {
Stream, ConsumerName string
event.RawStreamOptions
// EventFlowSilenceTolerance determines the amount of time to tolerate zero
// messages in the stream before giving an error on the service healthcheck.
// This is a workaround for a bug in the rabbitmq streams client in which it
// freezes when the stream has leader election issues in a clustered setup.
EventFlowSilenceTolerance time.Duration
}
Purposedly made of built-in types only to bind directly to cli flags.
Click to show internal directories.
Click to hide internal directories.