realtime

package
v0.3.1-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RegisterRoutes

func RegisterRoutes(router *gin.Engine, deps *ServerDependencies, authMiddleware gin.HandlerFunc)

Types

type ProgressAdapter

type ProgressAdapter struct {
	// contains filtered or unexported fields
}

ProgressAdapter bridges worker.ProgressTracker updates to WebSocket broadcasts for API batch jobs. It provides thread-safe mapping between task IDs and file paths for real-time progress tracking.

Usage Example:

// Create adapter for a batch job
adapter := NewProgressAdapter(job.ID, job, nil) // nil uses global wsHub

// Create progress tracker with adapter's channel
progressTracker := worker.NewProgressTracker(adapter.GetChannel())

// Start adapter (non-blocking)
adapter.Start()
defer adapter.Stop() // Safe to call multiple times

// Register each task before submitting to worker pool
for i, filePath := range job.Files {
    taskID := fmt.Sprintf("batch-scrape-%s-%d", job.ID, i)
    adapter.RegisterTask(taskID, i, filePath)

    task := NewBatchScrapeTask(taskID, filePath, progressTracker, ...)
    pool.Submit(task)
}

// Progress updates are automatically broadcast to WebSocket clients

func NewProgressAdapter

func NewProgressAdapter(jobID string, job *worker.BatchJob, broadcaster ProgressBroadcaster) *ProgressAdapter

NewProgressAdapter creates a new progress adapter for a batch job. The adapter listens to progress updates and broadcasts them via WebSocket. If broadcaster is nil, the default runtime hub is used when available.

func (*ProgressAdapter) GetChannel

func (a *ProgressAdapter) GetChannel() chan<- worker.ProgressUpdate

GetChannel returns a write-only channel for ProgressTracker to send updates. This channel should be passed to worker.NewProgressTracker().

func (*ProgressAdapter) GetRegisteredTaskCount

func (a *ProgressAdapter) GetRegisteredTaskCount() int

GetRegisteredTaskCount returns the number of currently registered tasks. Useful for debugging and monitoring.

func (*ProgressAdapter) RegisterTask

func (a *ProgressAdapter) RegisterTask(taskID string, fileIndex int, filePath string)

RegisterTask maps a task ID to its corresponding file index and path. This mapping is used to correlate progress updates with specific files. Thread-safe for concurrent registration during task submission.

func (*ProgressAdapter) Start

func (a *ProgressAdapter) Start()

Start launches the adapter's update processing goroutine. It listens for progress updates and converts them to WebSocket messages. This method is non-blocking; use Stop() to shut down gracefully.

func (*ProgressAdapter) Stop

func (a *ProgressAdapter) Stop()

Stop signals the adapter to shut down and waits for the processing goroutine to finish. This ensures graceful cleanup of resources. Stop is idempotent and safe to call multiple times.

func (*ProgressAdapter) UnregisterTask

func (a *ProgressAdapter) UnregisterTask(taskID string)

UnregisterTask removes a task from the mapping (optional cleanup). Not strictly necessary as the adapter is typically short-lived per job, but provided for completeness and memory efficiency in long-running jobs.

type ProgressBroadcaster

type ProgressBroadcaster interface {
	BroadcastProgress(msg *websocket.ProgressMessage) error
}

ProgressBroadcaster is an interface for broadcasting progress messages. This interface allows for dependency injection and testing with mocks.

type ServerDependencies

type ServerDependencies = core.ServerDependencies

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL