eventqueue

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2025 License: MIT Imports: 9 Imported by: 0

README

Go Event Queue

A lightweight, lock-free event queue library for Go that supports both sequential and parallel event processing with deadline management and result handling.

Features

  • Auto-incrementing event IDs
  • Context-based event data passing
  • Event deadlines with automatic expiration
  • Result handling with Done/Wait pattern
  • Two processing modes: Sequential and Parallel
  • Lock-free queue implementation for sequential mode
  • Configurable buffer size
  • Multiple handlers per event type
  • Comprehensive unit tests (95% coverage)

Installation

go get github.com/chronnie/go-event-queue

Quick Start

package main

import (
    "context"
    "fmt"
    "log"

    eventqueue "github.com/chronnie/go-event-queue"
)

func main() {
    // Create event queue
    config := eventqueue.EventQueueConfig{
        BufferSize:     100,
        ProcessingMode: eventqueue.Sequential,
    }
    queue := eventqueue.NewEventQueue(config)

    // Register handler
    queue.RegisterHandler("user.created", eventqueue.EventHandlerFunc(
        func(ctx context.Context, event eventqueue.IEvent) error {
            userID := ctx.Value("user_id")
            fmt.Printf("Processing user: %v (Event ID: %d)\n", userID, event.GetID())
            return nil
        },
    ))

    // Start queue
    queue.Start(context.Background())
    defer queue.Stop()

    // Create event with context data
    ctx := context.WithValue(context.Background(), "user_id", 123)
    event := eventqueue.NewEvent("user.created", ctx)

    // Enqueue and wait for result
    queue.Enqueue(event)
    result, err := event.Wait()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Result: %v\n", result)
}

Core Concepts

IEvent

Events are the fundamental unit of work in the queue. Each event has:

type IEvent interface {
    GetID() uint64                     // Auto-incrementing unique ID
    GetType() string                    // Event type for routing to handlers
    GetContext() context.Context        // Context with event data
    GetTimestamp() time.Time            // Creation time
    GetDeadline() time.Time             // Processing deadline
    HasDeadline() bool                  // Whether deadline is set
    IsExpired() bool                    // Whether deadline has passed
    Done(result interface{}, err error) // Signal completion
    Wait() (interface{}, error)         // Wait for completion
}
Auto-Incrementing IDs

Event IDs are automatically generated and increment globally:

event1 := eventqueue.NewEvent("test", ctx)  // ID: 1
event2 := eventqueue.NewEvent("test", ctx)  // ID: 2
event3 := eventqueue.NewEvent("test", ctx)  // ID: 3
Context-Based Data

Event data is passed via context.Context:

ctx := context.Background()
ctx = context.WithValue(ctx, "user_id", 123)
ctx = context.WithValue(ctx, "email", "user@example.com")

event := eventqueue.NewEvent("user.created", ctx)

Handlers access data from the context:

func(ctx context.Context, event eventqueue.IEvent) error {
    userID := ctx.Value("user_id").(int)
    email := ctx.Value("email").(string)
    // Process event
    return nil
}
Event Deadlines

Events can have deadlines. Expired events are automatically dropped:

// With timeout duration
event := eventqueue.NewEvent("task", ctx,
    eventqueue.WithTimeout(5*time.Second))

// With specific deadline time
deadline := time.Now().Add(10*time.Second)
event := eventqueue.NewEvent("task", ctx,
    eventqueue.WithDeadline(deadline))

// Check expiration
if event.IsExpired() {
    // Event has expired
}
Result Handling

Events provide a Done/Wait pattern for result handling:

event := eventqueue.NewEvent("task", ctx)
queue.Enqueue(event)

// Wait for processing to complete
result, err := event.Wait()
if err != nil {
    log.Printf("Processing failed: %v", err)
} else {
    log.Printf("Processing succeeded: %v", result)
}

The queue automatically calls Done() after processing:

  • On success: event.Done("processed", nil)
  • On deadline expiration: event.Done(nil, errors.New("event expired: deadline exceeded"))
  • On no handler: event.Done(nil, errors.New("no handler registered for event type"))
IEventHandler

Handlers process events:

type IEventHandler interface {
    Handle(ctx context.Context, event IEvent) error
}

Use EventHandlerFunc for function-based handlers:

handler := eventqueue.EventHandlerFunc(func(ctx context.Context, event eventqueue.IEvent) error {
    // Process event
    return nil
})
IEventQueue

The queue manages event processing:

type IEventQueue interface {
    Enqueue(event IEvent) error
    Start(ctx context.Context) error
    Stop() error
    RegisterHandler(eventType string, handler IEventHandler)
    GetQueueSize() int
}

Processing Modes

Sequential Mode

Events are processed one at a time in order. Ideal for:

  • Accessing shared resources without locks (maps, databases, etc.)
  • Maintaining strict event ordering
  • Preventing race conditions
config := eventqueue.EventQueueConfig{
    BufferSize:     100,
    ProcessingMode: eventqueue.Sequential,
}
queue := eventqueue.NewEventQueue(config)

In sequential mode, the queue uses lock-free atomic operations for better performance.

Parallel Mode

Events are processed concurrently. Ideal for:

  • Independent events
  • Read-only operations on shared resources
  • Maximizing throughput
config := eventqueue.EventQueueConfig{
    BufferSize:     100,
    ProcessingMode: eventqueue.Parallel,
}
queue := eventqueue.NewEventQueue(config)

Advanced Usage

Multiple Handlers

Register multiple handlers for the same event type:

queue.RegisterHandler("order.created", inventoryHandler)
queue.RegisterHandler("order.created", emailHandler)
queue.RegisterHandler("order.created", analyticsHandler)
  • Sequential mode: Handlers execute in registration order
  • Parallel mode: Handlers execute concurrently
Custom Handlers

Implement IEventHandler for custom logic:

type EmailNotificationHandler struct {
    smtpServer string
}

func (h *EmailNotificationHandler) Handle(ctx context.Context, event eventqueue.IEvent) error {
    email := ctx.Value("email").(string)
    // Send email using h.smtpServer
    return nil
}

queue.RegisterHandler("user.created", &EmailNotificationHandler{
    smtpServer: "smtp.example.com",
})
Context Cancellation

The queue respects context cancellation:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

queue.Start(ctx)
// Queue stops when context is cancelled or timeout occurs

Lock-Free Design

The event queue uses a lock-free design for sequential processing:

  • Event ID generation uses atomic operations
  • Queue state uses atomic.Bool for running status
  • Processing mode uses atomic.Int32
  • No locks in the hot path for sequential mode

This design is intentional for sequential mode where events are processed one at a time, eliminating the need for locks when accessing shared resources in your handlers.

Configuration

type EventQueueConfig struct {
    BufferSize     int                // Event buffer size (default: 100)
    ProcessingMode ProcessingMode     // Sequential or Parallel
}

Error Handling

  • Enqueue returns error if:

    • Queue is stopped
    • Buffer is full
    • Context is cancelled
  • Start returns error if queue is already running

  • Stop returns error if queue is already stopped

  • Expired events automatically return "event expired: deadline exceeded" error

Best Practices

  1. Register handlers before calling Start() to avoid race conditions
  2. Use Sequential mode when handlers access shared mutable state
  3. Use Parallel mode when handlers are independent or only read shared data
  4. Set appropriate deadlines for time-sensitive events
  5. Use context.WithValue for passing event data
  6. Call Stop() to ensure graceful shutdown
  7. Use Wait() when you need to know the processing result

Examples

See the examples directory for complete working examples:

Run examples:

cd examples/basic
go run main.go

Testing

Run tests:

go test -v ./...

Run tests with coverage:

go test -v -cover ./...

Current test coverage: 95%

License

This project is licensed under the MIT License - see the LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Event

type Event struct {
	// contains filtered or unexported fields
}

Event is the default implementation of IEvent

func NewEvent

func NewEvent(eventType string, ctx context.Context, options ...EventOption) *Event

NewEvent creates a new event instance with auto-incrementing ID

func (*Event) Done

func (e *Event) Done(result interface{}, err error)

Done signals that the event processing is complete

func (*Event) GetContext

func (e *Event) GetContext() context.Context

GetContext returns the event context

func (*Event) GetDeadline

func (e *Event) GetDeadline() time.Time

GetDeadline returns the deadline for processing this event

func (*Event) GetID

func (e *Event) GetID() uint64

GetID returns the unique identifier of the event

func (*Event) GetTimestamp

func (e *Event) GetTimestamp() time.Time

GetTimestamp returns when the event was created

func (*Event) GetType

func (e *Event) GetType() string

GetType returns the type/category of the event

func (*Event) HasDeadline

func (e *Event) HasDeadline() bool

HasDeadline returns true if the event has a deadline

func (*Event) IsExpired

func (e *Event) IsExpired() bool

IsExpired checks if the event has passed its deadline

func (*Event) Wait

func (e *Event) Wait() (interface{}, error)

Wait waits for the event to be processed and returns the result

type EventContext

type EventContext struct {
	// contains filtered or unexported fields
}

EventContext holds the context data and completion channel for an event

func NewEventContext

func NewEventContext(ctx context.Context) *EventContext

NewEventContext creates a new event context

func (*EventContext) Context

func (ec *EventContext) Context() context.Context

Context returns the underlying context

func (*EventContext) Done

func (ec *EventContext) Done(result interface{}, err error)

Done signals completion with result

func (*EventContext) Wait

func (ec *EventContext) Wait() (interface{}, error)

Wait waits for completion and returns the result

type EventHandlerFunc

type EventHandlerFunc func(ctx context.Context, event IEvent) error

EventHandlerFunc is a function type that implements IEventHandler

func (EventHandlerFunc) Handle

func (f EventHandlerFunc) Handle(ctx context.Context, event IEvent) error

Handle implements IEventHandler interface

type EventOption

type EventOption func(*Event)

EventOption is a function that configures an Event

func WithDeadline

func WithDeadline(deadline time.Time) EventOption

WithDeadline sets a deadline for the event

func WithTimeout

func WithTimeout(timeout time.Duration) EventOption

WithTimeout sets a timeout duration for the event from creation time

type EventQueue

type EventQueue struct {
	// contains filtered or unexported fields
}

EventQueue is the default implementation of IEventQueue Uses lock-free design for sequential processing

func NewDefaultEventQueue

func NewDefaultEventQueue() *EventQueue

NewDefaultEventQueue creates a new event queue with default configuration: - BufferSize: 1000 - ProcessingMode: Sequential

func NewEventQueue

func NewEventQueue(config EventQueueConfig) *EventQueue

NewEventQueue creates a new event queue with the given configuration

func (*EventQueue) Enqueue

func (eq *EventQueue) Enqueue(event IEvent) error

Enqueue adds an event to the queue

func (*EventQueue) GetQueueSize

func (eq *EventQueue) GetQueueSize() int

GetQueueSize returns the current number of events in the queue

func (*EventQueue) RegisterHandler

func (eq *EventQueue) RegisterHandler(eventType string, handler IEventHandler)

RegisterHandler registers a handler for a specific event type Only one handler per event type is allowed. Registering a new handler will replace the existing one. Note: Should be called before Start() to avoid race conditions

func (*EventQueue) Start

func (eq *EventQueue) Start(ctx context.Context) error

Start begins processing events from the queue

func (*EventQueue) Stop

func (eq *EventQueue) Stop() error

Stop gracefully stops the queue processing

type EventQueueConfig

type EventQueueConfig struct {
	BufferSize     int
	ProcessingMode ProcessingMode
}

EventQueueConfig holds configuration for creating an event queue

type IEvent

type IEvent interface {
	// GetID returns the unique identifier of the event
	GetID() uint64
	// GetType returns the type/category of the event
	GetType() string
	// GetContext returns the event context
	GetContext() context.Context
	// GetTimestamp returns when the event was created
	GetTimestamp() time.Time
	// GetDeadline returns the deadline for processing this event
	// Returns zero time if no deadline is set
	GetDeadline() time.Time
	// HasDeadline returns true if the event has a deadline
	HasDeadline() bool
	// IsExpired checks if the event has passed its deadline
	IsExpired() bool
	// Done signals that the event processing is complete and sets the result
	Done(result interface{}, err error)
	// Wait waits for the event to be processed and returns the result
	Wait() (interface{}, error)
}

IEvent represents a basic event interface that can be queued and processed

type IEventHandler

type IEventHandler interface {
	// Handle processes the event and returns an error if processing fails
	Handle(ctx context.Context, event IEvent) error
}

IEventHandler defines the interface for processing events

type IEventQueue

type IEventQueue interface {
	// Enqueue adds an event to the queue
	Enqueue(event IEvent) error
	// Start begins processing events from the queue
	Start(ctx context.Context) error
	// Stop gracefully stops the queue processing
	Stop() error
	// RegisterHandler registers a handler for a specific event type
	RegisterHandler(eventType string, handler IEventHandler)
	// GetQueueSize returns the current number of events in the queue
	GetQueueSize() int
}

IEventQueue defines the interface for an event queue

type ProcessingMode

type ProcessingMode int

ProcessingMode defines how events should be processed

const (
	// Sequential mode processes events one at a time in order
	Sequential ProcessingMode = iota
	// Parallel mode processes events concurrently
	Parallel
)

func (ProcessingMode) String added in v1.0.2

func (pm ProcessingMode) String() string

String returns the string representation of ProcessingMode

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL