eventstreams

package
v1.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

README

Event streams

This package provides a utility for exposing an event stream over WebSockets and Webhooks.

  • Connectivity:
    • WebSockets support for inbound connections
    • Webhooks support for outbound connections
  • Reliability:
    • Workload managed mode: at-least-once delivery
    • Broadcast mode: at-most-once delivery
    • Batching for performance
    • Checkpointing for the at-least-once delivery assurance
  • Convenience for packaging into apps:
    • Plug-in persistence (including allowing you multiple streams with CRUD.Scoped())
    • Out-of-the-box CRUD on event streams, using DB backed storage
    • Server-side topicFilter event filtering (regular expression)
  • Semi-opinionated:
    • How batches are spelled
    • How WebSocket flow control payloads are spelled (start,ack,nack,batch)
  • Flexibility:
    • Bring your own message payload (note topic and sequenceId always added)
    • Bring your own configuration type (must implement DB Scan & Value functions)

Example

A simple in-memory command line pub/sub example is provided:

Documentation

Index

Constants

View Source
const (
	ConfigTLSConfigName = "name"

	ConfigCheckpointsAsynchronous            = "asynchronous"
	ConfigCheckpointsUnmatchedEventThreshold = "unmatchedEventThreshold"

	ConfigDisablePrivateIPs = "disablePrivateIPs"

	ConfigWebhooksDefaultTLSConfig = "tlsConfigName"

	ConfigWebSocketsDistributionMode = "distributionMode"

	ConfigDefaultsErrorHandling     = "errorHandling"
	ConfigDefaultsBatchSize         = "batchSize"
	ConfigDefaultsBatchTimeout      = "batchTimeout"
	ConfigDefaultsRetryTimeout      = "retryTimeout"
	ConfigDefaultsBlockedRetryDelay = "blockedRetryDelay"
)
View Source
const MessageTypeEventBatch = "event_batch"

Variables

View Source
var (
	EventStreamTypeWebhook   = fftypes.FFEnumValue("estype", "webhook")
	EventStreamTypeWebSocket = fftypes.FFEnumValue("estype", "websocket")
)
View Source
var (
	ErrorHandlingTypeBlock = fftypes.FFEnumValue("ehtype", "block")
	ErrorHandlingTypeSkip  = fftypes.FFEnumValue("ehtype", "skip")
)
View Source
var (
	DispatchStatusDispatching = fftypes.FFEnumValue("edstatus", "dispatching")
	DispatchStatusRetrying    = fftypes.FFEnumValue("edstatus", "retrying")
	DispatchStatusBlocked     = fftypes.FFEnumValue("edstatus", "blocked")
	DispatchStatusComplete    = fftypes.FFEnumValue("edstatus", "complete")
	DispatchStatusSkipped     = fftypes.FFEnumValue("edstatus", "skipped")
)
View Source
var (
	EventStreamStatusStarted         = fftypes.FFEnumValue("esstatus", "started")
	EventStreamStatusStopped         = fftypes.FFEnumValue("esstatus", "stopped")
	EventStreamStatusDeleted         = fftypes.FFEnumValue("esstatus", "deleted")
	EventStreamStatusStopping        = fftypes.FFEnumValue("esstatus", "stopping")         // not persisted
	EventStreamStatusStoppingDeleted = fftypes.FFEnumValue("esstatus", "stopping_deleted") // not persisted
	EventStreamStatusUnknown         = fftypes.FFEnumValue("esstatus", "unknown")          // not persisted
)
View Source
var (
	DistributionModeBroadcast   = fftypes.FFEnumValue("distmode", "broadcast")
	DistributionModeLoadBalance = fftypes.FFEnumValue("distmode", "load_balance")
)
View Source
var CheckpointFilters = &ffapi.QueryFields{
	"id":         &ffapi.StringField{},
	"created":    &ffapi.TimeField{},
	"updated":    &ffapi.TimeField{},
	"sequenceid": &ffapi.StringField{},
}
View Source
var CheckpointsConfig config.Section
View Source
var DefaultsConfig config.Section
View Source
var EventStreamFilters = &ffapi.QueryFields{
	"id":          &ffapi.StringField{},
	"created":     &ffapi.TimeField{},
	"updated":     &ffapi.TimeField{},
	"name":        &ffapi.StringField{},
	"status":      &ffapi.StringField{},
	"type":        &ffapi.StringField{},
	"topicfilter": &ffapi.StringField{},
}
View Source
var RetrySection config.Section
View Source
var RootConfig config.Section
View Source
var WebSocketsDefaultsConfig config.Section
View Source
var WebhookDefaultsConfig config.Section

Functions

func InitConfig

func InitConfig(conf config.Section)

Due to how arrays work currently in the config system, this can only be initialized in one section for the whole process.

Types

type CheckpointsTuningConfig

type CheckpointsTuningConfig struct {
	Asynchronous            bool  `ffstruct:"CheckpointsConfig" json:"asynchronous"`
	UnmatchedEventThreshold int64 `ffstruct:"CheckpointsConfig" json:"unmatchedEventThreshold"`
}

type Config

type Config struct {
	TLSConfigs        map[string]*fftls.Config `ffstruct:"EventStreamConfig" json:"tlsConfigs,omitempty"`
	Retry             *retry.Retry             `ffstruct:"EventStreamConfig" json:"retry,omitempty"`
	DisablePrivateIPs bool                     `ffstruct:"EventStreamConfig" json:"disabledPrivateIPs"`
	Checkpoints       CheckpointsTuningConfig  `ffstruct:"EventStreamConfig" json:"checkpoints"`
	Defaults          EventStreamDefaults      `ffstruct:"EventStreamConfig" json:"defaults,omitempty"`
}

func GenerateConfig

func GenerateConfig(ctx context.Context) *Config

Optional function to generate config directly from YAML configuration using the config package. You can also generate the configuration programmatically

type ConfigWebhookDefaults

type ConfigWebhookDefaults struct {
	ffresty.HTTPConfig
}

type ConfigWebsocketDefaults

type ConfigWebsocketDefaults struct {
	DefaultDistributionMode DistributionMode `ffstruct:"EventStreamConfig" json:"distributionMode"`
}

type DBSerializable

type DBSerializable interface {
	sql.Scanner
	driver.Valuer
}

Let's us check that the config serializes

type Deliver

type Deliver[DT any] func(events []*Event[DT]) SourceInstruction

type DispatchStatus

type DispatchStatus = fftypes.FFEnum

type DistributionMode

type DistributionMode = fftypes.FFEnum

type ErrorHandlingType

type ErrorHandlingType = fftypes.FFEnum

type Event

type Event[DataType any] struct {
	EventCommon
	// Data can be anything to deliver for the event - must be JSON marshalable.
	// Will be flattened into the struct.
	// Can define topic and/or sequenceId, but these will overridden with EventCommon strings in the JSON serialization.
	Data *DataType `json:"-"`
}

func (Event[DataType]) MarshalJSON

func (e Event[DataType]) MarshalJSON() ([]byte, error)

func (*Event[DataType]) UnmarshalJSON

func (e *Event[DataType]) UnmarshalJSON(b []byte) error

type EventBatch

type EventBatch[DataType any] struct {
	Type        string             `json:"type"`        // always MessageTypeEventBatch (for consistent WebSocket flow control)
	StreamID    string             `json:"stream"`      // the ID of the event stream for this event
	BatchNumber int64              `json:"batchNumber"` // should be provided back in the ack
	Events      []*Event[DataType] `json:"events"`      // an array of events allows efficient batch acknowledgment
}

type EventBatchDispatcher

type EventBatchDispatcher[DT any] interface {
	AttemptDispatch(ctx context.Context, attempt int, events *EventBatch[DT]) error
}

type EventCommon

type EventCommon struct {
	Topic      string `json:"topic,omitempty"` // describes the sub-stream of events (optional) allowing sever-side event filtering (regexp)
	SequenceID string `json:"sequenceId"`      // deterministic ID for the event, that must be alpha-numerically orderable within the stream (numbers must be left-padded hex/decimal strings for ordering)
}

type EventStreamActions

type EventStreamActions[CT any] interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	Status(ctx context.Context) *EventStreamWithStatus[CT]
}

type EventStreamCheckpoint

type EventStreamCheckpoint struct {
	ID         *string         `ffstruct:"EventStreamCheckpoint" json:"id"`
	Created    *fftypes.FFTime `ffstruct:"EventStreamCheckpoint" json:"created"`
	Updated    *fftypes.FFTime `ffstruct:"EventStreamCheckpoint" json:"updated"`
	SequenceID *string         `ffstruct:"EventStreamCheckpoint" json:"sequenceId,omitempty"`
}

func (*EventStreamCheckpoint) GetID added in v1.4.1

func (esc *EventStreamCheckpoint) GetID() string

func (*EventStreamCheckpoint) SetCreated added in v1.4.1

func (esc *EventStreamCheckpoint) SetCreated(t *fftypes.FFTime)

func (*EventStreamCheckpoint) SetUpdated added in v1.4.1

func (esc *EventStreamCheckpoint) SetUpdated(t *fftypes.FFTime)

type EventStreamDefaults

type EventStreamDefaults struct {
	ErrorHandling     ErrorHandlingType       `ffstruct:"EventStreamDefaults" json:"errorHandling"`
	BatchSize         int                     `ffstruct:"EventStreamDefaults" json:"batchSize"`
	BatchTimeout      fftypes.FFDuration      `ffstruct:"EventStreamDefaults" json:"batchTimeout"`
	RetryTimeout      fftypes.FFDuration      `ffstruct:"EventStreamDefaults" json:"retryTimeout"`
	BlockedRetryDelay fftypes.FFDuration      `ffstruct:"EventStreamDefaults" json:"blockedRetryDelay"`
	WebSocketDefaults ConfigWebsocketDefaults `ffstruct:"EventStreamDefaults" json:"webSockets,omitempty"`
	WebhookDefaults   ConfigWebhookDefaults   `ffstruct:"EventStreamDefaults" json:"webhooks,omitempty"`
}

type EventStreamSpec

type EventStreamSpec[CT any] struct {
	ID                *string            `ffstruct:"eventstream" json:"id"`
	Created           *fftypes.FFTime    `ffstruct:"eventstream" json:"created"`
	Updated           *fftypes.FFTime    `ffstruct:"eventstream" json:"updated"`
	Name              *string            `ffstruct:"eventstream" json:"name,omitempty"`
	Status            *EventStreamStatus `ffstruct:"eventstream" json:"status,omitempty"`
	Type              *EventStreamType   `ffstruct:"eventstream" json:"type,omitempty" ffenum:"estype"`
	InitialSequenceID *string            `ffstruct:"eventstream" json:"initialSequenceID,omitempty" ffenum:"estype"`
	TopicFilter       *string            `ffstruct:"eventstream" json:"topicFilter,omitempty" ffenum:"estype"`
	Config            *CT                `ffstruct:"eventstream" json:"config,omitempty"`

	ErrorHandling     *ErrorHandlingType  `ffstruct:"eventstream" json:"errorHandling"`
	BatchSize         *int                `ffstruct:"eventstream" json:"batchSize"`
	BatchTimeout      *fftypes.FFDuration `ffstruct:"eventstream" json:"batchTimeout"`
	RetryTimeout      *fftypes.FFDuration `ffstruct:"eventstream" json:"retryTimeout"`
	BlockedRetryDelay *fftypes.FFDuration `ffstruct:"eventstream" json:"blockedRetryDelay"`

	Webhook   *WebhookConfig   `ffstruct:"eventstream" json:"webhook,omitempty"`
	WebSocket *WebSocketConfig `ffstruct:"eventstream" json:"websocket,omitempty"`
	// contains filtered or unexported fields
}

func (*EventStreamSpec[CT]) GetID

func (esc *EventStreamSpec[CT]) GetID() string

func (*EventStreamSpec[CT]) SetCreated

func (esc *EventStreamSpec[CT]) SetCreated(t *fftypes.FFTime)

func (*EventStreamSpec[CT]) SetUpdated

func (esc *EventStreamSpec[CT]) SetUpdated(t *fftypes.FFTime)

type EventStreamStatistics

type EventStreamStatistics struct {
	StartTime            *fftypes.FFTime `ffstruct:"EventStreamStatistics" json:"startTime"`
	LastDispatchTime     *fftypes.FFTime `ffstruct:"EventStreamStatistics" json:"lastDispatchTime"`
	LastDispatchNumber   int64           `ffstruct:"EventStreamStatistics" json:"lastDispatchBatch"`
	LastDispatchAttempts int             `ffstruct:"EventStreamStatistics" json:"lastDispatchAttempts,omitempty"`
	LastDispatchFailure  string          `ffstruct:"EventStreamStatistics" json:"lastDispatchFailure,omitempty"`
	LastDispatchStatus   DispatchStatus  `ffstruct:"EventStreamStatistics" json:"lastDispatchComplete"`
	HighestDetected      string          `ffstruct:"EventStreamStatistics" json:"highestDetected"`
	HighestDispatched    string          `ffstruct:"EventStreamStatistics" json:"highestDispatched"`
	Checkpoint           string          `ffstruct:"EventStreamStatistics" json:"checkpoint"`
}

type EventStreamStatus

type EventStreamStatus = fftypes.FFEnum

type EventStreamType

type EventStreamType = fftypes.FFEnum

type EventStreamWithStatus

type EventStreamWithStatus[CT any] struct {
	*EventStreamSpec[CT]
	Status     EventStreamStatus      `ffstruct:"EventStream" json:"status"`
	Statistics *EventStreamStatistics `ffstruct:"EventStream" json:"statistics,omitempty"`
}

type IDValidator added in v1.4.1

type IDValidator func(ctx context.Context, idStr string) error

type Manager

type Manager[CT any] interface {
	UpsertStream(ctx context.Context, esSpec *EventStreamSpec[CT]) (bool, error)
	GetStreamByID(ctx context.Context, id string, opts ...dbsql.GetOption) (*EventStreamWithStatus[CT], error)
	ListStreams(ctx context.Context, filter ffapi.Filter) ([]*EventStreamWithStatus[CT], *ffapi.FilterResult, error)
	StopStream(ctx context.Context, id string) error
	StartStream(ctx context.Context, id string) error
	ResetStream(ctx context.Context, id string, sequenceID string) error
	DeleteStream(ctx context.Context, id string) error
	Close(ctx context.Context)
}

func NewEventStreamManager

func NewEventStreamManager[CT any, DT any](ctx context.Context, config *Config, p Persistence[CT], wsChannels wsserver.WebSocketChannels, source Runtime[CT, DT]) (es Manager[CT], err error)

type Persistence

type Persistence[CT any] interface {
	EventStreams() dbsql.CRUD[*EventStreamSpec[CT]]
	Checkpoints() dbsql.CRUD[*EventStreamCheckpoint]
	Close()
}

func NewEventStreamPersistence

func NewEventStreamPersistence[CT any](db *dbsql.Database, idValidator IDValidator) Persistence[CT]

type Runtime

type Runtime[ConfigType any, DataType any] interface {
	// Generate a new unique resource ID (such as a UUID)
	NewID() string
	// Type specific config validation goes here
	Validate(ctx context.Context, config *ConfigType) error
	// The run function should execute in a loop detecting events until instructed to stop:
	// - The Run function should block when no events are available
	//   - Must detect if the context is closed (see below)
	// - The Deliver function will block if the stream is blocked:
	//   - Blocked means the previous batch is being processed, and the current batch is full
	// - If the stream stops, the Exit instruction will be returned from deliver
	// - The supplied context will be cancelled as well on exit, so should be used:
	//   1. In any blocking i/o functions
	//   2. To wake any sleeps early, such as batch polling scenarios
	// - If the function returns without an Exit instruction, it will be restarted from the last checkpoint
	Run(ctx context.Context, spec *EventStreamSpec[ConfigType], checkpointSequenceID string, deliver Deliver[DataType]) error
}

Runtime is the required implementation extension for the EventStream common utility

type SourceInstruction

type SourceInstruction int
const (
	Continue SourceInstruction = iota
	Exit
)

type WebSocketConfig

type WebSocketConfig struct {
	DistributionMode *DistributionMode `ffstruct:"wsconfig" json:"distributionMode,omitempty"`
}

func (*WebSocketConfig) Scan

func (wc *WebSocketConfig) Scan(src interface{}) error

Store in DB as JSON

func (*WebSocketConfig) Value

func (wc *WebSocketConfig) Value() (driver.Value, error)

Store in DB as JSON

type WebhookConfig

type WebhookConfig struct {
	URL           *string             `ffstruct:"whconfig" json:"url,omitempty"`
	Method        *string             `ffstruct:"whconfig" json:"method,omitempty"`
	Headers       map[string]string   `ffstruct:"whconfig" json:"headers,omitempty"`
	TLSConfigName *string             `ffstruct:"whconfig" json:"tlsConfigName,omitempty"`
	HTTP          *ffresty.HTTPConfig `ffstruct:"whconfig" json:"http,omitempty"`
	// contains filtered or unexported fields
}

func (*WebhookConfig) Scan

func (wc *WebhookConfig) Scan(src interface{}) error

Store in DB as JSON

func (*WebhookConfig) Value

func (wc *WebhookConfig) Value() (driver.Value, error)

Store in DB as JSON

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL