concurrency

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package concurrency ships Harbor's runtime concurrency primitives — Phase 14 of the runtime kernel chain (RFC §6.1).

Two stateless helpers:

  • MapConcurrent: runs fn over each input envelope with at most maxConcurrency goroutines in flight; preserves output order.
  • JoinK: reads exactly K envelopes from a channel; cancels the remaining producers via ctx; short-read returns ErrJoinKShortRead.

Both are pure functions — no shared state, no compiled artifacts, trivially safe to call from N concurrent runs (D-025 N/A by construction).

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidConcurrency = errors.New("concurrency: maxConcurrency must be > 0")

ErrInvalidConcurrency — MapConcurrent was called with maxConcurrency <= 0. Wraps the offending value for operator visibility.

View Source
var ErrInvalidK = errors.New("concurrency: k must be > 0")

ErrInvalidK — JoinK was called with k <= 0.

View Source
var ErrJoinKShortRead = errors.New("concurrency: JoinK source closed before K envelopes arrived")

ErrJoinKShortRead — the input channel closed before K envelopes arrived. The slice returned alongside the error contains however many envelopes did arrive (may be empty).

Functions

func JoinK

func JoinK(ctx context.Context, in <-chan messages.Envelope, k int) ([]messages.Envelope, error)

JoinK reads exactly K envelopes from `in`. After K, derives ctx cancellation so upstream producers blocked on send observe the cancellation and exit. Returns the K envelopes (caller-owned slice).

If `in` closes before K arrive, returns ErrJoinKShortRead alongside the partial slice. If the caller's ctx cancels mid-read, returns ctx.Err() with the partial slice (which may be empty).

Cancellation note: JoinK derives a child ctx and returns it via the `cancel` parameter so the caller wires upstream producers to honor it. Callers who don't need upstream cancellation can ignore the derived ctx; the cancel function is invoked internally on the happy path.

nil channel is rejected at call time. K must be > 0.

func MapConcurrent

func MapConcurrent(
	ctx context.Context,
	in []messages.Envelope,
	fn func(context.Context, messages.Envelope) (messages.Envelope, error),
	maxConcurrency int,
) ([]messages.Envelope, error)

MapConcurrent runs fn over each envelope in `in` with at most maxConcurrency goroutines in flight. Output preserves input order. Returns the first error encountered; remaining work is cancelled.

Cancellation: the function derives a child ctx from the caller's ctx. On the first fn error, the child ctx is cancelled, so any in-flight fn invocations that honor ctx will exit promptly. The caller's ctx is never cancelled.

Order preservation: a pre-allocated output slice is indexed by input position; goroutines write to their own slot. No locks needed for the slice itself — distinct indices are written by distinct goroutines.

nil fn is rejected at call time. Empty input returns nil, nil.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL