Documentation
¶
Overview ¶
Package stream manages HTTP/2 stream lifecycle, state transitions, flow control, and frame processing.
Index ¶
- Variables
- func ParsePriorityFromHeaders(f *http2.HeadersFrame) (dependency uint32, weight uint8, exclusive bool, hasPriority bool)
- func ResetH1Stream(s *Stream)
- type ContinuationState
- type FrameWriter
- type Handler
- type HandlerFunc
- type Hijacker
- type Manager
- func (m *Manager) AccumulateWindowUpdate(streamID uint32, increment uint32)
- func (m *Manager) ConsumeSendWindow(streamID uint32, n int32)
- func (m *Manager) ConsumeSendWindowFast(s *Stream, n int32)
- func (m *Manager) CountActiveStreams() int
- func (m *Manager) CreateStream(id uint32) *Stream
- func (m *Manager) DeleteStream(id uint32)
- func (m *Manager) FlushWindowUpdates(writer FrameWriter, force bool) bool
- func (m *Manager) GetConnectionWindow() int32
- func (m *Manager) GetLastClientStreamID() uint32
- func (m *Manager) GetLastStreamID() uint32
- func (m *Manager) GetMaxConcurrentStreams() uint32
- func (m *Manager) GetOrCreateStream(id uint32) *Stream
- func (m *Manager) GetSendWindowsAndMaxFrame(streamID uint32) (connWindow int32, streamWindow int32, maxFrame uint32)
- func (m *Manager) GetSendWindowsAndMaxFrameFast(s *Stream) (connWindow int32, streamWindow int32, maxFrame uint32)
- func (m *Manager) GetStream(id uint32) (*Stream, bool)
- func (m *Manager) MarkStreamBuffered(id uint32)
- func (m *Manager) MarkStreamEmpty(id uint32)
- func (m *Manager) SetMaxConcurrentStreams(n uint32)
- func (m *Manager) StreamCount() int
- func (m *Manager) TryOpenStream(id uint32) (*Stream, bool)
- func (m *Manager) UpdateConnectionWindow(delta int32)
- type Phase
- type Priority
- type PriorityTree
- func (pt *PriorityTree) CalculateStreamPriority(streamID uint32) int
- func (pt *PriorityTree) GetChildren(streamID uint32) []uint32
- func (pt *PriorityTree) GetPriority(streamID uint32) (*Priority, bool)
- func (pt *PriorityTree) GetWeight(streamID uint32) uint8
- func (pt *PriorityTree) RemoveStream(streamID uint32)
- func (pt *PriorityTree) SetPriority(streamID uint32, priority Priority)
- func (pt *PriorityTree) UpdateFromFrame(streamID uint32, dependency uint32, weight uint8, exclusive bool)
- type Processor
- func (p *Processor) FlushBufferedData(_ uint32)
- func (p *Processor) GetConnection() ResponseWriter
- func (p *Processor) GetCurrentConn() ResponseWriter
- func (p *Processor) GetExpectedContinuationStreamID() (uint32, bool)
- func (p *Processor) GetManager() *Manager
- func (p *Processor) GetStreamPriority(streamID uint32) int
- func (p *Processor) IsExpectingContinuation() bool
- func (p *Processor) ProcessFrame(ctx context.Context, frame http2.Frame) error
- func (p *Processor) ProcessFrameWithConn(ctx context.Context, frame http2.Frame, conn ResponseWriter) error
- func (p *Processor) SendGoAway(lastStreamID uint32, code http2.ErrCode, debugData []byte) error
- type ResponseWriter
- type State
- type Stream
- func (s *Stream) AddData(data []byte) error
- func (s *Stream) AddHeader(name, value string)
- func (s *Stream) AddHeadersBatch(headers [][2]string)
- func (s *Stream) BufferOutbound(data []byte, endStream bool)
- func (s *Stream) Cancel()
- func (s *Stream) Context() context.Context
- func (s *Stream) DeductWindow(n int32)
- func (s *Stream) EnsureCancel()
- func (s *Stream) ForEachHeader(fn func(name, value string))
- func (s *Stream) GetBuf() *bytes.Buffer
- func (s *Stream) GetData() []byte
- func (s *Stream) GetHeaders() [][2]string
- func (s *Stream) GetPhase() Phase
- func (s *Stream) GetState() State
- func (s *Stream) GetWindowSize() int32
- func (s *Stream) HeadersLen() int
- func (s *Stream) MarkBuffered()
- func (s *Stream) MarkEmpty()
- func (s *Stream) Release()
- func (s *Stream) SetPhase(p Phase)
- func (s *Stream) SetState(state State)
- func (s *Stream) WriteLock()
- func (s *Stream) WriteUnlock()
- type Streamer
Constants ¶
This section is empty.
Variables ¶
var ErrHijackNotSupported = errors.New("celeris: hijack not supported by this engine")
ErrHijackNotSupported is returned when the engine does not support connection takeover.
Functions ¶
func ParsePriorityFromHeaders ¶
func ParsePriorityFromHeaders(f *http2.HeadersFrame) (dependency uint32, weight uint8, exclusive bool, hasPriority bool)
ParsePriorityFromHeaders extracts priority information from a HEADERS frame.
func ResetH1Stream ¶ added in v1.1.0
func ResetH1Stream(s *Stream)
ResetH1Stream performs a lightweight per-request reset for H1 stream reuse. Unlike Release(), it does NOT return the stream to the pool. It clears only the fields that change between requests, retaining header slice capacity and the context reference. Called between requests on keep-alive connections to avoid sync.Pool Get/Put overhead.
Types ¶
type ContinuationState ¶
type ContinuationState struct {
// contains filtered or unexported fields
}
ContinuationState tracks the state of CONTINUATION frames.
type FrameWriter ¶
type FrameWriter interface {
WriteSettings(settings ...http2.Setting) error
WriteSettingsAck() error
WriteHeaders(streamID uint32, endStream bool, headerBlock []byte, maxFrameSize uint32) error
WriteData(streamID uint32, endStream bool, data []byte) error
WriteWindowUpdate(streamID uint32, increment uint32) error
WriteRSTStream(streamID uint32, code http2.ErrCode) error
WriteGoAway(lastStreamID uint32, code http2.ErrCode, debugData []byte) error
WritePing(ack bool, data [8]byte) error
WritePushPromise(streamID uint32, promiseID uint32, endHeaders bool, headerBlock []byte) error
}
FrameWriter is an interface for writing HTTP/2 frames.
type HandlerFunc ¶
HandlerFunc is an adapter to use functions as stream handlers.
func (HandlerFunc) HandleStream ¶
func (f HandlerFunc) HandleStream(ctx context.Context, stream *Stream) error
HandleStream calls the handler function.
type Hijacker ¶ added in v1.1.0
Hijacker is implemented by ResponseWriters that support connection takeover. This enables protocols like WebSocket that require raw TCP access.
type Manager ¶
type Manager struct {
RemoteAddr string
// contains filtered or unexported fields
}
Manager manages multiple HTTP/2 streams.
func (*Manager) AccumulateWindowUpdate ¶
AccumulateWindowUpdate accumulates window credits without sending immediately.
func (*Manager) ConsumeSendWindow ¶
ConsumeSendWindow decrements connection and stream windows after sending DATA.
func (*Manager) ConsumeSendWindowFast ¶
ConsumeSendWindowFast decrements connection and stream windows after sending DATA. Avoids Manager lock.
func (*Manager) CountActiveStreams ¶
CountActiveStreams returns number of streams considered active for concurrency limits.
func (*Manager) CreateStream ¶
CreateStream creates a new stream with the given ID.
func (*Manager) DeleteStream ¶
DeleteStream removes a stream and releases its pooled buffers.
func (*Manager) FlushWindowUpdates ¶
func (m *Manager) FlushWindowUpdates(writer FrameWriter, force bool) bool
FlushWindowUpdates sends accumulated WINDOW_UPDATE frames if threshold is met. Returns true if updates were sent.
func (*Manager) GetConnectionWindow ¶
GetConnectionWindow atomically returns the current connection window size.
func (*Manager) GetLastClientStreamID ¶
GetLastClientStreamID returns the highest client-initiated stream ID observed.
func (*Manager) GetLastStreamID ¶
GetLastStreamID returns the highest stream ID.
func (*Manager) GetMaxConcurrentStreams ¶
GetMaxConcurrentStreams returns the currently configured max concurrent streams value.
func (*Manager) GetOrCreateStream ¶
GetOrCreateStream gets an existing stream or creates a new one.
func (*Manager) GetSendWindowsAndMaxFrame ¶
func (m *Manager) GetSendWindowsAndMaxFrame(streamID uint32) (connWindow int32, streamWindow int32, maxFrame uint32)
GetSendWindowsAndMaxFrame returns current connection window, stream window, and max frame size.
func (*Manager) GetSendWindowsAndMaxFrameFast ¶
func (m *Manager) GetSendWindowsAndMaxFrameFast(s *Stream) (connWindow int32, streamWindow int32, maxFrame uint32)
GetSendWindowsAndMaxFrameFast returns current connection window, stream window, and max frame size. It avoids Manager lock by using atomics and direct stream access.
func (*Manager) MarkStreamBuffered ¶
MarkStreamBuffered adds a stream to the set of streams with buffered data.
func (*Manager) MarkStreamEmpty ¶
MarkStreamEmpty removes a stream from the set of streams with buffered data.
func (*Manager) SetMaxConcurrentStreams ¶
SetMaxConcurrentStreams sets the maximum number of concurrent peer-initiated streams allowed.
func (*Manager) StreamCount ¶
StreamCount returns the number of streams in the manager.
func (*Manager) TryOpenStream ¶
TryOpenStream attempts to atomically open a new stream and mark it active. Returns the opened stream and true on success; returns false if the MAX_CONCURRENT_STREAMS limit would be exceeded.
func (*Manager) UpdateConnectionWindow ¶
UpdateConnectionWindow atomically updates the connection-level flow control window.
type Phase ¶
type Phase int
Phase represents the response phase for a stream to ensure proper write ordering.
Response phase constants for ensuring correct write ordering.
type PriorityTree ¶
type PriorityTree struct {
// contains filtered or unexported fields
}
PriorityTree manages stream priorities and dependencies.
func NewPriorityTree ¶
func NewPriorityTree() *PriorityTree
NewPriorityTree creates a new priority tree.
func (*PriorityTree) CalculateStreamPriority ¶
func (pt *PriorityTree) CalculateStreamPriority(streamID uint32) int
CalculateStreamPriority computes a priority score for stream scheduling.
func (*PriorityTree) GetChildren ¶
func (pt *PriorityTree) GetChildren(streamID uint32) []uint32
GetChildren returns the streams that depend on the given stream.
func (*PriorityTree) GetPriority ¶
func (pt *PriorityTree) GetPriority(streamID uint32) (*Priority, bool)
GetPriority retrieves priority information for a stream.
func (*PriorityTree) GetWeight ¶
func (pt *PriorityTree) GetWeight(streamID uint32) uint8
GetWeight returns the weight of a stream, defaulting to 16.
func (*PriorityTree) RemoveStream ¶
func (pt *PriorityTree) RemoveStream(streamID uint32)
RemoveStream removes a stream and reorganizes its dependencies.
func (*PriorityTree) SetPriority ¶
func (pt *PriorityTree) SetPriority(streamID uint32, priority Priority)
SetPriority assigns or updates priority information for a stream.
func (*PriorityTree) UpdateFromFrame ¶
func (pt *PriorityTree) UpdateFromFrame(streamID uint32, dependency uint32, weight uint8, exclusive bool)
UpdateFromFrame updates stream priority from frame parameters.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor processes incoming HTTP/2 frames and manages streams.
func NewProcessor ¶
func NewProcessor(handler Handler, writer FrameWriter, conn ResponseWriter) *Processor
NewProcessor creates a new stream processor.
func (*Processor) FlushBufferedData ¶
FlushBufferedData exposes flushBufferedData for external callers.
func (*Processor) GetConnection ¶
func (p *Processor) GetConnection() ResponseWriter
GetConnection returns the permanent connection writer.
func (*Processor) GetCurrentConn ¶
func (p *Processor) GetCurrentConn() ResponseWriter
GetCurrentConn returns the current connection.
func (*Processor) GetExpectedContinuationStreamID ¶
GetExpectedContinuationStreamID returns the stream ID we're expecting CONTINUATION frames on.
func (*Processor) GetManager ¶
GetManager returns the stream manager.
func (*Processor) GetStreamPriority ¶
GetStreamPriority returns the priority score for a stream.
func (*Processor) IsExpectingContinuation ¶
IsExpectingContinuation reports whether the processor is in the middle of receiving a header block.
func (*Processor) ProcessFrame ¶
ProcessFrame processes an incoming HTTP/2 frame.
func (*Processor) ProcessFrameWithConn ¶
func (p *Processor) ProcessFrameWithConn(ctx context.Context, frame http2.Frame, conn ResponseWriter) error
ProcessFrameWithConn processes a frame with a connection context.
type ResponseWriter ¶
type ResponseWriter interface {
WriteResponse(stream *Stream, status int, headers [][2]string, body []byte) error
SendGoAway(lastStreamID uint32, code http2.ErrCode, debug []byte) error
MarkStreamClosed(streamID uint32)
IsStreamClosed(streamID uint32) bool
WriteRSTStreamPriority(streamID uint32, code http2.ErrCode) error
CloseConn() error
}
ResponseWriter is an interface for writing responses.
type State ¶
type State int
State represents the state of an HTTP/2 stream as defined in RFC 7540.
HTTP/2 stream state constants as defined in RFC 7540 Section 5.1.
type Stream ¶
type Stream struct {
ID uint32
State State
Headers [][2]string
Trailers [][2]string
Data *bytes.Buffer
OutboundBuffer *bytes.Buffer
OutboundEndStream bool
HeadersSent bool
EndStream bool
IsStreaming bool
HandlerStarted bool
DeferResponse bool
WindowSize int32
ReceivedWindowUpd chan int32 // buffered; consumed by engine layer during DATA writes
ResponseWriter ResponseWriter
RemoteAddr string
ReceivedDataLen int
ReceivedInitialHeaders bool
ClosedByReset bool
IsHEAD bool
CachedCtx any // per-connection cached context (avoids pool Get/Put per request)
// contains filtered or unexported fields
}
Stream represents an HTTP/2 stream with its associated state and data.
func NewH1Stream ¶ added in v1.1.0
NewH1Stream creates a lightweight stream optimized for H1 requests. It uses context.Background() directly (no WithCancel), leaves Data and OutboundBuffer nil (lazy-allocated only when body data arrives), and skips H2-specific window initialization. This eliminates 2+ allocs per H1 request.
func NewStream ¶
NewStream creates a new stream with full H2 initialization (context, buffers). Uses context.Background() directly (no WithCancel) to avoid allocation; cancel semantics are lazily created via EnsureCancel only when needed.
func (*Stream) AddHeadersBatch ¶ added in v1.1.0
AddHeadersBatch adds multiple headers under a single lock acquisition.
func (*Stream) BufferOutbound ¶ added in v0.2.0
BufferOutbound stores data that couldn't be sent due to flow control.
func (*Stream) DeductWindow ¶ added in v0.2.0
DeductWindow subtracts n from the flow control window.
func (*Stream) EnsureCancel ¶
func (s *Stream) EnsureCancel()
EnsureCancel lazily creates a cancellable context for this stream. Called when the stream needs cancel semantics (e.g., RST_STREAM).
func (*Stream) ForEachHeader ¶
ForEachHeader calls fn for each header under a read lock.
func (*Stream) GetHeaders ¶
GetHeaders returns the headers. For single-threaded H1 streams, returns the slice directly (no lock, no copy). For H2 streams, returns a safe copy under lock.
func (*Stream) GetWindowSize ¶
GetWindowSize returns the current flow control window size.
func (*Stream) HeadersLen ¶
HeadersLen returns the number of headers on the stream.
func (*Stream) MarkBuffered ¶
func (s *Stream) MarkBuffered()
MarkBuffered marks the stream as having buffered data.
func (*Stream) MarkEmpty ¶
func (s *Stream) MarkEmpty()
MarkEmpty marks the stream as having no buffered data.
func (*Stream) Release ¶ added in v0.3.2
func (s *Stream) Release()
Release returns pooled buffers, cancels the context, and returns the stream to its pool. Safe to call multiple times; subsequent calls are no-ops.
func (*Stream) WriteLock ¶
func (s *Stream) WriteLock()
WriteLock acquires the per-stream write lock.
func (*Stream) WriteUnlock ¶
func (s *Stream) WriteUnlock()
WriteUnlock releases the per-stream write lock.
type Streamer ¶ added in v1.1.0
type Streamer interface {
// WriteHeader sends the status line and headers. Must be called once before Write.
WriteHeader(stream *Stream, status int, headers [][2]string) error
// Write sends a chunk of the response body. May be called multiple times.
Write(stream *Stream, data []byte) error
// Flush ensures buffered data is sent to the network.
Flush(stream *Stream) error
// Close signals end of the response body.
Close(stream *Stream) error
}
Streamer supports incremental response writing. Engines that support streaming implement this interface on their ResponseWriter. The existing WriteResponse path is preserved for non-streaming responses (hot path).