capnweb

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2026 License: MIT Imports: 16 Imported by: 0

README

capnweb-go

Go implementation of Cloudflare's Cap'n Web RPC protocol.

What is Cap'n Web?

Cap'n Web is a JSON-based, bidirectional RPC protocol designed for the web. It enables structured communication between endpoints over WebSockets, HTTP batch requests, or any message-passing transport. The protocol supports:

  • Bidirectional RPC — either side can call the other; there is no fixed "client" or "server"
  • Promise pipelining — chain dependent calls without waiting for intermediate results, collapsing multiple round trips into one
  • Pass-by-reference — objects can be exported as stubs and called remotely, with automatic reference counting
  • Streaming — multiplexed ReadableStream/WritableStream over a single connection
  • Server-side .map() — send transformation recipes to run on remote collections without per-element round trips

The reference implementation is in TypeScript and runs on Cloudflare Workers.

Cap'n Web vs Cap'n Proto

Both protocols were designed by Kenton Varda — he created Cap'n Proto, then joined Cloudflare where he built the Workers runtime and designed Cap'n Web as its RPC layer. Despite the shared author and similar name, Cap'n Web is not Cap'n Proto. They share philosophical DNA — both draw from the capability-based security model and the idea of promise pipelining pioneered by E language and CapTP — but differ significantly in design:

Cap'n Proto Cap'n Web
Wire format Custom binary schema (zero-copy) JSON arrays
Schema Required .capnp schema files + code generation Schema-free; methods discovered at runtime
Target environment Systems programming (C++, Rust, Go) Web platforms (browsers, Workers, Deno)
Transport Custom TCP framing WebSocket, HTTP batch, postMessage
Serialization Schema-defined structs with pointer arithmetic JSON with typed expression arrays (["bytes", "..."], ["date", 123])
Complexity High — full type system, generics, schema evolution Moderate — intentionally simple wire format
Ecosystem Mature, used in Sandstorm, Cloudflare internals New, designed for Cloudflare Workers RPC

In short: Cap'n Proto is a high-performance binary RPC system with mandatory schemas. Cap'n Web is a lightweight JSON-based RPC protocol that trades raw throughput for web-native simplicity and zero code generation.

Both support promise pipelining and capability-based object references — the features that make them more powerful than typical REST or gRPC patterns.

Goals of this project

  1. Spec-complete Go implementation of the Cap'n Web protocol, suitable for use in Go services that need to interoperate with Cloudflare Workers or any other Cap'n Web endpoint
  2. Idiomatic Go API — no code generation required; export any struct with methods as an RPC target, call remote methods through typed stubs
  3. WebSocket and HTTP batch transports out of the box, with a pluggable Transport interface for custom framing
  4. Interoperability with the TypeScript reference implementation, validated by an automated cross-language test suite

Install

go get github.com/flaticols/capnweb-go

Examples

Go Server (WebSocket)
package main

import (
	"context"
	"net/http"

	capnweb "github.com/flaticols/capnweb-go"
)

type Greeter struct {
	capnweb.RpcTargetBase
}

func (g *Greeter) Greet(_ context.Context, name string) (string, error) {
	return "Hello, " + name + "!", nil
}

func main() {
	mux := http.NewServeMux()
	mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
		tr, _ := capnweb.WSAccept(w, r, &capnweb.WSAcceptOptions{Origins: []string{"*"}})
		sess := capnweb.NewSession(tr, &Greeter{})
		sess.Run(r.Context())
	})
	http.ListenAndServe(":8080", mux)
}
Go Server (HTTP Batch)
mux.Handle("/rpc", capnweb.BatchHandler(&Greeter{}))

Each HTTP POST carries a batch of NDJSON messages — all calls and their responses fit in one round trip.

Go Client (WebSocket)
ctx := context.Background()
tr, _ := capnweb.WSDial(ctx, "ws://localhost:8080/ws", nil)

client := capnweb.NewSession(tr, nil)
go client.Run(ctx)
defer client.Close()

main := client.Main()
result, _ := capnweb.Call[string](ctx, main, "Greet", "World")
// result == "Hello, World!"
Go Client (HTTP Batch)
bc := &capnweb.BatchClient{URL: "http://localhost:8080/rpc"}
msgs, _ := bc.Do(ctx, []capnweb.Message{
	capnweb.PushMsg{Expr: json.RawMessage(`["import",0,["Greet"],["World"]]`)},
	capnweb.PullMsg{ImportID: 1},
})
TypeScript Client (WebSocket)
import { newWebSocketRpcSession } from "capnweb";

const stub = newWebSocketRpcSession("ws://localhost:8080/ws");
const result = await stub.Greet("World");
// result === "Hello, World!"
TypeScript Server + Go Client
// server.ts
import { RpcTarget, newWebSocketRpcSession } from "capnweb";

class Greeter extends RpcTarget {
  greet(name: string) { return `Hello, ${name}!`; }
}

// ... set up WebSocketServer, call newWebSocketRpcSession(ws, new Greeter())
// client.go
tr, _ := capnweb.WSDial(ctx, "ws://localhost:3000", nil)
client := capnweb.NewSession(tr, nil)
go client.Run(ctx)

main := client.Main()
result, _ := capnweb.Call[string](ctx, main, "greet", "World")
Pass-by-Reference

Methods that return an RpcTarget are automatically passed by reference:

type Calculator struct{ capnweb.RpcTargetBase }
func (c *Calculator) Add(_ context.Context, a, b float64) (float64, error) {
	return a + b, nil
}

type MathService struct{ capnweb.RpcTargetBase }
func (s *MathService) GetCalculator(_ context.Context) (*Calculator, error) {
	return &Calculator{}, nil
}
// Client
main := client.Main()
calc, _ := capnweb.Call[*capnweb.Stub](ctx, main, "GetCalculator")
result, _ := capnweb.Call[float64](ctx, calc, "Add", 3.0, 4.0)
defer calc.Release(ctx)
Promise Pipelining

Chain calls without waiting for intermediate results:

main := client.Main()
calc, _ := main.Pipeline(ctx, "GetCalculator")   // push only, no wait
result, _ := capnweb.Call[float64](ctx, calc, "Add", 3.0, 4.0) // push + pull
defer calc.Release(ctx)
Streaming
// Client: create pipe, write chunks
writer, readable, _ := client.CreatePipe(ctx)
go func() {
	main := client.Main()
	capnweb.Call[string](ctx, main, "Collect", readable)
}()
writer.Write(ctx, "chunk1")
writer.Write(ctx, "chunk2")
writer.Close(ctx)

// Server: read stream
func (s *Service) Collect(_ context.Context, reader *capnweb.StreamReader) (string, error) {
	var buf strings.Builder
	for {
		chunk, err := reader.Read(context.Background())
		if err == io.EOF { break }
		buf.WriteString(chunk.(string))
	}
	return buf.String(), nil
}

Status

All core and advanced protocol features are implemented. See the issue tracker for details.

Go Reference CI Release

License

MIT

Documentation

Overview

Package capnweb implements Cloudflare's Cap'n Web RPC protocol in Go.

Cap'n Web is a JSON-based, bidirectional RPC protocol that supports promise pipelining, pass-by-reference objects with automatic reference counting, multiplexed streaming, and server-side remap expressions.

A session is established over a Transport (WebSocket, HTTP batch, or custom). Local objects implementing exported methods are served via [Target], and remote objects are called through Stub.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func BatchHandler

func BatchHandler(main any) http.Handler

BatchHandler returns an http.Handler that processes batched RPC requests. Each HTTP request creates an ephemeral session: the request body is NDJSON (one message per line), all messages are processed in order, and outbound messages are written back as NDJSON.

Example

ExampleBatchHandler demonstrates the HTTP batch transport.

package main

import (
	"context"
	"fmt"
	"net/http"
	"net/http/httptest"
	"strings"

	capnweb "github.com/flaticols/capnweb-go"
)

// A Greeter is a simple RPC service. Embed RpcTargetBase to mark it
// as pass-by-reference when returned from other methods.
type Greeter struct {
	capnweb.RpcTargetBase
}

func (g *Greeter) Greet(_ context.Context, name string) (string, error) {
	return "Hello, " + name + "!", nil
}

func (g *Greeter) Add(_ context.Context, a, b float64) (float64, error) {
	return a + b, nil
}

func (g *Greeter) Fail(_ context.Context) (any, error) {
	return nil, fmt.Errorf("something went wrong")
}

func main() {
	handler := capnweb.BatchHandler(&Greeter{})
	srv := httptest.NewServer(handler)
	defer srv.Close()

	// Send NDJSON batch request.
	body := `["push",["import",0,["Greet"],["World"]]]` + "\n" +
		`["pull",1]` + "\n"

	resp, err := http.Post(srv.URL, "application/x-ndjson", strings.NewReader(body))
	if err != nil {
		return
	}
	defer resp.Body.Close()

	msgs, _ := capnweb.ReadNDJSON(resp.Body)
	for _, msg := range msgs {
		if rm, ok := msg.(capnweb.ResolveMsg); ok {
			fmt.Println(string(rm.Expr))
		}
	}
}
Output:
"Hello, World!"

func Call

func Call[T any](ctx context.Context, stub *Stub, method string, args ...any) (T, error)

Call invokes a method on the stub and converts the result to type T. JSON numbers (float64) are automatically coerced to the target numeric type.

Example

ExampleCall demonstrates the generic Call helper with type coercion.

package main

import (
	"context"
	"fmt"
	"net/http"
	"net/http/httptest"
	"strings"
	"time"

	capnweb "github.com/flaticols/capnweb-go"
)

// A Greeter is a simple RPC service. Embed RpcTargetBase to mark it
// as pass-by-reference when returned from other methods.
type Greeter struct {
	capnweb.RpcTargetBase
}

func (g *Greeter) Greet(_ context.Context, name string) (string, error) {
	return "Hello, " + name + "!", nil
}

func (g *Greeter) Add(_ context.Context, a, b float64) (float64, error) {
	return a + b, nil
}

func (g *Greeter) Fail(_ context.Context) (any, error) {
	return nil, fmt.Errorf("something went wrong")
}

func main() {
	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		tr, _ := capnweb.WSAccept(w, r, &capnweb.WSAcceptOptions{Origins: []string{"*"}})
		sess := capnweb.NewSession(tr, &Greeter{})
		sess.Run(r.Context())
	}))
	defer srv.Close()

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	tr, _ := capnweb.WSDial(ctx, "ws"+strings.TrimPrefix(srv.URL, "http"), nil)
	client := capnweb.NewSession(tr, nil)
	go client.Run(ctx)
	defer client.Close()

	main := client.Main()

	// Call[int] automatically coerces JSON float64 to int.
	sum, _ := capnweb.Call[int](ctx, main, "Add", 3.0, 4.0)
	fmt.Println(sum)
}
Output:
7

func EncodeExpr

func EncodeExpr(e Expr) (json.RawMessage, error)

EncodeExpr serializes an Expr to its JSON wire representation.

func MarshalMessage

func MarshalMessage(m Message) ([]byte, error)

MarshalMessage encodes a Message to its JSON wire format.

func WriteNDJSON

func WriteNDJSON(w io.Writer, msgs []Message) error

WriteNDJSON writes messages as newline-delimited JSON to a writer.

Types

type AbortMsg

type AbortMsg struct {
	Expr json.RawMessage
}

AbortMsg is a fatal error that terminates the session. No further messages are sent or received after this.

Wire format: ["abort", expression]

type ArrayExpr

type ArrayExpr struct{ Elements []Expr }

ArrayExpr wraps a plain array of expressions. On the wire actual arrays are encoded as [elem0, elem1, ...] — the encoder handles distinguishing them from typed expression arrays.

type BatchClient

type BatchClient struct {
	URL        string
	HTTPClient *http.Client
}

BatchClient sends batches of messages to an HTTP batch endpoint. BatchClient sends batches of RPC messages to an HTTP batch endpoint. Set URL to the batch endpoint. HTTPClient is optional (defaults to http.DefaultClient).

func (*BatchClient) Do

func (c *BatchClient) Do(ctx context.Context, messages []Message) ([]Message, error)

Do sends a batch of messages and returns the response messages.

type BigIntExpr

type BigIntExpr struct{ Value *big.Int }

BigIntExpr represents an arbitrary-precision integer — ["bigint", decimal].

type BytesExpr

type BytesExpr struct{ Data []byte }

BytesExpr represents a byte slice — ["bytes", base64].

type DateExpr

type DateExpr struct{ Time time.Time }

DateExpr represents a timestamp — ["date", unixMillis].

type ErrorExpr

type ErrorExpr struct {
	Type    string // "Error", "TypeError", "RangeError", etc.
	Message string
	Stack   string // optional
}

ErrorExpr represents a remote error — ["error", type, message, stack?].

func NewRangeError

func NewRangeError(message string) *ErrorExpr

NewRangeError creates an ErrorExpr with type "RangeError".

func NewReferenceError

func NewReferenceError(message string) *ErrorExpr

NewReferenceError creates an ErrorExpr with type "ReferenceError".

func NewTypeError

func NewTypeError(message string) *ErrorExpr

NewTypeError creates an ErrorExpr with type "TypeError".

func (ErrorExpr) Error

func (e ErrorExpr) Error() string

Error implements the error interface, returning "Type: Message".

type ExportEntry

type ExportEntry struct {
	ID       int64
	Target   any
	RefCount int64 // times we've exported this ID
}

ExportEntry represents a single export table slot.

type ExportExpr

type ExportExpr struct{ ExportID int64 }

ExportExpr exports a local object — ["export", id].

type ExportTable

type ExportTable struct {
	// contains filtered or unexported fields
}

ExportTable tracks objects exported to the remote endpoint. The exporting side allocates negative IDs starting from -1.

func NewExportTable

func NewExportTable(main any) *ExportTable

NewExportTable creates an export table. The bootstrap object is placed at ID zero.

func (*ExportTable) Export

func (t *ExportTable) Export(target any) *ExportEntry

Export exports a target object. If the same target was already exported, its existing ID is reused and the refcount is incremented. Returns the export entry.

func (*ExportTable) ExportWithID

func (t *ExportTable) ExportWithID(id int64, target any) *ExportEntry

ExportWithID registers an export at a specific ID (used for result exports where the remote chose the positive ID via push/stream/pipe).

func (*ExportTable) Get

func (t *ExportTable) Get(id int64) *ExportEntry

Get returns the entry for the given export ID, or nil if not found.

func (*ExportTable) HandleRelease

func (t *ExportTable) HandleRelease(id, refcount int64) bool

HandleRelease decrements the refcount for an export by the given amount. If the refcount reaches zero, the entry is removed and true is returned.

type Expr

type Expr interface {
	// contains filtered or unexported methods
}

Expr represents a capnweb expression value. All values transmitted over the protocol are expressions — either literal JSON values or typed arrays like ["bytes", "..."], ["date", 123], ["import", 1, "method", [args]], etc.

The array wrapping rule: non-array JSON values are interpreted literally. Arrays where element 0 is a recognized string tag are typed expressions. Actual plain arrays must be wrapped: [elem0, elem1] on the wire represents a two-element array, distinguished from expressions by the first element not being a known tag string (or by being an ArrayExpr during encoding).

func DecodeExpr

func DecodeExpr(data json.RawMessage) (Expr, error)

DecodeExpr deserializes a JSON wire value into an Expr.

type Future

type Future struct {
	// contains filtered or unexported fields
}

Future represents a pending result that will be resolved or rejected.

func NewFuture

func NewFuture() *Future

NewFuture creates a new unresolved future.

func (*Future) Await

func (f *Future) Await(ctx context.Context) (any, error)

Await blocks until the future is settled or the context is cancelled.

func (*Future) Done

func (f *Future) Done() <-chan struct{}

Done returns a channel that is closed when the future is settled.

func (*Future) Reject

func (f *Future) Reject(err error)

Reject sets the error and unblocks all waiters.

func (*Future) Resolve

func (f *Future) Resolve(val any)

Resolve sets the value and unblocks all waiters.

func (*Future) Settled

func (f *Future) Settled() bool

Settled reports whether the future has been resolved or rejected.

type HeadersExpr

type HeadersExpr struct{ Header http.Header }

HeadersExpr represents HTTP headers — ["headers", [[name, value], ...]].

type ImportEntry

type ImportEntry struct {
	ID       int64
	RefCount int64 // times this import was "introduced" to us
	Resolved bool
	Value    any // resolved value, or nil if pending
}

ImportEntry represents a single import table slot.

type ImportExpr

type ImportExpr struct {
	ImportID int64
	Path     []string
	Args     []Expr // nil = no call; empty = call with zero args
}

ImportExpr references an import table entry — ["import", id, path?, args?]. Evaluates to a stub.

type ImportTable

type ImportTable struct {
	// contains filtered or unexported fields
}

ImportTable tracks objects imported from the remote endpoint. The importing side allocates positive IDs starting from 1.

func NewImportTable

func NewImportTable() *ImportTable

NewImportTable creates an import table. ID zero is pre-populated as the remote's bootstrap (main) interface.

func (*ImportTable) AddRef

func (t *ImportTable) AddRef(id int64)

AddRef increments the refcount for an import (called when we receive the same ID again via an export/promise expression from the remote).

func (*ImportTable) Allocate

func (t *ImportTable) Allocate() *ImportEntry

Allocate reserves the next positive import ID and returns the entry. Used when sending push/stream/pipe.

func (*ImportTable) Get

func (t *ImportTable) Get(id int64) *ImportEntry

Get returns the entry for the given import ID, or nil if not found.

func (*ImportTable) Insert

func (t *ImportTable) Insert(id int64) *ImportEntry

Insert adds an entry for a remotely-chosen (negative) import ID. Used when the remote exports an object to us via ["export", negativeId].

func (*ImportTable) Release

func (t *ImportTable) Release(id, count int64) bool

Release decrements the refcount for an import by count. If the refcount reaches zero, the entry is removed and true is returned.

func (*ImportTable) Remove

func (t *ImportTable) Remove(id int64)

Remove deletes an entry from the table. Called after we send a release message for this import.

type InfExpr

type InfExpr struct{}

InfExpr represents +Infinity — ["inf"].

type LiteralExpr

type LiteralExpr struct{ Value any }

LiteralExpr wraps a plain JSON value (string, number, bool, null, or object).

type Message

type Message interface {
	// contains filtered or unexported methods
}

Message is a capnweb protocol message. Each concrete type corresponds to one of the 8 wire message types.

func ReadNDJSON

func ReadNDJSON(r io.Reader) ([]Message, error)

ReadNDJSON reads newline-delimited JSON messages from a reader.

func UnmarshalMessage

func UnmarshalMessage(data []byte) (Message, error)

UnmarshalMessage decodes a JSON wire message into a Message.

type NaNExpr

type NaNExpr struct{}

NaNExpr represents NaN — ["nan"].

type NegInfExpr

type NegInfExpr struct{}

NegInfExpr represents -Infinity — ["-inf"].

type PipeMsg

type PipeMsg struct{}

PipeMsg creates a bidirectional pipe on the remote end. The sender implicitly assigns the next positive import ID, usable as a WritableStream.

Wire format: ["pipe"]

type PipelineExpr

type PipelineExpr struct {
	ImportID int64
	Path     []string
	Args     []Expr
}

PipelineExpr is like ImportExpr but evaluates to a promise — ["pipeline", id, path?, args?].

type PromiseExpr

type PromiseExpr struct{ ExportID int64 }

PromiseExpr exports a promise — ["promise", id].

type PullMsg

type PullMsg struct {
	ImportID int64
}

PullMsg signals that the sender wants a resolve/reject for a promise import.

Wire format: ["pull", importId]

type PushMsg

type PushMsg struct {
	Expr json.RawMessage
}

PushMsg requests the recipient to evaluate an expression. The sender implicitly assigns the next positive import ID to the result.

Wire format: ["push", expression]

type ReadableExpr

type ReadableExpr struct{ ImportID int64 }

ReadableExpr references the readable end of a pipe — ["readable", importId].

type RejectMsg

type RejectMsg struct {
	ExportID int64
	Expr     json.RawMessage
}

RejectMsg delivers a rejection for a promise export. The expression must not contain stubs — it typically evaluates to an error.

Wire format: ["reject", exportId, expression]

type ReleaseMsg

type ReleaseMsg struct {
	ImportID int64
	RefCount int64
}

ReleaseMsg releases an import table entry.

Wire format: ["release", importId, refcount]

type RemapExpr

type RemapExpr struct {
	ImportID     int64
	Path         []string
	Captures     []Expr
	Instructions []Expr
}

RemapExpr represents a server-side .map() — ["remap", importId, path, captures, instructions].

type RequestExpr

type RequestExpr struct {
	URL     string
	Method  string
	Headers http.Header
	Body    Expr // may be nil
}

RequestExpr represents an HTTP request — ["request", url, init].

type ResolveMsg

type ResolveMsg struct {
	ExportID int64
	Expr     json.RawMessage
}

ResolveMsg delivers the resolution of a promise export.

Wire format: ["resolve", exportId, expression]

type ResponseExpr

type ResponseExpr struct {
	Status     int
	StatusText string
	Headers    http.Header
	Body       Expr // may be nil
}

ResponseExpr represents an HTTP response — ["response", body, init].

type RpcTarget

type RpcTarget interface {
	IsRpcTarget()
}

RpcTarget is implemented by types that should be passed by reference over the wire. Instead of being serialized as JSON, RpcTarget values are exported into the session's export table and sent as ["export", id] expressions.

Embed RpcTargetBase in your struct to implement this interface:

type MyService struct {
    capnweb.RpcTargetBase
}

type RpcTargetBase

type RpcTargetBase struct{}

RpcTargetBase is embedded in structs to mark them as pass-by-reference RPC targets. This is the Go equivalent of extending the RpcTarget class in the TypeScript reference implementation.

func (RpcTargetBase) IsRpcTarget

func (RpcTargetBase) IsRpcTarget()

IsRpcTarget implements RpcTarget.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session manages a single capnweb RPC connection. It is fully bidirectional — either side can call methods on objects exported by the other.

func NewSession

func NewSession(transport Transport, main any) *Session

NewSession creates a session. The main object is exported at ID 0 (the bootstrap interface). Pass nil if this endpoint has no bootstrap.

Example

ExampleNewSession demonstrates a basic RPC call over an in-process WebSocket connection.

package main

import (
	"context"
	"fmt"
	"net/http"
	"net/http/httptest"
	"strings"
	"time"

	capnweb "github.com/flaticols/capnweb-go"
)

// A Greeter is a simple RPC service. Embed RpcTargetBase to mark it
// as pass-by-reference when returned from other methods.
type Greeter struct {
	capnweb.RpcTargetBase
}

func (g *Greeter) Greet(_ context.Context, name string) (string, error) {
	return "Hello, " + name + "!", nil
}

func (g *Greeter) Add(_ context.Context, a, b float64) (float64, error) {
	return a + b, nil
}

func (g *Greeter) Fail(_ context.Context) (any, error) {
	return nil, fmt.Errorf("something went wrong")
}

func main() {
	// Start a Go server with a Greeter service.
	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		tr, _ := capnweb.WSAccept(w, r, &capnweb.WSAcceptOptions{Origins: []string{"*"}})
		sess := capnweb.NewSession(tr, &Greeter{})
		sess.Run(r.Context())
	}))
	defer srv.Close()

	wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")

	// Connect a Go client.
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	tr, _ := capnweb.WSDial(ctx, wsURL, nil)
	client := capnweb.NewSession(tr, nil)
	go client.Run(ctx)
	defer client.Close()

	// Call the Greet method.
	main := client.Main()
	result, _ := capnweb.Call[string](ctx, main, "Greet", "World")
	fmt.Println(result)
}
Output:
Hello, World!

func (*Session) Abort

func (s *Session) Abort(reason error) error

Abort sends an abort message and terminates the session.

func (*Session) Call

func (s *Session) Call(ctx context.Context, targetID int64, method string, args ...any) (any, error)

Call sends a method call to a remote object and blocks until the result is available. targetID is the import ID of the remote object (0 for the bootstrap interface).

func (*Session) Close

func (s *Session) Close() error

Close gracefully shuts down the session.

func (*Session) CreatePipe

func (s *Session) CreatePipe(ctx context.Context) (*StreamWriter, Expr, error)

CreatePipe sends a ["pipe"] message to the remote, creating a pipe. Returns a StreamWriter for sending chunks and a ReadableExpr that can be passed as a method argument so the remote can read from the pipe.

Example

ExampleSession_CreatePipe demonstrates streaming data from client to server.

package main

import (
	"context"
	"fmt"
	"io"
	"net/http"
	"net/http/httptest"
	"strings"
	"time"

	capnweb "github.com/flaticols/capnweb-go"
)

// StreamService collects chunks from a stream into a single string.
type StreamService struct {
	capnweb.RpcTargetBase
}

func (s *StreamService) Collect(_ context.Context, reader *capnweb.StreamReader) (string, error) {
	var sb strings.Builder
	for {
		chunk, err := reader.Read(context.Background())
		if err == io.EOF {
			break
		}
		if err != nil {
			return "", err
		}
		sb.WriteString(chunk.(string))
	}
	return sb.String(), nil
}

func main() {
	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		tr, _ := capnweb.WSAccept(w, r, &capnweb.WSAcceptOptions{Origins: []string{"*"}})
		sess := capnweb.NewSession(tr, &StreamService{})
		sess.Run(r.Context())
	}))
	defer srv.Close()

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	tr, _ := capnweb.WSDial(ctx, "ws"+strings.TrimPrefix(srv.URL, "http"), nil)
	client := capnweb.NewSession(tr, nil)
	go client.Run(ctx)
	defer client.Close()

	// Create a pipe and pass the readable end to the server.
	writer, readable, _ := client.CreatePipe(ctx)

	resultCh := make(chan string, 1)
	go func() {
		main := client.Main()
		r, _ := capnweb.Call[string](ctx, main, "Collect", readable)
		resultCh <- r
	}()

	// Write chunks through the pipe.
	writer.Write(ctx, "Hello")
	writer.Write(ctx, ", ")
	writer.Write(ctx, "World!")
	writer.Close(ctx)

	fmt.Println(<-resultCh)
}
Output:
Hello, World!

func (*Session) Done

func (s *Session) Done() <-chan struct{}

Done returns a channel that is closed when the session ends.

func (*Session) Err

func (s *Session) Err() error

Err returns the terminal error, if any.

func (*Session) Main

func (s *Session) Main() *Stub

Main returns a Stub for the remote's bootstrap (main) interface. This is the entry point for calling methods on the remote endpoint.

func (*Session) Release

func (s *Session) Release(ctx context.Context, importID, refCount int64) error

Release sends a release message for the given import ID and removes it from the import table. Used when the caller is done with a remote object obtained via pass-by-reference.

func (*Session) Run

func (s *Session) Run(ctx context.Context) error

Run starts the message processing loop. It blocks until the context is cancelled, the transport closes, or an abort is received.

type StreamMsg

type StreamMsg struct {
	Expr json.RawMessage
}

StreamMsg is like PushMsg but optimized for streaming: no pipelining on the result, auto-pulled, and implicitly released with refcount 1 on resolve/reject.

Wire format: ["stream", expression]

type StreamReader

type StreamReader struct {
	// contains filtered or unexported fields
}

StreamReader reads chunks from a pipe. Obtained from a ReadableExpr when a method receives a stream argument.

func (*StreamReader) Read

func (r *StreamReader) Read(ctx context.Context) (any, error)

Read returns the next chunk from the stream. Returns io.EOF when the writer closes the stream, or an error if the writer aborts.

type StreamWriter

type StreamWriter struct {
	// contains filtered or unexported fields
}

StreamWriter writes chunks to a remote pipe via stream messages. Each write sends a ["stream", ...] message and waits for the resolve (backpressure).

func (*StreamWriter) Abort

func (w *StreamWriter) Abort(ctx context.Context, reason error) error

Abort terminates the stream with an error.

func (*StreamWriter) Close

func (w *StreamWriter) Close(ctx context.Context) error

Close signals the end of the stream. The remote reader will receive io.EOF.

func (*StreamWriter) Write

func (w *StreamWriter) Write(ctx context.Context, chunk any) error

Write sends a chunk to the remote pipe. Blocks until the remote acknowledges the write (backpressure).

type Stub

type Stub struct {
	// contains filtered or unexported fields
}

Stub represents a remote object accessible through a session. It wraps an import table entry and provides methods to call remote methods and release the reference.

Stubs are created via Session.Main (for the bootstrap interface) or returned from Stub.Call when the remote returns a pass-by-reference object.

When done with a stub, call Stub.Release to send a release message to the remote. A runtime finalizer is set as a safety net, but explicit release is preferred for deterministic cleanup.

func (*Stub) Call

func (s *Stub) Call(ctx context.Context, method string, args ...any) (any, error)

Call invokes a method on the remote object and blocks until the result is available. If the remote returns a pass-by-reference object, it is automatically wrapped as a *Stub.

func (*Stub) ID

func (s *Stub) ID() int64

ID returns the import ID of the remote object.

func (*Stub) Pipeline

func (s *Stub) Pipeline(ctx context.Context, method string, args ...any) (*Stub, error)

Pipeline sends a method call without waiting for the result, returning a pipeline stub that can be used as the target of subsequent calls. This enables promise pipelining — chaining dependent calls without waiting for intermediate results.

auth, _ := main.Pipeline(ctx, "Authenticate", token)  // push only
data, _ := Call[string](ctx, auth, "GetData")          // push + pull + await
defer auth.Release(ctx)
Example

ExampleStub_Pipeline demonstrates promise pipelining — chaining calls without waiting for intermediate results.

package main

import (
	"context"
	"fmt"
	"net/http"
	"net/http/httptest"
	"strings"
	"time"

	capnweb "github.com/flaticols/capnweb-go"
)

// Calculator is an RpcTarget returned by reference from MathService.
type Calculator struct {
	capnweb.RpcTargetBase
}

func (c *Calculator) Multiply(_ context.Context, a, b float64) (float64, error) {
	return a * b, nil
}

type MathService struct {
	capnweb.RpcTargetBase
}

func (s *MathService) GetCalculator(_ context.Context) (*Calculator, error) {
	return &Calculator{}, nil
}

func main() {
	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		tr, _ := capnweb.WSAccept(w, r, &capnweb.WSAcceptOptions{Origins: []string{"*"}})
		sess := capnweb.NewSession(tr, &MathService{})
		sess.Run(r.Context())
	}))
	defer srv.Close()

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	tr, _ := capnweb.WSDial(ctx, "ws"+strings.TrimPrefix(srv.URL, "http"), nil)
	client := capnweb.NewSession(tr, nil)
	go client.Run(ctx)
	defer client.Close()

	main := client.Main()

	// Pipeline: GetCalculator returns a stub, then Multiply is called
	// on it — both calls are sent before waiting for any result.
	calc, _ := main.Pipeline(ctx, "GetCalculator")
	result, _ := capnweb.Call[float64](ctx, calc, "Multiply", 6.0, 7.0)
	calc.Release(ctx)

	fmt.Println(result)
}
Output:
42

func (*Stub) Release

func (s *Stub) Release(ctx context.Context) error

Release sends a release message for this stub's import and marks it as released. Subsequent calls to Release are no-ops.

func (*Stub) String

func (s *Stub) String() string

String implements fmt.Stringer.

type Transport

type Transport interface {
	// Send sends a message to the remote endpoint.
	// Must be safe for concurrent use from multiple goroutines.
	Send(ctx context.Context, msg Message) error

	// Recv receives the next message from the remote endpoint.
	// Blocks until a message is available, the context is cancelled, or the
	// transport is closed.
	Recv(ctx context.Context) (Message, error)

	// Close closes the transport. Any blocked Recv call returns an error.
	Close() error
}

Transport is a bidirectional message stream. Implementations handle framing and JSON serialization for a specific underlying protocol (WebSocket, HTTP batch, etc.).

type UndefinedExpr

type UndefinedExpr struct{}

UndefinedExpr represents JavaScript undefined — ["undefined"].

type WSAcceptOptions

type WSAcceptOptions struct {
	Origins []string // allowed origins; nil allows any
}

WSAcceptOptions configures server-side WebSocket upgrade.

type WSDialOptions

type WSDialOptions struct {
	HTTPClient *http.Client
}

WSDialOptions configures a client WebSocket connection.

type WSTransport

type WSTransport struct {
	// contains filtered or unexported fields
}

WSTransport implements Transport over a WebSocket connection. One WebSocket text message = one capnweb message.

func NewWSTransport

func NewWSTransport(conn *websocket.Conn) *WSTransport

NewWSTransport wraps an existing websocket.Conn as a Transport.

func WSAccept

func WSAccept(w http.ResponseWriter, r *http.Request, opts *WSAcceptOptions) (*WSTransport, error)

WSAccept upgrades an HTTP request to a WebSocket connection and returns a transport.

func WSDial

func WSDial(ctx context.Context, url string, opts *WSDialOptions) (*WSTransport, error)

WSDial creates a client-side WebSocket transport by connecting to the given URL.

func (*WSTransport) Close

func (t *WSTransport) Close() error

Close sends a normal close frame and closes the connection.

func (*WSTransport) Recv

func (t *WSTransport) Recv(ctx context.Context) (Message, error)

Recv reads the next WebSocket text message and decodes it as a Message.

func (*WSTransport) Send

func (t *WSTransport) Send(ctx context.Context, msg Message) error

Send sends a message as a WebSocket text message. Safe for concurrent use.

type WritableExpr

type WritableExpr struct{ ExportID int64 }

WritableExpr references a writable stream — ["writable", exportId].

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL