Documentation
¶
Index ¶
- Constants
- Variables
- func InitConfig(conf config.Section)
- type CheckpointsTuningConfig
- type Config
- type ConfigWebhookDefaults
- type ConfigWebsocketDefaults
- type DBSerializable
- type Deliver
- type DispatchStatus
- type DistributionMode
- type ErrorHandlingType
- type Event
- type EventBatch
- type EventBatchDispatcher
- type EventCommon
- type EventStreamActions
- type EventStreamCheckpoint
- type EventStreamDefaults
- type EventStreamSpec
- type EventStreamStatistics
- type EventStreamStatus
- type EventStreamType
- type EventStreamWithStatus
- type IDValidator
- type Manager
- type Persistence
- type Runtime
- type SourceInstruction
- type WebSocketConfig
- type WebhookConfig
Constants ¶
View Source
const ( ConfigTLSConfigName = "name" ConfigCheckpointsAsynchronous = "asynchronous" ConfigCheckpointsUnmatchedEventThreshold = "unmatchedEventThreshold" ConfigDisablePrivateIPs = "disablePrivateIPs" ConfigWebhooksDefaultTLSConfig = "tlsConfigName" ConfigWebSocketsDistributionMode = "distributionMode" ConfigDefaultsErrorHandling = "errorHandling" ConfigDefaultsBatchSize = "batchSize" ConfigDefaultsBatchTimeout = "batchTimeout" ConfigDefaultsRetryTimeout = "retryTimeout" ConfigDefaultsBlockedRetryDelay = "blockedRetryDelay" )
View Source
const MessageTypeEventBatch = "event_batch"
Variables ¶
View Source
var ( EventStreamTypeWebhook = fftypes.FFEnumValue("estype", "webhook") EventStreamTypeWebSocket = fftypes.FFEnumValue("estype", "websocket") )
View Source
var ( ErrorHandlingTypeBlock = fftypes.FFEnumValue("ehtype", "block") ErrorHandlingTypeSkip = fftypes.FFEnumValue("ehtype", "skip") )
View Source
var ( DispatchStatusDispatching = fftypes.FFEnumValue("edstatus", "dispatching") DispatchStatusRetrying = fftypes.FFEnumValue("edstatus", "retrying") DispatchStatusBlocked = fftypes.FFEnumValue("edstatus", "blocked") DispatchStatusComplete = fftypes.FFEnumValue("edstatus", "complete") DispatchStatusSkipped = fftypes.FFEnumValue("edstatus", "skipped") )
View Source
var ( EventStreamStatusStarted = fftypes.FFEnumValue("esstatus", "started") EventStreamStatusStopped = fftypes.FFEnumValue("esstatus", "stopped") EventStreamStatusDeleted = fftypes.FFEnumValue("esstatus", "deleted") EventStreamStatusStopping = fftypes.FFEnumValue("esstatus", "stopping") // not persisted EventStreamStatusStoppingDeleted = fftypes.FFEnumValue("esstatus", "stopping_deleted") // not persisted EventStreamStatusUnknown = fftypes.FFEnumValue("esstatus", "unknown") // not persisted )
View Source
var ( DistributionModeBroadcast = fftypes.FFEnumValue("distmode", "broadcast") DistributionModeLoadBalance = fftypes.FFEnumValue("distmode", "load_balance") )
View Source
var CheckpointFilters = &ffapi.QueryFields{ "id": &ffapi.StringField{}, "created": &ffapi.TimeField{}, "updated": &ffapi.TimeField{}, "sequenceid": &ffapi.StringField{}, }
View Source
var CheckpointsConfig config.Section
View Source
var DefaultsConfig config.Section
View Source
var EventStreamFilters = &ffapi.QueryFields{ "id": &ffapi.StringField{}, "created": &ffapi.TimeField{}, "updated": &ffapi.TimeField{}, "name": &ffapi.StringField{}, "status": &ffapi.StringField{}, "type": &ffapi.StringField{}, "topicfilter": &ffapi.StringField{}, }
View Source
var RetrySection config.Section
View Source
var RootConfig config.Section
View Source
var TLSConfigs config.ArraySection
View Source
var WebSocketsDefaultsConfig config.Section
View Source
var WebhookDefaultsConfig config.Section
Functions ¶
func InitConfig ¶
Due to how arrays work currently in the config system, this can only be initialized in one section for the whole process.
Types ¶
type CheckpointsTuningConfig ¶
type Config ¶
type Config struct {
TLSConfigs map[string]*fftls.Config `ffstruct:"EventStreamConfig" json:"tlsConfigs,omitempty"`
Retry *retry.Retry `ffstruct:"EventStreamConfig" json:"retry,omitempty"`
DisablePrivateIPs bool `ffstruct:"EventStreamConfig" json:"disabledPrivateIPs"`
Checkpoints CheckpointsTuningConfig `ffstruct:"EventStreamConfig" json:"checkpoints"`
Defaults EventStreamDefaults `ffstruct:"EventStreamConfig" json:"defaults,omitempty"`
}
func GenerateConfig ¶
Optional function to generate config directly from YAML configuration using the config package. You can also generate the configuration programmatically
type ConfigWebhookDefaults ¶
type ConfigWebhookDefaults struct {
ffresty.HTTPConfig
}
type ConfigWebsocketDefaults ¶
type ConfigWebsocketDefaults struct {
DefaultDistributionMode DistributionMode `ffstruct:"EventStreamConfig" json:"distributionMode"`
}
type DBSerializable ¶
Let's us check that the config serializes
type Deliver ¶
type Deliver[DT any] func(events []*Event[DT]) SourceInstruction
type DispatchStatus ¶
type DistributionMode ¶
type ErrorHandlingType ¶
type Event ¶
type Event[DataType any] struct { EventCommon // Data can be anything to deliver for the event - must be JSON marshalable. // Will be flattened into the struct. // Can define topic and/or sequenceId, but these will overridden with EventCommon strings in the JSON serialization. Data *DataType `json:"-"` }
func (Event[DataType]) MarshalJSON ¶
func (*Event[DataType]) UnmarshalJSON ¶
type EventBatch ¶
type EventBatch[DataType any] struct { Type string `json:"type"` // always MessageTypeEventBatch (for consistent WebSocket flow control) StreamID string `json:"stream"` // the ID of the event stream for this event BatchNumber int64 `json:"batchNumber"` // should be provided back in the ack Events []*Event[DataType] `json:"events"` // an array of events allows efficient batch acknowledgment }
type EventBatchDispatcher ¶
type EventCommon ¶
type EventCommon struct {
Topic string `json:"topic,omitempty"` // describes the sub-stream of events (optional) allowing sever-side event filtering (regexp)
SequenceID string `json:"sequenceId"` // deterministic ID for the event, that must be alpha-numerically orderable within the stream (numbers must be left-padded hex/decimal strings for ordering)
}
type EventStreamActions ¶
type EventStreamCheckpoint ¶
type EventStreamCheckpoint struct {
ID *string `ffstruct:"EventStreamCheckpoint" json:"id"`
Created *fftypes.FFTime `ffstruct:"EventStreamCheckpoint" json:"created"`
Updated *fftypes.FFTime `ffstruct:"EventStreamCheckpoint" json:"updated"`
SequenceID *string `ffstruct:"EventStreamCheckpoint" json:"sequenceId,omitempty"`
}
func (*EventStreamCheckpoint) GetID ¶ added in v1.4.1
func (esc *EventStreamCheckpoint) GetID() string
func (*EventStreamCheckpoint) SetCreated ¶ added in v1.4.1
func (esc *EventStreamCheckpoint) SetCreated(t *fftypes.FFTime)
func (*EventStreamCheckpoint) SetUpdated ¶ added in v1.4.1
func (esc *EventStreamCheckpoint) SetUpdated(t *fftypes.FFTime)
type EventStreamDefaults ¶
type EventStreamDefaults struct {
ErrorHandling ErrorHandlingType `ffstruct:"EventStreamDefaults" json:"errorHandling"`
BatchSize int `ffstruct:"EventStreamDefaults" json:"batchSize"`
BatchTimeout fftypes.FFDuration `ffstruct:"EventStreamDefaults" json:"batchTimeout"`
RetryTimeout fftypes.FFDuration `ffstruct:"EventStreamDefaults" json:"retryTimeout"`
BlockedRetryDelay fftypes.FFDuration `ffstruct:"EventStreamDefaults" json:"blockedRetryDelay"`
WebSocketDefaults ConfigWebsocketDefaults `ffstruct:"EventStreamDefaults" json:"webSockets,omitempty"`
WebhookDefaults ConfigWebhookDefaults `ffstruct:"EventStreamDefaults" json:"webhooks,omitempty"`
}
type EventStreamSpec ¶
type EventStreamSpec[CT any] struct { ID *string `ffstruct:"eventstream" json:"id"` Created *fftypes.FFTime `ffstruct:"eventstream" json:"created"` Updated *fftypes.FFTime `ffstruct:"eventstream" json:"updated"` Name *string `ffstruct:"eventstream" json:"name,omitempty"` Status *EventStreamStatus `ffstruct:"eventstream" json:"status,omitempty"` Type *EventStreamType `ffstruct:"eventstream" json:"type,omitempty" ffenum:"estype"` InitialSequenceID *string `ffstruct:"eventstream" json:"initialSequenceID,omitempty" ffenum:"estype"` TopicFilter *string `ffstruct:"eventstream" json:"topicFilter,omitempty" ffenum:"estype"` Config *CT `ffstruct:"eventstream" json:"config,omitempty"` ErrorHandling *ErrorHandlingType `ffstruct:"eventstream" json:"errorHandling"` BatchSize *int `ffstruct:"eventstream" json:"batchSize"` BatchTimeout *fftypes.FFDuration `ffstruct:"eventstream" json:"batchTimeout"` RetryTimeout *fftypes.FFDuration `ffstruct:"eventstream" json:"retryTimeout"` BlockedRetryDelay *fftypes.FFDuration `ffstruct:"eventstream" json:"blockedRetryDelay"` Webhook *WebhookConfig `ffstruct:"eventstream" json:"webhook,omitempty"` WebSocket *WebSocketConfig `ffstruct:"eventstream" json:"websocket,omitempty"` // contains filtered or unexported fields }
func (*EventStreamSpec[CT]) GetID ¶
func (esc *EventStreamSpec[CT]) GetID() string
func (*EventStreamSpec[CT]) SetCreated ¶
func (esc *EventStreamSpec[CT]) SetCreated(t *fftypes.FFTime)
func (*EventStreamSpec[CT]) SetUpdated ¶
func (esc *EventStreamSpec[CT]) SetUpdated(t *fftypes.FFTime)
type EventStreamStatistics ¶
type EventStreamStatistics struct {
StartTime *fftypes.FFTime `ffstruct:"EventStreamStatistics" json:"startTime"`
LastDispatchTime *fftypes.FFTime `ffstruct:"EventStreamStatistics" json:"lastDispatchTime"`
LastDispatchNumber int64 `ffstruct:"EventStreamStatistics" json:"lastDispatchBatch"`
LastDispatchAttempts int `ffstruct:"EventStreamStatistics" json:"lastDispatchAttempts,omitempty"`
LastDispatchFailure string `ffstruct:"EventStreamStatistics" json:"lastDispatchFailure,omitempty"`
LastDispatchStatus DispatchStatus `ffstruct:"EventStreamStatistics" json:"lastDispatchComplete"`
HighestDetected string `ffstruct:"EventStreamStatistics" json:"highestDetected"`
HighestDispatched string `ffstruct:"EventStreamStatistics" json:"highestDispatched"`
Checkpoint string `ffstruct:"EventStreamStatistics" json:"checkpoint"`
}
type EventStreamStatus ¶
type EventStreamType ¶
type EventStreamWithStatus ¶
type EventStreamWithStatus[CT any] struct { *EventStreamSpec[CT] Status EventStreamStatus `ffstruct:"EventStream" json:"status"` Statistics *EventStreamStatistics `ffstruct:"EventStream" json:"statistics,omitempty"` }
type Manager ¶
type Manager[CT any] interface { UpsertStream(ctx context.Context, esSpec *EventStreamSpec[CT]) (bool, error) GetStreamByID(ctx context.Context, id string, opts ...dbsql.GetOption) (*EventStreamWithStatus[CT], error) ListStreams(ctx context.Context, filter ffapi.Filter) ([]*EventStreamWithStatus[CT], *ffapi.FilterResult, error) StopStream(ctx context.Context, id string) error StartStream(ctx context.Context, id string) error ResetStream(ctx context.Context, id string, sequenceID string) error DeleteStream(ctx context.Context, id string) error Close(ctx context.Context) }
func NewEventStreamManager ¶
type Persistence ¶
type Persistence[CT any] interface { EventStreams() dbsql.CRUD[*EventStreamSpec[CT]] Checkpoints() dbsql.CRUD[*EventStreamCheckpoint] Close() }
func NewEventStreamPersistence ¶
func NewEventStreamPersistence[CT any](db *dbsql.Database, idValidator IDValidator) Persistence[CT]
type Runtime ¶
type Runtime[ConfigType any, DataType any] interface { // Generate a new unique resource ID (such as a UUID) NewID() string // Type specific config validation goes here Validate(ctx context.Context, config *ConfigType) error // The run function should execute in a loop detecting events until instructed to stop: // - The Run function should block when no events are available // - Must detect if the context is closed (see below) // - The Deliver function will block if the stream is blocked: // - Blocked means the previous batch is being processed, and the current batch is full // - If the stream stops, the Exit instruction will be returned from deliver // - The supplied context will be cancelled as well on exit, so should be used: // 1. In any blocking i/o functions // 2. To wake any sleeps early, such as batch polling scenarios // - If the function returns without an Exit instruction, it will be restarted from the last checkpoint Run(ctx context.Context, spec *EventStreamSpec[ConfigType], checkpointSequenceID string, deliver Deliver[DataType]) error }
Runtime is the required implementation extension for the EventStream common utility
type SourceInstruction ¶
type SourceInstruction int
const ( Continue SourceInstruction = iota Exit )
type WebSocketConfig ¶
type WebSocketConfig struct {
DistributionMode *DistributionMode `ffstruct:"wsconfig" json:"distributionMode,omitempty"`
}
func (*WebSocketConfig) Scan ¶
func (wc *WebSocketConfig) Scan(src interface{}) error
Store in DB as JSON
type WebhookConfig ¶
type WebhookConfig struct {
URL *string `ffstruct:"whconfig" json:"url,omitempty"`
Method *string `ffstruct:"whconfig" json:"method,omitempty"`
Headers map[string]string `ffstruct:"whconfig" json:"headers,omitempty"`
TLSConfigName *string `ffstruct:"whconfig" json:"tlsConfigName,omitempty"`
HTTP *ffresty.HTTPConfig `ffstruct:"whconfig" json:"http,omitempty"`
// contains filtered or unexported fields
}
func (*WebhookConfig) Scan ¶
func (wc *WebhookConfig) Scan(src interface{}) error
Store in DB as JSON
Click to show internal directories.
Click to hide internal directories.