eventmodels

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package eventmodels has the interfaces needed to use the events package

Index

Constants

View Source
const ErrAlreadyProcessed errors.String = "event already processed"
View Source
const NotImplementedErr errors.String = "not implemented"

This can be returned to prevent retry

View Source
const TimeoutErr errors.String = "did not obtain lock before deadline"

Variables

View Source
var ErrDecode errors.String = "could not unmarshal message"

Functions

func SetErrorHandling

func SetErrorHandling(err error, handling ErrorHandling) error

SetErrorHandling can be used by a handler when it is returning an error to control retry. Some errors mean that there is no point in the event framework attempting to re-deliver the event.

Types

type AbstractDB

type AbstractDB[ID AbstractID[ID], TX AbstractTX] interface {
	CanTransact[TX]
	// ProduceSpecificTxEvents should not be called by transaction wrappers because
	// sending a batch to Kafka usually has a one second latecny.
	ProduceSpecificTxEvents(context.Context, []ID) (int, error)
	ProduceDroppedTxEvents(ctx context.Context, batchSize int) (int, error)
	LockOrError(ctx context.Context, key uint32, timeout time.Duration) (unlock func() error, err error)

	// MarkEventProcessed is always called inside a transaction. It must return
	// ErrAlreadyProcessed if the event has already been processed. The DB should not
	// be used by MarkEventProcessed, it is a DB method for simplicity of wrapping.
	MarkEventProcessed(ctx context.Context, tx TX, topic string, source string, id string, handlerName string) error
}

AbstractDB is what events.Library consume. It is implemented by a combination of the db packages.

type AbstractID

type AbstractID[ID AbstractIDMethods] interface {
	AbstractIDMethods
	New() ID
}

type AbstractIDMethods

type AbstractIDMethods interface {
	Value() (driver.Value, error)
	String() string
}

type AbstractTX

type AbstractTX interface {
	ExecContext(context.Context, string, ...any) (sql.Result, error)
	QueryContext(context.Context, string, ...any) (*sql.Rows, error)
	QueryRowContext(context.Context, string, ...any) *sql.Row
}

type BatchHandlerInterface

type BatchHandlerInterface interface {
	HandlerInterface
	// contains filtered or unexported methods
}

type BatchHandlerTxInterface

type BatchHandlerTxInterface[ID AbstractID[ID], TX AbstractTX] interface {
	HandlerTxInterface[ID, TX]
	// contains filtered or unexported methods
}

type BinaryEventID

type BinaryEventID struct {
	googleuuid.UUID
}

BinraryEventID is a uuid type that presents itself the the sql driver as binary bytes. This is compatible with SingleStore

func (BinaryEventID) New

func (u BinaryEventID) New() BinaryEventID

NewBinaryEventID generates a random event id

func (BinaryEventID) Value

func (u BinaryEventID) Value() (driver.Value, error)

type BoundTopic

type BoundTopic[E any] string

func BindTopic

func BindTopic[E any](name string) BoundTopic[E]

BindTopic does not validate topics. cpevents.BindTopic does, use it instead.

func (BoundTopic[E]) BatchHandler

func (t BoundTopic[E]) BatchHandler(callback func(context.Context, []Event[E]) error) BatchHandlerInterface

func (BoundTopic[E]) Event

func (t BoundTopic[E]) Event(key string, model E) SimpleEvent

func (BoundTopic[E]) Handler

func (t BoundTopic[E]) Handler(callback func(context.Context, Event[E]) error) HandlerInterface

func (BoundTopic[E]) Topic

func (t BoundTopic[E]) Topic() string

type BoundTopicTx

type BoundTopicTx[E any, ID AbstractID[ID], TX AbstractTX, DB AbstractDB[ID, TX]] struct {
	BoundTopic[E]
}

func BindTopicTx

func BindTopicTx[E any, ID AbstractID[ID], TX AbstractTX, DB AbstractDB[ID, TX]](name string) BoundTopicTx[E, ID, TX, DB]

func (BoundTopicTx[E, ID, TX, DB]) BatchHandlerTx

func (t BoundTopicTx[E, ID, TX, DB]) BatchHandlerTx(callback func(context.Context, TX, []Event[E]) error) BatchHandlerTxInterface[ID, TX]

func (BoundTopicTx[E, ID, TX, DB]) HandlerTx

func (t BoundTopicTx[E, ID, TX, DB]) HandlerTx(callback func(context.Context, TX, Event[E]) error) HandlerTxInterface[ID, TX]

type CanAugment

type CanAugment[ID AbstractID[ID], TX AbstractTX] interface {
	AugmentWithProducer(Producer[ID, TX])
}

If DB implements CanAugment then AugmentWithProducer will be invoked by the event framework.

type CanTransact

type CanTransact[TX AbstractTX] interface {
	AbstractTX
	Transact(context.Context, func(TX) error) error
}

CanTransact is a is a DB interface that implements that transaction part of AbstractDB

type EnhancedTX

type EnhancedTX interface {
	AbstractTX
	Produce(events ...ProducingEvent)
}

EnhancedTX is used by event tests. It is the suggested interface for how transactions should enable events to be sent. It is implemented by eventdb.WrappedTX

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error provides a way for handlers to explicitly control retry and timeout by the event consuming framework.

func (Error) Error

func (e Error) Error() string

func (Error) Unwrap

func (e Error) Unwrap() error

type ErrorHandling

type ErrorHandling int
const (
	RetryUntilTimeout ErrorHandling = iota
	IgnoreError
	DoNotRetry
)

func GetErrorHandling

func GetErrorHandling(err error) ErrorHandling

type Event

type Event[E any] struct {
	Topic         string // may be a dead-letter topic. Un-prefixed.
	Key           string
	Data          []byte
	Payload       E
	Headers       map[string][]string
	Timestamp     time.Time
	ID            string // CloudEvents field, defaults to a checksum if not present
	ContentType   string // Optional field, required for CloudEvents messages
	Subject       string // CloudEvents field, defaults to Kafka Key
	Type          string // CloudEvents field, defaults to Kafka Topic
	Source        string // CloudEvents field, empty for non-CloudEvents messages
	SpecVersion   string // CloudEvents field, empty for non-CloudEvents messages
	DataSchema    string // optional CloudEvents field, empty if not set
	ConsumerGroup string
	HandlerName   string
	BaseTopic     string // always the non-dead-letter topic, different from Topic when processing dead letters. Un-prefixed.
	// contains filtered or unexported fields
}

Event abstracts away the underlying message system (Kafka)

It is a superset of CloudEvent (https://github.com/cloudevents/spec) and Kafka data.

ID is either a CloudEvent header (if present) or a checksum of the event data. ID plus Source should be unique (or if not unique, then the message is a duplicate).

For Kafka events that follow the CloudEvents spec, Headers is the CloudEvents headers so the "id" header will be the CloudEvents id and any other CloudEvents. For non-CloudEvents-compliant messages, Headers will reflect the Kafka Headers.

type HandlerInfo

type HandlerInfo interface {
	Name() string
	BaseTopic() string
	ConsumerGroup() string
	IsDeadLetter() bool
}

type HandlerInterface

type HandlerInterface interface {
	SharedHandlerInterface
	SetLibrary(LibraryInterface)
	// contains filtered or unexported methods
}

type HandlerTxInterface

type HandlerTxInterface[ID AbstractID[ID], TX AbstractTX] interface {
	SharedHandlerInterface
	SetLibrary(LibraryInterfaceTx[ID, TX])
	// contains filtered or unexported methods
}

type LibraryInterface

type LibraryInterface interface {
	RemovePrefix(string) string
	TracerProvider(context.Context) Tracer
	TracerConfig() TracerConfig
}

type LibraryInterfaceTx

type LibraryInterfaceTx[ID AbstractID[ID], TX AbstractTX] interface {
	LibraryInterface
	DB() AbstractDB[ID, TX]
}

type OnFailure

type OnFailure int
const (
	OnFailureDiscard    OnFailure = iota // failed messages will be dropped
	OnFailureBlock                       // failed messages will retry forever and block the consumer
	OnFailureRetryLater                  // failed messages will go to the dead letter queue and be retried
	OnFailureSave                        // failed messages will go to the dead letter queue and not be retried
)

type ProduceMethod

type ProduceMethod string
const (
	ProduceInTransaction ProduceMethod = "tx"
	ProduceCatchUp       ProduceMethod = "catchUp"
	ProduceImmediate     ProduceMethod = "immediate"
)

type Producer

type Producer[ID AbstractID[ID], TX AbstractTX] interface {
	DB() AbstractDB[ID, TX]
	Produce(context.Context, ProduceMethod, ...ProducingEvent) error
	// ProduceFromTable should be called by transaction wappers. It will
	// send ids to a central thread that will in turn call ProduceSpecificTxEvents.
	ProduceFromTable(ctx context.Context, eventIDs map[string][]ID) error
	RecordError(context.Context, string, error) error       // pauses to avoid spamming
	RecordErrorNoWait(context.Context, string, error) error // does not pause
	IsConfigured() bool
	ValidateTopics(context.Context, []string) error
	TracerProvider(context.Context) Tracer
}

Producer is implemented by events.Library. It's an interface so that import cycles can be avoided.

type ProducingEvent

type ProducingEvent interface {
	GetKey() string
	GetTimestamp() time.Time // used for the cloudevents time header
	GetTopic() string        // Topic is also used to generate the cloudevents type header. The topic is un-prefixed. Suggested format: noun.verb
	GetHeaders() map[string][]string
}

ProducingEvent is used when generating an event. The body of the event is created by JSON marshaling.

type SelfMarshalingEvent

type SelfMarshalingEvent interface {
	Marshal() (encoded []byte, contentType string, err error)
}

SelfMarshalingEvent is a variant of ProducingEvent that has a custom marshal function.

type SharedHandlerInterface

type SharedHandlerInterface interface {
	GetTopic() string
	// Handle synchronously invokes the handler function for a message.
	Handle(ctx context.Context, handlerInfo HandlerInfo, messages []*kafka.Message) []error
	Batch() bool
}

type SimpleEvent

type SimpleEvent struct {
	// contains filtered or unexported fields
}

func (SimpleEvent) DataSchema

func (e SimpleEvent) DataSchema(s string) SimpleEvent

func (SimpleEvent) GetHeaders

func (e SimpleEvent) GetHeaders() map[string][]string

GetHeaders fills in default values for the headers and returns a http.Header. It is required to fulfill the ProducingEvent interface contract.

func (SimpleEvent) GetKey

func (e SimpleEvent) GetKey() string

func (SimpleEvent) GetTimestamp

func (e SimpleEvent) GetTimestamp() time.Time

func (SimpleEvent) GetTopic

func (e SimpleEvent) GetTopic() string

func (SimpleEvent) Header

func (e SimpleEvent) Header() map[string][]string

Header returns a map[string][]string object that contains the data that will become the event headers.

All headers, except "content-type" should be prefixed with "ce_" to indicate that they're CloudEvent headers. The "ce_" will be stripped on the receiving side.

func (SimpleEvent) ID

func (e SimpleEvent) ID(id string) SimpleEvent

func (SimpleEvent) MarshalJSON

func (e SimpleEvent) MarshalJSON() ([]byte, error)

func (SimpleEvent) Source

func (e SimpleEvent) Source(source string) SimpleEvent

func (SimpleEvent) SpecVersion

func (e SimpleEvent) SpecVersion(v string) SimpleEvent

func (SimpleEvent) Subject

func (e SimpleEvent) Subject(sub string) SimpleEvent

func (SimpleEvent) Time

func (e SimpleEvent) Time(t time.Time) SimpleEvent

func (SimpleEvent) Type

func (e SimpleEvent) Type(t string) SimpleEvent

type StringEventID

type StringEventID struct {
	baseuuid.UUID
}

StringEventUUID is a UUID type that presents itself to the sql driver as a string This is compatible with PostgreSQL

func (StringEventID) New

func (u StringEventID) New() StringEventID

type Tracer

type Tracer func(string, ...any)

Tracer is a Logf/Printf signature

type TracerConfig added in v0.8.0

type TracerConfig struct {
	BeginSpan func(context.Context, map[string]string) (context.Context, func())            // start of a span. All logs are inside spans.
	Handle    func(context.Context, bool, string, *kafka.Message) (context.Context, func()) // string is the handler name, bool is true for deadletter delivery
}

TracerConfig is how tracing becomes richer. All fields are optional. Additional fields may be added in the future. BeginSpan/DoneSpan will wrap many things. The returned funcs are for noting the end of the span/handle/thread etc.

type TracerProvider added in v0.8.0

type TracerProvider func(context.Context) Tracer

TracerProvider is a minimal logger-from-context function. Given a context, it returns a Logf. Logging behavior can be significantly enhanced with SetTracerConfig().

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL