cloudevent

package
v2.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2026 License: MIT Imports: 35 Imported by: 0

README

CloudEvent Module

NATS JetStream-backed job runtime for protobuf-defined CloudEvents. Handlers and publishers are wired from service/method options in .proto files and generated by protoc-gen-go-cloudevent2.

Features

  • JetStream jobs: Streams, consumers, retries, and concurrency from YAML config
  • Protobuf-first: Job name and subject/topic come from proto extensions
  • Codegen: Register*CloudEvent, *Publisher, and subject/job key constants
  • Interceptors: Subscribe and publish middleware chains
  • Delivery control: Reject, Redelivery, and ForceRetry for consumer semantics
  • Typed results: Publish returns result.Result[*PubAckInfo]

Installation

go get github.com/pubgo/funk/v2/component/cloudevent
go install github.com/pubgo/funk/v2/cmds/protoc-gen-go-cloudevent2@latest

From this repository root:

make protobuf   # regenerates proto output

Protobuf layout

CloudEvent uses two proto packages on purpose:

Package Go import Purpose
proto/cloudevent github.com/pubgo/funk/v2/proto/cloudevent (cloudeventpb) Message types: PushEventOptions, RegisterJobOptions, option payloads
proto/cloudeventoption github.com/pubgo/funk/v2/proto/cloudeventoption (cloudeventoptionpb) Descriptor extensions E_Job and E_Subject on services/methods

Message types and extensions are split so application protos can import only the extension package (which re-exports the option message types) without pulling extension definitions into every generated API surface.

Annotate a service
syntax = "proto3";

import "cloudeventoption/options.proto";

service GidInnerService {
  option (lava.cloudevent.job) = { name: "gid" };

  rpc ProxyExec(ProxyExecReq) returns (google.protobuf.Empty) {
    option (lava.cloudevent.subject) = { name: "gid.proxy.exec" };
  }
}
  • job.name must match a consumer group key in YAML config (consumers.<job>).
  • subject.name must match a subject entry under that consumer.
Codegen

Add to your protoc invocation (see root protobuf.yaml):

--go-cloudevent2_out=paths=source_relative:.

Generated artifacts per annotated service include:

  • <Service>CloudEventJobKey — job name constant
  • <Method>CloudEventSubjectKey — subject/topic constant
  • <Service>CloudEvent — handler struct with On<Method> fields
  • Register<Service>CloudEvent(client, event, opts...) — registers non-nil handlers
  • <Service>Publisher{ Client, Opt, Interceptors } with Push<Method>Event methods

Configuration

Jobs are declared in YAML (see config.yaml for shape):

jobs:
  streams:
    gid:
      storage: "file"
      subjects: ["gid.>"]
  consumers:
    gid:
      - consumer: "test:gid"
        stream: "gid"
        subjects: "gid.proxy.exec"
        job:
          timeout: "1m"
          max_retries: 10

Load config and create a client with your NATS JetStream connection. Subject names in config are prefixed at runtime (DefaultPrefix, default acj).

Defaults (overridable per job):

Setting Default
Handler timeout 15s
Max retries 3
Retry backoff 1s
Consumer AckWait 5m

Quick start

Register handlers (generated)
RegisterGidInnerServiceCloudEvent(jobCli, GidInnerServiceCloudEvent{
    OnProxyExec: func(ctx context.Context, req *gidpb.ProxyExecReq) error {
        evt := cloudevent.GetContext(ctx)
        // evt.Subject, evt.NumDelivered, evt.Config, ...
        return nil
    },
})
Register handlers (manual)
cloudevent.RegisterJobHandler(jobCli, "gid", "gid.proxy.exec",
    func(ctx context.Context, req *gidpb.ProxyExecReq) error { return nil },
    cloudevent.ProtoRegisterOpts(registerOpts...),
    cloudevent.WithSubInterceptors(myInterceptor),
)
Publish (generated publisher)
pub := GidInnerServicePublisher{
    Client: jobCli,
    Opt:    cloudevent.ProtoPubOpts(defaultPushOpts...),
}
ack := pub.PushProxyExecEvent(ctx, req).Unwrap()
Publish (direct)
ack := cloudevent.Publish(jobCli, ctx, subjectKey, req, interceptors, opts...).Unwrap()

Publish merges PubOpt values, sets default content_type to application/json, and fills sender from build metadata when omitted.

Interceptors

Subscribe (SubInterceptor): wrap the handler invocation. Attach per registration via WithSubInterceptors or RegisterOpt.

Publish (PubInterceptor): wrap the publish call. Pass on Client.Publish / generated Publisher.Interceptors.

Both follow a func(next ...) ... middleware pattern.

Delivery errors

Return these from handlers to control JetStream acknowledgment:

Function Behavior
Reject(errs...) Ack and discard; no further retries
Redelivery(delay, errs...) Nak with delay (scheduled redelivery)
ForceRetry(errs...) Immediate nak / force redelivery

Ordinary returned errors trigger the configured retry/backoff policy until max_retries is exhausted.

Context

GetContext(ctx) returns delivery metadata: subject, stream, consumer, delivery counts, timestamp, HTTP-style headers, and resolved JobEventConfig. Replaces the old GetEventContext name.

Migration from pre-v2 funk cloudevent

Breaking changes when moving from the previous funk layout:

Before After
proto/cloudevent/options.proto with extensions proto/cloudeventoption/options.proto (cloudeventoptionpb.E_Job, E_Subject)
GetEventContext GetContext
PushEvent / PushRpcEvent helpers Generated *Publisher.Push*Event or Publish
Publish returning (ack, error) Publishresult.Result[*PubAckInfo]
RegisterJobHandler without opts RegisterJobHandler(..., opts ...RegisterOpt)
Single proto package Split cloudeventpb + cloudeventoptionpb

Checklist for downstream services:

  1. Update proto imports to cloudeventoption/options.proto.
  2. Regenerate with protoc-gen-go-cloudevent2 (not protoc-gen-go-cloudevent).
  3. Replace handler registration with generated Register*CloudEvent or pass RegisterOpt.
  4. Replace publish calls with *Publisher or Publish + result unwrapping.
  5. Rename GetEventContextGetContext.
  6. Align YAML job/subject names with proto job / subject options.

Example

Runnable end-to-end demo (NATS + JetStream required):

docker run --rm -p 4222:4222 nats:latest -js
go run ./component/cloudevent/example

Source layout:

  • example/main.go — wires NATS, YAML config, generated register/publish APIs
  • example/demopb/demo.proto — sample service annotations
  • example/config.yaml — stream/consumer config matching the proto subjects

Package-level godoc examples live in example_test.go.

References

Documentation

Index

Examples

Constants

View Source
const (
	DefaultPrefix        = "acj"
	DefaultTimeout       = 15 * time.Second
	DefaultMaxRetry      = 3
	DefaultRetryBackoff  = time.Second
	SenderHeaderKey      = "__cloudevent_sender"
	DelayHeaderKey       = "__cloudevent_delay_run_at"
	DefaultJobName       = "default"
	DefaultConcurrent    = 100
	DefaultMaxConcurrent = 1000
	DefaultMinConcurrent = 1
)

Variables

This section is empty.

Functions

func ForceRetry added in v2.0.4

func ForceRetry(errs ...error) error

func Publish added in v2.0.4

func Publish(jobCli *Client, ctx context.Context, topic string, args proto.Message, interceptors []PubInterceptor, opts ...PubOpt) result.Result[*PubAckInfo]

func Redelivery

func Redelivery(delay time.Duration, errs ...error) error

func RegisterJobHandler

func RegisterJobHandler[T proto.Message](jobCli *Client, jobName, topic string, handler Handler[T], opts ...RegisterOpt)

func Reject

func Reject(errs ...error) error

func WithPushOpt

func WithPushOpt(opts ...func(opt *cloudeventpb.PushEventOptions)) *cloudeventpb.PushEventOptions
Example
package main

import (
	"fmt"

	"github.com/samber/lo"

	"github.com/pubgo/funk/v2/component/cloudevent"
	cloudeventpb "github.com/pubgo/funk/v2/proto/cloudevent"
)

func main() {
	opt := cloudevent.WithPushOpt(func(o *cloudeventpb.PushEventOptions) {
		o.ContentType = lo.ToPtr("application/protobuf")
	})
	fmt.Println(opt.GetContentType())

}
Output:
application/protobuf

func WrapHandler

func WrapHandler[Req, Rsp proto.Message](handler func(ctx context.Context, req Req) (Rsp, error)) func(ctx context.Context, req Req) error

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(p Params) *Client

func (*Client) GetSubject

func (c *Client) GetSubject(name string) *cloudeventpb.CloudEventMethodOptions

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, topic string, args proto.Message, interceptors []PubInterceptor, opts ...PubOpt) result.Result[*PubAckInfo]

func (*Client) Start

func (c *Client) Start() error

type Config

type Config struct {
	Streams   map[string]*StreamConfig                       `yaml:"streams"`
	Consumers map[string]typex.YamlListType[*ConsumerConfig] `yaml:"consumers"`
}

type Consumer

type Consumer struct {
	jetstream.Consumer
	Config *ConsumerConfig
}

type ConsumerConfig

type ConsumerConfig struct {
	Consumer   *string                             `yaml:"consumer"`
	Concurrent *int                                `yaml:"concurrent"`
	Stream     string                              `yaml:"stream"`
	Subjects   typex.YamlListType[*strOrJobConfig] `yaml:"subjects"`
	Job        *JobEventConfig                     `yaml:"job"`
}

type Context

type Context struct {
	Header       http.Header
	NumDelivered uint64
	NumPending   uint64
	Timestamp    time.Time
	Stream       string
	Consumer     string
	Subject      string
	Config       *JobEventConfig
}

func GetContext added in v2.0.4

func GetContext(ctx context.Context) *Context

type Handler added in v2.0.4

type Handler[T proto.Message] func(ctx context.Context, args T) error

type JobEventConfig

type JobEventConfig struct {
	Name         *string        `yaml:"name"`
	Timeout      *time.Duration `yaml:"timeout"`
	MaxRetry     *int           `yaml:"max_retries"`
	RetryBackoff *time.Duration `yaml:"retry_backoff"`
}

type Params

type Params struct {
	Nc  *natsclient.Client
	Cfg *Config
	Lc  lifecycle.Lifecycle
}

type PubAckInfo

type PubAckInfo struct {
	AckInfo *jetstream.PubAck
	Header  nats.Header
	MsgId   string
}

type PubInterceptFunc added in v2.0.4

type PubInterceptFunc func(ctx context.Context, topic string, args proto.Message, opts *PubOptions, handler func(ctx context.Context, topic string, args proto.Message, opts *PubOptions) result.Result[*PubAckInfo]) result.Result[*PubAckInfo]

type PubInterceptor added in v2.0.4

type PubInterceptor func(next PubInterceptFunc) PubInterceptFunc

type PubOpt added in v2.0.4

type PubOpt func(opts *PubOptions)

func ProtoPubOpts added in v2.0.4

func ProtoPubOpts(opts ...*cloudeventpb.PushEventOptions) []PubOpt

type PubOptions added in v2.0.4

type PubOptions = cloudeventpb.PushEventOptions

type Register added in v2.0.4

type Register interface {
	RegisterCloudEvent(jobCli *Client)
}

type RegisterJobOptions added in v2.0.4

type RegisterJobOptions struct {
	Opts         *cloudeventpb.RegisterJobOptions
	Interceptors []SubInterceptor
}

type RegisterOpt added in v2.0.4

type RegisterOpt func(opts *RegisterJobOptions)

func ProtoRegisterOpts added in v2.0.4

func ProtoRegisterOpts(opts ...*cloudeventpb.RegisterJobOptions) RegisterOpt
Example
package main

import (
	"fmt"

	"github.com/samber/lo"

	"github.com/pubgo/funk/v2/component/cloudevent"
	cloudeventpb "github.com/pubgo/funk/v2/proto/cloudevent"
)

func main() {
	opt := cloudevent.ProtoRegisterOpts(&cloudeventpb.RegisterJobOptions{
		JobName: lo.ToPtr("demo"),
	})
	ro := &cloudevent.RegisterJobOptions{Opts: new(cloudeventpb.RegisterJobOptions)}
	opt(ro)
	fmt.Println(lo.FromPtr(ro.Opts.JobName))

}
Output:
demo

func WithSubInterceptors added in v2.0.4

func WithSubInterceptors(interceptors ...SubInterceptor) RegisterOpt

type RpcHandler added in v2.0.4

type RpcHandler[T proto.Message] func(ctx context.Context, args T) (*emptypb.Empty, error)

type StreamConfig

type StreamConfig struct {
	Storage  string                     `yaml:"storage"`
	Subjects typex.YamlListType[string] `yaml:"subjects"`
}

type SubInterceptFunc added in v2.0.4

type SubInterceptFunc func(ctx context.Context, args proto.Message, handler func(ctx context.Context, args proto.Message) error) error

type SubInterceptor added in v2.0.4

type SubInterceptor func(next SubInterceptFunc) SubInterceptFunc

Directories

Path Synopsis
Runnable CloudEvent demo.
Runnable CloudEvent demo.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL