Documentation
¶
Overview ¶
Package eventmodels has the interfaces needed to use the events package
Index ¶
- Constants
- Variables
- func SetErrorHandling(err error, handling ErrorHandling) error
- type AbstractDB
- type AbstractID
- type AbstractIDMethods
- type AbstractTX
- type BatchHandlerInterface
- type BatchHandlerTxInterface
- type BinaryEventID
- type BoundTopic
- func (t BoundTopic[E]) BatchHandler(callback func(context.Context, []Event[E]) error) BatchHandlerInterface
- func (t BoundTopic[E]) Event(key string, model E) SimpleEvent
- func (t BoundTopic[E]) Handler(callback func(context.Context, Event[E]) error) HandlerInterface
- func (t BoundTopic[E]) Topic() string
- type BoundTopicTx
- type CanAugment
- type CanTransact
- type EnhancedTX
- type Error
- type ErrorHandling
- type Event
- type HandlerInfo
- type HandlerInterface
- type HandlerTxInterface
- type LibraryInterface
- type LibraryInterfaceTx
- type OnFailure
- type ProduceMethod
- type Producer
- type ProducingEvent
- type SelfMarshalingEvent
- type SharedHandlerInterface
- type SimpleEvent
- func (e SimpleEvent) DataSchema(s string) SimpleEvent
- func (e SimpleEvent) GetHeaders() map[string][]string
- func (e SimpleEvent) GetKey() string
- func (e SimpleEvent) GetTimestamp() time.Time
- func (e SimpleEvent) GetTopic() string
- func (e SimpleEvent) Header() map[string][]string
- func (e SimpleEvent) ID(id string) SimpleEvent
- func (e SimpleEvent) MarshalJSON() ([]byte, error)
- func (e SimpleEvent) Source(source string) SimpleEvent
- func (e SimpleEvent) SpecVersion(v string) SimpleEvent
- func (e SimpleEvent) Subject(sub string) SimpleEvent
- func (e SimpleEvent) Time(t time.Time) SimpleEvent
- func (e SimpleEvent) Type(t string) SimpleEvent
- type StringEventID
- type Tracer
- type TracerConfig
- type TracerProvider
Constants ¶
const ErrAlreadyProcessed errors.String = "event already processed"
const NotImplementedErr errors.String = "not implemented"
This can be returned to prevent retry
const TimeoutErr errors.String = "did not obtain lock before deadline"
Variables ¶
var ErrDecode errors.String = "could not unmarshal message"
Functions ¶
func SetErrorHandling ¶
func SetErrorHandling(err error, handling ErrorHandling) error
SetErrorHandling can be used by a handler when it is returning an error to control retry. Some errors mean that there is no point in the event framework attempting to re-deliver the event.
Types ¶
type AbstractDB ¶
type AbstractDB[ID AbstractID[ID], TX AbstractTX] interface { CanTransact[TX] // ProduceSpecificTxEvents should not be called by transaction wrappers because // sending a batch to Kafka usually has a one second latecny. ProduceSpecificTxEvents(context.Context, []ID) (int, error) ProduceDroppedTxEvents(ctx context.Context, batchSize int) (int, error) LockOrError(ctx context.Context, key uint32, timeout time.Duration) (unlock func() error, err error) // MarkEventProcessed is always called inside a transaction. It must return // ErrAlreadyProcessed if the event has already been processed. The DB should not // be used by MarkEventProcessed, it is a DB method for simplicity of wrapping. MarkEventProcessed(ctx context.Context, tx TX, topic string, source string, id string, handlerName string) error }
AbstractDB is what events.Library consume. It is implemented by a combination of the db packages.
type AbstractID ¶
type AbstractID[ID AbstractIDMethods] interface { AbstractIDMethods New() ID }
type AbstractIDMethods ¶
type AbstractTX ¶
type BatchHandlerInterface ¶
type BatchHandlerInterface interface {
HandlerInterface
// contains filtered or unexported methods
}
type BatchHandlerTxInterface ¶
type BatchHandlerTxInterface[ID AbstractID[ID], TX AbstractTX] interface { HandlerTxInterface[ID, TX] // contains filtered or unexported methods }
type BinaryEventID ¶
type BinaryEventID struct {
googleuuid.UUID
}
BinraryEventID is a uuid type that presents itself the the sql driver as binary bytes. This is compatible with SingleStore
func (BinaryEventID) New ¶
func (u BinaryEventID) New() BinaryEventID
NewBinaryEventID generates a random event id
type BoundTopic ¶
func BindTopic ¶
func BindTopic[E any](name string) BoundTopic[E]
BindTopic does not validate topics. cpevents.BindTopic does, use it instead.
func (BoundTopic[E]) BatchHandler ¶
func (t BoundTopic[E]) BatchHandler(callback func(context.Context, []Event[E]) error) BatchHandlerInterface
func (BoundTopic[E]) Event ¶
func (t BoundTopic[E]) Event(key string, model E) SimpleEvent
func (BoundTopic[E]) Handler ¶
func (t BoundTopic[E]) Handler(callback func(context.Context, Event[E]) error) HandlerInterface
func (BoundTopic[E]) Topic ¶
func (t BoundTopic[E]) Topic() string
type BoundTopicTx ¶
type BoundTopicTx[E any, ID AbstractID[ID], TX AbstractTX, DB AbstractDB[ID, TX]] struct { BoundTopic[E] }
func BindTopicTx ¶
func BindTopicTx[E any, ID AbstractID[ID], TX AbstractTX, DB AbstractDB[ID, TX]](name string) BoundTopicTx[E, ID, TX, DB]
func (BoundTopicTx[E, ID, TX, DB]) BatchHandlerTx ¶
func (t BoundTopicTx[E, ID, TX, DB]) BatchHandlerTx(callback func(context.Context, TX, []Event[E]) error) BatchHandlerTxInterface[ID, TX]
func (BoundTopicTx[E, ID, TX, DB]) HandlerTx ¶
func (t BoundTopicTx[E, ID, TX, DB]) HandlerTx(callback func(context.Context, TX, Event[E]) error) HandlerTxInterface[ID, TX]
type CanAugment ¶
type CanAugment[ID AbstractID[ID], TX AbstractTX] interface { AugmentWithProducer(Producer[ID, TX]) }
If DB implements CanAugment then AugmentWithProducer will be invoked by the event framework.
type CanTransact ¶
type CanTransact[TX AbstractTX] interface { AbstractTX Transact(context.Context, func(TX) error) error }
CanTransact is a is a DB interface that implements that transaction part of AbstractDB
type EnhancedTX ¶
type EnhancedTX interface {
AbstractTX
Produce(events ...ProducingEvent)
}
EnhancedTX is used by event tests. It is the suggested interface for how transactions should enable events to be sent. It is implemented by eventdb.WrappedTX
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error provides a way for handlers to explicitly control retry and timeout by the event consuming framework.
type ErrorHandling ¶
type ErrorHandling int
const ( RetryUntilTimeout ErrorHandling = iota IgnoreError DoNotRetry )
func GetErrorHandling ¶
func GetErrorHandling(err error) ErrorHandling
type Event ¶
type Event[E any] struct { Topic string // may be a dead-letter topic. Un-prefixed. Key string Data []byte Payload E Headers map[string][]string Timestamp time.Time ID string // CloudEvents field, defaults to a checksum if not present ContentType string // Optional field, required for CloudEvents messages Subject string // CloudEvents field, defaults to Kafka Key Type string // CloudEvents field, defaults to Kafka Topic Source string // CloudEvents field, empty for non-CloudEvents messages SpecVersion string // CloudEvents field, empty for non-CloudEvents messages DataSchema string // optional CloudEvents field, empty if not set ConsumerGroup string HandlerName string BaseTopic string // always the non-dead-letter topic, different from Topic when processing dead letters. Un-prefixed. // contains filtered or unexported fields }
Event abstracts away the underlying message system (Kafka)
It is a superset of CloudEvent (https://github.com/cloudevents/spec) and Kafka data.
ID is either a CloudEvent header (if present) or a checksum of the event data. ID plus Source should be unique (or if not unique, then the message is a duplicate).
For Kafka events that follow the CloudEvents spec, Headers is the CloudEvents headers so the "id" header will be the CloudEvents id and any other CloudEvents. For non-CloudEvents-compliant messages, Headers will reflect the Kafka Headers.
type HandlerInfo ¶
type HandlerInterface ¶
type HandlerInterface interface {
SharedHandlerInterface
SetLibrary(LibraryInterface)
// contains filtered or unexported methods
}
type HandlerTxInterface ¶
type HandlerTxInterface[ID AbstractID[ID], TX AbstractTX] interface { SharedHandlerInterface SetLibrary(LibraryInterfaceTx[ID, TX]) // contains filtered or unexported methods }
type LibraryInterface ¶
type LibraryInterfaceTx ¶
type LibraryInterfaceTx[ID AbstractID[ID], TX AbstractTX] interface { LibraryInterface DB() AbstractDB[ID, TX] }
type OnFailure ¶
type OnFailure int
const ( OnFailureDiscard OnFailure = iota // failed messages will be dropped OnFailureBlock // failed messages will retry forever and block the consumer OnFailureRetryLater // failed messages will go to the dead letter queue and be retried OnFailureSave // failed messages will go to the dead letter queue and not be retried )
type ProduceMethod ¶
type ProduceMethod string
const ( ProduceInTransaction ProduceMethod = "tx" ProduceCatchUp ProduceMethod = "catchUp" ProduceImmediate ProduceMethod = "immediate" )
type Producer ¶
type Producer[ID AbstractID[ID], TX AbstractTX] interface { DB() AbstractDB[ID, TX] Produce(context.Context, ProduceMethod, ...ProducingEvent) error // ProduceFromTable should be called by transaction wappers. It will // send ids to a central thread that will in turn call ProduceSpecificTxEvents. ProduceFromTable(ctx context.Context, eventIDs map[string][]ID) error RecordError(context.Context, string, error) error // pauses to avoid spamming RecordErrorNoWait(context.Context, string, error) error // does not pause IsConfigured() bool ValidateTopics(context.Context, []string) error TracerProvider(context.Context) Tracer }
Producer is implemented by events.Library. It's an interface so that import cycles can be avoided.
type ProducingEvent ¶
type ProducingEvent interface {
GetKey() string
GetTimestamp() time.Time // used for the cloudevents time header
GetTopic() string // Topic is also used to generate the cloudevents type header. The topic is un-prefixed. Suggested format: noun.verb
GetHeaders() map[string][]string
}
ProducingEvent is used when generating an event. The body of the event is created by JSON marshaling.
type SelfMarshalingEvent ¶
SelfMarshalingEvent is a variant of ProducingEvent that has a custom marshal function.
type SharedHandlerInterface ¶
type SharedHandlerInterface interface {
// Handle synchronously invokes the handler function for a message.
}
type SimpleEvent ¶
type SimpleEvent struct {
// contains filtered or unexported fields
}
func (SimpleEvent) DataSchema ¶
func (e SimpleEvent) DataSchema(s string) SimpleEvent
func (SimpleEvent) GetHeaders ¶
func (e SimpleEvent) GetHeaders() map[string][]string
GetHeaders fills in default values for the headers and returns a http.Header. It is required to fulfill the ProducingEvent interface contract.
func (SimpleEvent) GetKey ¶
func (e SimpleEvent) GetKey() string
func (SimpleEvent) GetTimestamp ¶
func (e SimpleEvent) GetTimestamp() time.Time
func (SimpleEvent) GetTopic ¶
func (e SimpleEvent) GetTopic() string
func (SimpleEvent) Header ¶
func (e SimpleEvent) Header() map[string][]string
Header returns a map[string][]string object that contains the data that will become the event headers.
All headers, except "content-type" should be prefixed with "ce_" to indicate that they're CloudEvent headers. The "ce_" will be stripped on the receiving side.
func (SimpleEvent) ID ¶
func (e SimpleEvent) ID(id string) SimpleEvent
func (SimpleEvent) MarshalJSON ¶
func (e SimpleEvent) MarshalJSON() ([]byte, error)
func (SimpleEvent) Source ¶
func (e SimpleEvent) Source(source string) SimpleEvent
func (SimpleEvent) SpecVersion ¶
func (e SimpleEvent) SpecVersion(v string) SimpleEvent
func (SimpleEvent) Subject ¶
func (e SimpleEvent) Subject(sub string) SimpleEvent
func (SimpleEvent) Time ¶
func (e SimpleEvent) Time(t time.Time) SimpleEvent
func (SimpleEvent) Type ¶
func (e SimpleEvent) Type(t string) SimpleEvent
type StringEventID ¶
StringEventUUID is a UUID type that presents itself to the sql driver as a string This is compatible with PostgreSQL
func (StringEventID) New ¶
func (u StringEventID) New() StringEventID
type TracerConfig ¶ added in v0.8.0
type TracerConfig struct {
BeginSpan func(context.Context, map[string]string) (context.Context, func()) // start of a span. All logs are inside spans.
Handle func(context.Context, bool, string, *kafka.Message) (context.Context, func()) // string is the handler name, bool is true for deadletter delivery
}
TracerConfig is how tracing becomes richer. All fields are optional. Additional fields may be added in the future. BeginSpan/DoneSpan will wrap many things. The returned funcs are for noting the end of the span/handle/thread etc.
type TracerProvider ¶ added in v0.8.0
TracerProvider is a minimal logger-from-context function. Given a context, it returns a Logf. Logging behavior can be significantly enhanced with SetTracerConfig().