progress

package
v2.0.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

README

sse/progress — Publisher-driven async-progress SSE

progress.Tracker is the publisher-driven sibling of pkg/sse. Use it when an HTTP-triggered async operation has a definite terminal (success or failure) and the client wants to observe progress over an SSE stream that opens slightly before, during, or shortly after the job runs.

Scope — when to use this package

This package is designed for bounded async operations with a terminal, emitting monotonic snapshot events:

  • A trigger starts the work (typically an HTTP POST).
  • Work emits a sequence of progress events ending in Job.Finish.
  • Each event is a snapshot — later events subsume earlier ones (e.g. progress=70% carries everything progress=30% did; result={count: 10, items: [...]} carries everything result={count: 5, ...} did).
  • Subscribers observe events from any point in time; late subscribers see the latest cached event then live tail.

Memory cost is O(1) per Job — only the most recent payload is held. Retention drops the Job after the terminal window expires.

Cache-last vs full-log replay. Earlier revisions retained the full event log so late subscribers saw the complete history. The current design caches only the last event because the in-use cases (discovery progress, auto-tagging results) emit cumulative snapshots where intermediate events are redundant. Caching one event eliminates the replay-window event-drop race and bounds memory cost. Delta-style events — e.g. emitting "found device A" and "found device B" as separate events that don't include the cumulative list — are not supported; encode such state into a snapshot payload instead, or use pkg/sse.Manager with caller-side history.

Do NOT use this package for:

Scenario Use instead
High-volume event streams (MQTT bus relay, telemetry) pkg/sse.Manager
Subscriber-driven polling without a terminal (lists, dashboards, logs) pkg/sse.NewPolling
Delta-style events where every payload carries unique information rethink — either coalesce into snapshots, or use pkg/sse.Manager with caller-side history

Quick start

import (
    "github.com/IOTechSystems/go-mod-edge-utils/v2/pkg/sse/progress"
)

// At service startup, one Tracker per progress-style flow.
tracker := progress.New(serviceCtx, 30*time.Second, 30*time.Second, lc)

// On the POST that triggers the work:
job, isNew := tracker.Start("discovery/" + edgeInst + "/" + service)
if !isNew {
    // Coalesce: another trigger is already running on this topic; the
    // caller typically returns 200 to signal "joined existing".
    return ok200(c)
}

go func() {
    // Safety-net terminal in case work returns early without calling
    // Finish (idempotent — the happy-path Finish below wins if called).
    defer job.Finish(progressFailed(errors.New("job did not complete")))

    if err := doWork(ctx, job); err != nil {
        job.Finish(progressFailed(err))
        return
    }
    job.Finish(progressDone())
}()
return accepted202(c)

// On the GET that streams progress:
func (c controller) listen(ec echo.Context) error {
    err := tracker.Subscribe(ec, topic)
    if errors.Is(err, progress.ErrNoTopic) {
        return ec.JSON(http.StatusNotFound, ...)
    }
    return err
}

// doWork emits progress directly on the Job — no callback indirection.
func doWork(ctx context.Context, job *progress.Job) error {
    job.Publish(progressStarted())
    // ... work ...
    return nil
}

Two entry points: Start vs StartOrJoin

Both atomically return (*Job, isNew bool). The difference is what happens when the topic has a retained terminal entry:

Entry point Running entry Retained terminal entry No entry
Start Join (isNew=false) Discard and create fresh (isNew=true) Create (isNew=true)
StartOrJoin Join (isNew=false) Reuse retained (isNew=false) Create (isNew=true)

Selection rule:

  • Trigger-driven flows (user clicks "Start scan", HTTP POST) → Start. Each trigger conceptually starts a new run; the retained terminal from a previous run must not be observed as if it were the new run's result.
  • Subscribe-driven flows ("compute the tag-scan result once; many dashboards subscribe to it") → StartOrJoin. Late subscribers should see the cached terminal, not retrigger the work.

Both methods are atomic over Tracker.mu; concurrent callers on the same topic see exactly one isNew=true and the rest coalesce.

Lifecycle

        (no entry)
             |
             | Start / StartOrJoin (isNew=true)
             v
        +---------+
        | Running |  -- Publish (updates last + fans out latest-wins)
        +---------+
             |
             | Finish (terminal, idempotent)
             v
        +----------+
        | Retained |  -- late subscribers within retention see the
        +----------+     cached terminal and close.
             |
             | cleanup (retention expires OR tracker ctx cancelled)
             v
        (no entry)

Start on a retained entry creates a fresh Job, replacing the map entry. The previous Job's cleanup goroutine sees the identity check fail (jobs[topic] != prevJob) and leaves the new entry alone.

API

Symbol Role
New(ctx, retention, heartbeat, lc) Construct a Tracker; zero/negative durations use 30s defaults.
Tracker.Start(topic) Trigger-driven entry. Returns (*Job, isNew bool).
Tracker.StartOrJoin(topic) Subscriber-driven entry. Reuses retained. Returns (*Job, isNew bool).
Tracker.Subscribe(c, topic) Attach an SSE subscriber. Writes the cached last event then live tail. Returns ErrNoTopic if topic is unknown.
Job.Publish(data) Update cached last + fan out (latest-wins). No-op once terminal.
Job.Finish(data) Cache terminal + close subscribers + start retention. Idempotent.
ErrNoTopic Sentinel: no active or recent Job for topic.

Runtime requirement

This package (and the parent pkg/sse) requires the ResponseWriter chain to expose SetWriteDeadline via http.ResponseController — directly or through Unwrap(). It is the only standard way Go offers to bound a stuck Write on a slow/dead client.

Stock Echo + net/http meets this out of the box. Custom middleware that wraps the writer MUST forward Unwrap(). If the requirement is not met:

  • WriteSSEHeaders returns an error before flushing any bytes, so Subscribe exits with a proper 5xx instead of committing 200 and dying on the first event
  • The diagnostic Errorf log names the failed SetWriteDeadline capability so ops can locate the offending middleware

Fail-loud is intentional. Tolerating a missing deadline silently disables slow-client protection — a slow reader could otherwise keep a server-side goroutine blocked in fmt.Fprintf indefinitely, since no in-Go mechanism can cancel that write without SetWriteDeadline.

Tests use a fixture that implements SetWriteDeadline as a no-op (see flushableRecorder in subscribe_test.go).

Note: no current deployment of this package uses middleware that breaks ResponseController — every active call site sits on stock Echo. This section is recorded as a forward reference: if some future integration ever returns 5xx on its first SSE request, the diagnostic log here is the first thing to check, and the path forward is either to fix the offending middleware to forward Unwrap() or to introduce an explicit fallback (e.g. goroutine + select timeout) in pkg/sse/utils.go.

What this package does NOT do

  • Mutual exclusion across callsStart coalesces concurrent same-topic triggers, but cross-topic mutual exclusion (e.g. profile scan blocking discovery on the same edge instance) belongs in the caller. Use a per-resource lock alongside the tracker.
  • De-duplication of payloads — emit at whatever cadence your work loop generates. A subscriber may briefly observe the cached event echoed on the wire if a same-value Publish lands between attach and the live loop's first iteration; harmless for idempotent progress payloads.
  • Full event history replay — late subscribers receive only the latest cached event. Design events as monotonic snapshots (see "Scope" above). The previous full-log design was reverted because its replay window introduced an event-drop race with no benefit for snapshot-style payloads.

See also

  • Parent package: ../Manager / broadcaster / PollingService.
  • The internal lock model is Tracker.muJob.mu, never reversed. Publish / Finish touch only Job.mu; Start / StartOrJoin / cleanup touch only Tracker.mu for the brief map mutation (with isTerminal() on a retained entry the only Job.mu acquisition under Tracker.mu, matching the lock order). Cleanup goroutines wake on j.finished or tracker.ctx.Done(); deletion is gated by an identity check so a replacement Job survives an orphaned old Job's cleanup pass.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoTopic = errors.New("progress: no active or recent job for topic")

ErrNoTopic — no active or recently-retained Job for the topic. Callers typically map to HTTP 404.

Functions

This section is empty.

Types

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job is the publisher handle for one topic-bound async operation. Owned by the goroutine that observed isNew=true from Start / StartOrJoin; subscribers reach it only via Tracker.Subscribe.

func (*Job) Finish

func (j *Job) Finish(data any)

Finish marks the job done, caches the terminal in j.last, and closes subscribers (signal only — they read the terminal from j.last under j.mu). Idempotent: defer job.Finish(failed) as a safety-net coexists with an explicit happy-path Finish.

Success vs failure is encoded in the payload, not in two methods.

func (*Job) Publish

func (j *Job) Publish(data any)

Publish updates the cached last payload and fans it out to subscribers with latest-wins semantics. A no-op once terminal. A slow subscriber that has not drained its previous payload sees that payload replaced by this newer one — for monotonic snapshot events, the most recent state is the only state that matters.

type Tracker

type Tracker struct {
	// contains filtered or unexported fields
}

Tracker is a per-service registry of in-flight and recently-finished Jobs, keyed by topic. See README for the API and the Start vs StartOrJoin selection rule.

Tracker holds the caller-supplied ctx directly (no WithCancel wrap): its lifecycle matches the parent, so cancelling the parent wakes every watchAndCleanup goroutine and every in-flight Subscribe loop. If a future use case needs to shut down a Tracker independent of its parent, reintroduce a cancel field + Close() — it isn't needed today.

func New

func New(ctx context.Context, retention, heartbeat time.Duration, lc log.Logger) *Tracker

New constructs a Tracker bound to ctx. Zero or negative durations use 30s defaults. Cancelling ctx wakes all cleanup goroutines.

func (*Tracker) Start

func (t *Tracker) Start(topic string) (job *Job, isNew bool)

Start is the trigger-driven entry. Running entry → join; retained terminal → discard and create fresh; absent → create.

The retained-discard case is the load-bearing difference vs StartOrJoin: a new trigger after the previous run finished must produce a fresh run, not silently observe the prior terminal.

func (*Tracker) StartOrJoin

func (t *Tracker) StartOrJoin(topic string) (job *Job, isNew bool)

StartOrJoin is the subscriber-driven entry. Any existing entry — running or retained terminal — is reused; absent → create.

The retained-reuse case is the load-bearing difference vs Start: late subscribers on a compute-once flow observe the cached terminal instead of retriggering work.

func (*Tracker) Subscribe

func (t *Tracker) Subscribe(c echo.Context, topic string) error

Subscribe attaches an SSE subscriber to topic.

  • Running: write the latest cached event (if any), then live tail until terminal / client disconnect / Tracker shutdown.
  • Retained: write the cached terminal and close.
  • Absent: ErrNoTopic.

Late subscribers see only the most recent payload — callers must design events as monotonic snapshots (see package README).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL