health

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const ConditionActive data.ConditionType = "Active"

Variables

View Source
var (
	ErrStreamNotFound = errors.New("stream not found")
	ErrEventNotFound  = errors.New("event not found")
)

Functions

This section is empty.

Types

type Core

type Core struct {
	// contains filtered or unexported fields
}

func NewCore

func NewCore(opts CoreOptions, reducer Reducer) (*Core, error)

func (*Core) Close added in v0.7.4

func (c *Core) Close() error

func (*Core) GetPastEvents added in v0.1.0

func (c *Core) GetPastEvents(manifestID string, from, to *time.Time) ([]data.Event, error)

func (*Core) GetStatus added in v0.1.0

func (c *Core) GetStatus(manifestID string) (*data.HealthStatus, error)

func (*Core) HandleMessage

func (c *Core) HandleMessage(msg event.StreamMessage)

func (*Core) IsHealthy added in v0.1.0

func (c *Core) IsHealthy() bool

func (*Core) Start

func (c *Core) Start(ctx context.Context) error

func (*Core) SubscribeEvents added in v0.1.0

func (c *Core) SubscribeEvents(ctx context.Context, manifestID string, lastEvtID *uuid.UUID, from *time.Time) ([]data.Event, <-chan data.Event, error)

func (*Core) WaitActive added in v0.8.0

func (c *Core) WaitActive(ctx context.Context, manifestID string) error

type CoreOptions

type CoreOptions struct {
	StreamUri, AMQPUri string
	Streaming          StreamingOptions
	StartTimeOffset    time.Duration
	MemoryRecordsTtl   time.Duration
}

type Record

type Record struct {
	ID         string
	Conditions []data.ConditionType

	sync.RWMutex

	PastEvents []data.Event
	EventsByID map[uuid.UUID]data.Event
	EventSubs  []chan<- data.Event

	ReducerState interface{}
	LastStatus   *data.HealthStatus
	// contains filtered or unexported fields
}

func NewRecord

func NewRecord(id string, conditionTypes []data.ConditionType) *Record

func (*Record) FlagInitialized added in v0.8.0

func (r *Record) FlagInitialized()

FlagInitialized will flag the record as initialized. It is meant to be called after the first event is processed, meaning the record is not empty anymore.

This is used to allow waiting until a stream is started by creating its record in an uninitialized state first and calling `WaitInitialized`. The initialization flag is simply a channel that is closed, which will unblock all goroutines waiting to receive from it (`WaitInitialized`).

func (*Record) IsInitialized added in v0.8.0

func (r *Record) IsInitialized() bool

func (*Record) SubscribeLocked added in v0.1.0

func (r *Record) SubscribeLocked(ctx context.Context, subs chan data.Event) chan data.Event

func (*Record) WaitInitialized added in v0.8.0

func (r *Record) WaitInitialized(ctx context.Context) error

type RecordStorage

type RecordStorage struct {
	SizeGauge prometheus.Gauge
	// contains filtered or unexported fields
}

func (*RecordStorage) Get

func (s *RecordStorage) Get(id string) (*Record, bool)

func (*RecordStorage) GetOrCreate

func (s *RecordStorage) GetOrCreate(id string, conditions []data.ConditionType) *Record

func (*RecordStorage) StartCleanupLoop added in v0.1.0

func (s *RecordStorage) StartCleanupLoop(ctx context.Context, ttl time.Duration)

type Reducer

type Reducer interface {
	Bindings() []event.BindingArgs
	Conditions() []data.ConditionType
	Reduce(current *data.HealthStatus, state interface{}, evt data.Event) (*data.HealthStatus, interface{})
}

type ReducerFunc

type ReducerFunc func(*data.HealthStatus, interface{}, data.Event) (*data.HealthStatus, interface{})

func (ReducerFunc) Bindings

func (f ReducerFunc) Bindings() []event.BindingArgs

func (ReducerFunc) Conditions

func (f ReducerFunc) Conditions() []data.ConditionType

func (ReducerFunc) Reduce

func (f ReducerFunc) Reduce(current *data.HealthStatus, state interface{}, evt data.Event) (*data.HealthStatus, interface{})

type StreamingOptions

type StreamingOptions struct {
	Stream, ConsumerName string

	event.RawStreamOptions

	// EventFlowSilenceTolerance determines the amount of time to tolerate zero
	// messages in the stream before giving an error on the service healthcheck.
	// This is a workaround for a bug in the rabbitmq streams client in which it
	// freezes when the stream has leader election issues in a clustered setup.
	EventFlowSilenceTolerance time.Duration
}

Purposedly made of built-in types only to bind directly to cli flags.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL