stream

package
v1.1.0-beta.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package stream manages HTTP/2 stream lifecycle, state transitions, flow control, and frame processing.

Index

Constants

This section is empty.

Variables

View Source
var ErrHijackNotSupported = errors.New("celeris: hijack not supported by this engine")

ErrHijackNotSupported is returned when the engine does not support connection takeover.

Functions

func ParsePriorityFromHeaders

func ParsePriorityFromHeaders(f *http2.HeadersFrame) (dependency uint32, weight uint8, exclusive bool, hasPriority bool)

ParsePriorityFromHeaders extracts priority information from a HEADERS frame.

func ResetH1Stream added in v1.1.0

func ResetH1Stream(s *Stream)

ResetH1Stream performs a lightweight per-request reset for H1 stream reuse. Unlike Release(), it does NOT return the stream to the pool. It clears only the fields that change between requests, retaining header slice capacity and the context reference. Called between requests on keep-alive connections to avoid sync.Pool Get/Put overhead.

Types

type ContinuationState

type ContinuationState struct {
	// contains filtered or unexported fields
}

ContinuationState tracks the state of CONTINUATION frames.

type FrameWriter

type FrameWriter interface {
	WriteSettings(settings ...http2.Setting) error
	WriteSettingsAck() error
	WriteHeaders(streamID uint32, endStream bool, headerBlock []byte, maxFrameSize uint32) error
	WriteData(streamID uint32, endStream bool, data []byte) error
	WriteWindowUpdate(streamID uint32, increment uint32) error
	WriteRSTStream(streamID uint32, code http2.ErrCode) error
	WriteGoAway(lastStreamID uint32, code http2.ErrCode, debugData []byte) error
	WritePing(ack bool, data [8]byte) error
	WritePushPromise(streamID uint32, promiseID uint32, endHeaders bool, headerBlock []byte) error
}

FrameWriter is an interface for writing HTTP/2 frames.

type Handler

type Handler interface {
	HandleStream(ctx context.Context, stream *Stream) error
}

Handler interface for processing streams.

type HandlerFunc

type HandlerFunc func(ctx context.Context, stream *Stream) error

HandlerFunc is an adapter to use functions as stream handlers.

func (HandlerFunc) HandleStream

func (f HandlerFunc) HandleStream(ctx context.Context, stream *Stream) error

HandleStream calls the handler function.

type Hijacker added in v1.1.0

type Hijacker interface {
	Hijack(stream *Stream) (net.Conn, error)
}

Hijacker is implemented by ResponseWriters that support connection takeover. This enables protocols like WebSocket that require raw TCP access.

type Manager

type Manager struct {
	RemoteAddr string
	// contains filtered or unexported fields
}

Manager manages multiple HTTP/2 streams.

func NewManager

func NewManager() *Manager

NewManager creates a new stream manager.

func (*Manager) AccumulateWindowUpdate

func (m *Manager) AccumulateWindowUpdate(streamID uint32, increment uint32)

AccumulateWindowUpdate accumulates window credits without sending immediately.

func (*Manager) ConsumeSendWindow

func (m *Manager) ConsumeSendWindow(streamID uint32, n int32)

ConsumeSendWindow decrements connection and stream windows after sending DATA.

func (*Manager) ConsumeSendWindowFast

func (m *Manager) ConsumeSendWindowFast(s *Stream, n int32)

ConsumeSendWindowFast decrements connection and stream windows after sending DATA. Avoids Manager lock.

func (*Manager) CountActiveStreams

func (m *Manager) CountActiveStreams() int

CountActiveStreams returns number of streams considered active for concurrency limits.

func (*Manager) CreateStream

func (m *Manager) CreateStream(id uint32) *Stream

CreateStream creates a new stream with the given ID.

func (*Manager) DeleteStream

func (m *Manager) DeleteStream(id uint32)

DeleteStream removes a stream and releases its pooled buffers.

func (*Manager) FlushWindowUpdates

func (m *Manager) FlushWindowUpdates(writer FrameWriter, force bool) bool

FlushWindowUpdates sends accumulated WINDOW_UPDATE frames if threshold is met. Returns true if updates were sent.

func (*Manager) GetConnectionWindow

func (m *Manager) GetConnectionWindow() int32

GetConnectionWindow atomically returns the current connection window size.

func (*Manager) GetLastClientStreamID

func (m *Manager) GetLastClientStreamID() uint32

GetLastClientStreamID returns the highest client-initiated stream ID observed.

func (*Manager) GetLastStreamID

func (m *Manager) GetLastStreamID() uint32

GetLastStreamID returns the highest stream ID.

func (*Manager) GetMaxConcurrentStreams

func (m *Manager) GetMaxConcurrentStreams() uint32

GetMaxConcurrentStreams returns the currently configured max concurrent streams value.

func (*Manager) GetOrCreateStream

func (m *Manager) GetOrCreateStream(id uint32) *Stream

GetOrCreateStream gets an existing stream or creates a new one.

func (*Manager) GetSendWindowsAndMaxFrame

func (m *Manager) GetSendWindowsAndMaxFrame(streamID uint32) (connWindow int32, streamWindow int32, maxFrame uint32)

GetSendWindowsAndMaxFrame returns current connection window, stream window, and max frame size.

func (*Manager) GetSendWindowsAndMaxFrameFast

func (m *Manager) GetSendWindowsAndMaxFrameFast(s *Stream) (connWindow int32, streamWindow int32, maxFrame uint32)

GetSendWindowsAndMaxFrameFast returns current connection window, stream window, and max frame size. It avoids Manager lock by using atomics and direct stream access.

func (*Manager) GetStream

func (m *Manager) GetStream(id uint32) (*Stream, bool)

GetStream gets a stream by ID.

func (*Manager) MarkStreamBuffered

func (m *Manager) MarkStreamBuffered(id uint32)

MarkStreamBuffered adds a stream to the set of streams with buffered data.

func (*Manager) MarkStreamEmpty

func (m *Manager) MarkStreamEmpty(id uint32)

MarkStreamEmpty removes a stream from the set of streams with buffered data.

func (*Manager) SetMaxConcurrentStreams

func (m *Manager) SetMaxConcurrentStreams(n uint32)

SetMaxConcurrentStreams sets the maximum number of concurrent peer-initiated streams allowed.

func (*Manager) StreamCount

func (m *Manager) StreamCount() int

StreamCount returns the number of streams in the manager.

func (*Manager) TryOpenStream

func (m *Manager) TryOpenStream(id uint32) (*Stream, bool)

TryOpenStream attempts to atomically open a new stream and mark it active. Returns the opened stream and true on success; returns false if the MAX_CONCURRENT_STREAMS limit would be exceeded.

func (*Manager) UpdateConnectionWindow

func (m *Manager) UpdateConnectionWindow(delta int32)

UpdateConnectionWindow atomically updates the connection-level flow control window.

type Phase

type Phase int

Phase represents the response phase for a stream to ensure proper write ordering.

const (
	PhaseInit Phase = iota
	PhaseHeadersSent
	PhaseBody
)

Response phase constants for ensuring correct write ordering.

func (Phase) String

func (p Phase) String() string

type Priority

type Priority struct {
	StreamDependency uint32
	Weight           uint8
	Exclusive        bool
}

Priority defines stream dependency and weight for HTTP/2 prioritization.

type PriorityTree

type PriorityTree struct {
	// contains filtered or unexported fields
}

PriorityTree manages stream priorities and dependencies.

func NewPriorityTree

func NewPriorityTree() *PriorityTree

NewPriorityTree creates a new priority tree.

func (*PriorityTree) CalculateStreamPriority

func (pt *PriorityTree) CalculateStreamPriority(streamID uint32) int

CalculateStreamPriority computes a priority score for stream scheduling.

func (*PriorityTree) GetChildren

func (pt *PriorityTree) GetChildren(streamID uint32) []uint32

GetChildren returns the streams that depend on the given stream.

func (*PriorityTree) GetPriority

func (pt *PriorityTree) GetPriority(streamID uint32) (*Priority, bool)

GetPriority retrieves priority information for a stream.

func (*PriorityTree) GetWeight

func (pt *PriorityTree) GetWeight(streamID uint32) uint8

GetWeight returns the weight of a stream, defaulting to 16.

func (*PriorityTree) RemoveStream

func (pt *PriorityTree) RemoveStream(streamID uint32)

RemoveStream removes a stream and reorganizes its dependencies.

func (*PriorityTree) SetPriority

func (pt *PriorityTree) SetPriority(streamID uint32, priority Priority)

SetPriority assigns or updates priority information for a stream.

func (*PriorityTree) UpdateFromFrame

func (pt *PriorityTree) UpdateFromFrame(streamID uint32, dependency uint32, weight uint8, exclusive bool)

UpdateFromFrame updates stream priority from frame parameters.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor processes incoming HTTP/2 frames and manages streams.

func NewProcessor

func NewProcessor(handler Handler, writer FrameWriter, conn ResponseWriter) *Processor

NewProcessor creates a new stream processor.

func (*Processor) FlushBufferedData

func (p *Processor) FlushBufferedData(_ uint32)

FlushBufferedData exposes flushBufferedData for external callers.

func (*Processor) GetConnection

func (p *Processor) GetConnection() ResponseWriter

GetConnection returns the permanent connection writer.

func (*Processor) GetCurrentConn

func (p *Processor) GetCurrentConn() ResponseWriter

GetCurrentConn returns the current connection.

func (*Processor) GetExpectedContinuationStreamID

func (p *Processor) GetExpectedContinuationStreamID() (uint32, bool)

GetExpectedContinuationStreamID returns the stream ID we're expecting CONTINUATION frames on.

func (*Processor) GetManager

func (p *Processor) GetManager() *Manager

GetManager returns the stream manager.

func (*Processor) GetStreamPriority

func (p *Processor) GetStreamPriority(streamID uint32) int

GetStreamPriority returns the priority score for a stream.

func (*Processor) IsExpectingContinuation

func (p *Processor) IsExpectingContinuation() bool

IsExpectingContinuation reports whether the processor is in the middle of receiving a header block.

func (*Processor) ProcessFrame

func (p *Processor) ProcessFrame(ctx context.Context, frame http2.Frame) error

ProcessFrame processes an incoming HTTP/2 frame.

func (*Processor) ProcessFrameWithConn

func (p *Processor) ProcessFrameWithConn(ctx context.Context, frame http2.Frame, conn ResponseWriter) error

ProcessFrameWithConn processes a frame with a connection context.

func (*Processor) SendGoAway

func (p *Processor) SendGoAway(lastStreamID uint32, code http2.ErrCode, debugData []byte) error

SendGoAway sends a GOAWAY frame.

type ResponseWriter

type ResponseWriter interface {
	WriteResponse(stream *Stream, status int, headers [][2]string, body []byte) error
	SendGoAway(lastStreamID uint32, code http2.ErrCode, debug []byte) error
	MarkStreamClosed(streamID uint32)
	IsStreamClosed(streamID uint32) bool
	WriteRSTStreamPriority(streamID uint32, code http2.ErrCode) error
	CloseConn() error
}

ResponseWriter is an interface for writing responses.

type State

type State int

State represents the state of an HTTP/2 stream as defined in RFC 7540.

const (
	StateIdle State = iota
	StateOpen
	StateHalfClosedLocal
	StateHalfClosedRemote
	StateClosed
)

HTTP/2 stream state constants as defined in RFC 7540 Section 5.1.

func (State) String

func (s State) String() string

type Stream

type Stream struct {
	ID    uint32
	State State

	Headers           [][2]string
	Trailers          [][2]string
	Data              *bytes.Buffer
	OutboundBuffer    *bytes.Buffer
	OutboundEndStream bool
	HeadersSent       bool
	EndStream         bool
	IsStreaming       bool
	HandlerStarted    bool
	DeferResponse     bool
	WindowSize        int32
	ReceivedWindowUpd chan int32 // buffered; consumed by engine layer during DATA writes

	ResponseWriter         ResponseWriter
	RemoteAddr             string
	ReceivedDataLen        int
	ReceivedInitialHeaders bool
	ClosedByReset          bool
	IsHEAD                 bool

	CachedCtx any // per-connection cached context (avoids pool Get/Put per request)
	// contains filtered or unexported fields
}

Stream represents an HTTP/2 stream with its associated state and data.

func NewH1Stream added in v1.1.0

func NewH1Stream(id uint32) *Stream

NewH1Stream creates a lightweight stream optimized for H1 requests. It uses context.Background() directly (no WithCancel), leaves Data and OutboundBuffer nil (lazy-allocated only when body data arrives), and skips H2-specific window initialization. This eliminates 2+ allocs per H1 request.

func NewStream

func NewStream(id uint32) *Stream

NewStream creates a new stream with full H2 initialization (context, buffers). Uses context.Background() directly (no WithCancel) to avoid allocation; cancel semantics are lazily created via EnsureCancel only when needed.

func (*Stream) AddData

func (s *Stream) AddData(data []byte) error

AddData adds data to the stream buffer.

func (*Stream) AddHeader

func (s *Stream) AddHeader(name, value string)

AddHeader adds a header to the stream.

func (*Stream) AddHeadersBatch added in v1.1.0

func (s *Stream) AddHeadersBatch(headers [][2]string)

AddHeadersBatch adds multiple headers under a single lock acquisition.

func (*Stream) BufferOutbound added in v0.2.0

func (s *Stream) BufferOutbound(data []byte, endStream bool)

BufferOutbound stores data that couldn't be sent due to flow control.

func (*Stream) Cancel

func (s *Stream) Cancel()

Cancel cancels the stream's context.

func (*Stream) Context

func (s *Stream) Context() context.Context

Context returns the stream's context.

func (*Stream) DeductWindow added in v0.2.0

func (s *Stream) DeductWindow(n int32)

DeductWindow subtracts n from the flow control window.

func (*Stream) EnsureCancel

func (s *Stream) EnsureCancel()

EnsureCancel lazily creates a cancellable context for this stream. Called when the stream needs cancel semantics (e.g., RST_STREAM).

func (*Stream) ForEachHeader

func (s *Stream) ForEachHeader(fn func(name, value string))

ForEachHeader calls fn for each header under a read lock.

func (*Stream) GetBuf added in v1.1.0

func (s *Stream) GetBuf() *bytes.Buffer

GetBuf lazily allocates the Data buffer and returns it.

func (*Stream) GetData

func (s *Stream) GetData() []byte

GetData returns the buffered data.

func (*Stream) GetHeaders

func (s *Stream) GetHeaders() [][2]string

GetHeaders returns the headers. For single-threaded H1 streams, returns the slice directly (no lock, no copy). For H2 streams, returns a safe copy under lock.

func (*Stream) GetPhase

func (s *Stream) GetPhase() Phase

GetPhase returns the current stream phase.

func (*Stream) GetState

func (s *Stream) GetState() State

GetState returns the current stream state.

func (*Stream) GetWindowSize

func (s *Stream) GetWindowSize() int32

GetWindowSize returns the current flow control window size.

func (*Stream) HeadersLen

func (s *Stream) HeadersLen() int

HeadersLen returns the number of headers on the stream.

func (*Stream) MarkBuffered

func (s *Stream) MarkBuffered()

MarkBuffered marks the stream as having buffered data.

func (*Stream) MarkEmpty

func (s *Stream) MarkEmpty()

MarkEmpty marks the stream as having no buffered data.

func (*Stream) Release added in v0.3.2

func (s *Stream) Release()

Release returns pooled buffers, cancels the context, and returns the stream to its pool. Safe to call multiple times; subsequent calls are no-ops.

func (*Stream) SetPhase

func (s *Stream) SetPhase(p Phase)

SetPhase sets the stream phase.

func (*Stream) SetState

func (s *Stream) SetState(state State)

SetState sets the stream state.

func (*Stream) WriteLock

func (s *Stream) WriteLock()

WriteLock acquires the per-stream write lock.

func (*Stream) WriteUnlock

func (s *Stream) WriteUnlock()

WriteUnlock releases the per-stream write lock.

type Streamer added in v1.1.0

type Streamer interface {
	// WriteHeader sends the status line and headers. Must be called once before Write.
	WriteHeader(stream *Stream, status int, headers [][2]string) error
	// Write sends a chunk of the response body. May be called multiple times.
	Write(stream *Stream, data []byte) error
	// Flush ensures buffered data is sent to the network.
	Flush(stream *Stream) error
	// Close signals end of the response body.
	Close(stream *Stream) error
}

Streamer supports incremental response writing. Engines that support streaming implement this interface on their ResponseWriter. The existing WriteResponse path is preserved for non-streaming responses (hot path).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL