Documentation
¶
Overview ¶
Package takt provides an abstract proactor runner for code.hybscloud.com/kont computations.
Responsibility remains split by layer:
- code.hybscloud.com/iox classifies outcome/progress evidence
- code.hybscloud.com/kont defines suspension and resumption shape
- code.hybscloud.com/cove carries explicit context across suspension boundaries
- takt advances and resubmits suspended computations
takt dispatches suspended operations through Dispatcher and uses `iox` classification at the execution boundary instead of redefining it.
Primary Operational Surface ¶
- Stepping: AdvanceSuspension and Advance dispatch one observed suspension at a time
- Event loop: Loop drives computations through a Backend (submit/poll)
- Stream loop: SubscriptionLoop drives route-indexed stream observations through a SubscriptionBackend
Convenience Surface ¶
- Blocking: Exec/ExecExpr wait on ErrWouldBlock with adaptive backoff
- Error-aware blocking: ExecError/ExecErrorExpr return code.hybscloud.com/kont.Either
- Stepping with errors: StepError/AdvanceError preserve the code.hybscloud.com/kont.Either result at each step
- Contextual stepping: AdvanceSuspension applies the same one-operation movement law to contextual suspension carriers such as code.hybscloud.com/cove.SuspensionView without making takt own their context
- Bridge helpers: Step reuses code.hybscloud.com/kont.StepExpr; Reify and Reflect re-export the `kont` conversions so callers do not need a second import
- Lifecycle: Loop.Failed, Loop.Drain, SubscriptionLoop.Failed, SubscriptionLoop.Drain, ErrDisposed, ErrLiveTokenReuse, ErrLiveRouteReuse, and ErrUnsupportedMultishot expose terminal fatal states
iox Classification ¶
- nil: completed
- code.hybscloud.com/iox.ErrMore: progress with a live frontier
- code.hybscloud.com/iox.ErrWouldBlock: no progress, retry later
- failure: infrastructure error
Event-loop backends report infrastructure poll failures directly from [Backend.Poll]. Loop.Poll and Loop.Run keep poll failures separate from completion classification: poll-level code.hybscloud.com/iox.ErrWouldBlock is treated as idle, while completion-level code.hybscloud.com/iox.ErrWouldBlock triggers a fresh submission for the same suspension without marking that tick as progress. [Backend.Submit] must return a token that is unique among all submissions still live in the loop; if a backend reuses a live token, Loop.Poll and Loop.Run surface ErrLiveTokenReuse after draining every pending suspension exactly once. Loop.Poll and Loop.Run return ErrUnsupportedMultishot for completion-level code.hybscloud.com/iox.ErrMore: a completion was received from the backend, but its [Completion.Value] is not resumed into the suspended operation because the submitted backend operation remains active and may produce later same-token completions. Generic Loop has no subscription or cancel carrier for that still-live operation, so multishot stream ownership belongs in a concrete layer above takt.
SubscriptionLoop is the abstract stream counterpart to Loop. It uses a RouteID made from a Token and generation, returns opaque Subscription handles, and polls StreamCompletion values. [StreamCompletion.More] is route-liveness evidence: when it is true, SubscriptionLoop.Poll emits a non-final StreamEvent and keeps the route live; when it is false, Poll emits a final event and retires the route. [StreamCompletion.HasValue] and [StreamCompletion.EventErr] describe payload evidence at that boundary and are independent of More, so an empty live boundary and a payload error on a still-live route are both representable without collapsing them into terminal success or failure. Poll-level code.hybscloud.com/iox.ErrWouldBlock remains idle/no mutation. A SubscriptionBackend must not return ready stream completions together with a non-nil poll-level error; payload errors belong in [StreamCompletion.EventErr]. Stale completions for already retired routes are ignored.
CompletionMemory supplies the Completion slice that a Loop passes to [Backend.Poll]. Use NewLoop with functional options: WithMemory installs a custom provider, WithMaxCompletions caps the visible slice length, HeapMemory is the default, and BoundedMemory provides a single bounded pool of default-sized 128 KiB slabs. Loop.Drain releases that slice exactly once through [CompletionMemory.Release]. Custom providers must hand each live loop an exclusive non-overlapping slab and may treat Release as ownership transfer back to the provider.
Error Handling ¶
ExecError/ExecErrorExpr/StepError/AdvanceError combine Dispatcher with `kont` Error effects. Error operations run before dispatcher operations. Results are code.hybscloud.com/kont.Either: Right on success, Left on Throw.
code.hybscloud.com/cove.SuspensionView already satisfies SuspensionLike through its `Op` and `Resume` methods, so it can be passed directly to AdvanceSuspension.
Stepping behavior ¶
Each AdvanceSuspension call handles exactly one suspended operation. If resuming that operation produces another suspension, the caller receives it back and decides how to continue.
AdvanceSuspension resumes on code.hybscloud.com/iox.ErrMore because `iox` classifies ErrMore as progress with a live frontier. It returns the original suspension unchanged on code.hybscloud.com/iox.ErrWouldBlock or ordinary failure.
Loop stores pending `*kont.Suspension` frontiers produced by code.hybscloud.com/kont.StepExpr (or by reifying code.hybscloud.com/kont.Eff into Expr form first). This gives the backend one pending suspension per token while that submission remains live.
Loop instances are single-owner runners: callers must serialize SubmitExpr, Submit, Poll, Run, Drain, Pending, and Failed calls on the same loop. The internal pending map uses sharding for token lookup, not as a public concurrent-use guarantee.
Execution styles ¶
For deterministic dispatchers, Exec, a loop built from Step/Advance, and Loop.Run produce the same externally visible results. Choose the style that fits your integration: blocking execution, manual stepping, or a backend-driven event loop.
Index ¶
- Constants
- Variables
- func Advance[D Dispatcher[D], R any](d D, susp *kont.Suspension[R]) (R, *kont.Suspension[R], error)
- func AdvanceError[E any, D Dispatcher[D], R any](d D, susp *kont.Suspension[kont.Either[E, R]]) (kont.Either[E, R], *kont.Suspension[kont.Either[E, R]], error)
- func AdvanceSuspension[D Dispatcher[D], S SuspensionLike[S, R], R any](d D, susp S) (R, S, error)
- func Exec[D Dispatcher[D], R any](d D, m kont.Eff[R]) R
- func ExecError[E any, D Dispatcher[D], R any](d D, m kont.Eff[R]) kont.Either[E, R]
- func ExecErrorExpr[E any, D Dispatcher[D], R any](d D, m kont.Expr[R]) kont.Either[E, R]
- func ExecExpr[D Dispatcher[D], R any](d D, m kont.Expr[R]) R
- func Reflect[A any](m kont.Expr[A]) kont.Eff[A]
- func Reify[A any](m kont.Eff[A]) kont.Expr[A]
- func Step[R any](m kont.Expr[R]) (R, *kont.Suspension[R])
- func StepError[E, R any](m kont.Expr[R]) (kont.Either[E, R], *kont.Suspension[kont.Either[E, R]])
- type Backend
- type BoundedMemory
- type BoundedMemoryOption
- type Completion
- type CompletionBufOption
- type CompletionMemory
- type Dispatcher
- type HeapMemory
- type Loop
- func (l *Loop[B, R]) Drain() int
- func (l *Loop[B, R]) Failed() error
- func (l *Loop[B, R]) Pending() int
- func (l *Loop[B, R]) Poll() ([]R, error)
- func (l *Loop[B, R]) Run() ([]R, error)
- func (l *Loop[B, R]) Submit(m kont.Eff[R]) (R, bool, error)
- func (l *Loop[B, R]) SubmitExpr(m kont.Expr[R]) (R, bool, error)
- type Option
- type RouteID
- type StreamCompletion
- type StreamEvent
- type Subscription
- type SubscriptionBackend
- type SubscriptionLoop
- func (l *SubscriptionLoop[B, A]) Cancel(sub Subscription[A]) error
- func (l *SubscriptionLoop[B, A]) Drain() int
- func (l *SubscriptionLoop[B, A]) Failed() error
- func (l *SubscriptionLoop[B, A]) Pending() int
- func (l *SubscriptionLoop[B, A]) Poll() ([]StreamEvent[A], error)
- func (l *SubscriptionLoop[B, A]) Subscribe(op kont.Operation) (Subscription[A], error)
- type SubscriptionOption
- type SuspensionLike
- type Token
Constants ¶
const DefaultCompletionBufBytes = iobuf.BufferSizeLarge
DefaultCompletionBufBytes anchors the default completion slab byte budget to iobuf's canonical "register buffer" size class (iobuf.BufferSizeLarge, 128 KiB).
const DefaultCompletionBufSize = defaultCompletionBufSize
DefaultCompletionBufSize is the number of Completion entries that fit in one DefaultCompletionBufBytes block. This is the slab length HeapMemory returns from HeapMemory.CompletionBuf when no WithSize option is supplied, and the implicit NewLoop choice when WithMaxCompletions is not given.
const DefaultPoolCapacity = 8
DefaultPoolCapacity is the default bounded-pool capacity used by BoundedMemory when WithPoolCapacity is not supplied. `iobuf` rounds the configured capacity up to the next power of two. With DefaultCompletionBufBytes set to iobuf.BufferSizeLarge, the default bounded pool keeps a contiguous 1 MiB backing block of 8 slabs.
Variables ¶
var ErrDisposed = errors.New("takt: loop disposed")
ErrDisposed reports that a loop has been explicitly disposed via Loop.Drain or SubscriptionLoop.Drain and can no longer accept submissions or produce completions.
var ErrInvalidRouteID = errors.New("takt: backend returned an invalid route id")
ErrInvalidRouteID reports that a SubscriptionBackend returned the reserved zero RouteID.
var ErrLiveRouteReuse = errors.New("takt: backend reused a live route")
ErrLiveRouteReuse reports that a SubscriptionBackend reused a route that is still live in the current SubscriptionLoop.
var ErrLiveTokenReuse = errors.New("takt: backend reused a live token")
ErrLiveTokenReuse reports that a Backend reused a token that is still live in the current Loop. Reusing a live token would alias two pending frontiers under one correlation key and break token-to-suspension tracking.
var ErrUnknownSubscription = errors.New("takt: unknown subscription")
ErrUnknownSubscription reports that a Subscription is not live in the current SubscriptionLoop.
var ErrUnsupportedMultishot = errors.New("takt: unsupported multishot completion")
ErrUnsupportedMultishot reports that a backend produced a multishot completion for Loop. iox.ErrMore means the submitted backend operation remains active after the CQE; generic Loop tracks affine one-shot suspensions and has no subscription or cancel carrier for later same-token completions.
Functions ¶
func Advance ¶
func Advance[D Dispatcher[D], R any](d D, susp *kont.Suspension[R]) (R, *kont.Suspension[R], error)
Advance dispatches one suspended operation via a Dispatcher. Use AdvanceSuspension when the suspension also carries external context. A nil dispatch error and iox.ErrMore resume the suspension; iox.ErrWouldBlock and ordinary failure return it unchanged.
func AdvanceError ¶
func AdvanceError[E any, D Dispatcher[D], R any](d D, susp *kont.Suspension[kont.Either[E, R]]) (kont.Either[E, R], *kont.Suspension[kont.Either[E, R]], error)
AdvanceError dispatches the suspended operation. Throw discards the suspension and returns Left; other operations go through the dispatcher with the usual `iox` classification. A nil dispatch error and iox.ErrMore resume the suspension; iox.ErrWouldBlock and ordinary failure return it unchanged.
func AdvanceSuspension ¶ added in v0.1.2
func AdvanceSuspension[D Dispatcher[D], S SuspensionLike[S, R], R any](d D, susp S) (R, S, error)
AdvanceSuspension dispatches one suspended operation through a Dispatcher. Exec blocks over the same operation sequence, while Loop drives it through submit/poll calls. A nil dispatch error and iox.ErrMore resume the suspension and return the next frontier. iox.ErrWouldBlock and ordinary failure return the original suspension value unchanged.
func Exec ¶
func Exec[D Dispatcher[D], R any](d D, m kont.Eff[R]) R
Exec runs a kont.Eff computation to completion via a Dispatcher.
func ExecError ¶
ExecError runs a kont.Eff computation with error handling. It returns Right on success and Left on Throw.
func ExecErrorExpr ¶
ExecErrorExpr is the kont.Expr form of ExecError. It waits on iox.ErrWouldBlock via adaptive backoff.
func ExecExpr ¶
func ExecExpr[D Dispatcher[D], R any](d D, m kont.Expr[R]) R
ExecExpr runs a kont.Expr computation to completion via a Dispatcher.
func Reflect ¶
Reflect converts kont.Expr to kont.Eff. Prefer package-owned runners such as Exec, Advance, or Loop when executing the result.
func Reify ¶
Reify converts kont.Eff to kont.Expr. Prefer package-owned runners such as ExecExpr, Advance, or Loop when executing the result.
func Step ¶
func Step[R any](m kont.Expr[R]) (R, *kont.Suspension[R])
Step evaluates until the first effect suspension. Prefer AdvanceSuspension or Advance once a suspension has already been observed.
Types ¶
type Backend ¶
type Backend[B Backend[B]] interface { // Submit sends an operation and returns a correlation token. // Returned tokens must be unique among all submissions that are still live in // the [Loop]; once a submission has retired, the backend may reuse its token. // Tokens are correlation keys, not sequence numbers, so a concrete backend may // use a kernel user_data value directly. Submit(op kont.Operation) (Token, error) // Poll writes ready completions into completions and reports any // infrastructure wait failure. Per-completion outcomes are reported in // [Completion.Err]; poll-level errors are reserved for failures of the // polling operation itself. Poll(completions []Completion) (int, error) }
Backend is the interface for asynchronous submit/poll execution. Poll reports whether the poll call itself succeeded separately from which token-correlated completions were written into the provided buffer. A backend must not return n > 0 and err != nil together; Loop handles poll errors separately from the per-completion outcome recorded in each Completion.
type BoundedMemory ¶ added in v0.1.2
type BoundedMemory struct {
// contains filtered or unexported fields
}
BoundedMemory is an `iobuf`-backed CompletionMemory provider for steady-state workloads.
It keeps a single bounded pool of default-sized 128 KiB slabs backed by iobuf's lock-free MPMC pool (iobuf.BoundedPool). Requests up to DefaultCompletionBufSize reuse the default slab shape; larger requests fall back to fresh exact-size allocations. Overflow slabs are discarded in [Release] so the bounded memory budget stays anchored to the steady-state default slab.
func NewBoundedMemory ¶ added in v0.1.2
func NewBoundedMemory(opts ...BoundedMemoryOption) *BoundedMemory
NewBoundedMemory constructs a BoundedMemory with the given options. The zero value `&BoundedMemory{}` is also valid and equivalent to `NewBoundedMemory()`; the single bounded default-slab pool defaults to DefaultPoolCapacity.
func (*BoundedMemory) CompletionBuf ¶ added in v0.1.2
func (b *BoundedMemory) CompletionBuf(opts ...CompletionBufOption) []Completion
CompletionBuf returns a Completion slab of length n (defaulting to DefaultCompletionBufSize when WithSize is omitted). Requests up to the default size reuse a bounded pool of default-sized slabs and are sliced back to the requested length; larger requests allocate exact-size overflow slabs.
func (*BoundedMemory) Release ¶ added in v0.1.2
func (b *BoundedMemory) Release(buf []Completion)
Release returns a default-sized slab to the bounded pool when it was originally allocated by this BoundedMemory; exact-size overflow slabs are dropped.
type BoundedMemoryOption ¶ added in v0.1.2
type BoundedMemoryOption interface {
// contains filtered or unexported methods
}
BoundedMemoryOption configures a BoundedMemory at construction.
func WithPoolCapacity ¶ added in v0.1.2
func WithPoolCapacity(n int) BoundedMemoryOption
WithPoolCapacity sets the bounded-pool capacity for BoundedMemory's single default-sized slab pool. The capacity is rounded up to the next power of two by iobuf. Panics if n < 1.
type Completion ¶
Completion carries token-correlated backend evidence together with an `iox` outcome. Value is valid resumption input for the correlated suspension even when Err reports an infrastructure failure.
type CompletionBufOption ¶ added in v0.1.2
type CompletionBufOption interface {
// contains filtered or unexported methods
}
CompletionBufOption tunes a single [CompletionMemory.CompletionBuf] call.
func WithSize ¶ added in v0.1.2
func WithSize(n int) CompletionBufOption
WithSize requests an explicit completion-slab length. Providers may round up to an internal storage boundary; the returned slice still satisfies `len >= n`. Panics if n <= 0; omit WithSize to request the provider's default slab length.
type CompletionMemory ¶ added in v0.1.2
type CompletionMemory interface {
CompletionBuf(opts ...CompletionBufOption) []Completion
Release(buf []Completion)
}
CompletionMemory provides the Completion buffer used by a Loop.
It is consulted twice per Loop lifetime: once at construction ([CompletionMemory.CompletionBuf]) and once at termination ([CompletionMemory.Release]). It is a loop-level resource rather than a per-dispatch callback, so implementations can choose any allocation strategy that fits their workload.
CompletionMemory only manages storage for observed completions. It does not change Backend, Token, Completion, or the outcome semantics reported by the backend.
takt ships with two built-ins: HeapMemory (the implicit default of NewLoop) and BoundedMemory (a single bounded pool of default-sized 128 KiB slabs). Custom providers may implement CompletionMemory directly.
Requirements:
- when WithSize is supplied, CompletionBuf must return at least that many slots; Loop trims the visible length back to the requested size so WithMaxCompletions remains a true cap.
- when no size hint is supplied, CompletionBuf must return a non-empty default slab.
- live slabs must not overlap: a provider must not hand out aliased storage to two loops (or two live CompletionBuf calls) at the same time.
The slice passed to Release must have been obtained from CompletionBuf on the same CompletionMemory. Release transfers ownership of that slab back to the provider: the caller must stop reading, mutating, or retaining aliases to buf after Release returns.
type Dispatcher ¶
Dispatcher is the non-blocking execution boundary for one suspended operation. Dispatch returns (value, nil) on completion, (value, iox.ErrMore) when progress was made and the operation reports a live frontier, (nil, iox.ErrWouldBlock) when no progress is currently possible, or (nil, error) on infrastructure failure.
type HeapMemory ¶ added in v0.1.2
type HeapMemory struct {
// contains filtered or unexported fields
}
HeapMemory is the default CompletionMemory: a sync.Pool-backed cache of default-sized typed Completion slabs. Slabs of non-default length bypass the pool and are freshly allocated each call.
Pass `&takt.HeapMemory{}` (or the address of an existing HeapMemory variable) to WithMemory so the underlying pool is shared across Loop instances. Copying a HeapMemory value copies the embedded sync.Pool state and therefore does not share recycled slabs.
func (*HeapMemory) CompletionBuf ¶ added in v0.1.2
func (h *HeapMemory) CompletionBuf(opts ...CompletionBufOption) []Completion
CompletionBuf returns a Completion slab of the requested length. The default length is DefaultCompletionBufSize; only default-sized slabs participate in the internal sync.Pool.
func (*HeapMemory) Release ¶ added in v0.1.2
func (h *HeapMemory) Release(buf []Completion)
Release returns the slab to the internal pool when its capacity matches DefaultCompletionBufSize; non-default slabs are left for the GC to reclaim.
type Loop ¶
Loop drives Expr computations through a Backend. It owns pending `*kont.Suspension` frontiers keyed by Token, so one live token refers to at most one pending suspension at a time.
Loop is not safe for concurrent use. Callers must serialize methods that share a Loop instance, including SubmitExpr, Submit, Poll, Run, Drain, Pending, and Failed.
func NewLoop ¶
NewLoop creates an event loop with the given backend. Behavior is configured via functional [Option]s:
- WithMemory installs a custom CompletionMemory provider; the default is a fresh HeapMemory (sync.Pool-backed default-sized slabs).
- WithMaxCompletions caps the completion slab length per poll; omitting it lets the CompletionMemory provider choose (typically DefaultCompletionBufSize).
Use BoundedMemory via WithMemory when completion buffers should come from a single bounded pool of default-sized 128 KiB slabs. Custom providers may implement CompletionMemory for any allocation strategy without widening the Backend or Completion contracts.
func (*Loop[B, R]) Drain ¶ added in v0.1.2
Drain transitions the loop into a terminal disposed state and discards every pending suspension exactly once. Drain is idempotent and safe to call after a fatal failure has already been recorded; it preserves the existing fatal error if one is set, otherwise it records ErrDisposed so subsequent SubmitExpr, Submit, and Poll calls fail fast. Drain also returns the completion slab to its CompletionMemory provider exactly once. It returns the number of suspensions drained by this call.
func (*Loop[B, R]) Failed ¶ added in v0.1.2
Failed returns the loop's terminal fatal error, if any. A non-nil result means every subsequent SubmitExpr, Submit, Poll, or Run call returns the same error and all previously pending suspensions have been discarded exactly once.
func (*Loop[B, R]) Poll ¶
Poll dispatches ready completions. A resumed suspension either completes or is submitted again under a fresh token. Multishot completions (`iox.ErrMore`) are rejected because the same backend submission remains active after the CQE, while Loop only tracks one affine suspension per submitted operation. Concrete runtimes that need multishot streams should own a subscription or cancel carrier above this generic Loop.
Returns completed results when one or more computations finish in the current poll tick, a zero-length non-nil slice when work advanced without producing a completed computation, (nil, nil) when idle, or (nil, err) on infrastructure failure. Poll never returns both results and a non-nil error; if a fatal failure arrives in the same tick as ready results, the failure is reported on the next call.
The returned []R aliases an internal buffer that is reused across Poll/Run calls; callers that need to retain the slice past the next Poll/Run must copy it.
When the loop enters a fatal state, whether from an infrastructure poll failure or a completion-driven failure such as ErrUnsupportedMultishot or ErrLiveTokenReuse, it discards every pending suspension exactly once before returning.
func (*Loop[B, R]) Run ¶
Run polls until every pending operation completes or the loop transitions to a fatal state. The returned []R is freshly allocated and owned by the caller, so it does not alias any internal buffer (unlike Loop.Poll). On a fatal failure Run returns the partial results accumulated so far together with the fatal error; subsequent SubmitExpr/Submit/Poll/Run calls return the same stored error.
func (*Loop[B, R]) SubmitExpr ¶
SubmitExpr steps a kont.Expr computation and transfers its first suspension to the backend. The loop stores the resulting frontier under one token, so later completions are expected to continue that same affine suspension lineage.
type Option ¶ added in v0.1.2
type Option interface {
// contains filtered or unexported methods
}
Option configures a Loop at construction. Pass options to NewLoop.
Available options:
- WithMemory supplies a custom CompletionMemory provider (defaults to a fresh HeapMemory).
- WithMaxCompletions caps the per-poll completion slab length (defaults to the CompletionMemory provider's own choice; see DefaultCompletionBufSize for the HeapMemory and BoundedMemory default size).
func WithMaxCompletions ¶ added in v0.1.2
WithMaxCompletions caps the per-poll completion slab length. If omitted, the CompletionMemory provider chooses the size (typically DefaultCompletionBufSize). Panics if n <= 0.
func WithMemory ¶ added in v0.1.2
func WithMemory(m CompletionMemory) Option
WithMemory installs a custom CompletionMemory provider on the Loop. The provider retains ownership of the slab's lifetime: Loop.Drain calls [CompletionMemory.Release] exactly once.
type RouteID ¶ added in v0.2.0
type RouteID struct {
// contains filtered or unexported fields
}
RouteID identifies one stream/subscription route. It pairs a backend token with an explicit generation so a backend may reuse a token after finalization without aliasing a previous live route.
func NewRouteID ¶ added in v0.2.0
NewRouteID constructs a route identifier for SubscriptionBackend implementations. The zero RouteID is reserved and must not be returned by a backend.
func (RouteID) Generation ¶ added in v0.2.0
Generation returns the generation component of id.
type StreamCompletion ¶ added in v0.2.0
StreamCompletion carries backend evidence for one stream route.
More is route-liveness evidence: when true, the same route remains active after this observation. HasValue and EventErr describe payload evidence at this boundary and are independent of More. EventErr is payload evidence, not route control; use More to report the live frontier.
func (StreamCompletion[A]) RouteOutcome ¶ added in v0.2.0
func (c StreamCompletion[A]) RouteOutcome() iox.Outcome
RouteOutcome projects the route-level completion onto iox's endpoint outcome vocabulary. The projection forgets route identity and successor ownership: More maps to iox.OutcomeMore, EventErr maps to iox.OutcomeFailure, and all other observations map to iox.OutcomeOK.
type StreamEvent ¶ added in v0.2.0
type StreamEvent[A any] struct { Subscription Subscription[A] Value A HasValue bool Final bool EventErr error }
StreamEvent is the user-visible event emitted by SubscriptionLoop.Poll. Final reports that the route has retired after this observation.
type Subscription ¶ added in v0.2.0
type Subscription[A any] struct { // contains filtered or unexported fields }
Subscription is an opaque handle for one abstract stream route. It carries route identity only; payload delivery and finalization are observed through SubscriptionLoop.Poll.
func (Subscription[A]) ID ¶ added in v0.2.0
func (s Subscription[A]) ID() RouteID
ID returns the route identifier associated with the subscription.
func (Subscription[A]) IsZero ¶ added in v0.2.0
func (s Subscription[A]) IsZero() bool
IsZero reports whether s is the zero, non-live subscription handle.
type SubscriptionBackend ¶ added in v0.2.0
type SubscriptionBackend[B SubscriptionBackend[B, A], A any] interface { // Subscribe starts op as a route-producing operation and returns its route // identity. Returned routes must be unique among routes still live in the // [SubscriptionLoop], and the zero [RouteID] is invalid. Subscribe(op kont.Operation) (RouteID, error) // Poll writes ready stream completions into completions. Poll-level errors // report failures of the polling operation itself; per-route payload errors // are reported in [StreamCompletion.EventErr]. A backend must not return // n > 0 and err != nil together; [SubscriptionLoop] handles poll errors // separately from route completions. A poll-level [iox.ErrWouldBlock] means // the stream loop is idle. Poll(completions []StreamCompletion[A]) (int, error) // Cancel requests cancellation of the live route. A successful Cancel moves // the route into a canceling state; the route is retired by a later terminal // completion, by Drain, or by a fatal loop transition. Cancel(id RouteID) error }
SubscriptionBackend is the abstract backend contract for route-indexed stream/subscription execution.
type SubscriptionLoop ¶ added in v0.2.0
type SubscriptionLoop[B SubscriptionBackend[B, A], A any] struct { // contains filtered or unexported fields }
SubscriptionLoop drives route-indexed stream completions through a SubscriptionBackend.
SubscriptionLoop is separate from Loop: Loop owns one-shot suspended computations, while SubscriptionLoop owns same-route successor observations. A StreamCompletion with More set emits a non-final StreamEvent and keeps route ownership live; a completion without More emits a final event and retires the route.
SubscriptionLoop is not safe for concurrent use. Callers must serialize Subscribe, Poll, Cancel, Drain, Pending, and Failed calls on one loop.
func NewSubscriptionLoop ¶ added in v0.2.0
func NewSubscriptionLoop[B SubscriptionBackend[B, A], A any](b B, opts ...SubscriptionOption) *SubscriptionLoop[B, A]
NewSubscriptionLoop creates a route-indexed stream runner over b. Use WithMaxStreamCompletions to cap the per-poll completion buffer length; omitted options use DefaultCompletionBufSize.
func (*SubscriptionLoop[B, A]) Cancel ¶ added in v0.2.0
func (l *SubscriptionLoop[B, A]) Cancel(sub Subscription[A]) error
Cancel requests cancellation of sub. A successful cancel request preserves the route until a terminal completion, Drain, or fatal loop transition retires it. Cancel returns ErrUnknownSubscription for a zero or non-live subscription handle.
func (*SubscriptionLoop[B, A]) Drain ¶ added in v0.2.0
func (l *SubscriptionLoop[B, A]) Drain() int
Drain transitions the stream loop into a disposed state and retires every owned route. Active routes receive one cancel request; routes already in the canceling state are retired without a duplicate cancel request. Drain is idempotent and returns the number of routes retired by this call.
func (*SubscriptionLoop[B, A]) Failed ¶ added in v0.2.0
func (l *SubscriptionLoop[B, A]) Failed() error
Failed returns the loop's terminal fatal error, if any.
func (*SubscriptionLoop[B, A]) Pending ¶ added in v0.2.0
func (l *SubscriptionLoop[B, A]) Pending() int
Pending returns the count of live stream routes.
func (*SubscriptionLoop[B, A]) Poll ¶ added in v0.2.0
func (l *SubscriptionLoop[B, A]) Poll() ([]StreamEvent[A], error)
Poll emits ready stream events.
Poll-level iox.ErrWouldBlock is treated as an idle tick. The backend poll contract excludes ready completions paired with a non-nil poll error. Unknown-route completions are stale observations and are ignored. The returned slice aliases an internal buffer that is reused on the next Poll call.
func (*SubscriptionLoop[B, A]) Subscribe ¶ added in v0.2.0
func (l *SubscriptionLoop[B, A]) Subscribe(op kont.Operation) (Subscription[A], error)
Subscribe starts op through the backend and stores the returned route. It returns ErrInvalidRouteID when the backend returns the reserved zero route and ErrLiveRouteReuse when the backend aliases a currently live route. Either condition is fatal to the loop.
type SubscriptionOption ¶ added in v0.2.0
type SubscriptionOption interface {
// contains filtered or unexported methods
}
SubscriptionOption configures a SubscriptionLoop at construction.
func WithMaxStreamCompletions ¶ added in v0.2.0
func WithMaxStreamCompletions(n int) SubscriptionOption
WithMaxStreamCompletions caps the per-poll stream completion buffer length. If omitted, DefaultCompletionBufSize is used. Panics if n <= 0.
type SuspensionLike ¶ added in v0.1.2
type SuspensionLike[S any, R any] interface { Op() kont.Operation Resume(value kont.Resumed) (R, S) }
SuspensionLike is any resumable value that exposes one pending operation and can accept the corresponding resumption while preserving its outer type. Packages such as `cove` can use AdvanceSuspension by providing `Op` and `Resume` methods without requiring extra adapters in `takt`.