Documentation
¶
Overview ¶
Package event contains an interface as well as helpers for go-orb events.
Index ¶
- Constants
- Variables
- func HandleRequest[TReq any, TResp any](ctx context.Context, handler Client, topic string, ...)
- func Register(name string, factory ProviderFunc)
- func Request[TResp any, TReq any](ctx context.Context, handler Client, topic string, req TReq, ...) (*TResp, error)
- type AckFunc
- type Client
- type Config
- type ConfigType
- type ConsumeOption
- type ConsumeOptions
- type Event
- type NackFunc
- type Option
- type ProviderFunc
- type PublishOption
- type PublishOptions
- type Req
- type RequestOption
- type RequestOptions
- type Type
Constants ¶
const ComponentType = "event"
ComponentType is the client component type name.
Variables ¶
var ( // DefaultEventPlugin is the default client implementation to use. DefaultEventPlugin = "natsjs" // DefaultConfigSection is the default config section for the client. DefaultConfigSection = "client" // DefaultRequestContentType is the default content type used to transport data around. DefaultRequestContentType = codecs.MimeProto // DefaultPublishContentType is the default content type used to transport events around. DefaultPublishContentType = codecs.MimeJSON // DefaultRequestTimeout is the default request timeout. DefaultRequestTimeout = time.Second * 30 )
var ( // ErrMissingTopic happens whenever the user doesnt give a topic. ErrMissingTopic = errors.New("missing topic") // ErrEncodingMessage is returned from publish if there was an error encoding the message option. ErrEncodingMessage = errors.New("encoding message") )
Functions ¶
func HandleRequest ¶
func HandleRequest[TReq any, TResp any]( ctx context.Context, handler Client, topic string, callback func(ctx context.Context, req *TReq) (*TResp, error), )
HandleRequest subscribes to the given topic and handles the requests.
func Register ¶
func Register(name string, factory ProviderFunc)
Register makes a plugin available by the provided name.
func Request ¶
func Request[TResp any, TReq any]( ctx context.Context, handler Client, topic string, req TReq, opts ...RequestOption, ) (*TResp, error)
Request makes a request with using events, it's a shortcut for NewRequest(...).Request(...) Example:
resp , err := events.Request[FooResponse](context.Background(), eventsHandler, "user.new", fooRequest)
Response will be of type *FooResponse.
Types ¶
type AckFunc ¶ added in v0.3.0
type AckFunc func() error
AckFunc is the function to call to acknowledge a message.
type Client ¶ added in v0.3.0
type Client interface {
types.Component
// Request runs a REST like call on the given topic.
// This is an internal function, clients MUST use event.Request().
Request(ctx context.Context, req *Req[[]byte, any], opts ...RequestOption) ([]byte, error)
// HandleRequest subscribes to the given topic and handles the requests.
// This is a blocking operation.
// This is an internal function, clients MUST use event.HandleRequest().
HandleRequest(ctx context.Context, topic string, cb func(context.Context, *Req[[]byte, []byte]))
// Clone creates a clone of the handler, this is useful for parallel requests.
Clone() Type
// GetCodec returns the codec used by the handler for publish and subscribe.
GetPublishCodec() codecs.Marshaler
// Publish publishes a Event to the given topic.
Publish(ctx context.Context, topic string, event any, opts ...PublishOption) error
// Consume lets you consume events from a given topic.
Consume(topic string, opts ...ConsumeOption) (<-chan Event, error)
}
Client is the client interface for events plugins.
type Config ¶
type Config struct {
// Plugin selects the client implementation.
Plugin string `json:"plugin" yaml:"plugin"`
}
Config are the Client options.
type ConfigType ¶
type ConfigType interface {
// contains filtered or unexported methods
}
ConfigType is used in the functional options as type to identify a registry option. It is used over a static *Config type as this way plugins can also easilty set functional options without the complication of contexts, as was done in v4. This is possible because plugins will nest the registry.Config type, and thus inherit the interface that is used to identify the registry config.
type ConsumeOption ¶ added in v0.3.0
type ConsumeOption func(o *ConsumeOptions)
ConsumeOption sets attributes on ConsumeOptions.
func WithAutoAck ¶ added in v0.3.0
func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption
WithAutoAck sets the AutoAck field on ConsumeOptions and an ackWait duration after which if no ack is received the message is requeued in case auto ack is turned off.
func WithGroup ¶ added in v0.3.0
func WithGroup(q string) ConsumeOption
WithGroup sets the consumer group to be part of when consuming events.
func WithOffset ¶ added in v0.3.0
func WithOffset(t time.Time) ConsumeOption
WithOffset sets the offset time at which to start consuming events.
func WithRetryLimit ¶ added in v0.3.0
func WithRetryLimit(retries int) ConsumeOption
WithRetryLimit sets the RetryLimit field on ConsumeOptions. Set to -1 for infinite retries (default).
type ConsumeOptions ¶ added in v0.3.0
type ConsumeOptions struct {
// Offset is the time from which the messages should be consumed from. If not provided then
// the messages will be consumed starting from the moment the Subscription starts.
Offset time.Time
// Group is the name of the consumer group, if two consumers have the same group the events
// are distributed between them
Group string
AckWait time.Duration
// RetryLimit indicates number of times a message is retried
RetryLimit int
// AutoAck if true (default true), automatically acknowledges every message so it will not be redelivered.
// If false specifies that each message need ts to be manually acknowledged by the subscriber.
// If processing is successful the message should be ack'ed to remove the message from the stream.
// If processing is unsuccessful the message should be nack'ed (negative acknowledgement) which will mean it will
// remain on the stream to be processed again.
AutoAck bool
// CustomRetries indicates whether to use RetryLimit
CustomRetries bool
}
ConsumeOptions contains all the options which can be provided when subscribing to a topic.
func NewConsumeOptions ¶ added in v0.3.0
func NewConsumeOptions(opts ...ConsumeOption) ConsumeOptions
NewConsumeOptions generates new subscribe options with defaults.
func (ConsumeOptions) GetRetryLimit ¶ added in v0.3.0
func (s ConsumeOptions) GetRetryLimit() int
GetRetryLimit returns the RetryLimit field on ConsumeOptions.
type Event ¶ added in v0.3.0
type Event struct {
// Handler is a reference to the client.
Handler Client
// Timestamp of the event
Timestamp time.Time
// Metadata contains the values the event was indexed by
Metadata map[string]string
// ID to uniquely identify the event
ID string
// Topic of event, e.g. "registry.service.created"
Topic string
// Payload contains the encoded message
Payload []byte
// contains filtered or unexported fields
}
Event is the object returned by the broker when you subscribe to a topic.
func (*Event) Ack ¶ added in v0.3.0
Ack acknowledges successful processing of the event in ManualAck mode.
func (*Event) Nack ¶ added in v0.3.0
Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode.
func (*Event) SetAckFunc ¶ added in v0.3.0
SetAckFunc sets the AckFunc for the event.
func (*Event) SetNackFunc ¶ added in v0.3.0
SetNackFunc sets the NackFunc for the event.
type NackFunc ¶ added in v0.3.0
type NackFunc func() error
NackFunc is the function to call to negatively acknowledge a message.
type Option ¶
type Option func(ConfigType)
Option is a functional option type for the registry.
func WithClientPlugin ¶
WithClientPlugin set the client implementation to use.
type ProviderFunc ¶
type ProviderFunc func( configData map[string]any, logger log.Logger, opts ...Option, ) (Type, error)
ProviderFunc is provider function type used by plugins to create a new client.
type PublishOption ¶ added in v0.3.0
type PublishOption func(o *PublishOptions)
PublishOption sets attributes on PublishOptions.
func WithPublishMetadata ¶ added in v0.3.0
func WithPublishMetadata(md map[string]string) PublishOption
WithPublishMetadata sets the Metadata field on PublishOptions.
func WithPublishTimestamp ¶ added in v0.3.0
func WithPublishTimestamp(t time.Time) PublishOption
WithPublishTimestamp sets the timestamp field on PublishOptions.
type PublishOptions ¶ added in v0.3.0
type PublishOptions struct {
// Metadata contains any keys which can be used to query the data, for example a customer id
Metadata map[string]string
// Timestamp to set for the event, if the timestamp is a zero value, the current time will be used
Timestamp time.Time
}
PublishOptions contains all the options which can be provided when publishing an event.
func NewPublishOptions ¶ added in v0.3.0
func NewPublishOptions(opts ...PublishOption) PublishOptions
NewPublishOptions generates new publish options with defaults.
type Req ¶
type Req[TReq any, TResp any] struct { Topic string `json:"topic"` ContentType string `json:"contentType"` // The Data of type TReq Data TReq `json:"data" yaml:"data"` // Err is an error that might happened during encoding. Err error // contains filtered or unexported fields }
Req contains all data for a request call.
func NewRequest ¶
NewRequest creates a event for the given topic.
type RequestOption ¶
type RequestOption func(o *RequestOptions)
RequestOption sets attributes on Calloptions.
func WithRequestContentType ¶
func WithRequestContentType(ct string) RequestOption
WithRequestContentType sets the ContentType field to use for transporting the message.
func WithRequestResponseMetadata ¶
func WithRequestResponseMetadata(md map[string]string) RequestOption
WithRequestResponseMetadata will write response Metadata into the given map.
func WithRequestTimeout ¶
func WithRequestTimeout(t time.Duration) RequestOption
WithRequestTimeout sets the timeout for a request.
type RequestOptions ¶
type RequestOptions struct {
// ContentType for transporting the message.
ContentType string
// Metadata contains keys which can be used to query the data, for example a customer id
Metadata map[string]string
// RequestTimeout defines how long to wait for the server to reply on a request.
RequestTimeout time.Duration
}
RequestOptions contains options for a call.
func NewRequestOptions ¶
func NewRequestOptions(opts ...RequestOption) RequestOptions
NewRequestOptions generates new calloptions with defaults.
type Type ¶ added in v0.3.0
type Type struct {
Client
}
Type is the client implementation for events.
func Provide ¶
func Provide( svcCtx *cli.ServiceContextWithConfig, components *types.Components, logger log.Logger, opts ...Option) (Type, error)
Provide creates a new client instance with the implementation from cfg.Plugin.
func ProvideNoOpts ¶ added in v0.3.0
func ProvideNoOpts( svcCtx *cli.ServiceContextWithConfig, components *types.Components, logger log.Logger, ) (Type, error)
ProvideNoOpts creates a new client instance with the implementation from cfg.Plugin.