cloudevent

package
v2.0.0-alpha.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2025 License: MIT Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultPrefix             = "acj"
	DefaultTimeout            = 15 * time.Second
	DefaultMaxRetry           = 3
	DefaultRetryBackoff       = time.Second
	DefaultSenderKey          = "sender"
	DefaultCloudEventDelayKey = "__cloudevent_delay_run_at"
	DefaultJobName            = "default"
	DefaultConcurrent         = 100
	DefaultMaxConcurrent      = 1000
	DefaultMinConcurrent      = 1
)

Variables

This section is empty.

Functions

func PushEvent

func PushEvent[T any](handler func(*Client, context.Context, T, ...*cloudeventpb.PushEventOptions) (*PubAckInfo, error), jobCli *Client, ctx context.Context, t T, opts ...*cloudeventpb.PushEventOptions) chan result.Result[*PubAckInfo]

func PushRpcEvent

func PushRpcEvent[T proto.Message](handler RpcEventHandler[T], ctx context.Context, t T, opts ...*cloudeventpb.PushEventOptions) chan error

PushRpcEvent push event async

func Redelivery

func Redelivery(delay time.Duration, errs ...error) error

func RegisterJobHandler

func RegisterJobHandler[T proto.Message](jobCli *Client, jobName, topic string, handler EventHandler[T], opts ...*cloudeventpb.RegisterJobOptions)

func Reject

func Reject(errs ...error) error

func WithPushOpt

func WithPushOpt(opts ...func(opt *cloudeventpb.PushEventOptions)) *cloudeventpb.PushEventOptions

func WrapHandler

func WrapHandler[Req, Rsp proto.Message](handler func(ctx context.Context, req Req) (Rsp, error)) func(ctx context.Context, req Req) error

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(p Params) *Client

func (*Client) GetSubject

func (c *Client) GetSubject(name string) *cloudeventpb.CloudEventMethodOptions

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, topic string, args proto.Message, opts ...*cloudeventpb.PushEventOptions) (*PubAckInfo, error)

func (*Client) Start

func (c *Client) Start() error

type Config

type Config struct {
	// Streams: nats stream config
	Streams map[string]*StreamConfig `yaml:"streams"`

	// Consumers: nats consumer config
	Consumers map[string]typex.YamlListType[*ConsumerConfig] `yaml:"consumers"`
}

type Consumer

type Consumer struct {
	jetstream.Consumer
	Config *ConsumerConfig
}

type ConsumerConfig

type ConsumerConfig struct {
	// Consumer name without prefix
	Consumer *string `yaml:"consumer"`

	// Concurrent default: 100
	Concurrent *int `yaml:"concurrent"`

	// Stream name without prefix
	Stream string `yaml:"stream"`

	// Subjects config
	Subjects typex.YamlListType[*strOrJobConfig] `yaml:"subjects"`

	// Job event config
	Job *JobEventConfig `yaml:"job"`
}

type Context

type Context struct {
	// Header jetstream.Headers().
	Header http.Header

	// NumDelivered jetstream.MsgMetadata{}.NumDelivered
	NumDelivered uint64

	// NumPending jetstream.MsgMetadata{}.NumPending
	NumPending uint64

	// Timestamp jetstream.MsgMetadata{}.Timestamp
	Timestamp time.Time

	// Stream jetstream.MsgMetadata{}.Stream
	Stream string

	// Consumer jetstream.MsgMetadata{}.Consumer
	Consumer string

	// Subject|Topic name jetstream.Msg().Subject()
	Subject string

	// Config job config from config file or default
	Config *JobEventConfig
}

func GetEventContext

func GetEventContext(ctx context.Context) *Context

type EventHandler

type EventHandler[T proto.Message] func(ctx context.Context, args T) error

type EventRegister

type EventRegister interface {
	RegisterCloudEvent(jobCli *Client)
}

type JobEventConfig

type JobEventConfig struct {
	// Name subject name
	Name *string `yaml:"name"`

	// Timeout job executor timeout, default: DefaultTimeout
	Timeout *time.Duration `yaml:"timeout"`

	// MaxRetry max retries, default: DefaultMaxRetry
	MaxRetry *int `yaml:"max_retries"`

	// RetryBackoff retry backoff, default: DefaultRetryBackoff
	RetryBackoff *time.Duration `yaml:"retry_backoff"`
}

type Options

type Params

type Params struct {
	Nc  *natsclient.Client
	Cfg *Config
	Lc  lifecycle.Lifecycle
}

type PubAckInfo

type PubAckInfo struct {
	AckInfo *jetstream.PubAck
	Header  nats.Header
	MsgId   string
}

type PushEventOpt

type PushEventOpt func(opts *Options)

type RpcEventHandler

type RpcEventHandler[T proto.Message] func(ctx context.Context, args T) (*emptypb.Empty, error)

type StreamConfig

type StreamConfig struct {
	// Storage jetstream.StorageType
	Storage string `yaml:"storage"`

	// Subjects stream subscribe subject, e.g. nvr.speaker.* without prefix
	Subjects typex.YamlListType[string] `yaml:"subjects"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL