go_event_client

package module
v1.3.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

README

Go Event Client

A Go client library for the Ambient Event Bus that provides real-time event messaging with WebSocket connections, OAuth authentication, and advanced subscription features.

Features

  • Real-time messaging via WebSocket connections
  • OAuth 2.0 authentication with automatic token management
  • Regex pattern matching for flexible topic subscriptions
  • Aggregate-based subscriptions for type-specific and resource-specific events
  • HTTP API publishing in addition to WebSocket publishing
  • UUID support for all identifiers
  • Comprehensive testing with unit and integration tests
  • Type-safe Go structs for all event data
  • Connection resilience with ping/pong heartbeat mechanism

Installation

go get github.com/ambientlabscomputing/go_event_client

Quick Start

package main

import (
    "context"
    "fmt"
    "log/slog"
    "os"
    
    "github.com/ambientlabscomputing/go_event_client"
)

func main() {
    // Create logger
    logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
    
    // Configure client options
    options := go_event_client.EventClientOptions{
        EventAPIURL:  "http://events.ambientlabsdev.io",
        SocketsURL:   "wss://sockets.ambientlabsdev.io",
        PingInterval: 30, // seconds
    }
    
    // OAuth token callback function
    getToken := func(ctx context.Context) (string, error) {
        return go_event_client.GetOAuthToken(ctx, 
            "https://oauth.ambientlabsdev.io/oauth/token",
            "your-client-id", 
            "your-client-secret")
    }
    
    // Create client
    client := go_event_client.NewEventClient(context.Background(), options, getToken, logger)
    
    // Add message handler
    err := client.AddHandler("^user\\..*", func(message go_event_client.Message) {
        fmt.Printf("Received user event: %s\n", message.Message)
    })
    if err != nil {
        panic(err)
    }
    
    // Start client
    if err := client.Start(); err != nil {
        panic(err)
    }
    defer client.Stop()
    
    // Create subscription
    err = client.NewSubscription(context.Background(), "user.created")
    if err != nil {
        panic(err)
    }
    
    // Publish message
    err = client.Publish("user.created", map[string]interface{}{
        "user_id": 123,
        "email":   "user@example.com",
    })
    if err != nil {
        panic(err)
    }
}

Advanced Features

Aggregate-Based Subscriptions

Subscribe to events based on resource types and specific resource IDs:

// Subscribe to all events for a specific aggregate type
err := client.NewAggregateTypeSubscription(ctx, "node.events", "node", false)

// Subscribe to events for a specific aggregate instance
err := client.NewAggregateSubscription(ctx, "user.events", "user", 123, false)

// Publish with aggregate information
err := client.PublishWithAggregate("user.updated", userData, "user", &userID)
Regex Topic Matching

Use regex patterns for flexible topic matching:

// Regex subscription for all user events
err := client.NewSubscriptionWithOptions(ctx, "user\\..*", "", nil, true)

// Regex with aggregate type
err := client.NewAggregateTypeSubscription(ctx, "node\\.(created|updated)", "node", true)
HTTP API Publishing

In addition to WebSocket publishing, you can publish messages via HTTP API:

// Publish via HTTP API (requires established WebSocket connection for connection_id)
err := client.PublishViaAPI(ctx, "user.notification", messageData, "user", &userID)
Message Structure

Messages include comprehensive metadata and optional aggregate information:

type Message struct {
    ID            string `json:"id"`              // UUID
    CreatedAt     string `json:"created_at"`     // ISO timestamp
    Topic         string `json:"topic"`          // Event topic
    Message       string `json:"message"`        // JSON payload
    SubscriberID  string `json:"subscriber_id"`  // Subscriber UUID
    SessionID     string `json:"session_id"`     // Session UUID
    ConnectionID  string `json:"connection_id"`  // Connection UUID
    Timestamp     string `json:"timestamp"`      // Processing timestamp
    AggregateType string `json:"aggregate_type,omitempty"` // Resource type
    AggregateID   *int   `json:"aggregate_id,omitempty"`   // Resource ID
}

Configuration

Environment Variables

Create a .env file with your configuration:

AMBIENT_EVENT_API_URL=http://events.ambientlabsdev.io
AMBIENT_SOCKETS_URL=wss://sockets.ambientlabsdev.io
AMBIENT_OAUTH_URL=https://oauth.ambientlabsdev.io/oauth/token
AMBIENT_CLIENT_ID=your-client-id
AMBIENT_CLIENT_SECRET=your-client-secret
OAuth Token Management

The client includes a built-in OAuth token helper:

token, err := go_event_client.GetOAuthToken(ctx, oauthURL, clientID, clientSecret)

API Reference

Client Interface
type EventClient interface {
    Start() error
    Stop() error
    AddHandler(expr string, handler func(Message)) error
    Publish(topic string, v interface{}) error
    PublishWithAggregate(topic string, v interface{}, aggregateType string, aggregateID *int) error
    PublishViaAPI(ctx context.Context, topic string, v interface{}, aggregateType string, aggregateID *int) error
    NewSubscription(ctx context.Context, topic string) error
    NewSubscriptionWithOptions(ctx context.Context, topic string, aggregateType string, aggregateID *int, isRegex bool) error
    NewAggregateTypeSubscription(ctx context.Context, topic string, aggregateType string, isRegex bool) error
    NewAggregateSubscription(ctx context.Context, topic string, aggregateType string, aggregateID int, isRegex bool) error
}
Subscription Methods
Method Description
NewSubscription() Basic topic subscription
NewSubscriptionWithOptions() Full control over subscription options
NewAggregateTypeSubscription() Subscribe to all events of an aggregate type
NewAggregateSubscription() Subscribe to events for a specific aggregate instance
Publishing Methods
Method Description
Publish() Basic message publishing via WebSocket
PublishWithAggregate() Publish with aggregate information via WebSocket
PublishViaAPI() Publish via HTTP API

Make Commands

The project includes a comprehensive Makefile for easy development and testing:

# Show all available commands
make help

# Run all tests
make test

# Run specific test types
make test-unit              # Unit tests only
make test-integration       # Integration tests only
make test-models           # Model/structure tests
make test-oauth            # OAuth authentication tests

# Run tests by functionality
make test-basic            # Basic subscription test
make test-aggregate        # Aggregate subscription tests
make test-regex            # Regex subscription test

# Build and examples
make build                 # Build library and examples
make run-demo             # Run comprehensive demo
make examples             # Build all examples

# Development helpers
make dev-setup            # Set up development environment
make check-env            # Check environment configuration
make format               # Format Go code
make lint                 # Run linter
make clean                # Clean build artifacts

# Verbose output (add V=1 to any test command)
make test-unit V=1        # Run unit tests with verbose output

For a complete list of commands, run make help.

Examples

See the examples/ directory for comprehensive examples demonstrating all features:

  • Basic subscriptions and publishing
  • Aggregate type subscriptions
  • Specific aggregate subscriptions
  • Regex pattern subscriptions
  • HTTP API publishing
  • Real-time message handling

Run the comprehensive demo:

cd examples
cp .env.example .env
# Edit .env with your credentials
source .env && go run comprehensive_demo.go

Testing

The library includes comprehensive test coverage:

# Run unit tests
go test ./...

# Run integration tests (requires .env configuration)
go test -v -run "TestIntegration"

# Run OAuth token test
go test -v -run "TestOAuth"
Integration Test Requirements

Integration tests require a .env file with valid credentials:

cp .env.example .env
# Edit .env with your actual API credentials

API Endpoints

The client uses the following API v2 endpoints:

  • POST /v2/subscribers - Register event subscriber
  • POST /v2/sessions - Create WebSocket session
  • POST /v2/subscriptions - Create event subscriptions
  • POST /v2/messages - Publish messages via HTTP API
  • GET /ws/{token} - WebSocket connection endpoint

Error Handling

The client provides detailed error logging and handles various failure scenarios:

  • Connection failures with automatic cleanup
  • Authentication errors with clear error messages
  • API errors with detailed response information
  • WebSocket disconnections with graceful shutdown

Changelog

Latest Version
  • OAuth Authentication: Built-in OAuth 2.0 client credentials flow
  • UUID Support: All identifiers use UUID format
  • Aggregate Subscriptions: Type-based and instance-based event filtering
  • Regex Topics: Pattern matching for flexible subscriptions
  • HTTP API Publishing: Alternative to WebSocket publishing
  • Enhanced Message Structure: Complete metadata and aggregate information
  • v2 API Endpoints: Updated to use latest API version
  • Integration Testing: Comprehensive end-to-end testing
  • Connection Management: Improved WebSocket connection handling
  • Error Handling: Enhanced error messages and logging

License

MIT License

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass
  5. Submit a pull request

For bugs and feature requests, please create an issue.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateTestLogger added in v1.0.0

func CreateTestLogger(level slog.Level) *slog.Logger

CreateTestLogger creates a logger for testing

func GetOAuthToken added in v1.0.0

func GetOAuthToken(ctx context.Context, oauthURL, clientID, clientSecret string) (string, error)

GetOAuthToken is a public convenience function for obtaining OAuth tokens

func GetTestToken added in v1.0.0

func GetTestToken(ctx context.Context, config *TestConfig) (string, error)

GetTestToken fetches an OAuth token for testing

Types

type EventClient

type EventClient interface {
	Start() error
	Stop() error
	AddHandler(expr string, handler func(Message)) error
	Publish(topic string, v interface{}) error
	PublishWithAggregate(topic string, v interface{}, aggregateType string, aggregateID *int) error
	PublishViaAPI(ctx context.Context, topic string, v interface{}, aggregateType string, aggregateID *int) error
	NewSubscription(ctx context.Context, topic string) error
	NewSubscriptionWithOptions(ctx context.Context, topic string, aggregateType string, aggregateID *int, isRegex bool) error
	NewAggregateTypeSubscription(ctx context.Context, topic string, aggregateType string, isRegex bool) error
	NewAggregateSubscription(ctx context.Context, topic string, aggregateType string, aggregateID int, isRegex bool) error
}

func CreateTestClient added in v1.0.0

func CreateTestClient(ctx context.Context, config *TestConfig) (EventClient, error)

CreateTestClient creates a test client with proper configuration

func NewEventClient

func NewEventClient(ctx context.Context, options EventClientOptions, getTokenCallback GetTokenCallback, logger *slog.Logger) EventClient

type EventClientImpl

type EventClientImpl struct {
	Ctx              context.Context
	Options          EventClientOptions
	GetTokenCallback GetTokenCallback
	Logger           *slog.Logger
	HTTPClient       *http.Client

	// internal values set during runtime
	Subscriber   *Subscriber
	Session      *Session
	ConnectionID string
	// contains filtered or unexported fields
}

func (*EventClientImpl) AddHandler

func (e *EventClientImpl) AddHandler(expr string, handler func(Message)) error

AddHandler registers a callback for topics matching the given regex The expr should be a valid Go regex (e.g. "^user\\..*$" to match "user.*").

func (*EventClientImpl) ListHandlers added in v1.3.2

func (e *EventClientImpl) ListHandlers() []string

ListHandlers returns information about currently registered handlers (for debugging)

func (*EventClientImpl) LogHandlerState added in v1.3.2

func (e *EventClientImpl) LogHandlerState()

LogHandlerState logs the current state of all handlers (for debugging)

func (*EventClientImpl) NewAggregateSubscription added in v1.0.0

func (e *EventClientImpl) NewAggregateSubscription(ctx context.Context, topic string, aggregateType string, aggregateID int, isRegex bool) error

NewAggregateSubscription creates a subscription for a specific aggregate type and ID

func (*EventClientImpl) NewAggregateTypeSubscription added in v1.0.0

func (e *EventClientImpl) NewAggregateTypeSubscription(ctx context.Context, topic string, aggregateType string, isRegex bool) error

NewAggregateTypeSubscription creates a subscription for all messages of a specific aggregate type

func (*EventClientImpl) NewSubscription

func (e *EventClientImpl) NewSubscription(ctx context.Context, topic string) error

func (*EventClientImpl) NewSubscriptionWithOptions added in v1.0.0

func (e *EventClientImpl) NewSubscriptionWithOptions(ctx context.Context, topic string, aggregateType string, aggregateID *int, isRegex bool) error

func (*EventClientImpl) Publish

func (e *EventClientImpl) Publish(topic string, v interface{}) error

Publish sends a topic and payload. It blocks only if the send buffer is full.

func (*EventClientImpl) PublishViaAPI added in v1.0.0

func (e *EventClientImpl) PublishViaAPI(ctx context.Context, topic string, v interface{}, aggregateType string, aggregateID *int) error

PublishViaAPI publishes a message via HTTP API instead of WebSocket (useful for testing)

func (*EventClientImpl) PublishWithAggregate added in v1.0.0

func (e *EventClientImpl) PublishWithAggregate(topic string, v interface{}, aggregateType string, aggregateID *int) error

PublishWithAggregate sends a topic and payload with aggregate information

func (*EventClientImpl) RegisterSubscriber

func (e *EventClientImpl) RegisterSubscriber() error

func (*EventClientImpl) RequestSession

func (e *EventClientImpl) RequestSession() error

func (*EventClientImpl) Start

func (e *EventClientImpl) Start() error

func (*EventClientImpl) Stop

func (e *EventClientImpl) Stop() error

type EventClientOptions

type EventClientOptions struct {
	EventAPIURL          string
	SocketsURL           string
	PingInterval         int
	MaxReconnectAttempts int           // Maximum number of reconnection attempts (0 = infinite)
	ReconnectBackoff     time.Duration // Initial backoff duration between reconnection attempts
	MaxReconnectBackoff  time.Duration // Maximum backoff duration
}

type GetTokenCallback

type GetTokenCallback func(ctx context.Context) (string, error)

type KeyValuePair added in v1.3.0

type KeyValuePair struct {
	Key   string      `json:"Key"`
	Value interface{} `json:"Value"`
}

KeyValuePair represents a key-value pair from the server

type Message

type Message struct {
	ID            string         `json:"id"`
	CreatedAt     string         `json:"created_at"`
	Topic         string         `json:"topic"`
	Content       MessageContent `json:"content"`
	SubscriberID  string         `json:"subscriber_id"`
	ConnectionID  string         `json:"connection_id"`
	SessionID     string         `json:"session_id"`
	Timestamp     string         `json:"timestamp"`
	Priority      string         `json:"priority,omitempty"`
	AggregateType string         `json:"aggregate_type,omitempty"`
	AggregateID   *int           `json:"aggregate_id,omitempty"`
}

type MessageContent added in v1.3.0

type MessageContent struct {
	// contains filtered or unexported fields
}

MessageContent represents flexible content that can be either a string or key-value pairs

func NewMessageContentFromMap added in v1.3.0

func NewMessageContentFromMap(content map[string]string) MessageContent

NewMessageContentFromMap creates a MessageContent from a map

func NewMessageContentFromString added in v1.3.0

func NewMessageContentFromString(content string) MessageContent

NewMessageContentFromString creates a MessageContent from a string

func (*MessageContent) AsMap added in v1.3.0

func (mc *MessageContent) AsMap() (map[string]string, bool)

AsMap returns the content as a map if it was parsed from key-value pairs

func (*MessageContent) AsString added in v1.3.0

func (mc *MessageContent) AsString() string

AsString returns the content as a string (same as String() but more explicit)

func (*MessageContent) GetValue added in v1.3.0

func (mc *MessageContent) GetValue(key string) string

GetValue gets a value by key if content is a map, otherwise returns empty string

func (*MessageContent) IsMap added in v1.3.0

func (mc *MessageContent) IsMap() bool

IsMap returns true if the content was parsed as key-value pairs

func (*MessageContent) IsString added in v1.3.0

func (mc *MessageContent) IsString() bool

IsString returns true if the content is a simple string

func (MessageContent) MarshalJSON added in v1.3.0

func (mc MessageContent) MarshalJSON() ([]byte, error)

MarshalJSON implements custom marshaling for MessageContent

func (*MessageContent) String added in v1.3.0

func (mc *MessageContent) String() string

String returns the content as a string

func (*MessageContent) UnmarshalJSON added in v1.3.0

func (mc *MessageContent) UnmarshalJSON(data []byte) error

UnmarshalJSON implements custom unmarshaling for MessageContent

type MessageCreate

type MessageCreate struct {
	Topic         string `json:"topic"`
	Content       string `json:"content"`
	AggregateType string `json:"aggregate_type,omitempty"`
	AggregateID   *int   `json:"aggregate_id,omitempty"`
}

type OAuthToken added in v1.0.0

type OAuthToken struct {
	AccessToken string `json:"access_token"`
	TokenType   string `json:"token_type"`
	ExpiresIn   int    `json:"expires_in"`
}

OAuthToken represents an OAuth token response

type Session

type Session struct {
	ID            string `json:"id"`
	CreatedAt     string `json:"created_at"`
	SubscriberID  string `json:"subscriber_id"`
	Status        string `json:"status"`
	LastConnected string `json:"last_connected"`
	Token         string `json:"token"`
}

type SessionCreate added in v1.2.0

type SessionCreate struct {
	SubscriberID string `json:"subscriber_id"`
}

type Subscriber

type Subscriber struct {
	ID        string `json:"id"`
	CreatedAt string `json:"created_at"`
	UserID    string `json:"user_id"`
}

type Subscription

type Subscription struct {
	ID            string `json:"id"`
	CreatedAt     string `json:"created_at"`
	Topic         string `json:"topic"`
	SubscriberID  string `json:"subscriber_id"`
	AggregateType string `json:"aggregate_type,omitempty"`
	AggregateID   *int   `json:"aggregate_id,omitempty"`
	IsRegex       bool   `json:"is_regex,omitempty"`
}

type SubscriptionCreate

type SubscriptionCreate struct {
	Topic         string `json:"topic"`
	SubscriberID  string `json:"subscriber_id"`
	AggregateType string `json:"aggregate_type,omitempty"`
	AggregateID   *int   `json:"aggregate_id,omitempty"`
	IsRegex       bool   `json:"is_regex,omitempty"`
}

type TestConfig added in v1.0.0

type TestConfig struct {
	OAuthClientID     string
	OAuthClientSecret string
	OAuthTokenURL     string
	EventAPIURL       string
	SocketsURL        string
	TestTimeout       time.Duration
	LogLevel          slog.Level
}

TestConfig holds configuration for tests

func LoadTestConfig added in v1.0.0

func LoadTestConfig() (*TestConfig, error)

LoadTestConfig loads test configuration from environment variables

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL