Documentation
¶
Overview ¶
Package cloudevents provides CloudEvents v1.0 compatible event types and utilities for use within protoflow. This package implements the CloudEvents specification with protoflow-specific extensions for reliability semantics.
Index ¶
- Constants
- Variables
- func CopyTracingContext(src Event, dst *Event)
- func DLQTopic(eventType string) string
- func ExceedsMaxAttempts(evt Event) bool
- func FormatTime(t time.Time) string
- func GetAttempt(evt Event) int
- func GetCorrelationID(evt Event) string
- func GetDelay(evt Event) time.Duration
- func GetDelayMs(evt Event) int64
- func GetErrorMessage(evt Event) string
- func GetEventVersion(evt Event) string
- func GetMaxAttempts(evt Event) int
- func GetNextAttemptAt(evt Event) time.Time
- func GetOriginalTopic(evt Event) string
- func GetParentID(evt Event) string
- func GetTraceID(evt Event) string
- func IncrementAttempt(evt *Event) int
- func IsDeadLetter(evt Event) bool
- func IsRetryable(err error) bool
- func Now() time.Time
- func ParseTime(s string) (time.Time, error)
- func PrepareForDLQ(evt *Event, originalTopic string, err error)
- func PrepareForRetry(evt *Event, delay time.Duration)
- func SetAttempt(evt *Event, n int)
- func SetCorrelationID(evt *Event, correlationID string)
- func SetDeadLetter(evt *Event, isDead bool)
- func SetDelay(evt *Event, d time.Duration)
- func SetDelayMs(evt *Event, delayMs int64)
- func SetErrorMessage(evt *Event, msg string)
- func SetEventVersion(evt *Event, version string)
- func SetMaxAttempts(evt *Event, n int)
- func SetNextAttemptAfter(evt *Event, d time.Duration)
- func SetNextAttemptAt(evt *Event, t time.Time)
- func SetOriginalTopic(evt *Event, topic string)
- func SetParentID(evt *Event, parentID string)
- func SetTraceID(evt *Event, traceID string)
- func ShouldDeadLetter(err error) bool
- type DeadLetterError
- type Event
- func (e Event) Clone() Event
- func (e Event) GetExtension(key string) any
- func (e Event) GetExtensionBool(key string) bool
- func (e Event) GetExtensionInt(key string) int
- func (e Event) GetExtensionInt64(key string) int64
- func (e Event) GetExtensionString(key string) string
- func (e Event) GetExtensionTime(key string) time.Time
- func (e Event) MarshalJSON() ([]byte, error)
- func (e *Event) UnmarshalJSON(data []byte) error
- func (e Event) Validate() error
- func (e Event) WithDataContentType(contentType string) Event
- func (e Event) WithDataSchema(schema string) Event
- func (e Event) WithExtension(key string, value any) Event
- func (e Event) WithSubject(subject string) Event
- type HandlerResult
- type RetryAfterError
Constants ¶
const ( // ExtAttempt is the current retry attempt number (1-based). ExtAttempt = "pf_attempt" // ExtMaxAttempts is the maximum number of retry attempts allowed. ExtMaxAttempts = "pf_max_attempts" // ExtNextAttemptAt is the RFC3339 timestamp or unix time for the next retry. ExtNextAttemptAt = "pf_next_attempt_at" // ExtDeadLetter indicates the event has been moved to DLQ. ExtDeadLetter = "pf_dead_letter" // ExtTraceID is the distributed trace ID (W3C traceparent compatible). ExtTraceID = "pf_trace_id" // ExtParentID is the parent span ID for trace correlation. ExtParentID = "pf_parent_id" // ExtDelayMs is the delay in milliseconds before processing. ExtDelayMs = "pf_delay_ms" // ExtEventVersion is an optional version number for the event schema. ExtEventVersion = "pf_event_version" // ExtOriginalTopic stores the original topic when moved to DLQ. ExtOriginalTopic = "pf_original_topic" // ExtErrorMessage stores the last error message when moved to DLQ. ExtErrorMessage = "pf_error_message" // ExtCorrelationID is a correlation identifier for request tracing. ExtCorrelationID = "pf_correlation_id" )
Protoflow extension keys for CloudEvents. These extensions provide reliability semantics for event processing.
const ( // TimeFormat is the standard CloudEvents time format (RFC3339). TimeFormat = time.RFC3339 // TimeFormatNano is the RFC3339 format with nanosecond precision. TimeFormatNano = time.RFC3339Nano )
Time format constants for CloudEvents.
const (
DefaultMaxAttempts = 3
)
Default values for protoflow extensions.
const SpecVersion = "1.0"
SpecVersion is the CloudEvents specification version implemented.
Variables ¶
var ( // ErrRetry signals that the message should be retried with the default delay. // The retry middleware will apply exponential backoff. ErrRetry = errors.New("protoflow: retry message") // ErrDeadLetter signals that the message should be sent to the dead letter queue // without further retry attempts. ErrDeadLetter = errors.New("protoflow: send to dead letter queue") // ErrSkip signals that the message should be acknowledged without processing. // Use this for messages that are intentionally ignored (e.g., duplicates). ErrSkip = errors.New("protoflow: skip message") // ErrUnprocessable signals that the message is permanently invalid and cannot // be processed. It will be sent to DLQ with the unprocessable flag. ErrUnprocessable = errors.New("protoflow: unprocessable message") )
Functions ¶
func CopyTracingContext ¶
CopyTracingContext copies tracing extensions from source to destination event.
func DLQTopic ¶
DLQTopic returns the dead letter queue topic name for a given event type. Convention: <eventType>.dead
func ExceedsMaxAttempts ¶
ExceedsMaxAttempts returns true if the current attempt exceeds the max.
func FormatTime ¶
FormatTime formats a time value for CloudEvents.
func GetAttempt ¶
GetAttempt returns the current attempt number (1-based). Returns 0 if not set.
func GetCorrelationID ¶
GetCorrelationID returns the correlation ID for request tracing.
func GetDelayMs ¶
GetDelayMs returns the delay in milliseconds. Returns 0 if not set.
func GetErrorMessage ¶
GetErrorMessage returns the last error message.
func GetEventVersion ¶
GetEventVersion returns the event schema version.
func GetMaxAttempts ¶
GetMaxAttempts returns the maximum number of attempts. Returns DefaultMaxAttempts if not set.
func GetNextAttemptAt ¶
GetNextAttemptAt returns the scheduled time for the next retry attempt. Returns zero time if not set.
func GetOriginalTopic ¶
GetOriginalTopic returns the original topic before DLQ routing.
func IncrementAttempt ¶
IncrementAttempt increments the attempt counter and returns the new value.
func IsDeadLetter ¶
IsDeadLetter returns true if the event has been marked for DLQ.
func IsRetryable ¶
IsRetryable returns true if the error indicates the message should be retried.
func ParseTime ¶
ParseTime parses a time string in various formats. Supports RFC3339, RFC3339Nano, and Unix timestamps.
func PrepareForDLQ ¶
PrepareForDLQ prepares an event for dead letter queue by: - Setting the dead letter flag - Storing the original topic - Storing the error message
func PrepareForRetry ¶
PrepareForRetry prepares an event for retry by incrementing the attempt counter and setting the next attempt time.
func SetCorrelationID ¶
SetCorrelationID sets the correlation ID.
func SetDeadLetter ¶
SetDeadLetter marks the event as dead-lettered.
func SetDelayMs ¶
SetDelayMs sets the delay in milliseconds.
func SetErrorMessage ¶
SetErrorMessage stores an error message on the event.
func SetEventVersion ¶
SetEventVersion sets the event schema version.
func SetMaxAttempts ¶
SetMaxAttempts sets the maximum number of attempts.
func SetNextAttemptAfter ¶
SetNextAttemptAfter sets the next attempt time as a duration from now.
func SetNextAttemptAt ¶
SetNextAttemptAt sets the time for the next retry attempt.
func SetOriginalTopic ¶
SetOriginalTopic stores the original topic when moving to DLQ.
func SetParentID ¶
SetParentID sets the parent span ID.
func SetTraceID ¶
SetTraceID sets the distributed trace ID.
func ShouldDeadLetter ¶
ShouldDeadLetter returns true if the error indicates the message should go to DLQ.
Types ¶
type DeadLetterError ¶
DeadLetterError signals that a message should be sent to DLQ with a reason.
func ErrDeadLetterWithReason ¶
func ErrDeadLetterWithReason(reason string, cause error) *DeadLetterError
ErrDeadLetterWithReason creates a DeadLetterError with a specific reason.
Example:
return nil, cloudevents.ErrDeadLetterWithReason("payment already processed", nil)
func (*DeadLetterError) Error ¶
func (e *DeadLetterError) Error() string
func (*DeadLetterError) Is ¶
func (e *DeadLetterError) Is(target error) bool
Is implements errors.Is for DeadLetterError.
func (*DeadLetterError) Unwrap ¶
func (e *DeadLetterError) Unwrap() error
type Event ¶
type Event struct {
// SpecVersion is the version of the CloudEvents specification.
// MUST be set to "1.0" for CloudEvents v1.0.
SpecVersion string `json:"specversion"`
// Type describes the type of event related to the originating occurrence.
// Recommended format: <resource>.<action>[.v<version>]
// Examples: customer.created, order.reserved.v2
Type string `json:"type"`
// Source identifies the context in which an event happened.
// Often contains the service name or URI of the producer.
Source string `json:"source"`
// ID uniquely identifies the event. If not set, a ULID will be generated.
ID string `json:"id"`
// Time is the timestamp when the occurrence happened.
// If not set, the current time is used.
Time time.Time `json:"time,omitempty"`
// DataContentType describes the content type of the data attribute.
// Common values: "application/json", "application/protobuf"
DataContentType *string `json:"datacontenttype,omitempty"`
// DataSchema identifies the schema that data adheres to.
DataSchema *string `json:"dataschema,omitempty"`
// Subject describes the subject of the event in the context of the source.
Subject *string `json:"subject,omitempty"`
// Data is the event payload. Can be any type that is JSON-serializable.
Data any `json:"data,omitempty"`
// DataBase64 contains base64-encoded binary data when Data cannot be
// directly serialized (e.g., binary protobuf).
DataBase64 *string `json:"data_base64,omitempty"`
// Extensions contains CloudEvents extension attributes.
// Protoflow uses extensions prefixed with "pf_" for reliability semantics.
Extensions map[string]any `json:"extensions,omitempty"`
}
Event represents a CloudEvents v1.0 compliant event with protoflow extensions. See https://github.com/cloudevents/spec/blob/v1.0/spec.md for specification details.
func New ¶
New creates a new CloudEvent with required fields populated. ID is auto-generated using ULID, Time is set to current time.
func (Event) GetExtension ¶
GetExtension retrieves an extension value by key. Returns nil if the extension does not exist.
func (Event) GetExtensionBool ¶
GetExtensionBool retrieves an extension value as a bool. Returns false if the extension does not exist or is not a bool.
func (Event) GetExtensionInt ¶
GetExtensionInt retrieves an extension value as an int. Returns 0 if the extension does not exist or cannot be converted.
func (Event) GetExtensionInt64 ¶
GetExtensionInt64 retrieves an extension value as an int64. Returns 0 if the extension does not exist or cannot be converted.
func (Event) GetExtensionString ¶
GetExtensionString retrieves an extension value as a string. Returns empty string if the extension does not exist or is not a string.
func (Event) GetExtensionTime ¶
GetExtensionTime retrieves an extension value as a time.Time. Supports RFC3339 strings and unix timestamps (seconds). Returns zero time if the extension does not exist or cannot be parsed.
func (Event) MarshalJSON ¶
MarshalJSON implements json.Marshaler for CloudEvents JSON format.
func (*Event) UnmarshalJSON ¶
UnmarshalJSON implements json.Unmarshaler for CloudEvents JSON format.
func (Event) WithDataContentType ¶
WithDataContentType sets the data content type and returns the event.
func (Event) WithDataSchema ¶
WithDataSchema sets the data schema and returns the event.
func (Event) WithExtension ¶
WithExtension sets an extension attribute and returns the event.
func (Event) WithSubject ¶
WithSubject sets the subject field and returns the event.
type HandlerResult ¶
type HandlerResult int
HandlerResult represents the outcome of processing an event.
const ( // ResultAck indicates successful processing; message should be acknowledged. ResultAck HandlerResult = iota // ResultRetry indicates the message should be retried. ResultRetry // ResultRetryAfter indicates the message should be retried after a delay. ResultRetryAfter // ResultDeadLetter indicates the message should be sent to DLQ. ResultDeadLetter // ResultSkip indicates the message should be skipped (ack without processing). ResultSkip )
func ClassifyError ¶
func ClassifyError(err error) (HandlerResult, time.Duration)
ClassifyError determines the appropriate action based on the handler error. Returns the result type and optional delay for RetryAfter errors.
type RetryAfterError ¶
RetryAfterError signals that a message should be retried after a specific delay.
func ErrRetryAfter ¶
func ErrRetryAfter(delay time.Duration, cause error) *RetryAfterError
ErrRetryAfter creates a RetryAfterError with the specified delay. The handler will be retried after the delay expires.
Example:
return nil, cloudevents.ErrRetryAfter(5*time.Second, nil)
return nil, cloudevents.ErrRetryAfter(time.Minute, fmt.Errorf("rate limited"))
func (*RetryAfterError) Error ¶
func (e *RetryAfterError) Error() string
func (*RetryAfterError) Is ¶
func (e *RetryAfterError) Is(target error) bool
Is implements errors.Is for RetryAfterError.
func (*RetryAfterError) Unwrap ¶
func (e *RetryAfterError) Unwrap() error