Documentation
¶
Index ¶
- Constants
- func PushEvent[T any](...) chan result.Result[*PubAckInfo]
- func PushRpcEvent[T proto.Message](handler RpcEventHandler[T], ctx context.Context, t T, ...) chan error
- func Redelivery(delay time.Duration, errs ...error) error
- func RegisterJobHandler[T proto.Message](jobCli *Client, jobName, topic string, handler EventHandler[T], ...)
- func Reject(errs ...error) error
- func WithPushOpt(opts ...func(opt *cloudeventpb.PushEventOptions)) *cloudeventpb.PushEventOptions
- func WrapHandler[Req, Rsp proto.Message](handler func(ctx context.Context, req Req) (Rsp, error)) func(ctx context.Context, req Req) error
- type Client
- type Config
- type Consumer
- type ConsumerConfig
- type Context
- type EventHandler
- type EventRegister
- type JobEventConfig
- type Options
- type Params
- type PubAckInfo
- type PushEventOpt
- type RpcEventHandler
- type StreamConfig
Constants ¶
View Source
const ( DefaultPrefix = "acj" DefaultTimeout = 15 * time.Second DefaultMaxRetry = 3 DefaultRetryBackoff = time.Second DefaultSenderKey = "sender" DefaultCloudEventDelayKey = "__cloudevent_delay_run_at" DefaultJobName = "default" DefaultConcurrent = 100 DefaultMaxConcurrent = 1000 DefaultMinConcurrent = 1 )
Variables ¶
This section is empty.
Functions ¶
func PushEvent ¶
func PushEvent[T any](handler func(*Client, context.Context, T, ...*cloudeventpb.PushEventOptions) (*PubAckInfo, error), jobCli *Client, ctx context.Context, t T, opts ...*cloudeventpb.PushEventOptions) chan result.Result[*PubAckInfo]
func PushRpcEvent ¶
func PushRpcEvent[T proto.Message](handler RpcEventHandler[T], ctx context.Context, t T, opts ...*cloudeventpb.PushEventOptions) chan error
PushRpcEvent push event async
func RegisterJobHandler ¶
func RegisterJobHandler[T proto.Message](jobCli *Client, jobName, topic string, handler EventHandler[T], opts ...*cloudeventpb.RegisterJobOptions)
func WithPushOpt ¶
func WithPushOpt(opts ...func(opt *cloudeventpb.PushEventOptions)) *cloudeventpb.PushEventOptions
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) GetSubject ¶
func (c *Client) GetSubject(name string) *cloudeventpb.CloudEventMethodOptions
func (*Client) Publish ¶
func (c *Client) Publish(ctx context.Context, topic string, args proto.Message, opts ...*cloudeventpb.PushEventOptions) (*PubAckInfo, error)
type Config ¶
type Config struct {
// Streams: nats stream config
Streams map[string]*StreamConfig `yaml:"streams"`
// Consumers: nats consumer config
Consumers map[string]typex.YamlListType[*ConsumerConfig] `yaml:"consumers"`
}
type Consumer ¶
type Consumer struct {
jetstream.Consumer
Config *ConsumerConfig
}
type ConsumerConfig ¶
type ConsumerConfig struct {
// Consumer name without prefix
Consumer *string `yaml:"consumer"`
// Concurrent default: 100
Concurrent *int `yaml:"concurrent"`
// Stream name without prefix
Stream string `yaml:"stream"`
// Subjects config
Subjects typex.YamlListType[*strOrJobConfig] `yaml:"subjects"`
// Job event config
Job *JobEventConfig `yaml:"job"`
}
type Context ¶
type Context struct {
// Header jetstream.Headers().
Header http.Header
// NumDelivered jetstream.MsgMetadata{}.NumDelivered
NumDelivered uint64
// NumPending jetstream.MsgMetadata{}.NumPending
NumPending uint64
// Timestamp jetstream.MsgMetadata{}.Timestamp
Timestamp time.Time
// Stream jetstream.MsgMetadata{}.Stream
Stream string
// Consumer jetstream.MsgMetadata{}.Consumer
Consumer string
// Subject|Topic name jetstream.Msg().Subject()
Subject string
// Config job config from config file or default
Config *JobEventConfig
}
func GetEventContext ¶
type EventRegister ¶
type EventRegister interface {
RegisterCloudEvent(jobCli *Client)
}
type JobEventConfig ¶
type JobEventConfig struct {
// Name subject name
Name *string `yaml:"name"`
// Timeout job executor timeout, default: DefaultTimeout
Timeout *time.Duration `yaml:"timeout"`
// MaxRetry max retries, default: DefaultMaxRetry
MaxRetry *int `yaml:"max_retries"`
// RetryBackoff retry backoff, default: DefaultRetryBackoff
RetryBackoff *time.Duration `yaml:"retry_backoff"`
}
type Options ¶
type Options = cloudeventpb.PushEventOptions
type PubAckInfo ¶
type PushEventOpt ¶
type PushEventOpt func(opts *Options)
type RpcEventHandler ¶
type StreamConfig ¶
type StreamConfig struct {
// Storage jetstream.StorageType
Storage string `yaml:"storage"`
// Subjects stream subscribe subject, e.g. nvr.speaker.* without prefix
Subjects typex.YamlListType[string] `yaml:"subjects"`
}
Click to show internal directories.
Click to hide internal directories.