push

package
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2020 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// PublisherNotPresent there are no publishers sending data
	PublisherNotPresent = iota
	// PublisherAvailable there are publishers running
	PublisherAvailable
)
View Source
const (
	// Ready represents ready
	Ready = iota
	// PushIsDown there are no publishers for streaming
	PushIsDown
	// PushIsUp there are publishers presents
	PushIsUp
	// BackoffAuth backoff is running for authentication
	BackoffAuth
	// BackoffSSE backoff is running for connecting to stream
	BackoffSSE
	// TokenExpiration flag to restart push services
	TokenExpiration
	// StreamingPaused flag for pausing streaming
	StreamingPaused
	// StreamingResumed flag for resuming streaming
	StreamingResumed
	// StreamingDisabled flag for disabling streaming
	StreamingDisabled
	// Reconnect flag to reconnect
	Reconnect
	// NonRetriableError represents an error that will force switching to polling
	NonRetriableError
)

Variables

This section is empty.

Functions

This section is empty.

Types

type EventHandler

type EventHandler struct {
	// contains filtered or unexported fields
}

EventHandler struct

func NewEventHandler

func NewEventHandler(keeper *Keeper, parser *NotificationParser, processor *Processor, logger logging.LoggerInterface) *EventHandler

NewEventHandler builds new EventHandler

func (*EventHandler) HandleIncomingMessage

func (e *EventHandler) HandleIncomingMessage(event map[string]interface{})

HandleIncomingMessage handles incoming message from streaming

type IncomingEvent

type IncomingEvent struct {
	// contains filtered or unexported fields
}

IncomingEvent struct to process every kind of notification that comes from streaming

func (*IncomingEvent) String added in v2.0.1

func (i *IncomingEvent) String() string

type Keeper

type Keeper struct {
	// contains filtered or unexported fields
}

Keeper struct

func NewKeeper

func NewKeeper(publishers chan int) *Keeper

NewKeeper creates new keeper

func (*Keeper) LastNotification

func (k *Keeper) LastNotification() (string, int64)

LastNotification return the latest notification saved

func (*Keeper) Publishers

func (k *Keeper) Publishers(manager string) *int

Publishers returns the quantity of publishers for a particular manager

func (*Keeper) UpdateLastNotification

func (k *Keeper) UpdateLastNotification(manager string, timestamp int64)

UpdateLastNotification updates last message received

func (*Keeper) UpdateManagers

func (k *Keeper) UpdateManagers(manager string, publishers int)

UpdateManagers updates current manager count

type Manager

type Manager interface {
	Start()
	Stop()
	StartWorkers()
	StopWorkers()
	IsRunning() bool
}

Manager interface for Push Manager

func NewPushManager

func NewPushManager(
	logger logging.LoggerInterface,
	synchronizeSegmentHandler func(segmentName string, till *int64) error,
	synchronizeSplitsHandler func(till *int64) error,
	splitStorage storage.SplitStorage,
	config *conf.AdvancedConfig,
	managerStatus chan int,
	authClient service.AuthClient,
) (Manager, error)

NewPushManager creates new PushManager

type Metrics

type Metrics struct {
	Publishers int `json:"publishers"`
}

Metrics dto

type NotificationParser

type NotificationParser struct {
	// contains filtered or unexported fields
}

NotificationParser struct

func NewNotificationParser

func NewNotificationParser(logger logging.LoggerInterface) *NotificationParser

NewNotificationParser creates notifcation parser

func (*NotificationParser) Parse

func (n *NotificationParser) Parse(event map[string]interface{}) IncomingEvent

Parse parses incoming event from streaming

type Occupancy

type Occupancy struct {
	Data Metrics `json:"metrics"`
}

Occupancy dto

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor struct for notification processor

func NewProcessor

func NewProcessor(segmentQueue chan dtos.SegmentChangeNotification, splitQueue chan dtos.SplitChangeNotification, splitStorage storage.SplitStorageProducer, logger logging.LoggerInterface, controlStatus chan int) (*Processor, error)

NewProcessor creates new processor

func (*Processor) Process

func (p *Processor) Process(i dtos.IncomingNotification) error

Process takes an incoming notification and generates appropriate notifications for it.

type PushManager

type PushManager struct {
	// contains filtered or unexported fields
}

PushManager struct for managing push services

func (*PushManager) IsRunning

func (p *PushManager) IsRunning() bool

IsRunning returns true if the services are running

func (*PushManager) Start

func (p *PushManager) Start()

Start push services

func (*PushManager) StartWorkers

func (p *PushManager) StartWorkers()

StartWorkers starts workers

func (*PushManager) Stop

func (p *PushManager) Stop()

Stop push services

func (*PushManager) StopWorkers

func (p *PushManager) StopWorkers()

StopWorkers stops workers

type SegmentUpdateWorker

type SegmentUpdateWorker struct {
	// contains filtered or unexported fields
}

SegmentUpdateWorker struct

func NewSegmentUpdateWorker

func NewSegmentUpdateWorker(segmentQueue chan dtos.SegmentChangeNotification, handler func(segmentName string, till *int64) error, logger logging.LoggerInterface) (*SegmentUpdateWorker, error)

NewSegmentUpdateWorker creates SegmentUpdateWorker

func (*SegmentUpdateWorker) IsRunning

func (s *SegmentUpdateWorker) IsRunning() bool

IsRunning indicates if worker is running or not

func (*SegmentUpdateWorker) Start

func (s *SegmentUpdateWorker) Start()

Start starts worker

func (*SegmentUpdateWorker) Stop

func (s *SegmentUpdateWorker) Stop()

Stop stops worker

type SplitUpdateWorker

type SplitUpdateWorker struct {
	// contains filtered or unexported fields
}

SplitUpdateWorker struct

func NewSplitUpdateWorker

func NewSplitUpdateWorker(splitQueue chan dtos.SplitChangeNotification, handler func(till *int64) error, logger logging.LoggerInterface) (*SplitUpdateWorker, error)

NewSplitUpdateWorker creates SplitUpdateWorker

func (*SplitUpdateWorker) IsRunning

func (s *SplitUpdateWorker) IsRunning() bool

IsRunning indicates if worker is running or not

func (*SplitUpdateWorker) Start

func (s *SplitUpdateWorker) Start()

Start starts worker

func (*SplitUpdateWorker) Stop

func (s *SplitUpdateWorker) Stop()

Stop stops worker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL