Documentation
¶
Overview ¶
SPDX-FileCopyrightText: © 2025 DSLab - Fondazione Bruno Kessler
SPDX-License-Identifier: Apache-2.0
SPDX-FileCopyrightText: © 2025 DSLab - Fondazione Bruno Kessler
SPDX-License-Identifier: Apache-2.0
SPDX-FileCopyrightText: © 2025 DSLab - Fondazione Bruno Kessler
SPDX-License-Identifier: Apache-2.0
SPDX-FileCopyrightText: © 2025 DSLab - Fondazione Bruno Kessler
SPDX-License-Identifier: Apache-2.0
SPDX-FileCopyrightText: © 2025 DSLab - Fondazione Bruno Kessler
SPDX-License-Identifier: Apache-2.0
SPDX-FileCopyrightText: © 2025 DSLab - Fondazione Bruno Kessler
SPDX-License-Identifier: Apache-2.0
SPDX-FileCopyrightText: © 2025 DSLab - Fondazione Bruno Kessler
SPDX-License-Identifier: Apache-2.0
SPDX-FileCopyrightText: © 2025 DSLab - Fondazione Bruno Kessler
SPDX-License-Identifier: Apache-2.0
SPDX-FileCopyrightText: © 2025 DSLab - Fondazione Bruno Kessler
SPDX-License-Identifier: Apache-2.0
SPDX-FileCopyrightText: © 2025 DSLab - Fondazione Bruno Kessler
SPDX-License-Identifier: Apache-2.0
Index ¶
- Constants
- func Serve(port int, processor RequestProcessor)
- func ServeWithOptions(port int, serverOpts ExtProcServerOptions, processor RequestProcessor)
- type AbstractProcessor
- func (s *AbstractProcessor) Finish()
- func (s *AbstractProcessor) GetOptions() *ProcessingOptions
- func (s *AbstractProcessor) Init(opts *ProcessingOptions, nonFlagArgs []string, handler EventHandler) error
- func (s *AbstractProcessor) ProcessRequestBody(ctx *RequestContext, body []byte) error
- func (s *AbstractProcessor) ProcessRequestHeaders(ctx *RequestContext, headers AllHeaders) error
- func (s *AbstractProcessor) ProcessRequestTrailers(ctx *RequestContext, trailers AllHeaders) error
- func (s *AbstractProcessor) ProcessResponseBody(ctx *RequestContext, body []byte) error
- func (s *AbstractProcessor) ProcessResponseHeaders(ctx *RequestContext, headers AllHeaders) error
- func (s *AbstractProcessor) ProcessResponseTrailers(ctx *RequestContext, trailers AllHeaders) error
- type AllHeaders
- func (h *AllHeaders) Clone() *AllHeaders
- func (h *AllHeaders) DropHeaderNamed(name string) bool
- func (h *AllHeaders) DropHeadersNamed(names []string)
- func (h *AllHeaders) DropHeadersNamedEndingWith(suffix string)
- func (h *AllHeaders) DropHeadersNamedStartingWith(prefix string)
- func (h *AllHeaders) FilterHeaders(exclude HeaderNameFilter)
- func (h *AllHeaders) GetHeaderValue(name string) (*string, []byte, bool)
- func (h *AllHeaders) GetHeaderValueAsString(name string) (string, error)
- func (h *AllHeaders) Stringify() map[string]string
- type BodyHandler
- type BodyType
- type Configuration
- type EncodedBody
- type Event
- func (e *Event) GetBody() []byte
- func (e *Event) GetContentType() string
- func (e *Event) GetFieldByteSlice(key string) []byte
- func (e *Event) GetFieldInt(key string) (int, error)
- func (e *Event) GetFieldString(key string) string
- func (e *Event) GetFields() map[string]interface{}
- func (e *Event) GetHeader(key string) interface{}
- func (e *Event) GetHeaderByteSlice(key string) []byte
- func (e *Event) GetHeaderString(key string) string
- func (e *Event) GetHeaders() map[string]interface{}
- func (e *Event) GetMethod() string
- func (e *Event) GetPath() string
- func (e *Event) GetTimestamp() time.Time
- type EventHandler
- type EventResponse
- type ExtProcServerOptions
- type GenericExtProcServer
- type HeaderNameFilter
- type HeaderValue
- type HealthServer
- func (s *HealthServer) Check(ctx context.Context, req *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error)
- func (h *HealthServer) List(ctx context.Context, req *pb.HealthListRequest) (*pb.HealthListResponse, error)
- func (s *HealthServer) Watch(req *pb.HealthCheckRequest, srv pb.Health_WatchServer) error
- type ObserveProcessor
- func (s *ObserveProcessor) GetName() string
- func (s *ObserveProcessor) ProcessRequestBody(ctx *RequestContext, body []byte) error
- func (s *ObserveProcessor) ProcessRequestHeaders(ctx *RequestContext, headers AllHeaders) error
- func (s *ObserveProcessor) ProcessResponseBody(ctx *RequestContext, body []byte) error
- func (s *ObserveProcessor) ProcessResponseHeaders(ctx *RequestContext, headers AllHeaders) error
- type OperatorType
- type PhaseResponse
- type PostProcessor
- type PreProcessor
- type ProcessingOptions
- type RequestContext
- func (rc *RequestContext) AddHeader(name string, hv HeaderValue) error
- func (rc *RequestContext) AddHeaders(headers map[string]HeaderValue) error
- func (rc *RequestContext) AppendHeader(name string, hv HeaderValue) error
- func (rc *RequestContext) AppendHeaders(headers map[string]HeaderValue) error
- func (rc *RequestContext) CancelRequest(status int32, headers map[string]HeaderValue, body []byte) error
- func (rc *RequestContext) ClearBodyChunk() error
- func (rc *RequestContext) ContinueRequest() error
- func (rc *RequestContext) CurrentBodyBytes() []byte
- func (rc *RequestContext) GetBodyType() BodyType
- func (rc *RequestContext) GetResponse(phase int) (*extprocv3.ProcessingResponse, error)
- func (rc *RequestContext) GetStoredValue(name string) (any, error)
- func (rc *RequestContext) GetValue(name string) (any, error)
- func (rc *RequestContext) HasBody() bool
- func (rc *RequestContext) HasCompleteBody() bool
- func (rc *RequestContext) HasDecompressedBody() bool
- func (rc *RequestContext) HasStoredValue(name string) bool
- func (rc *RequestContext) HasValue(name string) bool
- func (rc *RequestContext) OverwriteHeader(name string, hv HeaderValue) error
- func (rc *RequestContext) OverwriteHeaders(headers map[string]HeaderValue) error
- func (rc *RequestContext) RemoveHeader(name string) error
- func (rc *RequestContext) RemoveHeaders(headers []string) error
- func (rc *RequestContext) RemoveHeadersVariadic(headers ...string) error
- func (rc *RequestContext) ReplaceBodyChunk(body []byte) error
- func (rc *RequestContext) ResetPhase() error
- func (rc *RequestContext) SetStoredValue(name string, val any) error
- func (rc *RequestContext) SetValue(name string, val any) error
- func (rc *RequestContext) UpdateHeader(name string, hv HeaderValue, action string) error
- func (rc *RequestContext) UpdateHeaders(headers map[string]HeaderValue, action string) error
- type RequestProcessor
- type WrapProcessor
- func (s *WrapProcessor) GetName() string
- func (s *WrapProcessor) ProcessRequestBody(ctx *RequestContext, body []byte) error
- func (s *WrapProcessor) ProcessRequestHeaders(ctx *RequestContext, headers AllHeaders) error
- func (s *WrapProcessor) ProcessResponseBody(ctx *RequestContext, body []byte) error
- func (s *WrapProcessor) ProcessResponseHeaders(ctx *RequestContext, headers AllHeaders) error
Constants ¶
const ( REQUEST_PHASE_UNDETERMINED = 0 REQUEST_PHASE_REQUEST_HEADERS = 1 REQUEST_PHASE_REQUEST_BODY = 2 REQUEST_PHASE_REQUEST_TRAILERS = 3 REQUEST_PHASE_RESPONSE_HEADERS = 4 REQUEST_PHASE_RESPONSE_BODY = 5 REQUEST_PHASE_RESPONSE_TRAILERS = 6 )
const (
ProcessingPhaseHeader string = "processing-phase"
)
Variables ¶
This section is empty.
Functions ¶
func Serve ¶
func Serve(port int, processor RequestProcessor)
Wrapper for running gRPC ExternalProcessor service with a given RequestProcessor implementation. Includes the standard gRPC Health service as well as reflection.
Uses a default 15s shutdown timeout. It is up to the caller to execute shutdown behaviors after this shutdown completes, likely using defer processor.Finish()
Using this wrapper is not required, users can run their own gRPC server implementation with this SDK.
func ServeWithOptions ¶
func ServeWithOptions(port int, serverOpts ExtProcServerOptions, processor RequestProcessor)
Wrapper for running gRPC ExternalProcessor service with a given RequestProcessor implementation, with a declared shutdown timeout. It is still up to the caller to execute shutdown behaviors after this shutdown completes, likely using defer. Note that any deferred actions to "finalize" processing occur _after_ the server shutdown so plan accordingly. The reason for this is we should probably expect to need to drain existing streams _before_ any finalization of actions taken in external processing.
Using this wrapper is not required, users can run their own gRPC server implementation with this SDK.
Types ¶
type AbstractProcessor ¶
type AbstractProcessor struct {
RequestProcessor
Handler EventHandler
// contains filtered or unexported fields
}
func (*AbstractProcessor) Finish ¶
func (s *AbstractProcessor) Finish()
func (*AbstractProcessor) GetOptions ¶
func (s *AbstractProcessor) GetOptions() *ProcessingOptions
func (*AbstractProcessor) Init ¶
func (s *AbstractProcessor) Init(opts *ProcessingOptions, nonFlagArgs []string, handler EventHandler) error
func (*AbstractProcessor) ProcessRequestBody ¶
func (s *AbstractProcessor) ProcessRequestBody(ctx *RequestContext, body []byte) error
func (*AbstractProcessor) ProcessRequestHeaders ¶
func (s *AbstractProcessor) ProcessRequestHeaders(ctx *RequestContext, headers AllHeaders) error
func (*AbstractProcessor) ProcessRequestTrailers ¶
func (s *AbstractProcessor) ProcessRequestTrailers(ctx *RequestContext, trailers AllHeaders) error
func (*AbstractProcessor) ProcessResponseBody ¶
func (s *AbstractProcessor) ProcessResponseBody(ctx *RequestContext, body []byte) error
func (*AbstractProcessor) ProcessResponseHeaders ¶
func (s *AbstractProcessor) ProcessResponseHeaders(ctx *RequestContext, headers AllHeaders) error
func (*AbstractProcessor) ProcessResponseTrailers ¶
func (s *AbstractProcessor) ProcessResponseTrailers(ctx *RequestContext, trailers AllHeaders) error
type AllHeaders ¶
Internal structure for storing headers received from envoy, as either multi-string-valued lists or raw bytes.
func NewAllHeadersFromEnvoyHeaderMap ¶
func NewAllHeadersFromEnvoyHeaderMap(headerMap *corev3.HeaderMap) (headers AllHeaders, err error)
Create an `AllHeaders` struct from envoy-formatted headers.
func (*AllHeaders) Clone ¶
func (h *AllHeaders) Clone() *AllHeaders
Clone a set of headers, convenience for copying in case in-place methods above are too destructive for use in a given implementation.
func (*AllHeaders) DropHeaderNamed ¶
func (h *AllHeaders) DropHeaderNamed(name string) bool
Drop, in-place, the header with a given name if it exists
func (*AllHeaders) DropHeadersNamed ¶
func (h *AllHeaders) DropHeadersNamed(names []string)
Drop, in-place, the headers with given names if they exists
func (*AllHeaders) DropHeadersNamedEndingWith ¶
func (h *AllHeaders) DropHeadersNamedEndingWith(suffix string)
Convenience method for dropping headers with names matching a suffix.
func (*AllHeaders) DropHeadersNamedStartingWith ¶
func (h *AllHeaders) DropHeadersNamedStartingWith(prefix string)
Convenience method for dropping headers with names matching a prefix.
func (*AllHeaders) FilterHeaders ¶
func (h *AllHeaders) FilterHeaders(exclude HeaderNameFilter)
Filter headers, meaning drop them in place, using a generic filter strategy specified by a `HeaderNameFilter` method. That is, iterate over all headers and remove them if the method evaluates `true`.
func (*AllHeaders) GetHeaderValue ¶
func (h *AllHeaders) GetHeaderValue(name string) (*string, []byte, bool)
Get header values by name as either list of strings or raw bytes
func (*AllHeaders) GetHeaderValueAsString ¶
func (h *AllHeaders) GetHeaderValueAsString(name string) (string, error)
Get header values by name, if it exists, as a single string joining multivalues if they exist for the name
func (*AllHeaders) Stringify ¶
func (h *AllHeaders) Stringify() map[string]string
"Stringify" all headers, meaning convert the headers to a simplified map[string]string joining (in CSV-style) multi-string-valued headers if they exist
type BodyHandler ¶
type BodyHandler func(*RequestContext, []byte) error
Definition for DRY body handling
type BodyType ¶
type BodyType struct {
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type
ContentType string // the body content type, if applicable, but almost always present
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
ContentEncoding string // the body content encoding (compression), if applicable
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding
TransferEncoding string // The HTTP/1.1 transfer encoding used, not valid in HTTP/2
}
func NewBodyTypeFromHeaders ¶
func NewBodyTypeFromHeaders(headers *AllHeaders) BodyType
New "type" of a request/response body inferring from related headers
func (*BodyType) IsChunked ¶
Reply true if the body bytes should be interpretted as chunked. This is valid for HTTP/1.1 only, and transfer-encoding: chunked can cause other issues.
func (*BodyType) IsCompressed ¶
Reply true if the body bytes should be interpretted as compressed data with strategies defined by the type's stored headers.
type Configuration ¶
type Configuration struct {
trigger.Configuration
Type OperatorType `json:"type"`
Port int `json:"port"`
GracefulShutdownTimeout int `json:"gracefulShutdownTimeout,omitempty"`
MaxConcurrentStreams uint32 `json:"maxConcurrentStreams,omitempty"`
ProcessingOptions *ProcessingOptions `json:"processingOptions"`
}
func NewConfiguration ¶
func NewConfiguration(id string, triggerConfiguration *functionconfig.Trigger, runtimeConfiguration *runtime.Configuration) (*Configuration, error)
type EncodedBody ¶
type EncodedBody struct {
Type BodyType // the "type" of this body (according to headers)
Value []byte // body bytes, potentially accumulated in streaming/chunking
MaxSize int64 // maximum allowable size of a buffer; -1 for no limit
Complete bool // flag to identify if the body is complete
Decompressed bool // flag to identify if decompression was successful
}
Type for "wrapping" bodies that may involve streaming and/or encoding, particularly concerning body compression with standard strategies.
func NewEncodedBodyFromHeaders ¶
func NewEncodedBodyFromHeaders(headers *AllHeaders) *EncodedBody
Initializer for an `EncodedBody` when headers are known, and thus the intended type, encoding, and (in HTTP/1.1) transfer style is known.
func (*EncodedBody) AppendChunk ¶
func (b *EncodedBody) AppendChunk(chunk []byte) error
Append a "chunk" of bytes to stored bytes. Intended to be used when the SDK is accumulating streaming body bytes on behalf of an implementation to simplify access to a full body. Envoy can do this as well, of course, with BUFFERED or BUFFERED_PARTIAL body modes. It might, though, be of interest to reduce memory pressure on the actual proxy, allowing the external processor to customize behavior around handling of streaming bodies
func (*EncodedBody) CurrentContentLength ¶
func (b *EncodedBody) CurrentContentLength() uint32
Return the current "content length" of an encoded body, in bytes. This is not necessarily a request/response's full content length when streaming over multiple messages.
Note results are valid up to about 4GB with uint32.
func (*EncodedBody) DecompressBody ¶
func (b *EncodedBody) DecompressBody() error
Decompress the received body according to the stored header values describing the compression strategy. No-op when not compressed, returns an error if incomplete, or returns an error when decompression fails.
Sets the struct flag `Decompressed` to identify success/failure for SDK users to identify if the stored bytes can be interpreted as "real" content.
func (*EncodedBody) IsChunked ¶
func (b *EncodedBody) IsChunked() bool
Reply true if the body bytes should be interpretted as chunked. This is valid for HTTP/1.1 only, and transfer-encoding: chunked can cause other issues.
func (*EncodedBody) IsCompressed ¶
func (b *EncodedBody) IsCompressed() bool
Reply true if the body bytes should be interpretted as compressed data with strategies defined by the type's stored headers.
type Event ¶
type Event struct {
nuclio.AbstractEvent
Body []byte
// contains filtered or unexported fields
}
allows accessing fasthttp.RequestCtx as a event.Sync
func (*Event) GetContentType ¶
GetContentType returns the content type of the body
func (*Event) GetFieldByteSlice ¶
GetFieldByteSlice returns the field by name as a byte slice
func (*Event) GetFieldString ¶
GetFieldString returns the field by name as a string
func (*Event) GetHeaderByteSlice ¶
GetHeaderByteSlice returns the header by name as a byte slice
func (*Event) GetHeaderString ¶
GetHeaderString returns the header by name as a string
func (*Event) GetHeaders ¶
GetHeaders loads all headers into a map of string / interface{}
func (*Event) GetTimestamp ¶
GetTimestamp returns when the event originated
type EventHandler ¶
type EventHandler interface {
HandleEvent(ctx *RequestContext, body []byte) (*EventResponse, error)
}
type EventResponse ¶
type EventResponse struct {
Status int32
Headers map[string]HeaderValue
Body []byte
}
type ExtProcServerOptions ¶
Supported gRPC service options in the Serve* helpers.
func DefaultServerOptions ¶
func DefaultServerOptions() ExtProcServerOptions
Default gRPC service options in the Serve* helpers.
type GenericExtProcServer ¶
type GenericExtProcServer struct {
// contains filtered or unexported fields
}
Generic type for an external processor to which we can attach a gRPC bidi stream implementation.
func (*GenericExtProcServer) Process ¶
func (s *GenericExtProcServer) Process(srv extprocv3.ExternalProcessor_ProcessServer) error
Implementation of the bidi stream `Process` in an external processor. Given the type data `processor` and `options`, this intends to manage construction and updating of a `RequestContext` and calls to the `processor`'s phase-specific methods.
type HeaderNameFilter ¶
The type required of a method to filter headers in-place
type HeaderValue ¶
type HealthServer ¶
type HealthServer struct{}
func (*HealthServer) Check ¶
func (s *HealthServer) Check(ctx context.Context, req *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error)
func (*HealthServer) List ¶
func (h *HealthServer) List(ctx context.Context, req *pb.HealthListRequest) (*pb.HealthListResponse, error)
func (*HealthServer) Watch ¶
func (s *HealthServer) Watch(req *pb.HealthCheckRequest, srv pb.Health_WatchServer) error
type ObserveProcessor ¶
type ObserveProcessor struct {
AbstractProcessor
}
*
- ObserveProcessor pattern:
- - handles request response without modifying them
*
func (*ObserveProcessor) GetName ¶
func (s *ObserveProcessor) GetName() string
func (*ObserveProcessor) ProcessRequestBody ¶
func (s *ObserveProcessor) ProcessRequestBody(ctx *RequestContext, body []byte) error
func (*ObserveProcessor) ProcessRequestHeaders ¶
func (s *ObserveProcessor) ProcessRequestHeaders(ctx *RequestContext, headers AllHeaders) error
func (*ObserveProcessor) ProcessResponseBody ¶
func (s *ObserveProcessor) ProcessResponseBody(ctx *RequestContext, body []byte) error
func (*ObserveProcessor) ProcessResponseHeaders ¶
func (s *ObserveProcessor) ProcessResponseHeaders(ctx *RequestContext, headers AllHeaders) error
type OperatorType ¶
type OperatorType string
const ( OperatorTypePre OperatorType = "preprocessor" OperatorTypePost OperatorType = "postprocessor" OperatorTypeWrap OperatorType = "wrapprocessor" OperatorTypeObserve OperatorType = "observeprocessor" )
type PhaseResponse ¶
type PhaseResponse struct {
// contains filtered or unexported fields
}
Internal structure to manage construction of responses to streaming external processing requests from envoy
type PostProcessor ¶
type PostProcessor struct {
AbstractProcessor
}
*
- PostProcessor pattern:
- - modifies response body or leave it unchanged
- - if response with Status > 0 is returned, it is sent as immediate response
- - in case of error, logs it and leaves body unchanged
*
func (*PostProcessor) GetName ¶
func (s *PostProcessor) GetName() string
func (*PostProcessor) ProcessResponseBody ¶
func (s *PostProcessor) ProcessResponseBody(ctx *RequestContext, body []byte) error
func (*PostProcessor) ProcessResponseHeaders ¶
func (s *PostProcessor) ProcessResponseHeaders(ctx *RequestContext, headers AllHeaders) error
type PreProcessor ¶
type PreProcessor struct {
AbstractProcessor
}
*
- PreProcessor pattern:
- - modifies request body or leave it unchanged
- - if response with Status > 0 is returned, it is sent as immediate response
- - in case of error, logs it and leaves body unchanged
*
func (*PreProcessor) GetName ¶
func (s *PreProcessor) GetName() string
func (*PreProcessor) ProcessRequestBody ¶
func (s *PreProcessor) ProcessRequestBody(ctx *RequestContext, body []byte) error
func (*PreProcessor) ProcessRequestHeaders ¶
func (s *PreProcessor) ProcessRequestHeaders(ctx *RequestContext, headers AllHeaders) error
type ProcessingOptions ¶
type ProcessingOptions struct {
LogStream bool // Log "stream" events, i.e. Process calls
LogPhases bool // Log "phase" events, i.e. specific stream messages. Unsafe for production, prints all data.
UpdateExtProcHeader bool // Update a `x-extproc-names` header with the extproc name
UpdateDurationHeader bool // Update a `x-extproc-duration-ns` header with extproc duration (not request duration)
RequestIdHeaderName string // Header name to use for request ID's
RequestIdFallback string // Fallback value for a request id that does not exist (default empty string)
BufferStreamedBodies bool // Whether to buffer request/response bodies internally, instead of in envoy
PerRequestBodyBufferBytes int64 // Maximum allowed size of body buffers, ignored if not buffering (-1 for no limit); cast to a uint32
DecompressBodies bool // Flag to denote if the SDK itself should decompress bodies for processing, if possible and applicable
}
Processing options specific to the external processor.
func NewDefaultOptions ¶
func NewDefaultOptions() *ProcessingOptions
Return default options, as not all the zero values are "correct".
type RequestContext ¶
type RequestContext struct {
Scheme string // from envoy's `:scheme` header
Authority string // from envoy's `:authority` header
Method string // from envoy's `:method` header
FullPath string // from envoy's ':path' header
Path string // from envoy's `:path` header, parsed without query
Query map[string][]string // from envoy's `:path` header, parsed without path
RequestID string // from `x-request-id` header, if present
AllHeaders *AllHeaders // all request/response headers
Status uint16 // response status code, when available, from envoy's `:status` header
Started time.Time // stores when processing a specific request started
Duration time.Duration // appoximate, cumulative duration of external processing (not request)
EndOfStream bool // flag declaring when request/response processing is complete
// contains filtered or unexported fields
}
RequestContext helps manage and pass data related to a given request being processed. During request header processing basic data is initialized and populated, thus skipping request headers is not feasible. There should be one context per request, and it should not be shared across requests.
func (*RequestContext) AddHeader ¶
func (rc *RequestContext) AddHeader(name string, hv HeaderValue) error
Method to call to add a request/response header in an external processor (if absent)
func (*RequestContext) AddHeaders ¶
func (rc *RequestContext) AddHeaders(headers map[string]HeaderValue) error
Method to call to batch add request/response headers in an external processor, if absent
func (*RequestContext) AppendHeader ¶
func (rc *RequestContext) AppendHeader(name string, hv HeaderValue) error
Method to call to append a request/response header in an external processor
func (*RequestContext) AppendHeaders ¶
func (rc *RequestContext) AppendHeaders(headers map[string]HeaderValue) error
Method to call to batch append request/response header values in an external processor
func (*RequestContext) CancelRequest ¶
func (rc *RequestContext) CancelRequest(status int32, headers map[string]HeaderValue, body []byte) error
Signal that request processing should be stopped with a client response consisting of the supplied status, headers, and body. This is useful in the request headers or body processing phase to send a proxied response directly to the client without calling the upstream. Cancelling does not mean "failure", the response sent back can signal a successful request.
func (*RequestContext) ClearBodyChunk ¶
func (rc *RequestContext) ClearBodyChunk() error
Method to call to clear an entire request/response body chunk
func (*RequestContext) ContinueRequest ¶
func (rc *RequestContext) ContinueRequest() error
Method to use to signal request processing should continue, without a direct response or mode changes by the external processor. This does not imply the request isn't _modified_ by other changes made during processing.
func (*RequestContext) CurrentBodyBytes ¶
func (rc *RequestContext) CurrentBodyBytes() []byte
Return current body byte buffer, complete or incomplete, decompressed or not.
func (*RequestContext) GetBodyType ¶
func (rc *RequestContext) GetBodyType() BodyType
Return a body's declared content type, encoding, and transfer style.
func (*RequestContext) GetResponse ¶
func (rc *RequestContext) GetResponse(phase int) (*extprocv3.ProcessingResponse, error)
Internal method to get/form the formal envoy external processor service response to a streaming processing request. Returns the response for envoy, a flag denoting if the stream can be considered finished, and possibly an error. The "finished" flag in particular is so the server can cancel the stream with envoy, which envoy itself may not do.
func (*RequestContext) GetStoredValue ¶
func (rc *RequestContext) GetStoredValue(name string) (any, error)
Retreive a stored value having the given name from a context. Returns a non-nil error in the case the value does not exist (a deviation from standard map behavior, which may be changed).
func (*RequestContext) GetValue ¶
func (rc *RequestContext) GetValue(name string) (any, error)
@deprecate: migrate to clearer name "GetStoredValue"
func (*RequestContext) HasBody ¶
func (rc *RequestContext) HasBody() bool
func (*RequestContext) HasCompleteBody ¶
func (rc *RequestContext) HasCompleteBody() bool
Return whether context (thinks it) has "complete" body bytes.
func (*RequestContext) HasDecompressedBody ¶
func (rc *RequestContext) HasDecompressedBody() bool
Return whether context (thinks it) has decompressed body bytes.
func (*RequestContext) HasStoredValue ¶
func (rc *RequestContext) HasStoredValue(name string) bool
Check whether a context has a stored value of the given name
func (*RequestContext) HasValue ¶
func (rc *RequestContext) HasValue(name string) bool
@deprecate: migrate to clearer name "HasStoredValue"
func (*RequestContext) OverwriteHeader ¶
func (rc *RequestContext) OverwriteHeader(name string, hv HeaderValue) error
Method to call to overwrite-or-add a request/response header in an external processor
func (*RequestContext) OverwriteHeaders ¶
func (rc *RequestContext) OverwriteHeaders(headers map[string]HeaderValue) error
Method to call to batch overwrite request/response headers in an external processor
func (*RequestContext) RemoveHeader ¶
func (rc *RequestContext) RemoveHeader(name string) error
Method to call to remove a request/response header in an external processor
func (*RequestContext) RemoveHeaders ¶
func (rc *RequestContext) RemoveHeaders(headers []string) error
Method to call to batch remove request/response headers in an external processor
func (*RequestContext) RemoveHeadersVariadic ¶
func (rc *RequestContext) RemoveHeadersVariadic(headers ...string) error
Method to call to batch remove request/response headers in an external processor, using variadic arguments
func (*RequestContext) ReplaceBodyChunk ¶
func (rc *RequestContext) ReplaceBodyChunk(body []byte) error
Method to call to replace a request/response body chunk
func (*RequestContext) ResetPhase ¶
func (rc *RequestContext) ResetPhase() error
Internal method to "reset" the phase of a context, clearing internal data to be ready to handle the next phase.
func (*RequestContext) SetStoredValue ¶
func (rc *RequestContext) SetStoredValue(name string, val any) error
Set a stored value having under the given name in a context. Returns a non-nil error in the case of an error, but currently always nil.
func (*RequestContext) SetValue ¶
func (rc *RequestContext) SetValue(name string, val any) error
@deprecate: migrate to clearer name "SetStoredValue"
func (*RequestContext) UpdateHeader ¶
func (rc *RequestContext) UpdateHeader(name string, hv HeaderValue, action string) error
Method to call to update a request/response header in an external processor
func (*RequestContext) UpdateHeaders ¶
func (rc *RequestContext) UpdateHeaders(headers map[string]HeaderValue, action string) error
Method to call to batch update request/response headers in an external processor, with all having the same action
type RequestProcessor ¶
type RequestProcessor interface {
GetName() string
GetOptions() *ProcessingOptions
ProcessRequestHeaders(ctx *RequestContext, headers AllHeaders) error
ProcessRequestTrailers(ctx *RequestContext, trailers AllHeaders) error
ProcessResponseHeaders(ctx *RequestContext, headers AllHeaders) error
ProcessResponseTrailers(ctx *RequestContext, trailers AllHeaders) error
ProcessResponseBody(ctx *RequestContext, body []byte) error
ProcessRequestBody(ctx *RequestContext, body []byte) error
}
Primary interface for supported request processing that SDK users must implement, passing a complying type to `GenericExtProcServer` or `Serve`.
TODO: Passing through health check calls would help support better reasoning about dependencies for external processing (e.g., DB or kafka availability)
type WrapProcessor ¶
type WrapProcessor struct {
AbstractProcessor
}
*
- WrapProcessor pattern:
- - modifies request body or leave it unchanged and decide whether return response or continue
- - in case of error, logs it and leaves body unchanged
*
func (*WrapProcessor) GetName ¶
func (s *WrapProcessor) GetName() string
func (*WrapProcessor) ProcessRequestBody ¶
func (s *WrapProcessor) ProcessRequestBody(ctx *RequestContext, body []byte) error
func (*WrapProcessor) ProcessRequestHeaders ¶
func (s *WrapProcessor) ProcessRequestHeaders(ctx *RequestContext, headers AllHeaders) error
func (*WrapProcessor) ProcessResponseBody ¶
func (s *WrapProcessor) ProcessResponseBody(ctx *RequestContext, body []byte) error
func (*WrapProcessor) ProcessResponseHeaders ¶
func (s *WrapProcessor) ProcessResponseHeaders(ctx *RequestContext, headers AllHeaders) error