http

package
v1.0.0-beta.74 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2026 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

Package http provides a REST polling input component for ingesting data from HTTP endpoints into SemStreams.

Overview

The http input polls a configurable REST endpoint on a fixed schedule, decodes the response body, and publishes each decoded record as a BaseMessage envelope to a NATS subject. It is the generic counterpart to the typed-input components (UDP, WebSocket, GitHub webhook, …) — anything reachable over HTTP that emits JSON, JSON Lines, or a single JSON object is in scope.

Authentication via bearer token or HTTP Basic, custom request headers, configurable timeout, and exponential-backoff retry on transient failures are first-class config options.

Quick start

config := http.Config{
    URL:      "https://api.example.org/sensors/telemetry",
    Interval: "30s",
    Auth: &http.AuthConfig{
        Type:  "bearer",
        Token: "<token>",
    },
    Decoder: &http.DecoderConfig{Mode: "json"},
    Ports: &component.PortConfig{
        Outputs: []component.PortDefinition{
            {Name: "out", Type: "jetstream", Subject: "sensors.telemetry"},
        },
    },
}
rawConfig, _ := json.Marshal(config)
input, err := http.CreateInput(rawConfig, deps)

Decoder modes

  • "json" — parse the response body as a single JSON object; publish one BaseMessage per poll cycle.
  • "jsonl" — parse the response body as JSON Lines; publish one BaseMessage per non-empty line.

Both modes wrap the decoded JSON in a message.GenericJSONPayload ("core.json.v1") and route through the payload registry — every publish carries the BaseMessage envelope so downstream processors decode through the registry without bespoke handlers. Raw byte passthrough is intentionally NOT a mode here: anything not decodable as JSON should flow through a dedicated processor rather than skipping the registry contract.

Authentication

  • "none" — no auth header is set (default).
  • "bearer" — Authorization: Bearer <Token>.
  • "basic" — Authorization: Basic base64(Username:Password).

Tokens and passwords live in the operator config today; an env-var substitution layer is on the roadmap. Treat config files containing AuthConfig material as secrets.

Retry and backoff

Retries apply to network errors and 5xx responses. 4xx responses are NOT retried by default — they indicate misconfiguration, not transience. Backoff is exponential with a configurable cap. MaxAttempts == 1 disables retry entirely.

Scope (and what this component is NOT)

This package ships REST polling. Streaming HTTP (Server-Sent Events) and WebSocket subscription are deliberately deferred:

  • Plain WebSocket endpoints — use input/websocket.
  • SSE / event-stream endpoints — a follow-up extension to this package or a sibling input/sse.
  • GraphQL subscriptions — out of scope; clients construct POST requests with bodies, which the Method/Body config supports, but the persistent-stream semantics do not.

See [ADR-044] for the framework / sister-repo split rationale and the dependency chain that places this component in Phase 4.

[ADR-044]: ../../docs/adr/044-ogc-connected-systems-framework-split.md message.GenericJSONPayload: ../../message

Package http implements the REST polling input component. See doc.go for the full operator guide.

Index

Constants

View Source
const (
	AuthNone   = "none"
	AuthBearer = "bearer"
	AuthBasic  = "basic"
)

Auth types. Strings rather than typed enums so the JSON wire shape stays human-editable; Validate coerces and rejects.

View Source
const (
	DecoderJSON  = "json"
	DecoderJSONL = "jsonl"
)

Decoder modes.

View Source
const (
	MethodGET  = "GET"
	MethodPOST = "POST"
)

HTTP methods supported by this component. POST is included so callers can hit query-shaped APIs that require a request body; streaming POST (chunked uploads) is out of scope.

Variables

This section is empty.

Functions

func CreateInput

func CreateInput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

CreateInput is the factory function the component registry uses to construct an http input from raw JSON config. Validates the merged config before handing off to NewInput.

func Register

func Register(registry *component.Registry) error

Register wires the http input into the component registry.

Types

type AuthConfig

type AuthConfig struct {
	Type     string `json:"type" schema:"type:string,description:Auth type (none, bearer, basic),default:none"`
	Token    string `json:"token,omitempty" schema:"type:string,description:Bearer token,category:security"`
	Username string `json:"username,omitempty" schema:"type:string,description:Basic auth username,category:security"`
	Password string `json:"password,omitempty" schema:"type:string,description:Basic auth password,category:security"`
}

AuthConfig is the Authorization-header configuration.

type Config

type Config struct {
	// URL is the endpoint to poll. Required. Must be absolute and
	// use the http or https scheme.
	URL string `json:"url" schema:"type:string,description:Endpoint URL to poll,required:true,category:basic"`

	// Method is the HTTP method (GET or POST). Default GET.
	Method string `json:"method,omitempty" schema:"type:string,description:HTTP method (GET or POST),default:GET,category:basic"`

	// Body is the optional request body sent with POST requests.
	// Ignored for GET.
	Body string `json:"body,omitempty" schema:"type:string,description:Request body (POST only),category:basic"`

	// Interval is the duration between consecutive polls.
	// Default 30s. Minimum 100ms to prevent runaway clients.
	Interval string `json:"interval,omitempty" schema:"type:string,description:Delay between polls,default:30s,category:basic"`

	// Timeout is the per-attempt HTTP request timeout. Retries get
	// a fresh clock — total cycle time is bounded by
	// MaxAttempts * (Timeout + MaxBackoff). Default 10s.
	Timeout string `` /* 137-byte string literal not displayed */

	// Headers is a map of additional headers to send with every
	// request. Overrides any default headers but does not override
	// the Authorization header set by Auth.
	Headers map[string]string `json:"headers,omitempty" schema:"type:object,description:Additional request headers,category:advanced"`

	// Auth configures the Authorization header.
	Auth *AuthConfig `json:"auth,omitempty" schema:"type:object,description:Authentication configuration,category:security"`

	// Retry configures retry/backoff behavior.
	Retry *RetryConfig `json:"retry,omitempty" schema:"type:object,description:Retry / backoff configuration,category:reliability"`

	// Decoder configures how response bodies are interpreted.
	Decoder *DecoderConfig `json:"decoder,omitempty" schema:"type:object,description:Response decoder configuration,category:basic"`

	// Ports configures input / output ports. Required: at least
	// one NATS or JetStream output port.
	Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
}

Config holds the operator-facing configuration for the http input component.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with field defaults filled in. The URL and Ports fields remain zero — callers must supply them.

func (*Config) Validate

func (c *Config) Validate() error

Validate enforces config invariants. Called by the registry before the component is constructed.

type DecoderConfig

type DecoderConfig struct {
	// Mode selects the decoder: "json" (single object per poll) or
	// "jsonl" (line-delimited; one publish per line). Default
	// "json".
	Mode string `json:"mode,omitempty" schema:"type:string,description:Decoder mode (json or jsonl),default:json"`
}

DecoderConfig is the response-body decoding configuration.

type Input

type Input struct {
	// contains filtered or unexported fields
}

Input implements an HTTP polling input.

func NewInput

func NewInput(deps InputDeps) *Input

NewInput constructs a new HTTP polling input from the given dependencies. Caller is expected to have called Config.Validate already; the constructor does not re-validate.

func (*Input) ConfigSchema

func (h *Input) ConfigSchema() component.ConfigSchema

ConfigSchema returns the component config schema.

func (*Input) DataFlow

func (h *Input) DataFlow() component.FlowMetrics

DataFlow returns current data-flow metrics for the input.

func (*Input) Health

func (h *Input) Health() component.HealthStatus

Health returns the current health status.

func (*Input) Initialize

func (h *Input) Initialize() error

Initialize prepares the http input component. Currently a no-op past Validate; reserved for future DNS pre-warm or auth probe hooks.

func (*Input) InputPorts

func (h *Input) InputPorts() []component.Port

InputPorts returns this component's input ports.

func (*Input) Meta

func (h *Input) Meta() component.Metadata

Meta returns the component metadata.

func (*Input) OutputPorts

func (h *Input) OutputPorts() []component.Port

OutputPorts returns this component's output ports.

func (*Input) Start

func (h *Input) Start(ctx context.Context) error

Start launches the polling loop. Safe to call multiple times; subsequent calls are no-ops until Stop has been called.

func (*Input) Stop

func (h *Input) Stop(timeout time.Duration) error

Stop signals shutdown and waits up to timeout for the polling loop to exit.

type InputDeps

type InputDeps struct {
	Name            string
	Config          Config
	NATSClient      *natsclient.Client
	MetricsRegistry *metric.MetricsRegistry
	Logger          *slog.Logger
}

InputDeps holds runtime dependencies for constructing an Input.

type RetryConfig

type RetryConfig struct {
	// MaxAttempts is the total number of attempts (1 = no retry).
	// Default 3.
	MaxAttempts int `json:"max_attempts,omitempty" schema:"type:int,description:Total attempts (1 = no retry),default:3"`

	// InitialBackoff is the first delay before retry. Default 1s.
	InitialBackoff string `json:"initial_backoff,omitempty" schema:"type:string,description:First retry delay,default:1s"`

	// MaxBackoff caps the exponential growth. Default 30s.
	MaxBackoff string `json:"max_backoff,omitempty" schema:"type:string,description:Max retry delay,default:30s"`

	// Multiplier grows the backoff exponentially. Default 2.0.
	Multiplier float64 `json:"multiplier,omitempty" schema:"type:float,description:Backoff multiplier,default:2.0"`
}

RetryConfig is the retry / backoff configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL