takt

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: MIT Imports: 8 Imported by: 0

README

Go Reference Go Report Card Coverage Status

English | 简体中文 | Español | 日本語 | Français

takt

An abstract completion-driven dispatch engine for non-blocking I/O stacks.

Overview

In a proactor model, I/O operations are submitted to the kernel and their completions arrive asynchronously. The application must correlate each completion back to the computation that requested it, resume that computation, and handle success, progress with a live frontier, no-progress readiness/backpressure, and failure.

takt provides this execution model as an abstract layer over the kont effect system. A Dispatcher evaluates one algebraic effect at a time, classifying the result according to the iox outcome algebra. A Backend submits operations to an asynchronous engine, for example io_uring, and polls for completions. The Loop ties them together: it submits computations, polls the backend, correlates completions by token, and resumes suspended continuations.

Two equivalent APIs are available: kont.Eff (closure-based, straightforward to compose) and kont.Expr (frame-based, with lower allocation overhead on hot paths).

The event-loop path stores one pending suspension per live token. Each token tracks a suspension produced by kont.StepExpr (or by reifying kont.Eff first), so Backend.Submit must not reuse a token while the older submission carrying it remains live in the loop.

For stream or multishot-style integrations, SubscriptionLoop provides a separate abstract route runner. It tracks Subscription handles by RouteID (Token plus generation), polls StreamCompletion values, emits StreamEvent observations, and keeps More as route-liveness evidence independent of payload value or payload error. This keeps generic Loop affine and one-shot while still giving concrete runtimes a principled place to represent same-operation successor observations.

Composition Boundary

takt owns execution movement, not context meaning or outcome vocabulary. iox classifies nil, ErrWouldBlock, ErrMore, and failure; kont owns the suspension/resumption carrier; cove may wrap a suspension with explicit context through SuspensionView; takt advances any value that satisfies SuspensionLike without interpreting that context.

Installation

go get code.hybscloud.com/takt

Requires Go 1.26 or later.

Outcome Classification

Each dispatched operation yields an iox outcome. Dispatcher.Dispatch reports that outcome as (value, error); blocking runners and the stepping API interpret it as follows:

Outcome Meaning Dispatcher return Blocking / stepping behavior
nil completed (value, nil) resume
ErrMore progress with a live frontier (value, ErrMore) resume; stepping returns ErrMore
ErrWouldBlock no progress (nil, ErrWouldBlock) blocking waits; stepping returns the suspension
other infrastructure failure (nil, error) blocking panics; stepping returns the error

Usage

Dispatcher

A Dispatcher maps each algebraic effect to a concrete I/O operation and returns the result together with an iox outcome.

type myDispatcher struct{ /* ... */ }

func (d *myDispatcher) Dispatch(op kont.Operation) (kont.Resumed, error) {
	// dispatch op; return (value, nil), (value, iox.ErrMore), or (nil, iox.ErrWouldBlock)
}
Blocking Evaluation

Exec and ExecExpr run a computation to completion, synchronously waiting when the dispatcher yields iox.ErrWouldBlock.

result := takt.Exec(d, computation) // kont.Eff
result := takt.ExecExpr(d, exprComputation) // kont.Expr
Stepping

For proactor event loops, for example io_uring, Step and Advance evaluate one effect at a time. When the dispatcher yields iox.ErrWouldBlock, the suspension is returned to the caller so the event loop can reschedule it.

The manual stepping outcome law is explicit: if d.Dispatch(susp.Op()) returns nil or iox.ErrMore, Advance resumes the suspension with the returned value and returns the next suspension plus the original error; if it returns iox.ErrWouldBlock or an ordinary failure, Advance returns the zero result, the original suspension, and that error. ErrMore is therefore a progress signal in manual stepping, while completion-level ErrMore remains unsupported by generic Loop because the backend operation is still live.

result, susp := takt.Step[int](exprComputation)
if susp != nil {
	var err error
	result, susp, err = takt.Advance(d, susp)
	if iox.IsWouldBlock(err) {
		return susp // yield to the event loop and reschedule when ready
	}
}
// result holds the final value
Error Handling

Compose dispatcher operations with error effects. Throw short-circuits the computation eagerly and discards the pending suspension.

either := takt.ExecError[string](d, computation)
// Right on success, Left on Throw

// Stepping with errors
either, susp := takt.StepError[string, int](exprComputation)
if susp != nil {
	var err error
	either, susp, err = takt.AdvanceError[string](d, susp)
	if iox.IsWouldBlock(err) {
		return susp // yield to the event loop and reschedule when ready
	}
}
Event Loop

A Loop drives computations through a Backend. It submits operations, polls for completions, correlates them by Token, and resumes suspended continuations. NewLoop accepts functional Options. WithMaxCompletions(n) panics when n <= 0 with takt: WithMaxCompletions requires n > 0; WithMemory(nil) panics with takt: WithMemory requires a non-nil CompletionMemory.

Backend.Poll([]Completion) (int, error) reports both the number of ready completions and any infrastructure poll failure; a backend must not return ready completions together with a non-nil poll-level error. Loop treats iox.ErrWouldBlock returned by Poll as an idle tick rather than a terminal error.

Loop is a single-owner runner. Serialize calls that share the same Loop, including SubmitExpr, Submit, Poll, Run, Drain, Pending, and Failed.

Backend.Submit must return a token that is unique among all submissions still live in the loop. Tokens are correlation keys, not sequence numbers; a concrete backend may use a kernel user_data value directly. If a backend reuses a live token, the loop records ErrLiveTokenReuse, drains every pending suspension exactly once, and every subsequent SubmitExpr / Submit / Poll / Run call returns that fatal error.

When a completion carries iox.ErrWouldBlock, the loop resubmits the same operation. If a completion carries iox.ErrMore (multishot), the loop records ErrUnsupportedMultishot, drains every pending suspension exactly once, and every subsequent SubmitExpr / Submit / Poll / Run call returns that fatal error. ErrMore means the submitted backend operation remains active after the CQE, while generic Loop has no subscription or cancel carrier for later same-token completions.

Loop.Failed() reports the recorded fatal error (or nil). Loop.Drain() forces the loop into a disposed state, discards every pending suspension exactly once, and records ErrDisposed only if no fatal was previously set; it is idempotent and preserves the original fatal error when one already exists.

loop := takt.NewLoop[*myBackend, int](backend, takt.WithMaxCompletions(64))

// Submit computations
loop.SubmitExpr(exprComputation1)
loop.SubmitExpr(exprComputation2)
loop.Submit(contComputation) // kont.Eff

// Drive all to completion
results, err := loop.Run()
Stream / Subscription Runner

SubscriptionLoop is the sibling runner for route-indexed stream observations. A SubscriptionBackend starts an operation with Subscribe, polls StreamCompletion values, and accepts Cancel requests for live routes. RouteID pairs a Token with a generation so token reuse after finalization does not alias an older live route.

StreamCompletion.More means the same route remains live after the observation. HasValue and EventErr describe payload evidence at that boundary and are independent of More: a completion can report a value and more to come, no value and more to come, or a payload error while the route remains live. A completion with More == false emits a final StreamEvent and retires the route.

Subscribe rejects the zero RouteID with ErrInvalidRouteID and live-route aliasing with ErrLiveRouteReuse; both conditions put the stream loop in a fatal state. Poll-level iox.ErrWouldBlock is an idle tick. A SubscriptionBackend must not return ready stream completions together with a non-nil poll-level error; payload errors belong in StreamCompletion.EventErr. Unknown or already retired route completions are stale observations and are ignored. Cancel requests route cancellation without retiring the route immediately; a terminal completion, Drain, or a fatal loop transition retires it.

type myStreamBackend struct{ /* ... */ }

func (b *myStreamBackend) Subscribe(op kont.Operation) (takt.RouteID, error) {
	return takt.NewRouteID(tok, generation), nil
}

func (b *myStreamBackend) Poll(out []takt.StreamCompletion[int]) (int, error) {
	// Fill out with route-indexed observations.
	return n, nil
}

func (b *myStreamBackend) Cancel(id takt.RouteID) error {
	// Request cancellation of the live route.
	return nil
}

stream := takt.NewSubscriptionLoop[*myStreamBackend, int](
	backend,
	takt.WithMaxStreamCompletions(64),
)

sub, err := stream.Subscribe(op)
events, err := stream.Poll()
_ = sub
_ = events
_ = err

NewLoop uses HeapMemory as the default completion-buffer provider. Pass BoundedMemory via WithMemory when completion buffers should come from a bounded steady-state pool of default-sized 128 KiB slabs; or supply a custom CompletionMemory implementation to control allocation strategy without widening the Backend or Completion contracts. Custom providers must return exclusive non-overlapping live slabs and may treat Release as ownership transfer back to the provider:

// Default: HeapMemory + default-sized completion slab.
loop := takt.NewLoop[*myBackend, int](backend)

// Cap the per-poll completion slab length (the provider still chooses the slab shape; Loop trims the visible length back to this cap).
loop = takt.NewLoop[*myBackend, int](backend, takt.WithMaxCompletions(64))

// Share one HeapMemory's sync.Pool across several Loops by passing the same address to WithMemory; copying a HeapMemory value would not share recycled slabs.
heap := &takt.HeapMemory{}
loopA := takt.NewLoop[*myBackend, int](backend, takt.WithMemory(heap))
loopB := takt.NewLoop[*myBackend, int](backend, takt.WithMemory(heap))

// BoundedMemory: one bounded pool of default-sized 128 KiB slabs. WithPoolCapacity tunes that pool's capacity (rounded up to the next power of two by iobuf).
bounded := takt.NewBoundedMemory(takt.WithPoolCapacity(4))
loop = takt.NewLoop[*myBackend, int](
	backend,
	takt.WithMemory(bounded),
	takt.WithMaxCompletions(64),
)

API Overview

Dispatch
  • Dispatcher[D Dispatcher[D]]: non-blocking dispatch interface
  • Exec[D, R](d D, m kont.Eff[R]) R: blocking evaluation of kont.Eff
  • ExecExpr[D, R](d D, m kont.Expr[R]) R: blocking evaluation of kont.Expr
Stepping
  • SuspensionLike[S, R]: resumable interface (Op + Resume), implemented by cove.SuspensionView
  • Step[R](m kont.Expr[R]) (R, *kont.Suspension[R]): evaluate until the first suspension
  • AdvanceSuspension[D, S, R](d D, susp S) (R, S, error): dispatch one operation through any SuspensionLike value
  • Advance[D, R](d D, susp *kont.Suspension[R]) (R, *kont.Suspension[R], error): dispatch one operation
Error Handling
  • ExecError[E, D, R](d D, m kont.Eff[R]) kont.Either[E, R]: blocking evaluation with errors
  • ExecErrorExpr[E, D, R](d D, m kont.Expr[R]) kont.Either[E, R]: error-aware evaluation of kont.Expr
  • StepError[E, R](m kont.Expr[R]) (kont.Either[E, R], *kont.Suspension[kont.Either[E, R]]): stepping with errors
  • AdvanceError[E, D, R](d D, susp *kont.Suspension[kont.Either[E, R]]) (kont.Either[E, R], *kont.Suspension[kont.Either[E, R]], error): advance one step with errors
Backend and Event Loop
  • Backend[B Backend[B]]: asynchronous submit/poll interface
  • CompletionMemory: loop-local completion-buffer provider
  • HeapMemory: default implementation (sync.Pool-backed typed slabs of the default size)
  • BoundedMemory: iobuf-backed implementation with a single bounded pool of default-sized 128 KiB slabs
  • Option: functional options for NewLoop (WithMemory, WithMaxCompletions)
  • CompletionBufOption: functional options for CompletionMemory.CompletionBuf (WithSize)
  • BoundedMemoryOption: functional options for NewBoundedMemory (WithPoolCapacity)
  • Token: submission-completion correlation (uint64)
  • Completion: {Token, Value kont.Resumed, Err error}
  • NewLoop[B, R](b B, opts ...Option) *Loop[B, R]: create an event loop (default HeapMemory, default-sized slab)
  • (*Loop[B, R]).SubmitExpr(m kont.Expr[R]) (R, bool, error): step and submit Expr
  • (*Loop[B, R]).Submit(m kont.Eff[R]) (R, bool, error): step and submit Cont
  • (*Loop[B, R]).Poll() ([]R, error): poll and dispatch completions
  • (*Loop[B, R]).Run() ([]R, error): drive all to completion
  • (*Loop[B, R]).Pending() int: count pending operations
  • (*Loop[B, R]).Failed() error: terminal fatal error, or nil
  • (*Loop[B, R]).Drain() int: discard pending suspensions and dispose the loop
  • ErrLiveTokenReuse: backend reused a token that was still live in the loop
  • ErrUnsupportedMultishot: multishot completion is unsupported by generic Loop
  • ErrDisposed: loop has been disposed via Drain
Stream Routes
  • RouteID: stream route identity (Token plus generation)
  • NewRouteID(token Token, generation uint64) RouteID: construct a route identifier for stream backends
  • RouteID.Token() Token: token component
  • RouteID.Generation() uint64: generation component
  • RouteID.IsZero() bool: reserved invalid-route test
  • Subscription[A]: opaque live stream handle
  • Subscription[A].ID() RouteID: route identity carried by the handle
  • Subscription[A].IsZero() bool: zero, non-live handle test
  • StreamCompletion[A]: {ID, Value, HasValue, EventErr, More}
  • StreamCompletion[A].RouteOutcome() iox.Outcome: projection to iox outcome vocabulary
  • StreamEvent[A]: {Subscription, Value, HasValue, Final, EventErr}
  • SubscriptionBackend[B, A]: abstract stream backend interface (Subscribe, Poll, Cancel)
  • SubscriptionOption: functional options for NewSubscriptionLoop
  • WithMaxStreamCompletions(n): cap the per-poll stream completion buffer
  • NewSubscriptionLoop[B, A](b B, opts ...SubscriptionOption) *SubscriptionLoop[B, A]: create a stream runner
  • (*SubscriptionLoop[B, A]).Subscribe(op kont.Operation) (Subscription[A], error): start a route-producing operation
  • (*SubscriptionLoop[B, A]).Poll() ([]StreamEvent[A], error): poll route-indexed events
  • (*SubscriptionLoop[B, A]).Cancel(sub Subscription[A]) error: request route cancellation
  • (*SubscriptionLoop[B, A]).Pending() int: count live stream routes
  • (*SubscriptionLoop[B, A]).Failed() error: terminal fatal error, or nil
  • (*SubscriptionLoop[B, A]).Drain() int: cancel or retire owned routes and dispose the stream loop
  • ErrLiveRouteReuse: backend reused a route that was still live
  • ErrInvalidRouteID: backend returned the reserved zero RouteID
  • ErrUnknownSubscription: subscription handle is not live in the stream loop
Bridge
  • Reify[A](kont.Eff[A]) kont.Expr[A]: Cont → Expr
  • Reflect[A](kont.Expr[A]) kont.Eff[A]: Expr → Eff

Practical Recipes

A complete event-loop integration combines a Dispatcher (the synchronous semantics) with a Backend (the proactor) under one Loop:

// 1. Define the dispatcher: maps an effect operation to an iox outcome.
type myDispatcher struct{ /* ... */ }

func (d *myDispatcher) Dispatch(op kont.Operation) (kont.Resumed, error) {
	// Return (value, nil), (value, iox.ErrMore), or (nil, iox.ErrWouldBlock).
}

// 2. Define the backend: submits ops to the OS proactor and polls completions.
type myBackend struct{ /* ... */ }
func (b *myBackend) Submit(op kont.Operation) (takt.Token, error) { /* ... */ }
func (b *myBackend) Poll(out []takt.Completion) (int, error)      { /* ... */ }

// 3. Drive: submit one or more computations, then Run to completion.
loop := takt.NewLoop[*myBackend, int](backend, takt.WithMaxCompletions(64))
loop.SubmitExpr(prog1)
loop.SubmitExpr(prog2)
results, err := loop.Run()
_ = results; _ = err

For error-aware composition use ExecError / StepError / AdvanceError in place of their non-error counterparts; Throw short-circuits the in-flight computation while leaving sibling computations on the same loop unaffected. The fused dispatcher+backend pattern shown here is the one used by sess to attach a session endpoint to a real I/O runtime.

References

  • Tarmo Uustalu and Varmo Vene. 2008. Comonadic Notions of Computation. Electronic Notes in Theoretical Computer Science 203, 5 (June 2008), 263–284. https://doi.org/10.1016/j.entcs.2008.05.029
  • Gordon D. Plotkin and Matija Pretnar. 2009. Handlers of Algebraic Effects. In Proc. 18th European Symposium on Programming (ESOP '09). LNCS 5502, 80–94. https://doi.org/10.1007/978-3-642-00590-9_7
  • Andrej Bauer and Matija Pretnar. 2015. Programming with Algebraic Effects and Handlers. Journal of Logical and Algebraic Methods in Programming 84, 1 (Jan. 2015), 108–123. https://arxiv.org/abs/1203.1539
  • Daniel Leijen. 2017. Type Directed Compilation of Row-Typed Algebraic Effects. In Proc. 44th ACM SIGPLAN Symposium on Principles of Programming Languages (POPL '17). 486–499. https://doi.org/10.1145/3009837.3009872
  • Danel Ahman and Andrej Bauer. 2020. Runners in Action. In Proc. 29th European Symposium on Programming (ESOP '20). LNCS 12075, 29–55. https://arxiv.org/abs/1910.11629
  • Daniel Hillerström, Sam Lindley, and Robert Atkey. 2020. Effect Handlers via Generalised Continuations. Journal of Functional Programming 30 (2020), e5. https://bentnib.org/handlers-cps-journal.pdf

License

MIT License. See LICENSE for details.

©2026 Hayabusa Cloud Co., Ltd.

Documentation

Overview

Package takt provides an abstract proactor runner for code.hybscloud.com/kont computations.

Responsibility remains split by layer:

takt dispatches suspended operations through Dispatcher and uses `iox` classification at the execution boundary instead of redefining it.

Primary Operational Surface

Convenience Surface

iox Classification

Event-loop backends report infrastructure poll failures directly from [Backend.Poll]. Loop.Poll and Loop.Run keep poll failures separate from completion classification: poll-level code.hybscloud.com/iox.ErrWouldBlock is treated as idle, while completion-level code.hybscloud.com/iox.ErrWouldBlock triggers a fresh submission for the same suspension without marking that tick as progress. [Backend.Submit] must return a token that is unique among all submissions still live in the loop; if a backend reuses a live token, Loop.Poll and Loop.Run surface ErrLiveTokenReuse after draining every pending suspension exactly once. Loop.Poll and Loop.Run return ErrUnsupportedMultishot for completion-level code.hybscloud.com/iox.ErrMore: a completion was received from the backend, but its [Completion.Value] is not resumed into the suspended operation because the submitted backend operation remains active and may produce later same-token completions. Generic Loop has no subscription or cancel carrier for that still-live operation, so multishot stream ownership belongs in a concrete layer above takt.

SubscriptionLoop is the abstract stream counterpart to Loop. It uses a RouteID made from a Token and generation, returns opaque Subscription handles, and polls StreamCompletion values. [StreamCompletion.More] is route-liveness evidence: when it is true, SubscriptionLoop.Poll emits a non-final StreamEvent and keeps the route live; when it is false, Poll emits a final event and retires the route. [StreamCompletion.HasValue] and [StreamCompletion.EventErr] describe payload evidence at that boundary and are independent of More, so an empty live boundary and a payload error on a still-live route are both representable without collapsing them into terminal success or failure. Poll-level code.hybscloud.com/iox.ErrWouldBlock remains idle/no mutation. A SubscriptionBackend must not return ready stream completions together with a non-nil poll-level error; payload errors belong in [StreamCompletion.EventErr]. Stale completions for already retired routes are ignored.

CompletionMemory supplies the Completion slice that a Loop passes to [Backend.Poll]. Use NewLoop with functional options: WithMemory installs a custom provider, WithMaxCompletions caps the visible slice length, HeapMemory is the default, and BoundedMemory provides a single bounded pool of default-sized 128 KiB slabs. Loop.Drain releases that slice exactly once through [CompletionMemory.Release]. Custom providers must hand each live loop an exclusive non-overlapping slab and may treat Release as ownership transfer back to the provider.

Error Handling

ExecError/ExecErrorExpr/StepError/AdvanceError combine Dispatcher with `kont` Error effects. Error operations run before dispatcher operations. Results are code.hybscloud.com/kont.Either: Right on success, Left on Throw.

code.hybscloud.com/cove.SuspensionView already satisfies SuspensionLike through its `Op` and `Resume` methods, so it can be passed directly to AdvanceSuspension.

Stepping behavior

Each AdvanceSuspension call handles exactly one suspended operation. If resuming that operation produces another suspension, the caller receives it back and decides how to continue.

AdvanceSuspension resumes on code.hybscloud.com/iox.ErrMore because `iox` classifies ErrMore as progress with a live frontier. It returns the original suspension unchanged on code.hybscloud.com/iox.ErrWouldBlock or ordinary failure.

Loop stores pending `*kont.Suspension` frontiers produced by code.hybscloud.com/kont.StepExpr (or by reifying code.hybscloud.com/kont.Eff into Expr form first). This gives the backend one pending suspension per token while that submission remains live.

Loop instances are single-owner runners: callers must serialize SubmitExpr, Submit, Poll, Run, Drain, Pending, and Failed calls on the same loop. The internal pending map uses sharding for token lookup, not as a public concurrent-use guarantee.

Execution styles

For deterministic dispatchers, Exec, a loop built from Step/Advance, and Loop.Run produce the same externally visible results. Choose the style that fits your integration: blocking execution, manual stepping, or a backend-driven event loop.

Index

Constants

View Source
const DefaultCompletionBufBytes = iobuf.BufferSizeLarge

DefaultCompletionBufBytes anchors the default completion slab byte budget to iobuf's canonical "register buffer" size class (iobuf.BufferSizeLarge, 128 KiB).

View Source
const DefaultCompletionBufSize = defaultCompletionBufSize

DefaultCompletionBufSize is the number of Completion entries that fit in one DefaultCompletionBufBytes block. This is the slab length HeapMemory returns from HeapMemory.CompletionBuf when no WithSize option is supplied, and the implicit NewLoop choice when WithMaxCompletions is not given.

View Source
const DefaultPoolCapacity = 8

DefaultPoolCapacity is the default bounded-pool capacity used by BoundedMemory when WithPoolCapacity is not supplied. `iobuf` rounds the configured capacity up to the next power of two. With DefaultCompletionBufBytes set to iobuf.BufferSizeLarge, the default bounded pool keeps a contiguous 1 MiB backing block of 8 slabs.

Variables

View Source
var ErrDisposed = errors.New("takt: loop disposed")

ErrDisposed reports that a loop has been explicitly disposed via Loop.Drain or SubscriptionLoop.Drain and can no longer accept submissions or produce completions.

View Source
var ErrInvalidRouteID = errors.New("takt: backend returned an invalid route id")

ErrInvalidRouteID reports that a SubscriptionBackend returned the reserved zero RouteID.

View Source
var ErrLiveRouteReuse = errors.New("takt: backend reused a live route")

ErrLiveRouteReuse reports that a SubscriptionBackend reused a route that is still live in the current SubscriptionLoop.

View Source
var ErrLiveTokenReuse = errors.New("takt: backend reused a live token")

ErrLiveTokenReuse reports that a Backend reused a token that is still live in the current Loop. Reusing a live token would alias two pending frontiers under one correlation key and break token-to-suspension tracking.

View Source
var ErrUnknownSubscription = errors.New("takt: unknown subscription")

ErrUnknownSubscription reports that a Subscription is not live in the current SubscriptionLoop.

View Source
var ErrUnsupportedMultishot = errors.New("takt: unsupported multishot completion")

ErrUnsupportedMultishot reports that a backend produced a multishot completion for Loop. iox.ErrMore means the submitted backend operation remains active after the CQE; generic Loop tracks affine one-shot suspensions and has no subscription or cancel carrier for later same-token completions.

Functions

func Advance

func Advance[D Dispatcher[D], R any](d D, susp *kont.Suspension[R]) (R, *kont.Suspension[R], error)

Advance dispatches one suspended operation via a Dispatcher. Use AdvanceSuspension when the suspension also carries external context. A nil dispatch error and iox.ErrMore resume the suspension; iox.ErrWouldBlock and ordinary failure return it unchanged.

func AdvanceError

func AdvanceError[E any, D Dispatcher[D], R any](d D, susp *kont.Suspension[kont.Either[E, R]]) (kont.Either[E, R], *kont.Suspension[kont.Either[E, R]], error)

AdvanceError dispatches the suspended operation. Throw discards the suspension and returns Left; other operations go through the dispatcher with the usual `iox` classification. A nil dispatch error and iox.ErrMore resume the suspension; iox.ErrWouldBlock and ordinary failure return it unchanged.

func AdvanceSuspension added in v0.1.2

func AdvanceSuspension[D Dispatcher[D], S SuspensionLike[S, R], R any](d D, susp S) (R, S, error)

AdvanceSuspension dispatches one suspended operation through a Dispatcher. Exec blocks over the same operation sequence, while Loop drives it through submit/poll calls. A nil dispatch error and iox.ErrMore resume the suspension and return the next frontier. iox.ErrWouldBlock and ordinary failure return the original suspension value unchanged.

func Exec

func Exec[D Dispatcher[D], R any](d D, m kont.Eff[R]) R

Exec runs a kont.Eff computation to completion via a Dispatcher.

func ExecError

func ExecError[E any, D Dispatcher[D], R any](d D, m kont.Eff[R]) kont.Either[E, R]

ExecError runs a kont.Eff computation with error handling. It returns Right on success and Left on Throw.

func ExecErrorExpr

func ExecErrorExpr[E any, D Dispatcher[D], R any](d D, m kont.Expr[R]) kont.Either[E, R]

ExecErrorExpr is the kont.Expr form of ExecError. It waits on iox.ErrWouldBlock via adaptive backoff.

func ExecExpr

func ExecExpr[D Dispatcher[D], R any](d D, m kont.Expr[R]) R

ExecExpr runs a kont.Expr computation to completion via a Dispatcher.

func Reflect

func Reflect[A any](m kont.Expr[A]) kont.Eff[A]

Reflect converts kont.Expr to kont.Eff. Prefer package-owned runners such as Exec, Advance, or Loop when executing the result.

func Reify

func Reify[A any](m kont.Eff[A]) kont.Expr[A]

Reify converts kont.Eff to kont.Expr. Prefer package-owned runners such as ExecExpr, Advance, or Loop when executing the result.

func Step

func Step[R any](m kont.Expr[R]) (R, *kont.Suspension[R])

Step evaluates until the first effect suspension. Prefer AdvanceSuspension or Advance once a suspension has already been observed.

func StepError

func StepError[E, R any](m kont.Expr[R]) (kont.Either[E, R], *kont.Suspension[kont.Either[E, R]])

StepError evaluates a computation with error support until the first suspension. It returns (Either[E, R], nil) on completion or error, or (zero, suspension) if pending.

Types

type Backend

type Backend[B Backend[B]] interface {
	// Submit sends an operation and returns a correlation token.
	// Returned tokens must be unique among all submissions that are still live in
	// the [Loop]; once a submission has retired, the backend may reuse its token.
	// Tokens are correlation keys, not sequence numbers, so a concrete backend may
	// use a kernel user_data value directly.
	Submit(op kont.Operation) (Token, error)

	// Poll writes ready completions into completions and reports any
	// infrastructure wait failure. Per-completion outcomes are reported in
	// [Completion.Err]; poll-level errors are reserved for failures of the
	// polling operation itself.
	Poll(completions []Completion) (int, error)
}

Backend is the interface for asynchronous submit/poll execution. Poll reports whether the poll call itself succeeded separately from which token-correlated completions were written into the provided buffer. A backend must not return n > 0 and err != nil together; Loop handles poll errors separately from the per-completion outcome recorded in each Completion.

type BoundedMemory added in v0.1.2

type BoundedMemory struct {
	// contains filtered or unexported fields
}

BoundedMemory is an `iobuf`-backed CompletionMemory provider for steady-state workloads.

It keeps a single bounded pool of default-sized 128 KiB slabs backed by iobuf's lock-free MPMC pool (iobuf.BoundedPool). Requests up to DefaultCompletionBufSize reuse the default slab shape; larger requests fall back to fresh exact-size allocations. Overflow slabs are discarded in [Release] so the bounded memory budget stays anchored to the steady-state default slab.

func NewBoundedMemory added in v0.1.2

func NewBoundedMemory(opts ...BoundedMemoryOption) *BoundedMemory

NewBoundedMemory constructs a BoundedMemory with the given options. The zero value `&BoundedMemory{}` is also valid and equivalent to `NewBoundedMemory()`; the single bounded default-slab pool defaults to DefaultPoolCapacity.

func (*BoundedMemory) CompletionBuf added in v0.1.2

func (b *BoundedMemory) CompletionBuf(opts ...CompletionBufOption) []Completion

CompletionBuf returns a Completion slab of length n (defaulting to DefaultCompletionBufSize when WithSize is omitted). Requests up to the default size reuse a bounded pool of default-sized slabs and are sliced back to the requested length; larger requests allocate exact-size overflow slabs.

func (*BoundedMemory) Release added in v0.1.2

func (b *BoundedMemory) Release(buf []Completion)

Release returns a default-sized slab to the bounded pool when it was originally allocated by this BoundedMemory; exact-size overflow slabs are dropped.

type BoundedMemoryOption added in v0.1.2

type BoundedMemoryOption interface {
	// contains filtered or unexported methods
}

BoundedMemoryOption configures a BoundedMemory at construction.

func WithPoolCapacity added in v0.1.2

func WithPoolCapacity(n int) BoundedMemoryOption

WithPoolCapacity sets the bounded-pool capacity for BoundedMemory's single default-sized slab pool. The capacity is rounded up to the next power of two by iobuf. Panics if n < 1.

type Completion

type Completion struct {
	Token Token
	Value kont.Resumed
	Err   error
}

Completion carries token-correlated backend evidence together with an `iox` outcome. Value is valid resumption input for the correlated suspension even when Err reports an infrastructure failure.

type CompletionBufOption added in v0.1.2

type CompletionBufOption interface {
	// contains filtered or unexported methods
}

CompletionBufOption tunes a single [CompletionMemory.CompletionBuf] call.

func WithSize added in v0.1.2

func WithSize(n int) CompletionBufOption

WithSize requests an explicit completion-slab length. Providers may round up to an internal storage boundary; the returned slice still satisfies `len >= n`. Panics if n <= 0; omit WithSize to request the provider's default slab length.

type CompletionMemory added in v0.1.2

type CompletionMemory interface {
	CompletionBuf(opts ...CompletionBufOption) []Completion
	Release(buf []Completion)
}

CompletionMemory provides the Completion buffer used by a Loop.

It is consulted twice per Loop lifetime: once at construction ([CompletionMemory.CompletionBuf]) and once at termination ([CompletionMemory.Release]). It is a loop-level resource rather than a per-dispatch callback, so implementations can choose any allocation strategy that fits their workload.

CompletionMemory only manages storage for observed completions. It does not change Backend, Token, Completion, or the outcome semantics reported by the backend.

takt ships with two built-ins: HeapMemory (the implicit default of NewLoop) and BoundedMemory (a single bounded pool of default-sized 128 KiB slabs). Custom providers may implement CompletionMemory directly.

Requirements:

  • when WithSize is supplied, CompletionBuf must return at least that many slots; Loop trims the visible length back to the requested size so WithMaxCompletions remains a true cap.
  • when no size hint is supplied, CompletionBuf must return a non-empty default slab.
  • live slabs must not overlap: a provider must not hand out aliased storage to two loops (or two live CompletionBuf calls) at the same time.

The slice passed to Release must have been obtained from CompletionBuf on the same CompletionMemory. Release transfers ownership of that slab back to the provider: the caller must stop reading, mutating, or retaining aliases to buf after Release returns.

type Dispatcher

type Dispatcher[D Dispatcher[D]] interface {
	Dispatch(op kont.Operation) (kont.Resumed, error)
}

Dispatcher is the non-blocking execution boundary for one suspended operation. Dispatch returns (value, nil) on completion, (value, iox.ErrMore) when progress was made and the operation reports a live frontier, (nil, iox.ErrWouldBlock) when no progress is currently possible, or (nil, error) on infrastructure failure.

type HeapMemory added in v0.1.2

type HeapMemory struct {
	// contains filtered or unexported fields
}

HeapMemory is the default CompletionMemory: a sync.Pool-backed cache of default-sized typed Completion slabs. Slabs of non-default length bypass the pool and are freshly allocated each call.

Pass `&takt.HeapMemory{}` (or the address of an existing HeapMemory variable) to WithMemory so the underlying pool is shared across Loop instances. Copying a HeapMemory value copies the embedded sync.Pool state and therefore does not share recycled slabs.

func (*HeapMemory) CompletionBuf added in v0.1.2

func (h *HeapMemory) CompletionBuf(opts ...CompletionBufOption) []Completion

CompletionBuf returns a Completion slab of the requested length. The default length is DefaultCompletionBufSize; only default-sized slabs participate in the internal sync.Pool.

func (*HeapMemory) Release added in v0.1.2

func (h *HeapMemory) Release(buf []Completion)

Release returns the slab to the internal pool when its capacity matches DefaultCompletionBufSize; non-default slabs are left for the GC to reclaim.

type Loop

type Loop[B Backend[B], R any] struct {
	// contains filtered or unexported fields
}

Loop drives Expr computations through a Backend. It owns pending `*kont.Suspension` frontiers keyed by Token, so one live token refers to at most one pending suspension at a time.

Loop is not safe for concurrent use. Callers must serialize methods that share a Loop instance, including SubmitExpr, Submit, Poll, Run, Drain, Pending, and Failed.

func NewLoop

func NewLoop[B Backend[B], R any](b B, opts ...Option) *Loop[B, R]

NewLoop creates an event loop with the given backend. Behavior is configured via functional [Option]s:

Use BoundedMemory via WithMemory when completion buffers should come from a single bounded pool of default-sized 128 KiB slabs. Custom providers may implement CompletionMemory for any allocation strategy without widening the Backend or Completion contracts.

func (*Loop[B, R]) Drain added in v0.1.2

func (l *Loop[B, R]) Drain() int

Drain transitions the loop into a terminal disposed state and discards every pending suspension exactly once. Drain is idempotent and safe to call after a fatal failure has already been recorded; it preserves the existing fatal error if one is set, otherwise it records ErrDisposed so subsequent SubmitExpr, Submit, and Poll calls fail fast. Drain also returns the completion slab to its CompletionMemory provider exactly once. It returns the number of suspensions drained by this call.

func (*Loop[B, R]) Failed added in v0.1.2

func (l *Loop[B, R]) Failed() error

Failed returns the loop's terminal fatal error, if any. A non-nil result means every subsequent SubmitExpr, Submit, Poll, or Run call returns the same error and all previously pending suspensions have been discarded exactly once.

func (*Loop[B, R]) Pending

func (l *Loop[B, R]) Pending() int

Pending returns the count of in-flight operations.

func (*Loop[B, R]) Poll

func (l *Loop[B, R]) Poll() ([]R, error)

Poll dispatches ready completions. A resumed suspension either completes or is submitted again under a fresh token. Multishot completions (`iox.ErrMore`) are rejected because the same backend submission remains active after the CQE, while Loop only tracks one affine suspension per submitted operation. Concrete runtimes that need multishot streams should own a subscription or cancel carrier above this generic Loop.

Returns completed results when one or more computations finish in the current poll tick, a zero-length non-nil slice when work advanced without producing a completed computation, (nil, nil) when idle, or (nil, err) on infrastructure failure. Poll never returns both results and a non-nil error; if a fatal failure arrives in the same tick as ready results, the failure is reported on the next call.

The returned []R aliases an internal buffer that is reused across Poll/Run calls; callers that need to retain the slice past the next Poll/Run must copy it.

When the loop enters a fatal state, whether from an infrastructure poll failure or a completion-driven failure such as ErrUnsupportedMultishot or ErrLiveTokenReuse, it discards every pending suspension exactly once before returning.

func (*Loop[B, R]) Run

func (l *Loop[B, R]) Run() ([]R, error)

Run polls until every pending operation completes or the loop transitions to a fatal state. The returned []R is freshly allocated and owned by the caller, so it does not alias any internal buffer (unlike Loop.Poll). On a fatal failure Run returns the partial results accumulated so far together with the fatal error; subsequent SubmitExpr/Submit/Poll/Run calls return the same stored error.

func (*Loop[B, R]) Submit

func (l *Loop[B, R]) Submit(m kont.Eff[R]) (R, bool, error)

Submit reifies a kont.Eff computation and submits it.

func (*Loop[B, R]) SubmitExpr

func (l *Loop[B, R]) SubmitExpr(m kont.Expr[R]) (R, bool, error)

SubmitExpr steps a kont.Expr computation and transfers its first suspension to the backend. The loop stores the resulting frontier under one token, so later completions are expected to continue that same affine suspension lineage.

type Option added in v0.1.2

type Option interface {
	// contains filtered or unexported methods
}

Option configures a Loop at construction. Pass options to NewLoop.

Available options:

func WithMaxCompletions added in v0.1.2

func WithMaxCompletions(n int) Option

WithMaxCompletions caps the per-poll completion slab length. If omitted, the CompletionMemory provider chooses the size (typically DefaultCompletionBufSize). Panics if n <= 0.

func WithMemory added in v0.1.2

func WithMemory(m CompletionMemory) Option

WithMemory installs a custom CompletionMemory provider on the Loop. The provider retains ownership of the slab's lifetime: Loop.Drain calls [CompletionMemory.Release] exactly once.

type RouteID added in v0.2.0

type RouteID struct {
	// contains filtered or unexported fields
}

RouteID identifies one stream/subscription route. It pairs a backend token with an explicit generation so a backend may reuse a token after finalization without aliasing a previous live route.

func NewRouteID added in v0.2.0

func NewRouteID(token Token, generation uint64) RouteID

NewRouteID constructs a route identifier for SubscriptionBackend implementations. The zero RouteID is reserved and must not be returned by a backend.

func (RouteID) Generation added in v0.2.0

func (id RouteID) Generation() uint64

Generation returns the generation component of id.

func (RouteID) IsZero added in v0.2.0

func (id RouteID) IsZero() bool

IsZero reports whether id is the reserved invalid route.

func (RouteID) Token added in v0.2.0

func (id RouteID) Token() Token

Token returns the token component of id.

type StreamCompletion added in v0.2.0

type StreamCompletion[A any] struct {
	ID       RouteID
	Value    A
	HasValue bool
	EventErr error
	More     bool
}

StreamCompletion carries backend evidence for one stream route.

More is route-liveness evidence: when true, the same route remains active after this observation. HasValue and EventErr describe payload evidence at this boundary and are independent of More. EventErr is payload evidence, not route control; use More to report the live frontier.

func (StreamCompletion[A]) RouteOutcome added in v0.2.0

func (c StreamCompletion[A]) RouteOutcome() iox.Outcome

RouteOutcome projects the route-level completion onto iox's endpoint outcome vocabulary. The projection forgets route identity and successor ownership: More maps to iox.OutcomeMore, EventErr maps to iox.OutcomeFailure, and all other observations map to iox.OutcomeOK.

type StreamEvent added in v0.2.0

type StreamEvent[A any] struct {
	Subscription Subscription[A]
	Value        A
	HasValue     bool
	Final        bool
	EventErr     error
}

StreamEvent is the user-visible event emitted by SubscriptionLoop.Poll. Final reports that the route has retired after this observation.

type Subscription added in v0.2.0

type Subscription[A any] struct {
	// contains filtered or unexported fields
}

Subscription is an opaque handle for one abstract stream route. It carries route identity only; payload delivery and finalization are observed through SubscriptionLoop.Poll.

func (Subscription[A]) ID added in v0.2.0

func (s Subscription[A]) ID() RouteID

ID returns the route identifier associated with the subscription.

func (Subscription[A]) IsZero added in v0.2.0

func (s Subscription[A]) IsZero() bool

IsZero reports whether s is the zero, non-live subscription handle.

type SubscriptionBackend added in v0.2.0

type SubscriptionBackend[B SubscriptionBackend[B, A], A any] interface {
	// Subscribe starts op as a route-producing operation and returns its route
	// identity. Returned routes must be unique among routes still live in the
	// [SubscriptionLoop], and the zero [RouteID] is invalid.
	Subscribe(op kont.Operation) (RouteID, error)

	// Poll writes ready stream completions into completions. Poll-level errors
	// report failures of the polling operation itself; per-route payload errors
	// are reported in [StreamCompletion.EventErr]. A backend must not return
	// n > 0 and err != nil together; [SubscriptionLoop] handles poll errors
	// separately from route completions. A poll-level [iox.ErrWouldBlock] means
	// the stream loop is idle.
	Poll(completions []StreamCompletion[A]) (int, error)

	// Cancel requests cancellation of the live route. A successful Cancel moves
	// the route into a canceling state; the route is retired by a later terminal
	// completion, by Drain, or by a fatal loop transition.
	Cancel(id RouteID) error
}

SubscriptionBackend is the abstract backend contract for route-indexed stream/subscription execution.

type SubscriptionLoop added in v0.2.0

type SubscriptionLoop[B SubscriptionBackend[B, A], A any] struct {
	// contains filtered or unexported fields
}

SubscriptionLoop drives route-indexed stream completions through a SubscriptionBackend.

SubscriptionLoop is separate from Loop: Loop owns one-shot suspended computations, while SubscriptionLoop owns same-route successor observations. A StreamCompletion with More set emits a non-final StreamEvent and keeps route ownership live; a completion without More emits a final event and retires the route.

SubscriptionLoop is not safe for concurrent use. Callers must serialize Subscribe, Poll, Cancel, Drain, Pending, and Failed calls on one loop.

func NewSubscriptionLoop added in v0.2.0

func NewSubscriptionLoop[B SubscriptionBackend[B, A], A any](b B, opts ...SubscriptionOption) *SubscriptionLoop[B, A]

NewSubscriptionLoop creates a route-indexed stream runner over b. Use WithMaxStreamCompletions to cap the per-poll completion buffer length; omitted options use DefaultCompletionBufSize.

func (*SubscriptionLoop[B, A]) Cancel added in v0.2.0

func (l *SubscriptionLoop[B, A]) Cancel(sub Subscription[A]) error

Cancel requests cancellation of sub. A successful cancel request preserves the route until a terminal completion, Drain, or fatal loop transition retires it. Cancel returns ErrUnknownSubscription for a zero or non-live subscription handle.

func (*SubscriptionLoop[B, A]) Drain added in v0.2.0

func (l *SubscriptionLoop[B, A]) Drain() int

Drain transitions the stream loop into a disposed state and retires every owned route. Active routes receive one cancel request; routes already in the canceling state are retired without a duplicate cancel request. Drain is idempotent and returns the number of routes retired by this call.

func (*SubscriptionLoop[B, A]) Failed added in v0.2.0

func (l *SubscriptionLoop[B, A]) Failed() error

Failed returns the loop's terminal fatal error, if any.

func (*SubscriptionLoop[B, A]) Pending added in v0.2.0

func (l *SubscriptionLoop[B, A]) Pending() int

Pending returns the count of live stream routes.

func (*SubscriptionLoop[B, A]) Poll added in v0.2.0

func (l *SubscriptionLoop[B, A]) Poll() ([]StreamEvent[A], error)

Poll emits ready stream events.

Poll-level iox.ErrWouldBlock is treated as an idle tick. The backend poll contract excludes ready completions paired with a non-nil poll error. Unknown-route completions are stale observations and are ignored. The returned slice aliases an internal buffer that is reused on the next Poll call.

func (*SubscriptionLoop[B, A]) Subscribe added in v0.2.0

func (l *SubscriptionLoop[B, A]) Subscribe(op kont.Operation) (Subscription[A], error)

Subscribe starts op through the backend and stores the returned route. It returns ErrInvalidRouteID when the backend returns the reserved zero route and ErrLiveRouteReuse when the backend aliases a currently live route. Either condition is fatal to the loop.

type SubscriptionOption added in v0.2.0

type SubscriptionOption interface {
	// contains filtered or unexported methods
}

SubscriptionOption configures a SubscriptionLoop at construction.

func WithMaxStreamCompletions added in v0.2.0

func WithMaxStreamCompletions(n int) SubscriptionOption

WithMaxStreamCompletions caps the per-poll stream completion buffer length. If omitted, DefaultCompletionBufSize is used. Panics if n <= 0.

type SuspensionLike added in v0.1.2

type SuspensionLike[S any, R any] interface {
	Op() kont.Operation
	Resume(value kont.Resumed) (R, S)
}

SuspensionLike is any resumable value that exposes one pending operation and can accept the corresponding resumption while preserving its outer type. Packages such as `cove` can use AdvanceSuspension by providing `Op` and `Resume` methods without requiring extra adapters in `takt`.

type Token

type Token uint64

Token correlates a submitted operation with its completion. A backend may reuse a token only after the older submission carrying it has retired from the Loop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL