events

package
v0.0.0-...-4c964c4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var WireSet = wire.NewSet(
	ProvideSystem,
)

WireSet provides a wire set for this package.

Functions

func NewDiscardEventError

func NewDiscardEventError(inner error) error

func NewDiscardEventErrorf

func NewDiscardEventErrorf(format string, args ...interface{}) error

func ReaderRegisterEvent

func ReaderRegisterEvent[T interface{}](reader *GenericReader,
	eventType EventType, fn HandlerFunc[T], opts ...HandlerOption) error

ReaderRegisterEvent registers a type safe handler function on the reader for a specific event. This method allows to register type safe handlers without the need of handling the raw stream payload. NOTE: Generic arguments are not allowed for struct methods, hence pass the reader as input parameter.

func ReporterSendEvent

func ReporterSendEvent[T interface{}](reporter *GenericReporter, ctx context.Context,
	eventType EventType, payload T) (string, error)

ReportEvent reports an event using the provided GenericReporter. Returns the reported event's ID in case of success. NOTE: This call is blocking until the event was send (not until it was processed).

Types

type Config

type Config struct {
	Mode                  Mode
	Namespace             string
	MaxStreamLength       int64
	ApproxMaxStreamLength bool
}

Config defines the config of the events system.

func (*Config) Validate

func (c *Config) Validate() error

type Event

type Event[T interface{}] struct {
	ID        string    `json:"id"`
	Timestamp time.Time `json:"timestamp"`
	Payload   T         `json:"payload"`
}

type EventType

type EventType string

EventType describes the type of event.

type GenericReader

type GenericReader struct {
	// contains filtered or unexported fields
}

GenericReader represents an event reader that supports registering type safe handlers for an arbitrary set of custom events within a given event category using the ReaderRegisterEvent method. NOTE: Optimally this should be an interface with RegisterEvent[T] method, but that's currently not possible in go. IMPORTANT: This reader should not be instantiated from external packages.

func (*GenericReader) Configure

func (r *GenericReader) Configure(opts ...ReaderOption)

type GenericReporter

type GenericReporter struct {
	// contains filtered or unexported fields
}

GenericReporter represents an event reporter that supports sending typesafe messages for an arbitrary set of custom events within an event category using the ReporterSendEvent method. NOTE: Optimally this should be an interface with SendEvent[T] method, but that's not possible in go.

func NewReporter

func NewReporter(system *System, category string) (*GenericReporter, error)

type HandlerFunc

type HandlerFunc[T interface{}] func(context.Context, *Event[T]) error

type HandlerOption

type HandlerOption stream.HandlerOption

HandlerOption can be used to configure event handlers.

func WithIdleTimeout

func WithIdleTimeout(timeout time.Duration) HandlerOption

WithIdleTimeout can be used to set the idle timeout for a specific event handler.

func WithMaxRetries

func WithMaxRetries(maxRetries int) HandlerOption

WithMaxRetries can be used to set the max retry count for a specific event handler.

type Mode

type Mode string

Mode defines the different modes of the event framework.

const (
	ModeRedis    Mode = "redis"
	ModeInMemory Mode = "inmemory"
)

type Reader

type Reader interface {
	Configure(opts ...ReaderOption)
}

Reader specifies the minimum functionality a reader should expose. NOTE: we don't want to enforce any event registration methods here, allowing full control for customized readers.

type ReaderCanceler

type ReaderCanceler struct {
	// contains filtered or unexported fields
}

ReaderCanceler exposes the functionality to cancel a reader explicitly.

func (*ReaderCanceler) Cancel

func (d *ReaderCanceler) Cancel() error

type ReaderFactory

type ReaderFactory[R Reader] struct {
	// contains filtered or unexported fields
}

ReaderFactory allows to launch event readers of type [R] (can be GenericReader or customized readers).

func NewReaderFactory

func NewReaderFactory[R Reader](system *System, category string, fn ReaderFactoryFunc[R]) (*ReaderFactory[R], error)

func (*ReaderFactory[R]) Launch

func (f *ReaderFactory[R]) Launch(ctx context.Context,
	groupName string, readerName string, setup func(R) error) (*ReaderCanceler, error)

Launch launches a new reader for the provided group and client name. The setup method should be used to register the different events the reader will act on. To stop the reader and cleanup its resources the returned ReaderCanceler can be used. The reader also cancels automatically when the provided context is canceled. NOTE: Do not setup the reader outside of the setup method!

type ReaderFactoryFunc

type ReaderFactoryFunc[R Reader] func(reader *GenericReader) (R, error)

ReaderFactoryFunc is an abstraction of a factory method that creates customized Reader implementations (type [R]). It is triggered by the ReaderFactory to create a new instance of the Reader to launch. The provided GenericReader object is available exclusively to the factory method (every call has a fresh instance) and should be used as base of any custom Reader implementation (use ReaderRegisterEvent to register custom handler).

type ReaderOption

type ReaderOption stream.ConsumerOption

ReaderOption can be used to configure event readers.

func WithConcurrency

func WithConcurrency(concurrency int) ReaderOption

WithConcurrency sets up the concurrency of the reader.

func WithHandlerOptions

func WithHandlerOptions(opts ...HandlerOption) ReaderOption

WithHandlerOptions sets up the default options for event handlers.

type StreamConsumer

type StreamConsumer interface {
	Register(streamID string, handler stream.HandlerFunc, opts ...stream.HandlerOption) error
	Configure(opts ...stream.ConsumerOption)
	Start(ctx context.Context) error
	Errors() <-chan error
	Infos() <-chan string
}

StreamConsumer is an abstraction of a consumer from the streams package.

type StreamConsumerFactoryFunc

type StreamConsumerFactoryFunc func(groupName string, consumerName string) (StreamConsumer, error)

StreamConsumerFactoryFunc is an abstraction of a factory method for stream consumers.

type StreamProducer

type StreamProducer interface {
	Send(ctx context.Context, streamID string, payload map[string]interface{}) (string, error)
}

StreamProducer is an abstraction of a producer from the streams package.

type System

type System struct {
	// contains filtered or unexported fields
}

System represents a single contained event system that is used to setup event Reporters and ReaderFactories.

func NewSystem

func NewSystem(streamConsumerFactoryFunc StreamConsumerFactoryFunc, streamProducer StreamProducer) (*System, error)

func ProvideSystem

func ProvideSystem(config Config, redisClient redis.UniversalClient) (*System, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL