signals

package module
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2025 License: MIT Imports: 3 Imported by: 9

README

Signals

High-Performance, Production-Ready Event System for Go

The signals library delivers sub-10 nanosecond performance with zero-allocation critical paths, making it perfect for mission-critical applications like high-frequency trading, real-time control systems, and embedded applications.

Key Features

  • Ultra-Fast Performance: 5.66ns/op single listener emit with zero allocations
  • 🛡️ Context-Aware: All listeners receive context for cancellation and timeouts
  • 🚨 Error-Safe Operations: Fast-failing error propagation with TryEmit for transaction safety
  • 🔒 Thread-Safe: Race-condition free design tested under extreme concurrency
  • 🎯 Transaction-Safe: Perfect for database transactions and critical workflows
  • 📦 Zero Dependencies: Pure Go, no external dependencies
  • 🚀 Async & Sync: Both fire-and-forget and error-handling patterns

💯 93.5% test coverage 💯 | Enterprise-grade reliability

Production-Ready: Used by ManiarTech®️ and other companies in mission-critical applications.

GoReportCard example made-with-Go GoDoc reference example

Quick Start

Installation
go get github.com/maniartech/signals@latest
Choose Your Signal Type
// For fire-and-forget async operations
var UserRegistered = signals.New[User]()

// For transaction-safe operations with error handling
var OrderProcessed = signals.NewSync[Order]()

Practical Examples

1. Simple Async Events (Fire-and-Forget)
package main

import (
    "context"
    "fmt"
    "github.com/maniartech/signals"
)

type User struct {
    ID   int
    Name string
}

// Async signals for non-critical events
var UserRegistered = signals.New[User]()
var EmailSent = signals.New[string]()

func main() {
    // Add listeners for user registration
    UserRegistered.AddListener(func(ctx context.Context, user User) {
        fmt.Printf("📧 Sending welcome email to %s\n", user.Name)
        EmailSent.Emit(ctx, user.Name)
    })

    UserRegistered.AddListener(func(ctx context.Context, user User) {
        fmt.Printf("📊 Adding user %s to analytics\n", user.Name)
    })

    // Emit user registration event
    ctx := context.Background()
    UserRegistered.Emit(ctx, User{ID: 1, Name: "John Doe"})
}
2. Transaction-Safe Error Handling (Mission-Critical)
package main

import (
    "context"
    "errors"
    "fmt"
    "time"
    "github.com/maniartech/signals"
)

type Order struct {
    ID     int
    Amount float64
    UserID int
}

// Sync signal for transaction-safe operations
var OrderProcessed = signals.NewSync[Order]()

func main() {
    // Add error-returning listeners for critical operations
    OrderProcessed.AddListenerWithErr(func(ctx context.Context, order Order) error {
        fmt.Printf("💳 Processing payment for order %d\n", order.ID)
        if order.Amount > 10000 {
            return errors.New("payment declined: amount too high")
        }
        return nil
    })

    OrderProcessed.AddListenerWithErr(func(ctx context.Context, order Order) error {
        fmt.Printf("📦 Creating shipping label for order %d\n", order.ID)
        return nil // Success
    })

    // Emit with error handling and timeout
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    order := Order{ID: 123, Amount: 15000, UserID: 456}

    if err := OrderProcessed.TryEmit(ctx, order); err != nil {
        fmt.Printf("❌ Order processing failed: %v\n", err)
        // Rollback transaction, notify user, etc.
    } else {
        fmt.Printf("✅ Order %d processed successfully\n", order.ID)
    }
}
3. Real-Time System Events
// High-frequency trading or real-time control systems
var PriceUpdated = signals.New[PriceUpdate]()
var SystemAlert = signals.NewSync[Alert]()

// Zero-allocation performance for critical paths
PriceUpdated.AddListener(func(ctx context.Context, update PriceUpdate) {
    // Process price update with sub-10ns latency
    handlePriceChange(update)
})

// Context cancellation for graceful shutdowns
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := SystemAlert.TryEmit(ctx, criticalAlert); err != nil {
    // Handle system failure
}

Performance Benchmarks

Zero-Allocation Critical Paths
Benchmark Iterations Time/Op Memory/Op Allocs/Op Performance Rating
Single Listener 196,613,109 5.66ns 0 B 0 allocs Sub-10ns
Concurrent Emit 41,751,328 28.55ns 0 B 0 allocs 🚀 Race-free
100 Listeners 34,066 35.87μs 42 B 2 allocs 🎯 Optimized
Why These Numbers Matter
  • 5.66ns Single Listener: Faster than most function calls - suitable for high-frequency trading
  • Zero Allocations: No GC pressure in critical paths - perfect for real-time control systems
  • 28.55ns Concurrent: Extreme thread safety without performance compromise
  • Stress-Tested: 100 goroutines × 1000 operations under adversarial conditions
Real-World Performance
// This emits 1 million events in ~5.66ms
for i := 0; i < 1_000_000; i++ {
    signal.Emit(ctx, data) // 5.66ns per emit
}

// Perfect for:
// ✅ High-frequency trading (microsecond latency requirements)
// ✅ Real-time control systems (deterministic timing)
// ✅ Embedded applications (memory-constrained environments)
// ✅ Mission-critical workflows (zero-failure tolerance)

API Reference

AsyncSignal (Fire-and-Forget)
// Create async signal
var UserLoggedIn = signals.New[User]()

// Add listeners
UserLoggedIn.AddListener(func(ctx context.Context, user User) {
    // Handle event (no error return)
}, "optional-key")

// Emit (waits for all listeners to complete)
UserLoggedIn.Emit(ctx, user)

// Non-blocking emit
go UserLoggedIn.Emit(ctx, user)

// Remove listener
UserLoggedIn.RemoveListener("optional-key")
SyncSignal (Error-Safe, Transaction-Ready)
// Create sync signal
var OrderCreated = signals.NewSync[Order]()

// Add error-returning listeners
OrderCreated.AddListenerWithErr(func(ctx context.Context, order Order) error {
    return processPayment(order) // Can return errors
})

// Error-safe emit with context cancellation
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := OrderCreated.TryEmit(ctx, order); err != nil {
    // Handle error or timeout
    // Subsequent listeners won't execute if error occurs
}
Advanced Patterns
// Conditional listeners
if isProduction {
    UserRegistered.AddListener(sendToAnalytics)
}

// Dynamic listener management
key := UserRegistered.AddListener(temporaryHandler)
// Later...
UserRegistered.RemoveListener(key)

// Context cancellation
ctx, cancel := context.WithCancel(context.Background())
go func() {
    time.Sleep(1*time.Second)
    cancel() // Cancels in-flight TryEmit operations
}()

Documentation

GoDoc

License

License

You Need Some Go Experts, Right?

As a software development firm, ManiarTech® specializes in Golang-based projects. Our team has an in-depth understanding of Enterprise Process Automation, Open Source, and SaaS. Also, we have extensive experience porting code from Python and Node.js to Golang. We have a team of Golang experts here at ManiarTech® that is well-versed in all aspects of the language and its ecosystem. At ManiarTech®, we have a team of Golang experts who are well-versed in all facets of the technology.

In short, if you're looking for experts to assist you with Golang-related projects, don't hesitate to get in touch with us. Send an email to contact@maniartech.com to get in touch.

Do you consider yourself an "Expert Golang Developer"?

If so, you may be interested in the challenging and rewarding work that is waiting for you. Use careers@maniartech.com to submit your resume.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncSignal added in v1.1.0

type AsyncSignal[T any] struct {
	// contains filtered or unexported fields
}

AsyncSignal is a struct that implements the Signal interface. This is the default implementation. It provides the same functionality as the SyncSignal but the listeners are called in a separate goroutine. This means that all listeners are called asynchronously. However, the method waits for all the listeners to finish before returning. If you don't want to wait for the listeners to finish, you can call the Emit method in a separate goroutine.

func New

func New[T any]() *AsyncSignal[T]

New creates a new signal that can be used to emit and listen to events asynchronously.

Example:

signal := signals.New[int]()
signal.AddListener(func(ctx context.Context, payload int) {
    // Listener implementation
    // ...
})
signal.Emit(context.Background(), 42)

func (*AsyncSignal[T]) AddListener added in v1.3.0

func (s *AsyncSignal[T]) AddListener(listener SignalListener[T], key ...string) int

AddListener adds a listener to the signal. Promoted from baseSignal.

func (*AsyncSignal[T]) Emit added in v1.1.0

func (s *AsyncSignal[T]) Emit(ctx context.Context, payload T)

func (*AsyncSignal[T]) IsEmpty added in v1.3.0

func (s *AsyncSignal[T]) IsEmpty() bool

IsEmpty checks if the signal has any subscribers. Promoted from baseSignal.

func (*AsyncSignal[T]) Len added in v1.3.0

func (s *AsyncSignal[T]) Len() int

Len returns the number of listeners. Promoted from baseSignal.

func (*AsyncSignal[T]) RemoveListener added in v1.3.0

func (s *AsyncSignal[T]) RemoveListener(key string) int

RemoveListener removes a listener from the signal. Promoted from baseSignal.

func (*AsyncSignal[T]) Reset added in v1.3.0

func (s *AsyncSignal[T]) Reset()

Reset resets the signal. Promoted from baseSignal.

type BaseSignal added in v1.1.0

type BaseSignal[T any] struct {
	// contains filtered or unexported fields
}

BaseSignal provides the base implementation of the Signal interface. It is intended to be used as an abstract base for underlying signal mechanisms.

Example:

type MyDerivedSignal[T any] struct {
	BaseSignal[T]
	// Additional fields or methods specific to MyDerivedSignal
}

func (s *MyDerivedSignal[T]) Emit(ctx context.Context, payload T) {
	// Custom implementation for emitting the signal
}

func NewBaseSignal added in v1.3.0

func NewBaseSignal[T any](opts *SignalOptions) *BaseSignal[T]

NewBaseSignal creates a BaseSignal with optional allocation/growth options.

func (*BaseSignal[T]) AddListener added in v1.1.0

func (s *BaseSignal[T]) AddListener(listener SignalListener[T], key ...string) int

AddListener adds a listener to the signal. The listener will be called whenever the signal is emitted. It returns the number of subscribers after the listener was added. It accepts an optional key that can be used to remove the listener later or to check if the listener was already added. It returns -1 if the listener with the same key was already added to the signal.

Example:

signal := signals.New[int]()
count := signal.AddListener(func(ctx context.Context, payload int) {
	// Listener implementation
	// ...
}, "key1")
fmt.Println("Number of subscribers after adding listener:", count)

func (*BaseSignal[T]) AddListenerWithErr added in v1.3.0

func (s *BaseSignal[T]) AddListenerWithErr(listener SignalListenerErr[T], key ...string) int

AddListenerErr adds an error-returning listener. It behaves like AddListener but the listener may return an error. If a key is provided and already exists, it returns -1.

func (*BaseSignal[T]) Emit added in v1.1.0

func (s *BaseSignal[T]) Emit(ctx context.Context, payload T)

Emit is not implemented in BaseSignal and panics if called. It should be implemented by a derived type.

Example:

type MyDerivedSignal[T any] struct {
	BaseSignal[T]
	// Additional fields or methods specific to MyDerivedSignal
}

func (s *MyDerivedSignal[T]) Emit(ctx context.Context, payload T) {
	// Custom implementation for emitting the signal
}

func (*BaseSignal[T]) IsEmpty added in v1.1.0

func (s *BaseSignal[T]) IsEmpty() bool

func (*BaseSignal[T]) Len added in v1.1.0

func (s *BaseSignal[T]) Len() int

func (*BaseSignal[T]) RemoveListener added in v1.1.0

func (s *BaseSignal[T]) RemoveListener(key string) int

RemoveListener removes a listener from the signal. It returns the number of subscribers after the listener was removed. It returns -1 if the listener was not found.

Example:

signal := signals.New[int]()
signal.AddListener(func(ctx context.Context, payload int) {
	// Listener implementation
	// ...
}, "key1")
count := signal.RemoveListener("key1")
fmt.Println("Number of subscribers after removing listener:", count)

func (*BaseSignal[T]) Reset added in v1.1.0

func (s *BaseSignal[T]) Reset()

Reset resets the signal by removing all subscribers from the signal, effectively clearing the list of subscribers. This can be used when you want to stop all listeners from receiving further signals.

Example:

signal := signals.New[int]()
signal.AddListener(func(ctx context.Context, payload int) {
	// Listener implementation
	// ...
})
signal.Reset() // Removes all listeners
fmt.Println("Number of subscribers after resetting:", signal.Len())

type Signal

type Signal[T any] interface {
	// Emit notifies all subscribers of the signal and passes the context and the payload.
	//
	// If the context has a deadline or cancellable property, the listeners
	// must respect it. If the signal is async (default), the listeners are called
	// in a separate goroutine.
	//
	// Example:
	//	signal := signals.New[int]()
	//	signal.AddListener(func(ctx context.Context, payload int) {
	//		// Listener implementation
	//		// ...
	//	})
	//	signal.Emit(context.Background(), 42)
	Emit(ctx context.Context, payload T)

	// AddListener adds a listener to the signal.
	//
	// The listener will be called whenever the signal is emitted. It returns the
	// number of subscribers after the listener was added. It accepts an optional key
	// that can be used to remove the listener later or to check if the listener
	// was already added. It returns -1 if the listener with the same key
	// was already added to the signal.
	//
	// Example:
	//	signal := signals.NewSync[int]()
	//	count := signal.AddListener(func(ctx context.Context, payload int) {
	//		// Listener implementation
	//		// ...
	//	})
	//	fmt.Println("Number of subscribers after adding listener:", count)
	AddListener(handler SignalListener[T], key ...string) int

	// RemoveListener removes a listener from the signal.
	//
	// It returns the number of subscribers after the listener was removed.
	// It returns -1 if the listener was not found.
	//
	// Example:
	//	signal := signals.NewSync[int]()
	//	signal.AddListener(func(ctx context.Context, payload int) {
	//		// Listener implementation
	//		// ...
	//	}, "key1")
	//	count := signal.RemoveListener("key1")
	//	fmt.Println("Number of subscribers after removing listener:", count)
	RemoveListener(key string) int

	// Reset resets the signal by removing all subscribers from the signal,
	// effectively clearing the list of subscribers.
	//
	// This can be used when you want to stop all listeners from receiving
	// further signals.
	//
	// Example:
	//	signal := signals.New[int]()
	//	signal.AddListener(func(ctx context.Context, payload int) {
	//		// Listener implementation
	//		// ...
	//	})
	//	signal.Reset() // Removes all listeners
	//	fmt.Println("Number of subscribers after resetting:", signal.Len())
	Reset()

	// Len returns the number of listeners subscribed to the signal.
	//
	// This can be used to check how many listeners are currently waiting for a signal.
	// The returned value is of type int.
	//
	// Example:
	//	signal := signals.NewSync[int]()
	//	signal.AddListener(func(ctx context.Context, payload int) {
	//		// Listener implementation
	//		// ...
	//	})
	//	fmt.Println("Number of subscribers:", signal.Len())
	Len() int

	// IsEmpty checks if the signal has any subscribers.
	//
	// It returns true if the signal has no subscribers, and false otherwise.
	// This can be used to check if there are any listeners before emitting a signal.
	//
	// Example:
	//	signal := signals.New[int]()
	//	fmt.Println("Is signal empty?", signal.IsEmpty()) // Should print true
	//	signal.AddListener(func(ctx context.Context, payload int) {
	//		// Listener implementation
	//		// ...
	//	})
	//	fmt.Println("Is signal empty?", signal.IsEmpty()) // Should print false
	IsEmpty() bool
}

Signal is the interface that represents a signal that can be subscribed to emitting a payload of type T.

func NewWithOptions added in v1.3.0

func NewWithOptions[T any](opts *SignalOptions) Signal[T]

NewWithOptions creates a new async Signal with custom allocation/growth options.

type SignalListener

type SignalListener[T any] func(context.Context, T)

SignalListener is a type definition for a function that will act as a listener for signals. This function takes two parameters:

  1. A context of type `context.Context`. This is typically used for timeout and cancellation signals, and can carry request-scoped values across API boundaries and between processes.
  2. A payload of generic type `T`. This can be any type, and represents the data or signal that the listener function will process.

The function does not return any value.

type SignalListenerErr added in v1.3.0

type SignalListenerErr[T any] func(context.Context, T) error

SignalListenerErr is a type definition for a function that will act as an error-returning listener for signals. This function takes two parameters:

  1. A context of type `context.Context`. This is typically used for timeout and cancellation signals, and can carry request-scoped values across API boundaries and between processes.
  2. A payload of generic type `T`. This can be any type, and represents the data or signal that the listener function will process.

The function returns an error value, which can be used to indicate if the listener encountered any issues while processing the signal.

type SignalOptions added in v1.3.0

type SignalOptions struct {
	InitialCapacity int
	GrowthFunc      func(currentCap int) int
}

SignalOptions allows advanced users to configure allocation and growth strategy for listeners.

type SyncSignal added in v1.1.0

type SyncSignal[T any] struct {
	// contains filtered or unexported fields
}

SyncSignal is a struct that implements the Signal interface. It provides a synchronous way of notifying all subscribers of a signal. The type parameter `T` is a placeholder for any type.

func NewSync added in v1.1.0

func NewSync[T any]() *SyncSignal[T]

NewSync creates a new signal that can be used to emit and listen to events synchronously.

Example:

signal := signals.NewSync[int]()
signal.AddListener(func(ctx context.Context, payload int) {
    // Listener implementation
    // ...
})
signal.Emit(context.Background(), 42)

func (*SyncSignal[T]) AddListener added in v1.3.0

func (s *SyncSignal[T]) AddListener(listener SignalListener[T], key ...string) int

AddListener adds a listener to the signal. Promoted from baseSignal.

func (*SyncSignal[T]) AddListenerWithErr added in v1.3.0

func (s *SyncSignal[T]) AddListenerWithErr(listener SignalListenerErr[T], key ...string) int

AddListenerWithErr adds an error-returning listener. Promoted from baseSignal.

func (*SyncSignal[T]) Emit added in v1.1.0

func (s *SyncSignal[T]) Emit(ctx context.Context, payload T)

Emit notifies all subscribers of the signal and passes the payload in a synchronous way.

func (*SyncSignal[T]) IsEmpty added in v1.3.0

func (s *SyncSignal[T]) IsEmpty() bool

IsEmpty checks if the signal has any subscribers. Promoted from baseSignal.

func (*SyncSignal[T]) Len added in v1.3.0

func (s *SyncSignal[T]) Len() int

Len returns the number of listeners. Promoted from baseSignal.

func (*SyncSignal[T]) RemoveListener added in v1.3.0

func (s *SyncSignal[T]) RemoveListener(key string) int

RemoveListener removes a listener from the signal. Promoted from baseSignal.

func (*SyncSignal[T]) Reset added in v1.3.0

func (s *SyncSignal[T]) Reset()

Reset resets the signal. Promoted from baseSignal.

func (*SyncSignal[T]) TryEmit added in v1.3.0

func (s *SyncSignal[T]) TryEmit(ctx context.Context, payload T) error

TryEmit behaves like Emit but returns an error when the provided context is canceled or when any error-returning listener returns a non-nil error. It stops invoking further listeners as soon as an error or cancellation is observed. If no error occurs, it returns nil.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL