transport

package
v1.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: MIT Imports: 3 Imported by: 0

Documentation

Overview

Package transport defines the dependency-free observer contract used by Loom's generated HTTP, JSON-RPC, and Loom-MCP transports.

Enablement

Enablement is context-based. Applications attach an Observer to the request context via HTTPMiddleware (or WithObserver for non-HTTP entry points), and generated code resolves it transparently. Generated constructor signatures remain source-compatible with prior Loom releases — turning observability on or off is purely a wiring choice at the application boundary.

A minimal observer that prints reasons to stderr:

mw := transport.HTTPMiddleware(transport.ObserverFunc(func(_ context.Context, e transport.Event) {
    log.Printf("%s/%s %s reason=%s status=%d", e.Transport, e.Kind, e.Method, e.Reason, e.StatusCode)
}))
http.ListenAndServe(":8080", mw(generatedServer))

For non-HTTP entry points (e.g. a JSON-RPC consumer that reads frames from a Kafka topic), use WithObserver directly to inject the observer into the request-scoped context before invoking the generated handler.

Redaction rules

Generated code never emits raw bodies, JSON-RPC params, MCP tool arguments, credentials, or result payloads. Event only carries classified, low-cardinality fields safe for metric labeling and structured-log enrichment:

  • Transport (http/jsonrpc/mcp), Kind (start/finish/failure/stream*), and Reason are stable enumerations and may be used as metric labels.
  • Service, Method, Route, HTTPMethod identify the operation.
  • StatusCode, BytesWritten, Duration measure the response.
  • JSONRPCMethod, JSONRPCID, BatchCount, Notification are populated only after the JSON-RPC envelope has been decoded; pre-decode rejection events leave them empty by design.
  • SafeMessage carries operator-redacted text, never raw user input.

Observer implementations that need to extract more detail (request id, trace id, principal) should read from the request context inside [Observer.ObserveEvent] instead of expecting it on Event.

Boundary with observability/otel

This package handles request-level classification only. OpenTelemetry span attributes, propagators, metric mode, and provider bootstrap remain in github.com/CaliLuke/loom/observability/otel. The two are composable: `otel.HTTPMiddleware` traces the request, `transport.HTTPMiddleware` classifies it. Stack them in any order; neither package depends on the other.

MCP release dependency

`loom-mcp` consumes this package through its `go.mod` `replace github.com/CaliLuke/loom => ../loom` directive during local development. A non-local `loom-mcp` release that drops the replace must bump `github.com/CaliLuke/loom` in `loom-mcp/go.mod` to a Loom tag that contains this package — otherwise generated MCP server code will not compile against the public Loom module.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HTTPMiddleware

func HTTPMiddleware(observer Observer) func(http.Handler) http.Handler

HTTPMiddleware returns an HTTP middleware that injects observer into the request context so generated handlers can resolve it via ObserverFromContext. A nil observer disables event delivery without affecting the wrapped handler chain.

HTTPMiddleware is intentionally narrow: it only performs context injection. Span/trace setup, propagation, and metric recording remain the responsibility of `loom/observability/otel`, which can be composed on the outside of this middleware.

func Observe

func Observe(ctx context.Context, e Event)

Observe delivers e to the Observer carried by ctx, if any. Observe is the single entry point generated code uses to emit events so that a missing observer remains a cheap no-op.

func WithObserver

func WithObserver(ctx context.Context, o Observer) context.Context

WithObserver returns a copy of ctx that carries o. A nil observer clears any observer already attached to ctx.

func WithRequestObserver

func WithRequestObserver(ctx context.Context, obs *RequestObserver) context.Context

WithRequestObserver returns a copy of ctx that carries obs so deeper generated layers (JSON-RPC method dispatch, MCP tool handlers) can resolve the lifecycle observer without threading a new parameter through every signature.

Types

type CaptureResponseWriter

type CaptureResponseWriter struct {
	// ResponseWriter is the wrapped writer. It is intentionally exported so
	// generated code can pass the wrapper directly to helpers that perform
	// type assertions (for example, hijack and flush probes).
	ResponseWriter http.ResponseWriter
	// contains filtered or unexported fields
}

CaptureResponseWriter wraps a http.ResponseWriter and records the final HTTP status code and number of bytes written by the handler. Generated HTTP handlers use it to populate the StatusCode and BytesWritten fields of terminal transport events without retaining the raw response body.

CaptureResponseWriter forwards http.Flusher, http.Hijacker, and http.Pusher when the underlying writer implements them so streaming and WebSocket upgrade paths continue to work after wrapping.

func NewCaptureResponseWriter

func NewCaptureResponseWriter(w http.ResponseWriter) *CaptureResponseWriter

NewCaptureResponseWriter returns a wrapper that records status and byte counts for w.

func (*CaptureResponseWriter) BytesWritten

func (c *CaptureResponseWriter) BytesWritten() int64

BytesWritten returns the number of bytes the handler has written to the underlying response writer through Write.

func (*CaptureResponseWriter) Header

func (c *CaptureResponseWriter) Header() http.Header

Header returns the wrapped writer's header map.

func (*CaptureResponseWriter) StatusCode

func (c *CaptureResponseWriter) StatusCode() int

StatusCode returns the HTTP status code recorded by WriteHeader or Write, or 0 if the handler wrote no response.

func (*CaptureResponseWriter) Unwrap

Unwrap returns the wrapped http.ResponseWriter so that http.ResponseController can locate underlying interfaces such as Flusher and Hijacker.

func (*CaptureResponseWriter) Write

func (c *CaptureResponseWriter) Write(b []byte) (int, error)

Write writes b to the wrapped writer and accumulates the byte count. If no status has been written yet, an implicit 200 is recorded to match net/http semantics.

func (*CaptureResponseWriter) WriteHeader

func (c *CaptureResponseWriter) WriteHeader(status int)

WriteHeader records the first status code observed and forwards the call to the wrapped writer. Subsequent calls are forwarded but only the first status is retained, matching net/http behavior.

type Event

type Event struct {
	// Kind is the lifecycle position of the event.
	Kind EventKind
	// Transport identifies the generated transport (HTTP, JSON-RPC, MCP).
	Transport TransportKind
	// Reason is the stable classification for this event.
	Reason Reason

	// Service is the Loom service name owning the endpoint.
	Service string
	// Method is the operation name (HTTP handler, JSON-RPC method, or MCP
	// tool) as known to the generator.
	Method string
	// Route is the HTTP route pattern when available.
	Route string
	// HTTPMethod is the HTTP verb when the underlying transport is HTTP.
	HTTPMethod string
	// StatusCode is the final HTTP status code when known.
	StatusCode int
	// Duration is the wall-clock duration between request start and the
	// emission of the terminal event.
	Duration time.Duration
	// BytesWritten counts bytes flushed to the response writer when known.
	BytesWritten int64

	// JSONRPCMethod is the JSON-RPC method name decoded from the request
	// envelope.
	JSONRPCMethod string
	// JSONRPCID is the decoded JSON-RPC request id rendered as a string.
	JSONRPCID string
	// BatchCount is the number of envelopes in a JSON-RPC batch request,
	// zero for non-batch requests.
	BatchCount int
	// Notification reports whether the JSON-RPC request had no id and is a
	// notification.
	Notification bool

	// SessionID is the transport session identifier when known. For MCP
	// this is the Mcp-Session-Id; for WebSocket and SSE streams it is the
	// stream session id.
	SessionID string
	// RequestID is the generator-assigned request correlation id when
	// known.
	RequestID string
	// TraceID is the propagated trace id when known.
	TraceID string

	// ErrorClass categorizes the error using a short, redaction-safe
	// label.
	ErrorClass string
	// SafeMessage is a redaction-safe error message. Generated code never
	// places raw error messages from untrusted sources in this field.
	SafeMessage string

	// Attrs carries additional redaction-safe key/value attributes. Keys
	// and values are intended to be safe for log enrichment and metric
	// labeling; raw payloads must not be placed here.
	Attrs map[string]string
}

Event is the payload delivered to an Observer. All fields are optional; generators populate the fields that are well defined at the point where the event is emitted and leave the rest zero. Generated code never populates Event with raw bodies, JSON-RPC params, MCP tool arguments, credentials, or result payloads.

type EventKind

type EventKind uint8

EventKind identifies the lifecycle position a transport event reports.

const (
	// EventKindRequestStart marks the beginning of a request lifecycle.
	EventKindRequestStart EventKind = iota + 1
	// EventKindRequestFinish marks a successful terminal request event.
	EventKindRequestFinish
	// EventKindRequestFailure marks a failed terminal request event.
	EventKindRequestFailure
	// EventKindStreamOpen marks the opening of a streaming response.
	EventKindStreamOpen
	// EventKindStreamClose marks the closing of a streaming response.
	EventKindStreamClose
	// EventKindStreamFailure marks a failed stream write or flush.
	EventKindStreamFailure
)

type Observer

type Observer interface {
	// ObserveEvent is invoked with a context that is alive for the
	// request producing the event. Implementations must not retain ctx
	// past the call, and must treat e as read-only.
	ObserveEvent(ctx context.Context, e Event)
}

Observer receives transport lifecycle events emitted by generated code. Observer implementations must be safe for concurrent use.

func ObserverFromContext

func ObserverFromContext(ctx context.Context) Observer

ObserverFromContext returns the Observer attached to ctx, or nil if none has been injected.

type ObserverFunc

type ObserverFunc func(ctx context.Context, e Event)

ObserverFunc adapts an ordinary function into an Observer.

func (ObserverFunc) ObserveEvent

func (f ObserverFunc) ObserveEvent(ctx context.Context, e Event)

ObserveEvent calls f with the supplied event. A nil ObserverFunc is a no-op.

type Reason

type Reason string

Reason is a stable, low-cardinality classification of why an event was emitted. Reason values are part of the package's public contract and are safe to use as metric labels.

const (
	// ReasonOK marks a terminal event with no error classification.
	ReasonOK Reason = "ok"
	// ReasonRequestDecodeFailed marks failure to decode the HTTP request body
	// or query parameters into the generated request type.
	ReasonRequestDecodeFailed Reason = "request_decode_failed"
	// ReasonInvalidJSONRPCEnvelope marks failure to parse the JSON-RPC
	// envelope.
	ReasonInvalidJSONRPCEnvelope Reason = "invalid_jsonrpc_envelope"
	// ReasonInvalidJSONRPCBatch marks rejection of a malformed JSON-RPC batch
	// request.
	ReasonInvalidJSONRPCBatch Reason = "invalid_jsonrpc_batch"
	// ReasonInvalidJSONRPCMethod marks rejection of a JSON-RPC envelope whose
	// method field is missing or malformed.
	ReasonInvalidJSONRPCMethod Reason = "invalid_jsonrpc_method"
	// ReasonInvalidJSONRPCParams marks failure to decode JSON-RPC params into
	// the generated parameter type.
	ReasonInvalidJSONRPCParams Reason = "invalid_jsonrpc_params"
	// ReasonUnsupportedMethod marks rejection of a JSON-RPC method the
	// generated handler does not implement.
	ReasonUnsupportedMethod Reason = "unsupported_method"
	// ReasonMissingCredentials marks rejection because required credentials
	// were not presented.
	ReasonMissingCredentials Reason = "missing_credentials"
	// ReasonInvalidCredentials marks rejection because credentials were
	// presented but could not be validated.
	ReasonInvalidCredentials Reason = "invalid_credentials"
	// ReasonPermissionRejected marks rejection because authorization checks
	// denied the principal.
	ReasonPermissionRejected Reason = "permission_rejected"
	// ReasonPrincipalMismatch marks rejection because the principal did not
	// match the session owner.
	ReasonPrincipalMismatch Reason = "principal_mismatch"
	// ReasonHandlerError marks a handler returning a non-nil error.
	ReasonHandlerError Reason = "handler_error"
	// ReasonPanic marks a recovered panic in the generated handler.
	ReasonPanic Reason = "panic"
	// ReasonResponseWriteFailed marks a failure while writing the HTTP
	// response body.
	ReasonResponseWriteFailed Reason = "response_write_failed"
	// ReasonStreamWriteFailed marks a failed write to a streaming response.
	ReasonStreamWriteFailed Reason = "stream_write_failed"
	// ReasonStreamFlushFailed marks a failed flush of a streaming response.
	ReasonStreamFlushFailed Reason = "stream_flush_failed"
	// ReasonMCPSessionMissing marks an MCP events-stream request that omitted
	// the Mcp-Session-Id header.
	ReasonMCPSessionMissing Reason = "mcp_session_missing"
	// ReasonMCPSessionNotFound marks an MCP events-stream request whose
	// session id is unknown to the server.
	ReasonMCPSessionNotFound Reason = "mcp_session_not_found"
	// ReasonMCPSessionPrincipalMismatch marks an MCP events-stream request
	// whose authenticated principal differs from the session owner.
	ReasonMCPSessionPrincipalMismatch Reason = "mcp_session_principal_mismatch"
	// ReasonMCPEventsStreamWriteFailed marks a failure writing an MCP
	// notification frame to the events-stream response.
	ReasonMCPEventsStreamWriteFailed Reason = "mcp_events_stream_write_failed"
)

type RequestObserver

type RequestObserver struct {
	// contains filtered or unexported fields
}

RequestObserver tracks a single transport request lifecycle and emits Start, terminal, and panic events through an Observer attached to the request context. Generated HTTP, JSON-RPC, and MCP transports use the observer to consolidate per-request boilerplate into a small surface:

obs, w := transport.BeginHTTPRequest(ctx, w, "Service", "Method", r)
defer obs.End()
// ...
obs.Fail(transport.ReasonRequestDecodeFailed) // before an error return

End must be deferred so panics propagate through it as EventKindRequestFailure with ReasonPanic and are re-panicked; if End is reached without a recorded failure, EventKindRequestFinish is emitted.

func BeginHTTPRequest

func BeginHTTPRequest(ctx context.Context, w http.ResponseWriter, service, method string, r *http.Request) (*RequestObserver, http.ResponseWriter)

BeginHTTPRequest wraps w with a CaptureResponseWriter, records the request start time, emits an EventKindRequestStart event, and returns the observer together with the wrapped writer. Generated HTTP handlers reassign their response writer parameter to the returned writer so all subsequent writes are captured.

func BeginJSONRPCRequest

func BeginJSONRPCRequest(ctx context.Context, w http.ResponseWriter, service string, r *http.Request) (*RequestObserver, http.ResponseWriter)

BeginJSONRPCRequest wraps w with a CaptureResponseWriter and starts a request lifecycle classified as TransportJSONRPC. service identifies the Loom service; the JSON-RPC envelope's method, id, batch count, and notification flag are filled in later through RequestObserver.SetJSONRPC once the envelope has been decoded. Pre- decode rejection events therefore leave the JSON-RPC fields empty as the plan requires.

func BeginRequest

func BeginRequest(ctx context.Context, kind TransportKind, service, method string) *RequestObserver

BeginRequest starts a transport request lifecycle without an HTTP response writer. JSON-RPC and MCP generators that need to emit start events outside the HTTP capture pipeline use this entry point. End and Fail behave identically to the HTTP variant; StatusCode and BytesWritten remain zero unless updated through ApplyHTTPStatus.

func RequestObserverFromContext

func RequestObserverFromContext(ctx context.Context) *RequestObserver

RequestObserverFromContext returns the RequestObserver injected by WithRequestObserver, or nil if none is attached. Generated code uses nil-safe methods on the returned observer so a missing observer remains a cheap no-op.

func (*RequestObserver) EmitStreamClose

func (o *RequestObserver) EmitStreamClose()

EmitStreamClose reports the natural close of a streaming response.

func (*RequestObserver) EmitStreamFailure

func (o *RequestObserver) EmitStreamFailure(reason Reason)

EmitStreamFailure reports a failed write or flush during streaming. The terminal request event is unaffected; callers that want the failure to also classify the request must call Fail with the same reason.

func (*RequestObserver) EmitStreamOpen

func (o *RequestObserver) EmitStreamOpen()

EmitStreamOpen reports the opening of a streaming response within the current request. Generated SSE and WebSocket servers call EmitStreamOpen once the stream headers have been negotiated.

func (*RequestObserver) End

func (o *RequestObserver) End()

End emits the terminal request event. End must be deferred so a recovered panic emits EventKindRequestFailure with ReasonPanic before re-panicking. End is a no-op on a nil observer or after the first call, allowing it to be safely composed with explicit failure emission.

func (*RequestObserver) Fail

func (o *RequestObserver) Fail(reason Reason)

Fail records reason as the terminal classification for the request. The first call wins so the originating failure is not overwritten by downstream cleanup classifications. End will emit EventKindRequestFailure with the recorded reason.

func (*RequestObserver) FailWithMessage

func (o *RequestObserver) FailWithMessage(reason Reason, safeMessage string)

FailWithMessage records reason and a redaction-safe message for the terminal event. The message is reported through Event.SafeMessage and must not contain raw user input, credentials, or payload data.

func (*RequestObserver) SetJSONRPC

func (o *RequestObserver) SetJSONRPC(method, id string, batchCount int, notification bool)

SetJSONRPC populates JSON-RPC-specific fields on the terminal event. It is safe to call once the JSON-RPC envelope has been decoded. When the observer's operation name was not provided at Begin time, method is also used as the observer's Method so the terminal event reports the JSON-RPC method as the operation name.

func (*RequestObserver) SetSession

func (o *RequestObserver) SetSession(sessionID string)

SetSession records the transport session identifier carried by the request, when known.

type TransportKind

type TransportKind string

TransportKind identifies the generated transport that produced an event.

const (
	// TransportHTTP identifies generic HTTP-generated transport events.
	TransportHTTP TransportKind = "http"
	// TransportJSONRPC identifies JSON-RPC-generated transport events.
	TransportJSONRPC TransportKind = "jsonrpc"
	// TransportMCP identifies Loom-MCP-generated transport events.
	TransportMCP TransportKind = "mcp"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL