sse

package
v2.39.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2026 License: CC0-1.0 Imports: 9 Imported by: 0

README

SSE Token Authentication Guide

Overview

The SSE endpoint (/sse) now supports token/API-key authentication to prevent unauthorized access to real-time events. Clients must provide a valid token to establish an SSE connection.

How It Works

1. Token Validation Flow
Client Request → SSE Endpoint → Check for Token → Validate Token → Accept/Reject
                                                   ↓
                                    401 (Missing Token)
                                    403 (Invalid Token)
                                    200 (Valid Token)
2. Token Sources

The endpoint checks for tokens in the following order:

  1. Authorization Header - Authorization: Bearer YOUR_TOKEN
  2. Query Parameter - GET /v1/sse?token=YOUR_TOKEN

The system automatically strips the "Bearer " prefix if present.

Implementation

Basic Setup
1. Define Your Token Validator Function
// In loader/app.go or a separate auth package
func MyTokenValidator(token string) (bool, error) {
    // Database lookup example
    user, err := db.GetUserByToken(token)
    if err != nil {
        return false, err
    }
    return user != nil && !user.IsExpired(), nil
}
2. Enable SSE with Token Validation
import "loader"

// In your main.go or initialization code
app := loader.NewApp(
    loader.WithSSE(),
    loader.WithSSETokenValidator(MyTokenValidator),
)
Advanced Examples
JWT Token Validation
import "github.com/golang-jwt/jwt/v4"

func ValidateJWTToken(tokenString string) (bool, error) {
    token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
        // Verify signing method
        if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
            return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
        }
        // Return your secret key
        return []byte(os.Getenv("JWT_SECRET")), nil
    })
    
    if err != nil {
        return false, err
    }
    
    if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
        // Verify expiration
        if exp, ok := claims["exp"].(float64); ok {
            if int64(exp) < time.Now().Unix() {
                return false, nil // Token expired
            }
        }
        return true, nil
    }
    
    return false, nil
}
Database Lookup with Caching
import "time"

type APIKeyCache struct {
    keys  map[string]time.Time
    mu    sync.RWMutex
    ttl   time.Duration
}

func (c *APIKeyCache) ValidateToken(token string) (bool, error) {
    c.mu.RLock()
    if expiry, exists := c.keys[token]; exists {
        c.mu.RUnlock()
        if expiry.After(time.Now()) {
            return true, nil
        }
        // Token expired, remove from cache
        c.mu.Lock()
        delete(c.keys, token)
        c.mu.Unlock()
        return false, nil
    }
    c.mu.RUnlock()
    
    // Not in cache, check database
    dbKey, err := db.GetAPIKey(token)
    if err != nil {
        return false, err
    }
    
    if dbKey != nil {
        c.mu.Lock()
        c.keys[token] = time.Now().Add(c.ttl)
        c.mu.Unlock()
        return true, nil
    }
    
    return false, nil
}

Client Usage

JavaScript/Browser Example
Using Authorization Header
// Connect with Authorization header
const eventSource = new EventSource('/v1/sse', {
    headers: {
        'Authorization': 'Bearer your-secret-token'
    }
});

eventSource.onopen = () => {
    console.log('SSE Connected');
};

eventSource.addEventListener('message', (event) => {
    console.log('Received:', event.data);
});

eventSource.addEventListener('heartbeat', (event) => {
    console.log('Heartbeat:', event.data);
});

eventSource.onerror = (error) => {
    if (error.status === 401) {
        console.error('Unauthorized: Missing or invalid token');
    } else if (error.status === 403) {
        console.error('Forbidden: Invalid or expired token');
    }
    eventSource.close();
};
Using Query Parameter
const token = 'your-secret-token';
const eventSource = new EventSource(`/v1/sse?token=${encodeURIComponent(token)}`);

// Same event handlers as above
React Hook Example
import { useEffect, useState } from 'react';

function useSSE(token) {
    const [message, setMessage] = useState(null);
    const [error, setError] = useState(null);
    const [isConnected, setIsConnected] = useState(false);

    useEffect(() => {
        const eventSource = new EventSource(`/v1/sse?token=${encodeURIComponent(token)}`);

        eventSource.onopen = () => {
            setIsConnected(true);
            setError(null);
        };

        eventSource.onmessage = (event) => {
            setMessage(JSON.parse(event.data));
        };

        eventSource.addEventListener('heartbeat', () => {
            // Heartbeat received, connection is alive
        });

        eventSource.onerror = () => {
            setIsConnected(false);
            if (eventSource.readyState === EventSource.CLOSED) {
                setError('Connection closed');
            }
            eventSource.close();
        };

        return () => eventSource.close();
    }, [token]);

    return { message, error, isConnected };
}

// Usage
function App() {
    const { message, error, isConnected } = useSSE('your-token');
    
    return (
        <div>
            <p>Status: {isConnected ? 'Connected' : 'Disconnected'}</p>
            {error && <p style={{ color: 'red' }}>{error}</p>}
            {message && <pre>{JSON.stringify(message, null, 2)}</pre>}
        </div>
    );
}
Go Client Example
import "net/http"

func main() {
    token := "your-secret-token"
    client := &http.Client{}
    
    req, err := http.NewRequest("GET", "http://localhost:8000/v1/sse", nil)
    if err != nil {
        log.Fatal(err)
    }
    
    // Set authorization header
    req.Header.Set("Authorization", "Bearer "+token)
    
    resp, err := client.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    
    if resp.StatusCode == http.StatusUnauthorized {
        log.Fatal("Missing or invalid token")
    }
    if resp.StatusCode == http.StatusForbidden {
        log.Fatal("Token is invalid or expired")
    }
    
    defer resp.Body.Close()
    
    // Read SSE messages
    scanner := bufio.NewScanner(resp.Body)
    for scanner.Scan() {
        line := scanner.Text()
        fmt.Println(line)
    }
}

Error Responses

401 Unauthorized - Missing Token
{
  "code": "UNAUTHORIZED",
  "message": "Missing token/api-key"
}

Cause: No token provided in Authorization header or query parameter

Solution: Add token to request:

  • Header: Authorization: Bearer YOUR_TOKEN
  • Query: /v1/sse?token=YOUR_TOKEN
403 Forbidden - Invalid Token
{
  "code": "FORBIDDEN",
  "message": "Invalid or expired token/api-key"
}

Cause: Token failed validation (invalid, expired, revoked)

Solution:

  • Verify token is correct
  • Check if token has expired
  • Refresh token if applicable
500 Internal Server Error - Validation Failure
{
  "code": "SERVER_ERROR",
  "message": "Token validation failed"
}

Cause: Unexpected error during token validation (e.g., database connection error)

Solution: Check server logs for detailed error information

Disabling Token Validation

If you don't provide a token validator, the SSE endpoint will accept connections without authentication:

// SSE without authentication
app := loader.NewApp(
    loader.WithSSE(),
    // No WithSSETokenValidator provided
)

To require authentication, always provide a validator:

app := loader.NewApp(
    loader.WithSSE(),
    loader.WithSSETokenValidator(MyValidator), // Required for auth
)

Best Practices

1. Token Generation
  • Use cryptographically secure random generation
  • Store tokens hashed (never plain text)
  • Include expiration times
2. Token Validation
  • Cache valid tokens with TTL for performance
  • Log validation failures for security auditing
  • Don't log actual token values in error messages
3. Transport Security
  • Always use HTTPS in production
  • Never transmit tokens over unencrypted connections
  • Consider short-lived tokens with refresh mechanisms
4. Token Rotation
  • Implement token expiration
  • Provide refresh endpoints for valid users
  • Revoke tokens on logout
5. Rate Limiting
  • Consider rate limiting token validation attempts
  • Implement exponential backoff for failed attempts

Testing

func TestSSEWithValidToken(t *testing.T) {
    validator := func(token string) (bool, error) {
        return token == "valid-token", nil
    }
    
    req := httptest.NewRequest("GET", "/v1/sse", nil)
    req.Header.Set("Authorization", "Bearer valid-token")
    
    // Test your endpoint
}

func TestSSEWithInvalidToken(t *testing.T) {
    validator := func(token string) (bool, error) {
        return token == "valid-token", nil
    }
    
    req := httptest.NewRequest("GET", "/v1/sse", nil)
    req.Header.Set("Authorization", "Bearer invalid-token")
    
    // Should return 403
}

func TestSSEWithoutToken(t *testing.T) {
    validator := func(token string) (bool, error) {
        return token == "valid-token", nil
    }
    
    req := httptest.NewRequest("GET", "/v1/sse", nil)
    // No token header
    
    // Should return 401
}

Troubleshooting

Connection Refused with 401
  • Issue: Token not provided
  • Solution: Add Authorization: Bearer YOUR_TOKEN header or ?token=YOUR_TOKEN query param
Connection Refused with 403
  • Issue: Token validation failed
  • Cause: Invalid token, expired token, or revoked token
  • Solution: Verify token and refresh if necessary
Connection Closes Immediately
  • Issue: Token validation error in validator function
  • Solution: Check server logs for validator function errors and database connectivity
No Heartbeat After Connect
  • Issue: Client connected successfully but receives no heartbeat
  • Solution: Browser SSE has a 30-second heartbeat; check network connectivity

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BufferedMessage

type BufferedMessage struct {
	ID        string                 `json:"id"`
	Event     string                 `json:"event"`
	Data      string                 `json:"data"`
	Timestamp time.Time              `json:"timestamp"`
	CreatedAt time.Time              `json:"-"`
	ExpiresAt time.Time              `json:"-"`
	Retry     int                    `json:"retry,omitempty"`
	Extra     map[string]interface{} `json:"extra,omitempty"`
}

BufferedMessage represents a message with delivery metadata

type Client

type Client struct {
	ID      string
	Channel chan *Message
	Request *http.Request
	Writer  http.ResponseWriter
	Flusher http.Flusher
	// contains filtered or unexported fields
}

Client SSEClient represents a single SSE connection

type ClientDeliveryManager

type ClientDeliveryManager struct {
	// contains filtered or unexported fields
}

ClientDeliveryManager manages delivery tracking for multiple clients

func NewClientDeliveryManager

func NewClientDeliveryManager() *ClientDeliveryManager

NewClientDeliveryManager creates a new delivery manager

func (*ClientDeliveryManager) GetAllClients

func (cdm *ClientDeliveryManager) GetAllClients() []string

GetAllClients returns all registered client IDs

func (*ClientDeliveryManager) GetClientTracker

func (cdm *ClientDeliveryManager) GetClientTracker(clientID string) *ClientDeliveryTracker

GetClientTracker returns the tracker for a client

func (*ClientDeliveryManager) RegisterClient

func (cdm *ClientDeliveryManager) RegisterClient(clientID string) *ClientDeliveryTracker

RegisterClient registers a new client

func (*ClientDeliveryManager) UnregisterClient

func (cdm *ClientDeliveryManager) UnregisterClient(clientID string)

UnregisterClient removes a client tracker

func (*ClientDeliveryManager) UpdateClientAcknowledgment

func (cdm *ClientDeliveryManager) UpdateClientAcknowledgment(clientID string, messageID string)

UpdateClientAcknowledgment updates the last acknowledged message for a client

type ClientDeliveryTracker

type ClientDeliveryTracker struct {
	// contains filtered or unexported fields
}

ClientDeliveryTracker tracks which messages have been delivered to which clients

func NewClientDeliveryTracker

func NewClientDeliveryTracker(clientID string) *ClientDeliveryTracker

NewClientDeliveryTracker creates a tracker for a client

func (*ClientDeliveryTracker) GetLastMessageID

func (cdt *ClientDeliveryTracker) GetLastMessageID() string

GetLastMessageID returns the last acknowledged message ID

func (*ClientDeliveryTracker) UpdateLastMessageID

func (cdt *ClientDeliveryTracker) UpdateLastMessageID(messageID string)

UpdateLastMessageID updates the last acknowledged message ID

type EncryptedTokenValidator

type EncryptedTokenValidator func(encryptedToken string) (bool, error)

EncryptedTokenValidator is a function type for validating encrypted tokens

type Events added in v2.39.2

type Events interface {
	Broadcast(message *Message)
}

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub SSEHub manages all SSE connections

func NewHub

func NewHub(ctx context.Context) *Hub

NewHub creates a new SSE hub

func (*Hub) Broadcast

func (hub *Hub) Broadcast(message *Message)

Broadcast sends a message to all connected clients and buffers it for later retrieval

func (*Hub) GetClientCount

func (hub *Hub) GetClientCount() int

GetClientCount returns the number of connected clients

func (*Hub) GetDeliveryManager

func (hub *Hub) GetDeliveryManager() *ClientDeliveryManager

GetDeliveryManager returns the delivery manager

func (*Hub) GetMessageBuffer

func (hub *Hub) GetMessageBuffer() *MessageBuffer

GetMessageBuffer returns the message buffer

func (*Hub) GetMissedMessages

func (hub *Hub) GetMissedMessages(clientID string, lastReceivedID string) []*BufferedMessage

GetMissedMessages returns messages that were sent while a client was offline clientID: the client requesting missed messages lastReceivedID: the ID of the last message the client successfully received

func (*Hub) Handle

func (hub *Hub) Handle(w http.ResponseWriter, r *http.Request)

Handle is an HTTP handler for SSE connections

func (*Hub) Run

func (hub *Hub) Run()

Run starts the SSE hub

func (*Hub) SendToClient

func (hub *Hub) SendToClient(clientID string, message *Message) error

SendToClient sends a message to a specific client

func (*Hub) SetCryptoManager

func (hub *Hub) SetCryptoManager(cm *helper.CryptoManager)

SetCryptoManager sets the crypto manager for decrypting tokens

func (*Hub) SetEncryptedTokenValidator

func (hub *Hub) SetEncryptedTokenValidator(validator EncryptedTokenValidator)

SetEncryptedTokenValidator sets the encrypted token validation function

func (*Hub) SetMessageBuffer

func (hub *Hub) SetMessageBuffer(mb *MessageBuffer)

SetMessageBuffer sets a custom message buffer

func (*Hub) SetTokenValidator

func (hub *Hub) SetTokenValidator(validator TokenValidator)

SetTokenValidator sets the token validation function

func (*Hub) Stop

func (hub *Hub) Stop()

Stop gracefully stops the SSE hub

type Message

type Message struct {
	Event string
	Data  interface{}
	ID    string
	Retry int
}

Message SSEMessage represents a message to be sent via SSE

func NewSSEMessage

func NewSSEMessage(event string, data interface{}) *Message

NewSSEMessage creates a new SSE message

type MessageBuffer

type MessageBuffer struct {
	// contains filtered or unexported fields
}

MessageBuffer stores SSE messages with ID tracking for reliable delivery

func NewMessageBuffer

func NewMessageBuffer(maxSize int, ttl time.Duration) *MessageBuffer

NewMessageBuffer creates a new message buffer

func (*MessageBuffer) AddMessage

func (mb *MessageBuffer) AddMessage(msg *BufferedMessage)

AddMessage adds a message to the buffer

func (*MessageBuffer) Clear

func (mb *MessageBuffer) Clear()

Clear removes all messages from buffer

func (*MessageBuffer) GetMessagesSince

func (mb *MessageBuffer) GetMessagesSince(messageID string) []*BufferedMessage

GetMessagesSince returns all messages since a given message ID

func (*MessageBuffer) GetSize

func (mb *MessageBuffer) GetSize() int

GetSize returns the current number of buffered messages

func (*MessageBuffer) GetUndeliveredMessages

func (mb *MessageBuffer) GetUndeliveredMessages(clientID string, lastAcknowledgedID string) []*BufferedMessage

GetUndeliveredMessages returns messages that haven't been delivered to a client

type TokenValidator

type TokenValidator func(token string) (bool, error)

TokenValidator is a function type for validating tokens/api-keys

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL