event

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2025 License: Apache-2.0 Imports: 13 Imported by: 6

Documentation

Overview

Package event contains an interface as well as helpers for go-orb events.

Index

Constants

View Source
const ComponentType = "event"

ComponentType is the client component type name.

Variables

View Source
var (
	// DefaultEventPlugin is the default client implementation to use.
	DefaultEventPlugin = "natsjs"

	// DefaultConfigSection is the default config section for the client.
	DefaultConfigSection = "client"

	// DefaultRequestContentType is the default content type used to transport data around.
	DefaultRequestContentType = codecs.MimeProto

	// DefaultPublishContentType is the default content type used to transport events around.
	DefaultPublishContentType = codecs.MimeJSON

	// DefaultRequestTimeout is the default request timeout.
	DefaultRequestTimeout = time.Second * 30
)
View Source
var (
	// ErrMissingTopic happens whenever the user doesnt give a topic.
	ErrMissingTopic = errors.New("missing topic")

	// ErrEncodingMessage is returned from publish if there was an error encoding the message option.
	ErrEncodingMessage = errors.New("encoding message")
)

Functions

func HandleRequest

func HandleRequest[TReq any, TResp any](
	ctx context.Context,
	handler Client,
	topic string,
	callback func(ctx context.Context, req *TReq) (*TResp, error),
)

HandleRequest subscribes to the given topic and handles the requests.

func Register

func Register(name string, factory ProviderFunc)

Register makes a plugin available by the provided name.

func Request

func Request[TResp any, TReq any](
	ctx context.Context,
	handler Client,
	topic string,
	req TReq,
	opts ...RequestOption,
) (*TResp, error)

Request makes a request with using events, it's a shortcut for NewRequest(...).Request(...) Example:

resp , err := events.Request[FooResponse](context.Background(), eventsHandler, "user.new", fooRequest)

Response will be of type *FooResponse.

Types

type AckFunc added in v0.3.0

type AckFunc func() error

AckFunc is the function to call to acknowledge a message.

type Client added in v0.3.0

type Client interface {
	types.Component

	// Request runs a REST like call on the given topic.
	// This is an internal function, clients MUST use event.Request().
	Request(ctx context.Context, req *Req[[]byte, any], opts ...RequestOption) ([]byte, error)

	// HandleRequest subscribes to the given topic and handles the requests.
	// This is a blocking operation.
	// This is an internal function, clients MUST use event.HandleRequest().
	HandleRequest(ctx context.Context, topic string, cb func(context.Context, *Req[[]byte, []byte]))

	// Clone creates a clone of the handler, this is useful for parallel requests.
	Clone() Type

	// GetCodec returns the codec used by the handler for publish and subscribe.
	GetPublishCodec() codecs.Marshaler

	// Publish publishes a Event to the given topic.
	Publish(ctx context.Context, topic string, event any, opts ...PublishOption) error

	// Consume lets you consume events from a given topic.
	Consume(topic string, opts ...ConsumeOption) (<-chan Event, error)
}

Client is the client interface for events plugins.

type Config

type Config struct {
	// Plugin selects the client implementation.
	Plugin string `json:"plugin" yaml:"plugin"`
}

Config are the Client options.

func NewConfig

func NewConfig(opts ...Option) Config

NewConfig generates a new config with all the defaults.

type ConfigType

type ConfigType interface {
	// contains filtered or unexported methods
}

ConfigType is used in the functional options as type to identify a registry option. It is used over a static *Config type as this way plugins can also easilty set functional options without the complication of contexts, as was done in v4. This is possible because plugins will nest the registry.Config type, and thus inherit the interface that is used to identify the registry config.

type ConsumeOption added in v0.3.0

type ConsumeOption func(o *ConsumeOptions)

ConsumeOption sets attributes on ConsumeOptions.

func WithAutoAck added in v0.3.0

func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption

WithAutoAck sets the AutoAck field on ConsumeOptions and an ackWait duration after which if no ack is received the message is requeued in case auto ack is turned off.

func WithGroup added in v0.3.0

func WithGroup(q string) ConsumeOption

WithGroup sets the consumer group to be part of when consuming events.

func WithOffset added in v0.3.0

func WithOffset(t time.Time) ConsumeOption

WithOffset sets the offset time at which to start consuming events.

func WithRetryLimit added in v0.3.0

func WithRetryLimit(retries int) ConsumeOption

WithRetryLimit sets the RetryLimit field on ConsumeOptions. Set to -1 for infinite retries (default).

type ConsumeOptions added in v0.3.0

type ConsumeOptions struct {
	// Offset is the time from which the messages should be consumed from. If not provided then
	// the messages will be consumed starting from the moment the Subscription starts.
	Offset time.Time
	// Group is the name of the consumer group, if two consumers have the same group the events
	// are distributed between them
	Group   string
	AckWait time.Duration
	// RetryLimit indicates number of times a message is retried
	RetryLimit int
	// AutoAck if true (default true), automatically acknowledges every message so it will not be redelivered.
	// If false specifies that each message need ts to be manually acknowledged by the subscriber.
	// If processing is successful the message should be ack'ed to remove the message from the stream.
	// If processing is unsuccessful the message should be nack'ed (negative acknowledgement) which will mean it will
	// remain on the stream to be processed again.
	AutoAck bool
	// CustomRetries indicates whether to use RetryLimit
	CustomRetries bool
}

ConsumeOptions contains all the options which can be provided when subscribing to a topic.

func NewConsumeOptions added in v0.3.0

func NewConsumeOptions(opts ...ConsumeOption) ConsumeOptions

NewConsumeOptions generates new subscribe options with defaults.

func (ConsumeOptions) GetRetryLimit added in v0.3.0

func (s ConsumeOptions) GetRetryLimit() int

GetRetryLimit returns the RetryLimit field on ConsumeOptions.

type Event added in v0.3.0

type Event struct {
	// Handler is a reference to the client.
	Handler Client

	// Timestamp of the event
	Timestamp time.Time
	// Metadata contains the values the event was indexed by
	Metadata map[string]string

	// ID to uniquely identify the event
	ID string
	// Topic of event, e.g. "registry.service.created"
	Topic string
	// Payload contains the encoded message
	Payload []byte
	// contains filtered or unexported fields
}

Event is the object returned by the broker when you subscribe to a topic.

func (*Event) Ack added in v0.3.0

func (e *Event) Ack() error

Ack acknowledges successful processing of the event in ManualAck mode.

func (*Event) Nack added in v0.3.0

func (e *Event) Nack() error

Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode.

func (*Event) SetAckFunc added in v0.3.0

func (e *Event) SetAckFunc(f AckFunc)

SetAckFunc sets the AckFunc for the event.

func (*Event) SetNackFunc added in v0.3.0

func (e *Event) SetNackFunc(f NackFunc)

SetNackFunc sets the NackFunc for the event.

func (*Event) Unmarshal added in v0.3.0

func (e *Event) Unmarshal(v any) error

Unmarshal the events message into an object.

type NackFunc added in v0.3.0

type NackFunc func() error

NackFunc is the function to call to negatively acknowledge a message.

type Option

type Option func(ConfigType)

Option is a functional option type for the registry.

func WithClientPlugin

func WithClientPlugin(n string) Option

WithClientPlugin set the client implementation to use.

type ProviderFunc

type ProviderFunc func(
	configData map[string]any,
	logger log.Logger,
	opts ...Option,
) (Type, error)

ProviderFunc is provider function type used by plugins to create a new client.

type PublishOption added in v0.3.0

type PublishOption func(o *PublishOptions)

PublishOption sets attributes on PublishOptions.

func WithPublishMetadata added in v0.3.0

func WithPublishMetadata(md map[string]string) PublishOption

WithPublishMetadata sets the Metadata field on PublishOptions.

func WithPublishTimestamp added in v0.3.0

func WithPublishTimestamp(t time.Time) PublishOption

WithPublishTimestamp sets the timestamp field on PublishOptions.

type PublishOptions added in v0.3.0

type PublishOptions struct {
	// Metadata contains any keys which can be used to query the data, for example a customer id
	Metadata map[string]string
	// Timestamp to set for the event, if the timestamp is a zero value, the current time will be used
	Timestamp time.Time
}

PublishOptions contains all the options which can be provided when publishing an event.

func NewPublishOptions added in v0.3.0

func NewPublishOptions(opts ...PublishOption) PublishOptions

NewPublishOptions generates new publish options with defaults.

type Req

type Req[TReq any, TResp any] struct {
	Topic       string `json:"topic"`
	ContentType string `json:"contentType"`

	// The Data of type TReq
	Data TReq `json:"data" yaml:"data"`
	// Err is an error that might happened during encoding.
	Err error
	// contains filtered or unexported fields
}

Req contains all data for a request call.

func NewRequest

func NewRequest[TResp, TReq any](req TReq) *Req[TReq, TResp]

NewRequest creates a event for the given topic.

func (*Req[TReq, TResp]) Request

func (e *Req[TReq, TResp]) Request(ctx context.Context, handler Client, topic string, opts ...RequestOption) (*TResp, error)

Request runs a REST like call on the events topic.

func (*Req[TReq, TResp]) SetReplyFunc

func (e *Req[TReq, TResp]) SetReplyFunc(h func(ctx context.Context, result TResp, err error))

SetReplyFunc sets the internal reply func (for example nats.Msg) for the client.

type RequestOption

type RequestOption func(o *RequestOptions)

RequestOption sets attributes on Calloptions.

func WithRequestContentType

func WithRequestContentType(ct string) RequestOption

WithRequestContentType sets the ContentType field to use for transporting the message.

func WithRequestResponseMetadata

func WithRequestResponseMetadata(md map[string]string) RequestOption

WithRequestResponseMetadata will write response Metadata into the given map.

func WithRequestTimeout

func WithRequestTimeout(t time.Duration) RequestOption

WithRequestTimeout sets the timeout for a request.

type RequestOptions

type RequestOptions struct {
	// ContentType for transporting the message.
	ContentType string
	// Metadata contains keys which can be used to query the data, for example a customer id
	Metadata map[string]string

	// RequestTimeout defines how long to wait for the server to reply on a request.
	RequestTimeout time.Duration
}

RequestOptions contains options for a call.

func NewRequestOptions

func NewRequestOptions(opts ...RequestOption) RequestOptions

NewRequestOptions generates new calloptions with defaults.

type Type added in v0.3.0

type Type struct {
	Client
}

Type is the client implementation for events.

func Provide

func Provide(
	svcCtx *cli.ServiceContextWithConfig,
	components *types.Components,
	logger log.Logger,
	opts ...Option) (Type, error)

Provide creates a new client instance with the implementation from cfg.Plugin.

func ProvideNoOpts added in v0.3.0

func ProvideNoOpts(
	svcCtx *cli.ServiceContextWithConfig,
	components *types.Components,
	logger log.Logger,
) (Type, error)

ProvideNoOpts creates a new client instance with the implementation from cfg.Plugin.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL