cloudevents

package
v1.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2026 License: Apache-2.0 Imports: 9 Imported by: 2

README

CloudEvents

CloudEvents client and server implementation for event-driven architectures.

Overview

The cloudevents package provides a simplified wrapper around the official CloudEvents SDK for Go. It enables sending, receiving, and processing CloudEvents over HTTP with support for both one-way messaging and request-response patterns.

CloudEvents is a specification for describing event data in a common way, promoting interoperability across services, platforms, and systems.

Features

  • HTTP Client & Server - Send and receive CloudEvents over HTTP
  • Request-Response Pattern - Synchronous request-response event flow
  • Asynchronous Receiver - Background event receiver with lifecycle management
  • Result Types - Detailed delivery status (ACK/NACK/Undelivered)
  • CloudEvents v1.0 Compliant - Full specification compliance
  • Flexible Configuration - Customizable HTTP options and client settings

Installation

go get -u github.com/common-library/go/event/cloudevents
go get -u github.com/cloudevents/sdk-go/v2

Quick Start

Sending Events
import "github.com/common-library/go/event/cloudevents"

func main() {
    // Create client
    client, err := cloudevents.NewHttp("http://localhost:8080", nil, nil)
    if err != nil {
        log.Fatal(err)
    }
    
    // Create event
    event := cloudevents.NewEvent()
    event.SetType("com.example.user.created")
    event.SetSource("example/users")
    event.SetData("application/json", map[string]string{
        "user_id": "123",
        "name":    "Alice",
    })
    
    // Send event
    result := client.Send(event)
    if result.IsUndelivered() {
        log.Printf("Failed to send: %s", result.Error())
    } else {
        log.Println("Event sent successfully")
    }
}
Receiving Events
import (
    "context"
    "github.com/common-library/go/event/cloudevents"
    "github.com/cloudevents/sdk-go/v2/protocol/http"
)

func main() {
    // Create receiver with port configuration
    httpOpts := []http.Option{http.WithPort(8080)}
    receiver, err := cloudevents.NewHttp("", httpOpts, nil)
    if err != nil {
        log.Fatal(err)
    }
    
    // Define event handler
    handler := func(ctx context.Context, event cloudevents.Event) {
        log.Printf("Received event: %s", event.Type())
        log.Printf("Data: %v", event.Data())
    }
    
    // Define failure handler
    failureFunc := func(err error) {
        log.Printf("Receiver error: %v", err)
    }
    
    // Start receiver
    receiver.StartReceiver(handler, failureFunc)
    defer receiver.StopReceiver()
    
    // Wait for shutdown signal
    // ...
}
Request-Response Pattern
// Client side - send request and wait for response
event := cloudevents.NewEvent()
event.SetType("com.example.query")
event.SetSource("example/client")
event.SetData("application/json", map[string]string{"query": "status"})

response, result := client.Request(event)
if result.IsUndelivered() {
    log.Printf("Request failed: %s", result.Error())
} else {
    log.Printf("Response: %v", response.Data())
}
Running a Server
import "github.com/common-library/go/event/cloudevents"

func main() {
    var server cloudevents.Server
    
    // Define request handler
    handler := func(event cloudevents.Event) (*cloudevents.Event, cloudevents.Result) {
        log.Printf("Received: %s", event.Type())
        
        // Process event...
        
        // Return response event
        response := cloudevents.NewEvent()
        response.SetType("com.example.response")
        response.SetSource("example/server")
        response.SetData("application/json", map[string]string{
            "status": "processed",
        })
        
        return &response, cloudevents.NewHTTPResult(200, "OK")
    }
    
    failureFunc := func(err error) {
        log.Printf("Server error: %v", err)
    }
    
    // Start server
    err := server.Start(":8080", handler, failureFunc)
    if err != nil {
        log.Fatal(err)
    }
    
    // ... wait for shutdown signal ...
    
    // Graceful shutdown
    server.Stop(10 * time.Second)
}

CloudEvents Structure

A CloudEvent has the following required attributes:

event := cloudevents.NewEvent()

// Required attributes
event.SetID("abc-123")                    // Unique identifier
event.SetType("com.example.object.action") // Event type
event.SetSource("example/source")          // Event source

// Optional attributes
event.SetSubject("user/123")              // Subject within source
event.SetTime(time.Now())                 // Timestamp
event.SetDataContentType("application/json") // Content type

// Event data
event.SetData("application/json", myData)

// Access attributes
id := event.ID()
eventType := event.Type()
source := event.Source()
data := event.Data()

Result Handling

Result Types
result := client.Send(event)

// Check delivery status
if result.IsACK() {
    // Event acknowledged (success)
    log.Println("Event delivered and acknowledged")
}

if result.IsNACK() {
    // Event not acknowledged (rejected)
    log.Printf("Event rejected: %s", result.Error())
}

if result.IsUndelivered() {
    // Event could not be delivered (network/connection error)
    log.Printf("Delivery failed: %s", result.Error())
}

// Get HTTP status code (for HTTP transport)
statusCode, err := result.GetHttpStatusCode()
if err == nil {
    log.Printf("HTTP Status: %d", statusCode)
}
Creating Results
// Generic result
result := cloudevents.NewResult("event processed")

// HTTP result with status code
result = cloudevents.NewHTTPResult(200, "OK")
result = cloudevents.NewHTTPResult(400, "invalid event type: %s", eventType)
result = cloudevents.NewHTTPResult(500, "processing error: %v", err)

Complete Examples

Microservice Event Producer
package main

import (
    "log"
    "time"
    
    "github.com/common-library/go/event/cloudevents"
)

func main() {
    client, err := cloudevents.NewHttp("http://event-gateway:8080", nil, nil)
    if err != nil {
        log.Fatal(err)
    }
    
    // Publish user creation event
    event := cloudevents.NewEvent()
    event.SetType("com.example.user.created")
    event.SetSource("users-service")
    event.SetData("application/json", map[string]interface{}{
        "user_id":   "12345",
        "email":     "alice@example.com",
        "created_at": time.Now(),
    })
    
    result := client.Send(event)
    if result.IsUndelivered() {
        log.Fatal(result.Error())
    }
    
    statusCode, _ := result.GetHttpStatusCode()
    log.Printf("Event sent with status: %d", statusCode)
}
Event Consumer Service
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    
    "github.com/common-library/go/event/cloudevents"
    "github.com/cloudevents/sdk-go/v2/protocol/http"
)

func main() {
    httpOpts := []http.Option{http.WithPort(8080)}
    receiver, err := cloudevents.NewHttp("", httpOpts, nil)
    if err != nil {
        log.Fatal(err)
    }
    
    handler := func(ctx context.Context, event cloudevents.Event) {
        switch event.Type() {
        case "com.example.user.created":
            handleUserCreated(event)
        case "com.example.user.updated":
            handleUserUpdated(event)
        default:
            log.Printf("Unknown event type: %s", event.Type())
        }
    }
    
    failureFunc := func(err error) {
        log.Fatalf("Receiver error: %v", err)
    }
    
    receiver.StartReceiver(handler, failureFunc)
    defer receiver.StopReceiver()
    
    log.Println("Event receiver started on :8080")
    
    // Wait for interrupt signal
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
    <-sigChan
    
    log.Println("Shutting down...")
}

func handleUserCreated(event cloudevents.Event) {
    log.Printf("User created: %v", event.Data())
    // Process event...
}

func handleUserUpdated(event cloudevents.Event) {
    log.Printf("User updated: %v", event.Data())
    // Process event...
}
Request-Response Service
package main

import (
    "log"
    "time"
    
    "github.com/common-library/go/event/cloudevents"
)

func main() {
    var server cloudevents.Server
    
    handler := func(event cloudevents.Event) (*cloudevents.Event, cloudevents.Result) {
        log.Printf("Request: %s", event.Type())
        
        switch event.Type() {
        case "com.example.query.status":
            // Process query and create response
            response := cloudevents.NewEvent()
            response.SetType("com.example.response.status")
            response.SetSource("status-service")
            response.SetData("application/json", map[string]string{
                "status": "healthy",
                "uptime": "24h",
            })
            return &response, cloudevents.NewHTTPResult(200, "OK")
            
        default:
            return nil, cloudevents.NewHTTPResult(400, "unknown query type")
        }
    }
    
    failureFunc := func(err error) {
        log.Printf("Server error: %v", err)
    }
    
    err := server.Start(":8080", handler, failureFunc)
    if err != nil {
        log.Fatal(err)
    }
    
    log.Println("Request-response server started on :8080")
    
    // Keep running...
    select {}
}

API Reference

Client Creation
NewHttp(address string, httpOption []http.Option, clientOption []client.Option) (*client, error)

Creates an HTTP CloudEvents client.

Parameters:

  • address - Target URL for sending events (e.g., "http://localhost:8080")
  • httpOption - HTTP protocol options (WithPort, WithPath, etc.)
  • clientOption - Client configuration options

Returns: Client instance and error

Client Methods
Send(event Event) Result

Sends a CloudEvent (one-way).

Request(event Event) (*Event, Result)

Sends a CloudEvent and waits for response.

StartReceiver(handler func(context.Context, Event), failureFunc func(error))

Starts receiving events asynchronously.

StopReceiver()

Stops the event receiver gracefully.

Server Methods
Start(address string, handler func(Event) (*Event, Result), listenAndServeFailureFunc func(error)) error

Starts the CloudEvents HTTP server.

Stop(shutdownTimeout time.Duration) error

Stops the server gracefully.

Result Methods
NewResult(format string, arguments ...any) Result

Creates a generic result.

NewHTTPResult(statusCode int, format string, arguments ...any) Result

Creates an HTTP result with status code.

IsACK() bool

Checks if event was acknowledged.

IsNACK() bool

Checks if event was not acknowledged.

IsUndelivered() bool

Checks if event could not be delivered.

GetHttpStatusCode() (int, error)

Gets HTTP status code from result.

Error() string

Gets error message.

Event Methods
NewEvent() Event

Creates a new CloudEvent.

Common Event Methods:

  • SetID(string) / ID() string
  • SetType(string) / Type() string
  • SetSource(string) / Source() string
  • SetSubject(string) / Subject() string
  • SetTime(time.Time) / Time() time.Time
  • SetData(contentType string, data interface{}) error / Data() interface{}
  • SetDataContentType(string) / DataContentType() string

Best Practices

1. Event Type Naming

Use reverse DNS notation for event types:

// Good
event.SetType("com.example.users.created")
event.SetType("com.example.orders.shipped")

// Avoid
event.SetType("user_created")
event.SetType("shipped")
2. Source Identification

Use clear, consistent source identifiers:

// Good
event.SetSource("users-service")
event.SetSource("example.com/payment-gateway")

// Avoid
event.SetSource("server1")
event.SetSource("app")
3. Error Handling

Always check result status:

// Good
result := client.Send(event)
if result.IsUndelivered() {
    // Retry or log error
    log.Printf("Send failed: %s", result.Error())
    return
}

// Avoid
client.Send(event) // Ignoring result
4. Resource Cleanup

Stop receivers when shutting down:

receiver.StartReceiver(handler, failureFunc)
defer receiver.StopReceiver() // Ensure cleanup

// Or with signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
<-sigChan
receiver.StopReceiver()
5. Structured Data

Use structured data formats:

// Good - structured JSON
event.SetData("application/json", map[string]interface{}{
    "user_id": 123,
    "action":  "login",
})

// Avoid - unstructured strings
event.SetData("text/plain", "user 123 logged in")
6. Server Graceful Shutdown

Use appropriate timeout for graceful shutdown:

// Give active connections time to complete
server.Stop(30 * time.Second)

Event Patterns

Fire-and-Forget
result := client.Send(event)
// No response expected
Request-Response
response, result := client.Request(event)
// Synchronous response
Publish-Subscribe
// Publisher
client.Send(event)

// Multiple subscribers
receiver1.StartReceiver(handler1, failureFunc)
receiver2.StartReceiver(handler2, failureFunc)

Error Handling

Network Errors
result := client.Send(event)
if result.IsUndelivered() {
    log.Printf("Network error: %s", result.Error())
    // Connection refused, timeout, etc.
}
Application Errors
result := client.Send(event)
if result.IsNACK() {
    statusCode, _ := result.GetHttpStatusCode()
    log.Printf("Application rejected: %d - %s", statusCode, result.Error())
    // 4xx or 5xx status codes
}
Validation Errors
event := cloudevents.NewEvent()
// Missing required fields
result := client.Send(event)
// Will fail validation

Testing

Unit Testing with Mock Events
func TestEventHandler(t *testing.T) {
    event := cloudevents.NewEvent()
    event.SetType("com.example.test")
    event.SetSource("test")
    event.SetData("application/json", map[string]string{"key": "value"})
    
    // Test handler
    response, result := handler(event)
    
    if !result.IsACK() {
        t.Errorf("Expected ACK, got: %s", result.Error())
    }
}
Integration Testing
func TestClientServer(t *testing.T) {
    // Start server
    var server cloudevents.Server
    handler := func(event cloudevents.Event) (*cloudevents.Event, cloudevents.Result) {
        return nil, cloudevents.NewHTTPResult(200, "OK")
    }
    server.Start(":8081", handler, func(err error) { t.Fatal(err) })
    defer server.Stop(5 * time.Second)
    
    // Test client
    client, _ := cloudevents.NewHttp("http://localhost:8081", nil, nil)
    event := cloudevents.NewEvent()
    event.SetType("test")
    event.SetSource("test")
    
    result := client.Send(event)
    if result.IsUndelivered() {
        t.Fatal(result.Error())
    }
}

Performance Considerations

  1. Connection Pooling - HTTP client reuses connections
  2. Async Processing - Use StartReceiver for non-blocking event receipt
  3. Batch Events - Group related events when possible
  4. Timeout Configuration - Set appropriate timeouts for network operations
  5. Resource Cleanup - Always stop receivers and servers

Dependencies

  • github.com/cloudevents/sdk-go/v2 - Official CloudEvents SDK
  • github.com/common-library/go/http - HTTP server utilities

CloudEvents Specification

This package implements CloudEvents v1.0 specification:

Further Reading

Documentation

Overview

Package cloudevents provides CloudEvents client and server implementations.

This package wraps the official CloudEvents SDK for Go, offering simplified interfaces for sending, receiving, and processing CloudEvents over HTTP and other protocols.

Features:

  • HTTP client for sending and receiving CloudEvents
  • Request-response pattern support
  • Asynchronous event receiver with lifecycle management
  • Result types for event delivery status
  • CloudEvents v1.0 specification compliance

Example:

client, _ := cloudevents.NewHttp("http://localhost:8080", nil, nil)
event := cloudevents.NewEvent()
event.SetType("com.example.event")
event.SetSource("example/source")
result := client.Send(event)

Package cloudevents provides CloudEvents client and server implementations.

This package wraps the official CloudEvents SDK for Go, offering simplified interfaces for sending, receiving, and processing CloudEvents over HTTP and other protocols.

Features:

  • HTTP client for sending and receiving CloudEvents
  • Request-response pattern support
  • Asynchronous event receiver with lifecycle management
  • Result types for event delivery status
  • CloudEvents v1.0 specification compliance

Example:

client, _ := cloudevents.NewHttp("http://localhost:8080", nil, nil)
event := cloudevents.NewEvent()
event.SetType("com.example.event")
event.SetSource("example/source")
result := client.Send(event)

Package cloudevents provides CloudEvents client and server implementations.

This package wraps the official CloudEvents SDK for Go, offering simplified interfaces for sending, receiving, and processing CloudEvents over HTTP and other protocols.

Features:

  • HTTP client for sending and receiving CloudEvents
  • Request-response pattern support
  • Asynchronous event receiver with lifecycle management
  • Result types for event delivery status
  • CloudEvents v1.0 specification compliance

Example:

client, _ := cloudevents.NewHttp("http://localhost:8080", nil, nil)
event := cloudevents.NewEvent()
event.SetType("com.example.event")
event.SetSource("example/source")
result := client.Send(event)

Package cloudevents provides CloudEvents client and server implementations.

This package wraps the official CloudEvents SDK for Go, offering simplified interfaces for sending, receiving, and processing CloudEvents over HTTP and other protocols.

Features:

  • HTTP client for sending and receiving CloudEvents
  • Request-response pattern support
  • Asynchronous event receiver with lifecycle management
  • Result types for event delivery status
  • CloudEvents v1.0 specification compliance

Example:

client, _ := cloudevents.NewHttp("http://localhost:8080", nil, nil)
event := cloudevents.NewEvent()
event.SetType("com.example.event")
event.SetSource("example/source")
result := client.Send(event)

Index

Constants

This section is empty.

Variables

View Source
var NewEvent = v2.NewEvent

NewEvent creates a new CloudEvent with default attributes.

Returns:

  • Event: A new CloudEvent instance with generated ID and timestamp

The returned event must have at minimum Type and Source set before being valid according to the CloudEvents specification.

Example:

event := cloudevents.NewEvent()
event.SetType("com.example.user.created")
event.SetSource("example/users")
event.SetData("application/json", userData)

Functions

func NewHttp

func NewHttp(address string, httpOption []http.Option, clientOption []cloudeventssdk_client.Option) (*client, error)

NewHttp creates and returns an HTTP client for CloudEvents.

Parameters:

  • address: Target URL for sending events (e.g., "http://localhost:8080")
  • httpOption: HTTP protocol options from CloudEvents SDK (e.g., WithPort, WithPath)
  • clientOption: Client configuration options from CloudEvents SDK

Returns:

  • *client: Configured CloudEvents client
  • error: Error if client creation or protocol initialization fails

The client can be used for sending events, making request-response calls, or receiving events. For receivers, leave address empty and configure port via httpOption.

Example:

// Sender client
client, err := cloudevents.NewHttp("http://localhost:8080", nil, nil)
if err != nil {
    log.Fatal(err)
}

// Receiver client
httpOpts := []http.Option{http.WithPort(8080)}
receiver, err := cloudevents.NewHttp("", httpOpts, nil)

Types

type Event

type Event = v2.Event

Event represents a CloudEvent conforming to the CloudEvents v1.0 specification.

Events have required attributes (Type, Source, ID, SpecVersion) and optional attributes (DataContentType, DataSchema, Subject, Time). Events can carry data payloads of any type.

Example:

var event cloudevents.Event
event.SetID("abc-123")
event.SetType("com.example.object.action")
event.SetSource("example/source")
event.SetData("application/json", myData)

// Access attributes
eventType := event.Type()
eventSource := event.Source()
data := event.Data()

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result is the result of event delivery.

func NewHTTPResult

func NewHTTPResult(statusCode int, format string, arguments ...any) Result

NewHTTPResult creates and returns an HTTP-specific CloudEvents result with status code.

Parameters:

  • statusCode: HTTP status code (e.g., 200, 400, 500)
  • format: Printf-style format string describing the result
  • arguments: Optional format arguments

Returns:

  • Result: CloudEvents result with HTTP status code and formatted message

This result type includes HTTP-specific information and can be retrieved using GetHttpStatusCode method.

Example:

// Success with 200 OK
result := cloudevents.NewHTTPResult(200, "event accepted")

// Client error with 400 Bad Request
result = cloudevents.NewHTTPResult(400, "invalid event type: %s", eventType)

// Server error with 500 Internal Server Error
result = cloudevents.NewHTTPResult(500, "processing error: %v", err)

func NewResult

func NewResult(format string, arguments ...any) Result

NewResult creates and returns a generic CloudEvents result.

Parameters:

  • format: Printf-style format string describing the result
  • arguments: Optional format arguments

Returns:

  • Result: CloudEvents result with the formatted message

Use this for protocol-agnostic results. For HTTP-specific results with status codes, use NewHTTPResult instead.

Example:

// Success result
result := cloudevents.NewResult("event processed successfully")

// Error result with details
result = cloudevents.NewResult("validation failed: %s", validationError)

func (*Result) Error

func (r *Result) Error() string

Error returns the error message string from the result.

Returns:

  • string: Error message, or empty string if no error

This implements the error interface, allowing Result to be used as an error type. The message includes details about why an event was NACK'd or undelivered.

Example:

result := client.Send(event)
if !result.IsACK() {
    log.Printf("Event failed: %s", result.Error())
}

func (*Result) GetHttpStatusCode

func (r *Result) GetHttpStatusCode() (int, error)

GetHttpStatusCode extracts the HTTP status code from an HTTP result.

Returns:

  • int: HTTP status code (e.g., 200, 404, 500), or -1 if not an HTTP result
  • error: Error if result is not an HTTP result, nil otherwise

This method only works with results created by NewHTTPResult or returned from HTTP-based event operations. Returns an error for non-HTTP results.

Example:

result := client.Send(event)
statusCode, err := result.GetHttpStatusCode()
if err != nil {
    log.Println("Not an HTTP result")
} else {
    log.Printf("HTTP Status: %d", statusCode)
    if statusCode >= 400 {
        log.Println("HTTP error occurred")
    }
}

func (*Result) IsACK

func (r *Result) IsACK() bool

IsACK returns whether the recipient acknowledged the event.

Returns:

  • bool: true if event was acknowledged (ACK), false otherwise

An ACK indicates successful delivery and processing. For HTTP, this typically corresponds to 2xx status codes.

Example:

result := client.Send(event)
if result.IsACK() {
    log.Println("Event acknowledged")
}

func (*Result) IsNACK

func (r *Result) IsNACK() bool

IsNACK returns whether the recipient did not acknowledge the event.

Returns:

  • bool: true if event was not acknowledged (NACK), false otherwise

A NACK indicates the event was delivered but rejected or failed processing. For HTTP, this typically corresponds to 4xx or 5xx status codes.

Example:

result := client.Send(event)
if result.IsNACK() {
    log.Printf("Event rejected: %s", result.Error())
}

func (*Result) IsUndelivered

func (r *Result) IsUndelivered() bool

IsUndelivered returns whether the event could not be delivered.

Returns:

  • bool: true if event was undelivered, false if delivery was attempted

Undelivered indicates a network error, connection failure, or other issue that prevented the event from reaching the recipient. This is different from NACK where the event was delivered but rejected.

Example:

result := client.Send(event)
if result.IsUndelivered() {
    log.Printf("Delivery failed: %s", result.Error())
    // Retry or handle connection error
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is a struct that provides server related methods.

func (*Server) Start

func (s *Server) Start(address string, handler func(Event) (*Event, Result), listenAndServeFailureFunc func(error)) error

Start starts the CloudEvents HTTP server on the specified address.

Parameters:

  • address: Server address to bind to (e.g., "localhost:8080" or ":8080")
  • handler: Function to process incoming events and optionally return response events
  • listenAndServeFailureFunc: Function called if the HTTP server encounters a fatal error

Returns:

  • error: Error if server initialization fails, nil if started successfully

The handler function receives each incoming CloudEvent and can return a response event for request-response patterns. Return nil for the event if no response is needed. The server runs asynchronously; this method returns after starting the server.

Example:

var server cloudevents.Server

handler := func(event cloudevents.Event) (*cloudevents.Event, cloudevents.Result) {
    log.Printf("Received: %s", event.Type())

    // Process event...

    // Return response event
    response := cloudevents.NewEvent()
    response.SetType("com.example.response")
    response.SetSource("example/server")
    return &response, cloudevents.NewHTTPResult(200, "OK")
}

failureFunc := func(err error) {
    log.Printf("Server error: %v", err)
}

err := server.Start(":8080", handler, failureFunc)
if err != nil {
    log.Fatal(err)
}

func (*Server) Stop

func (s *Server) Stop(shutdownTimeout time.Duration) error

Stop gracefully shuts down the CloudEvents server.

Parameters:

  • shutdownTimeout: Maximum duration to wait for active connections to complete

Returns:

  • error: Error if shutdown fails or times out, nil if successful

The server stops accepting new connections immediately and waits for active connections to complete within the timeout period. After the timeout, the server forcefully closes remaining connections.

Example:

server.Start(":8080", handler, failureFunc)
// ... server running ...

// Graceful shutdown with 10 second timeout
err := server.Stop(10 * time.Second)
if err != nil {
    log.Printf("Shutdown error: %v", err)
}
log.Println("Server stopped")

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL