cloudevents

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package cloudevents provides CloudEvents v1.0 compatible event types and utilities for use within protoflow. This package implements the CloudEvents specification with protoflow-specific extensions for reliability semantics.

Index

Constants

View Source
const (
	// ExtAttempt is the current retry attempt number (1-based).
	ExtAttempt = "pf_attempt"

	// ExtMaxAttempts is the maximum number of retry attempts allowed.
	ExtMaxAttempts = "pf_max_attempts"

	// ExtNextAttemptAt is the RFC3339 timestamp or unix time for the next retry.
	ExtNextAttemptAt = "pf_next_attempt_at"

	// ExtDeadLetter indicates the event has been moved to DLQ.
	ExtDeadLetter = "pf_dead_letter"

	// ExtTraceID is the distributed trace ID (W3C traceparent compatible).
	ExtTraceID = "pf_trace_id"

	// ExtParentID is the parent span ID for trace correlation.
	ExtParentID = "pf_parent_id"

	// ExtDelayMs is the delay in milliseconds before processing.
	ExtDelayMs = "pf_delay_ms"

	// ExtEventVersion is an optional version number for the event schema.
	ExtEventVersion = "pf_event_version"

	// ExtOriginalTopic stores the original topic when moved to DLQ.
	ExtOriginalTopic = "pf_original_topic"

	// ExtErrorMessage stores the last error message when moved to DLQ.
	ExtErrorMessage = "pf_error_message"

	// ExtCorrelationID is a correlation identifier for request tracing.
	ExtCorrelationID = "pf_correlation_id"
)

Protoflow extension keys for CloudEvents. These extensions provide reliability semantics for event processing.

View Source
const (
	// TimeFormat is the standard CloudEvents time format (RFC3339).
	TimeFormat = time.RFC3339

	// TimeFormatNano is the RFC3339 format with nanosecond precision.
	TimeFormatNano = time.RFC3339Nano
)

Time format constants for CloudEvents.

View Source
const (
	DefaultMaxAttempts = 3
)

Default values for protoflow extensions.

View Source
const SpecVersion = "1.0"

SpecVersion is the CloudEvents specification version implemented.

Variables

View Source
var (
	// ErrRetry signals that the message should be retried with the default delay.
	// The retry middleware will apply exponential backoff.
	ErrRetry = errors.New("protoflow: retry message")

	// ErrDeadLetter signals that the message should be sent to the dead letter queue
	// without further retry attempts.
	ErrDeadLetter = errors.New("protoflow: send to dead letter queue")

	// ErrSkip signals that the message should be acknowledged without processing.
	// Use this for messages that are intentionally ignored (e.g., duplicates).
	ErrSkip = errors.New("protoflow: skip message")

	// ErrUnprocessable signals that the message is permanently invalid and cannot
	// be processed. It will be sent to DLQ with the unprocessable flag.
	ErrUnprocessable = errors.New("protoflow: unprocessable message")
)

Functions

func CopyTracingContext

func CopyTracingContext(src Event, dst *Event)

CopyTracingContext copies tracing extensions from source to destination event.

func DLQTopic

func DLQTopic(eventType string) string

DLQTopic returns the dead letter queue topic name for a given event type. Convention: <eventType>.dead

func ExceedsMaxAttempts

func ExceedsMaxAttempts(evt Event) bool

ExceedsMaxAttempts returns true if the current attempt exceeds the max.

func FormatTime

func FormatTime(t time.Time) string

FormatTime formats a time value for CloudEvents.

func GetAttempt

func GetAttempt(evt Event) int

GetAttempt returns the current attempt number (1-based). Returns 0 if not set.

func GetCorrelationID

func GetCorrelationID(evt Event) string

GetCorrelationID returns the correlation ID for request tracing.

func GetDelay

func GetDelay(evt Event) time.Duration

GetDelay returns the delay as a time.Duration.

func GetDelayMs

func GetDelayMs(evt Event) int64

GetDelayMs returns the delay in milliseconds. Returns 0 if not set.

func GetErrorMessage

func GetErrorMessage(evt Event) string

GetErrorMessage returns the last error message.

func GetEventVersion

func GetEventVersion(evt Event) string

GetEventVersion returns the event schema version.

func GetMaxAttempts

func GetMaxAttempts(evt Event) int

GetMaxAttempts returns the maximum number of attempts. Returns DefaultMaxAttempts if not set.

func GetNextAttemptAt

func GetNextAttemptAt(evt Event) time.Time

GetNextAttemptAt returns the scheduled time for the next retry attempt. Returns zero time if not set.

func GetOriginalTopic

func GetOriginalTopic(evt Event) string

GetOriginalTopic returns the original topic before DLQ routing.

func GetParentID

func GetParentID(evt Event) string

GetParentID returns the parent span ID.

func GetTraceID

func GetTraceID(evt Event) string

GetTraceID returns the distributed trace ID.

func IncrementAttempt

func IncrementAttempt(evt *Event) int

IncrementAttempt increments the attempt counter and returns the new value.

func IsDeadLetter

func IsDeadLetter(evt Event) bool

IsDeadLetter returns true if the event has been marked for DLQ.

func IsRetryable

func IsRetryable(err error) bool

IsRetryable returns true if the error indicates the message should be retried.

func Now

func Now() time.Time

Now returns the current UTC time.

func ParseTime

func ParseTime(s string) (time.Time, error)

ParseTime parses a time string in various formats. Supports RFC3339, RFC3339Nano, and Unix timestamps.

func PrepareForDLQ

func PrepareForDLQ(evt *Event, originalTopic string, err error)

PrepareForDLQ prepares an event for dead letter queue by: - Setting the dead letter flag - Storing the original topic - Storing the error message

func PrepareForRetry

func PrepareForRetry(evt *Event, delay time.Duration)

PrepareForRetry prepares an event for retry by incrementing the attempt counter and setting the next attempt time.

func SetAttempt

func SetAttempt(evt *Event, n int)

SetAttempt sets the current attempt number.

func SetCorrelationID

func SetCorrelationID(evt *Event, correlationID string)

SetCorrelationID sets the correlation ID.

func SetDeadLetter

func SetDeadLetter(evt *Event, isDead bool)

SetDeadLetter marks the event as dead-lettered.

func SetDelay

func SetDelay(evt *Event, d time.Duration)

SetDelay sets the delay from a time.Duration.

func SetDelayMs

func SetDelayMs(evt *Event, delayMs int64)

SetDelayMs sets the delay in milliseconds.

func SetErrorMessage

func SetErrorMessage(evt *Event, msg string)

SetErrorMessage stores an error message on the event.

func SetEventVersion

func SetEventVersion(evt *Event, version string)

SetEventVersion sets the event schema version.

func SetMaxAttempts

func SetMaxAttempts(evt *Event, n int)

SetMaxAttempts sets the maximum number of attempts.

func SetNextAttemptAfter

func SetNextAttemptAfter(evt *Event, d time.Duration)

SetNextAttemptAfter sets the next attempt time as a duration from now.

func SetNextAttemptAt

func SetNextAttemptAt(evt *Event, t time.Time)

SetNextAttemptAt sets the time for the next retry attempt.

func SetOriginalTopic

func SetOriginalTopic(evt *Event, topic string)

SetOriginalTopic stores the original topic when moving to DLQ.

func SetParentID

func SetParentID(evt *Event, parentID string)

SetParentID sets the parent span ID.

func SetTraceID

func SetTraceID(evt *Event, traceID string)

SetTraceID sets the distributed trace ID.

func ShouldDeadLetter

func ShouldDeadLetter(err error) bool

ShouldDeadLetter returns true if the error indicates the message should go to DLQ.

Types

type DeadLetterError

type DeadLetterError struct {
	Reason string
	Cause  error
}

DeadLetterError signals that a message should be sent to DLQ with a reason.

func ErrDeadLetterWithReason

func ErrDeadLetterWithReason(reason string, cause error) *DeadLetterError

ErrDeadLetterWithReason creates a DeadLetterError with a specific reason.

Example:

return nil, cloudevents.ErrDeadLetterWithReason("payment already processed", nil)

func (*DeadLetterError) Error

func (e *DeadLetterError) Error() string

func (*DeadLetterError) Is

func (e *DeadLetterError) Is(target error) bool

Is implements errors.Is for DeadLetterError.

func (*DeadLetterError) Unwrap

func (e *DeadLetterError) Unwrap() error

type Event

type Event struct {

	// SpecVersion is the version of the CloudEvents specification.
	// MUST be set to "1.0" for CloudEvents v1.0.
	SpecVersion string `json:"specversion"`

	// Type describes the type of event related to the originating occurrence.
	// Recommended format: <resource>.<action>[.v<version>]
	// Examples: customer.created, order.reserved.v2
	Type string `json:"type"`

	// Source identifies the context in which an event happened.
	// Often contains the service name or URI of the producer.
	Source string `json:"source"`

	// ID uniquely identifies the event. If not set, a ULID will be generated.
	ID string `json:"id"`

	// Time is the timestamp when the occurrence happened.
	// If not set, the current time is used.
	Time time.Time `json:"time,omitempty"`

	// DataContentType describes the content type of the data attribute.
	// Common values: "application/json", "application/protobuf"
	DataContentType *string `json:"datacontenttype,omitempty"`

	// DataSchema identifies the schema that data adheres to.
	DataSchema *string `json:"dataschema,omitempty"`

	// Subject describes the subject of the event in the context of the source.
	Subject *string `json:"subject,omitempty"`

	// Data is the event payload. Can be any type that is JSON-serializable.
	Data any `json:"data,omitempty"`

	// DataBase64 contains base64-encoded binary data when Data cannot be
	// directly serialized (e.g., binary protobuf).
	DataBase64 *string `json:"data_base64,omitempty"`

	// Extensions contains CloudEvents extension attributes.
	// Protoflow uses extensions prefixed with "pf_" for reliability semantics.
	Extensions map[string]any `json:"extensions,omitempty"`
}

Event represents a CloudEvents v1.0 compliant event with protoflow extensions. See https://github.com/cloudevents/spec/blob/v1.0/spec.md for specification details.

func New

func New(eventType, source string, data any) Event

New creates a new CloudEvent with required fields populated. ID is auto-generated using ULID, Time is set to current time.

func NewWithID

func NewWithID(id, eventType, source string, data any) Event

NewWithID creates a new CloudEvent with a specific ID.

func (Event) Clone

func (e Event) Clone() Event

Clone creates a deep copy of the event.

func (Event) GetExtension

func (e Event) GetExtension(key string) any

GetExtension retrieves an extension value by key. Returns nil if the extension does not exist.

func (Event) GetExtensionBool

func (e Event) GetExtensionBool(key string) bool

GetExtensionBool retrieves an extension value as a bool. Returns false if the extension does not exist or is not a bool.

func (Event) GetExtensionInt

func (e Event) GetExtensionInt(key string) int

GetExtensionInt retrieves an extension value as an int. Returns 0 if the extension does not exist or cannot be converted.

func (Event) GetExtensionInt64

func (e Event) GetExtensionInt64(key string) int64

GetExtensionInt64 retrieves an extension value as an int64. Returns 0 if the extension does not exist or cannot be converted.

func (Event) GetExtensionString

func (e Event) GetExtensionString(key string) string

GetExtensionString retrieves an extension value as a string. Returns empty string if the extension does not exist or is not a string.

func (Event) GetExtensionTime

func (e Event) GetExtensionTime(key string) time.Time

GetExtensionTime retrieves an extension value as a time.Time. Supports RFC3339 strings and unix timestamps (seconds). Returns zero time if the extension does not exist or cannot be parsed.

func (Event) MarshalJSON

func (e Event) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler for CloudEvents JSON format.

func (*Event) UnmarshalJSON

func (e *Event) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler for CloudEvents JSON format.

func (Event) Validate

func (e Event) Validate() error

Validate checks that the event has all required CloudEvents attributes.

func (Event) WithDataContentType

func (e Event) WithDataContentType(contentType string) Event

WithDataContentType sets the data content type and returns the event.

func (Event) WithDataSchema

func (e Event) WithDataSchema(schema string) Event

WithDataSchema sets the data schema and returns the event.

func (Event) WithExtension

func (e Event) WithExtension(key string, value any) Event

WithExtension sets an extension attribute and returns the event.

func (Event) WithSubject

func (e Event) WithSubject(subject string) Event

WithSubject sets the subject field and returns the event.

type HandlerResult

type HandlerResult int

HandlerResult represents the outcome of processing an event.

const (
	// ResultAck indicates successful processing; message should be acknowledged.
	ResultAck HandlerResult = iota

	// ResultRetry indicates the message should be retried.
	ResultRetry

	// ResultRetryAfter indicates the message should be retried after a delay.
	ResultRetryAfter

	// ResultDeadLetter indicates the message should be sent to DLQ.
	ResultDeadLetter

	// ResultSkip indicates the message should be skipped (ack without processing).
	ResultSkip
)

func ClassifyError

func ClassifyError(err error) (HandlerResult, time.Duration)

ClassifyError determines the appropriate action based on the handler error. Returns the result type and optional delay for RetryAfter errors.

type RetryAfterError

type RetryAfterError struct {
	Delay time.Duration
	Cause error
}

RetryAfterError signals that a message should be retried after a specific delay.

func ErrRetryAfter

func ErrRetryAfter(delay time.Duration, cause error) *RetryAfterError

ErrRetryAfter creates a RetryAfterError with the specified delay. The handler will be retried after the delay expires.

Example:

return nil, cloudevents.ErrRetryAfter(5*time.Second, nil)
return nil, cloudevents.ErrRetryAfter(time.Minute, fmt.Errorf("rate limited"))

func (*RetryAfterError) Error

func (e *RetryAfterError) Error() string

func (*RetryAfterError) Is

func (e *RetryAfterError) Is(target error) bool

Is implements errors.Is for RetryAfterError.

func (*RetryAfterError) Unwrap

func (e *RetryAfterError) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL