store

package
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RedisLivePrefix = "streamdal_live"
	RedisLiveFormat = "streamdal_live:%s:%s:%s" // K: $session_id:$node_name:$audience

	RedisAudiencePrefix    = "streamdal_audience"
	RedisAudienceKeyFormat = "streamdal_audience:%s" // K: $audience V: NONE

	// RedisRegisterFormat key resides in the RedisLivePrefix
	RedisRegisterFormat = "streamdal_live:%s:%s:register" // K: $session_id:$node_name:register; V: NONE

	RedisPipelinePrefix    = "streamdal_pipeline"
	RedisPipelineKeyFormat = "streamdal_pipeline:%s" // K: $pipeline_id V: serialized protos.Pipeline

	RedisConfigPrefix    = "streamdal_config"
	RedisConfigKeyFormat = "streamdal_config:%s:%s" // K: $audience V: $pipeline_id (string)

	RedisPausedPrefix    = "streamdal_paused"
	RedisPausedKeyFormat = "streamdal_paused:%s:%s" // K: $pipeline_id:$audience V: NONE

	RedisNotificationConfigPrefix    = "streamdal_notification_config"
	RedisNotificationConfigKeyFormat = "streamdal_notification_config:%s" // K: $config_id V: serialized protos.NotificationConfig

	RedisNotificationAssocPrefix = "streamdal_notification_assoc"
	RedisNotificationAssocFormat = "streamdal_notification_assoc:%s:%s" // K: $pipeline_id:$config_id V: NONE

	RedisSchemaPrefix = "streamdal_schema"
	RedisSchemaFormat = "streamdal_schema:%s" // K: $audience V: serialized protos.Schema

	RedisActiveTailPrefix    = "streamdal_tail"
	RedisActiveTailKeyFormat = "streamdal_tail:%s:%s" // K: $service_name:$tail_request_id V: serialized protos.TailRequest

	// RedisActiveTailTTL is the TTL for the active tail key. While this key
	// should be automatically cleaned up when the frontend stops a Tail() request,
	// this TTL is a safety mechanism to ensure we do not leave orphaned tails.
	RedisActiveTailTTL = 10 * time.Second
)
View Source
const (
	// RedisKeyWatchPrefix is the key under which redis publishes key events.
	// The format is  __keyspace@{$database_number}__
	// We're always defaulting to db 0, so we can use this prefix to watch for key changes
	// See https://redis.io/docs/manual/keyspace-notifications/
	RedisKeyWatchPrefix = "__keyspace@0__:"

	// StreamdalIDKey is a unique ID for this streamdal server cluster
	// Each cluster will get a unique UUID. This is used to track the number of
	// installs for telemetry and is completely random for anonymization purposes.
	StreamdalIDKey = "streamdal_id"
)

Variables

View Source
var (
	ErrPipelineNotFound = errors.New("pipeline not found")
	ErrConfigNotFound   = errors.New("config not found")
)

Functions

func RedisAudienceKey

func RedisAudienceKey(audience string) string

func RedisConfigKey

func RedisConfigKey(audience *protos.Audience, pipelineID string) string

func RedisLiveKey

func RedisLiveKey(session, node, audience string) string

func RedisNotificationAssocKey

func RedisNotificationAssocKey(pipelineID, configID string) string

func RedisNotificationConfigKey

func RedisNotificationConfigKey(configID string) string

func RedisPausedKey

func RedisPausedKey(audience, pipelineID string) string

func RedisPipelineKey

func RedisPipelineKey(pipelineID string) string

func RedisRegisterKey

func RedisRegisterKey(session, node string) string

func RedisSchemaKey

func RedisSchemaKey(audience string) string

Types

type IStore

type IStore interface {
	AddRegistration(ctx context.Context, req *protos.RegisterRequest) error
	DeleteRegistration(ctx context.Context, req *protos.DeregisterRequest) error
	GetPipelines(ctx context.Context) (map[string]*protos.Pipeline, error)
	GetPipeline(ctx context.Context, pipelineID string) (*protos.Pipeline, error)
	GetConfig(ctx context.Context) (map[*protos.Audience][]string, error) // v: pipeline_id
	GetConfigByAudience(ctx context.Context, audience *protos.Audience) ([]string, error)
	GetLive(ctx context.Context) ([]*types.LiveEntry, error)
	GetPaused(ctx context.Context) (map[string]*types.PausedEntry, error)
	CreatePipeline(ctx context.Context, pipeline *protos.Pipeline) error
	AddAudience(ctx context.Context, req *protos.NewAudienceRequest) error
	DeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error
	DeletePipeline(ctx context.Context, pipelineID string) error
	UpdatePipeline(ctx context.Context, pipeline *protos.Pipeline) error
	AttachPipeline(ctx context.Context, req *protos.AttachPipelineRequest) error
	DetachPipeline(ctx context.Context, req *protos.DetachPipelineRequest) error
	PausePipeline(ctx context.Context, req *protos.PausePipelineRequest) error
	ResumePipeline(ctx context.Context, req *protos.ResumePipelineRequest) error
	IsPaused(ctx context.Context, audience *protos.Audience, pipelineID string) (bool, error)
	GetAudiences(ctx context.Context) ([]*protos.Audience, error)
	IsPipelineAttached(ctx context.Context, audience *protos.Audience, pipelineID string) (bool, error)
	GetNotificationConfig(ctx context.Context, req *protos.GetNotificationRequest) (*protos.NotificationConfig, error)
	GetNotificationConfigs(ctx context.Context) (map[string]*protos.NotificationConfig, error)
	GetNotificationConfigsByPipeline(ctx context.Context, pipelineID string) ([]*protos.NotificationConfig, error)
	CreateNotificationConfig(ctx context.Context, req *protos.CreateNotificationRequest) error
	UpdateNotificationConfig(ctx context.Context, req *protos.UpdateNotificationRequest) error
	DeleteNotificationConfig(ctx context.Context, req *protos.DeleteNotificationRequest) error
	AttachNotificationConfig(ctx context.Context, req *protos.AttachNotificationRequest) error
	DetachNotificationConfig(ctx context.Context, req *protos.DetachNotificationRequest) error
	GetAttachCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)
	GetPipelineUsage(ctx context.Context) ([]*PipelineUsage, error)
	GetActivePipelineUsage(ctx context.Context, pipelineID string) ([]*PipelineUsage, error)
	GetActiveTailCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)
	AddActiveTailRequest(ctx context.Context, req *protos.TailRequest) (string, error) // Returns key that tail req is stored under

	// GetAudiencesBySessionID returns all audiences for a given session id
	// This is needed to inject an inferschema pipeline for each announced audience
	// to the session via a goroutine in internal.Register()
	GetAudiencesBySessionID(ctx context.Context, sessionID string) ([]*protos.Audience, error)

	GetAudiencesByService(ctx context.Context, serviceName string) ([]*protos.Audience, error)

	// WatchKeys will watch for key changes for given key pattern; every time key
	// is updated, it will send a message on the return channel.
	// WatchKeys launches a dedicated goroutine for the watch - it can be stopped
	// via the provided context.
	WatchKeys(quitCtx context.Context, key string) chan struct{}

	AddSchema(ctx context.Context, req *protos.SendSchemaRequest) error

	GetSchema(ctx context.Context, aud *protos.Audience) (*protos.Schema, error)

	// GetStreamdalID returns the unique ID for this cluster.
	// If an ID has not been set yet, a new one is generated and returned
	GetStreamdalID(ctx context.Context) (string, error)
}

type Options

type Options struct {
	Encryption   encryption.IEncryption
	RedisBackend *redis.Client
	ShutdownCtx  context.Context
	NodeName     string
	SessionTTL   time.Duration
}

type PipelineUsage

type PipelineUsage struct {
	PipelineId string
	Active     bool
	NodeName   string
	SessionId  string
	Audience   *protos.Audience
}

type Store

type Store struct {
	// contains filtered or unexported fields
}

func New

func New(opts *Options) (*Store, error)

func (*Store) AddActiveTailRequest

func (s *Store) AddActiveTailRequest(ctx context.Context, req *protos.TailRequest) (string, error)

func (*Store) AddAudience

func (s *Store) AddAudience(ctx context.Context, req *protos.NewAudienceRequest) error

func (*Store) AddRegistration

func (s *Store) AddRegistration(ctx context.Context, req *protos.RegisterRequest) error

func (*Store) AddSchema

func (s *Store) AddSchema(ctx context.Context, req *protos.SendSchemaRequest) error

func (*Store) AttachNotificationConfig

func (s *Store) AttachNotificationConfig(ctx context.Context, req *protos.AttachNotificationRequest) error

func (*Store) AttachPipeline

func (s *Store) AttachPipeline(ctx context.Context, req *protos.AttachPipelineRequest) error

func (*Store) CreateNotificationConfig

func (s *Store) CreateNotificationConfig(ctx context.Context, req *protos.CreateNotificationRequest) error

func (*Store) CreatePipeline

func (s *Store) CreatePipeline(ctx context.Context, pipeline *protos.Pipeline) error

func (*Store) DeleteAudience

func (s *Store) DeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error

func (*Store) DeleteNotificationConfig

func (s *Store) DeleteNotificationConfig(ctx context.Context, req *protos.DeleteNotificationRequest) error

func (*Store) DeletePipeline

func (s *Store) DeletePipeline(ctx context.Context, pipelineId string) error

func (*Store) DeleteRegistration

func (s *Store) DeleteRegistration(ctx context.Context, req *protos.DeregisterRequest) error

func (*Store) DetachNotificationConfig

func (s *Store) DetachNotificationConfig(ctx context.Context, req *protos.DetachNotificationRequest) error

func (*Store) DetachPipeline

func (s *Store) DetachPipeline(ctx context.Context, req *protos.DetachPipelineRequest) error

func (*Store) GetActivePipelineUsage

func (s *Store) GetActivePipelineUsage(ctx context.Context, pipelineID string) ([]*PipelineUsage, error)

GetActivePipelineUsage gets *ACTIVE* pipeline usage on the CURRENT node NOTE: This method is a helper for GetPipelineUsage().

func (*Store) GetActiveTailCommandsByService

func (s *Store) GetActiveTailCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)

func (*Store) GetAttachCommandsByService

func (s *Store) GetAttachCommandsByService(ctx context.Context, serviceName string) ([]*protos.Command, error)

func (*Store) GetAudiences

func (s *Store) GetAudiences(ctx context.Context) ([]*protos.Audience, error)

func (*Store) GetAudiencesByService

func (s *Store) GetAudiencesByService(ctx context.Context, serviceName string) ([]*protos.Audience, error)

func (*Store) GetAudiencesBySessionID

func (s *Store) GetAudiencesBySessionID(ctx context.Context, sessionID string) ([]*protos.Audience, error)

func (*Store) GetConfig

func (s *Store) GetConfig(ctx context.Context) (map[*protos.Audience][]string, error)

func (*Store) GetConfigByAudience

func (s *Store) GetConfigByAudience(ctx context.Context, audience *protos.Audience) ([]string, error)

GetConfigByAudience returns a list of pipeline IDs attached to given audience

func (*Store) GetLive

func (s *Store) GetLive(ctx context.Context) ([]*types.LiveEntry, error)

func (*Store) GetNotificationConfig

func (s *Store) GetNotificationConfig(ctx context.Context, req *protos.GetNotificationRequest) (*protos.NotificationConfig, error)

func (*Store) GetNotificationConfigs

func (s *Store) GetNotificationConfigs(ctx context.Context) (map[string]*protos.NotificationConfig, error)

func (*Store) GetNotificationConfigsByPipeline

func (s *Store) GetNotificationConfigsByPipeline(ctx context.Context, pipelineID string) ([]*protos.NotificationConfig, error)

func (*Store) GetPaused

func (s *Store) GetPaused(ctx context.Context) (map[string]*types.PausedEntry, error)

func (*Store) GetPipeline

func (s *Store) GetPipeline(ctx context.Context, pipelineId string) (*protos.Pipeline, error)

func (*Store) GetPipelineUsage

func (s *Store) GetPipelineUsage(ctx context.Context) ([]*PipelineUsage, error)

GetPipelineUsage gets usage across the entire cluster

func (*Store) GetPipelines

func (s *Store) GetPipelines(ctx context.Context) (map[string]*protos.Pipeline, error)

func (*Store) GetSchema

func (s *Store) GetSchema(ctx context.Context, aud *protos.Audience) (*protos.Schema, error)

func (*Store) GetStreamdalID

func (s *Store) GetStreamdalID(ctx context.Context) (string, error)

func (*Store) IsPaused

func (s *Store) IsPaused(ctx context.Context, audience *protos.Audience, pipelineID string) (bool, error)

IsPaused returns if pipeline is paused and if it exists

func (*Store) IsPipelineAttached

func (s *Store) IsPipelineAttached(ctx context.Context, audience *protos.Audience, pipelineID string) (bool, error)

func (*Store) PausePipeline

func (s *Store) PausePipeline(ctx context.Context, req *protos.PausePipelineRequest) error

func (*Store) ResumePipeline

func (s *Store) ResumePipeline(ctx context.Context, req *protos.ResumePipelineRequest) error

func (*Store) UpdateNotificationConfig

func (s *Store) UpdateNotificationConfig(ctx context.Context, req *protos.UpdateNotificationRequest) error

func (*Store) UpdatePipeline

func (s *Store) UpdatePipeline(ctx context.Context, pipeline *protos.Pipeline) error

func (*Store) WatchKeys

func (s *Store) WatchKeys(quitCtx context.Context, key string) chan struct{}

WatchKeys will watch for key changes for given key pattern; every time key is updated, it will send a message on the return channel.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL