httppost

package
v1.0.0-alpha.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 22 Imported by: 0

README

HTTP POST Output

Sends NATS messages to HTTP/HTTPS endpoints via POST requests with automatic retry and exponential backoff.

Purpose

The HTTP POST output component bridges stream processing pipelines to external HTTP services, enabling webhook integration, API calls, and third-party service communication. It consumes messages from NATS subjects or JetStream and delivers them as HTTP POST requests with configurable retry logic, custom headers, and TLS support including mutual TLS and ACME certificate management.

Configuration

name: webhook-sender
type: output.httppost
config:
  ports:
    inputs:
      - name: webhook_input
        type: nats                    # or "jetstream"
        subject: events.webhook       # NATS subject pattern
        required: true
  url: https://api.example.com/events # Target HTTP endpoint (required)
  timeout: 30                          # Request timeout in seconds (default: 30)
  retry_count: 3                       # Number of retry attempts (default: 3, max: 10)
  content_type: application/json       # Content-Type header (default: application/json)
  headers:
    Authorization: Bearer ${API_TOKEN} # Custom headers (env var expansion supported)
    X-Source: semstreams
Configuration Fields
Field Type Required Default Description
url string Yes - HTTP/HTTPS endpoint URL
timeout int No 30 Request timeout in seconds (0-300)
retry_count int No 3 Number of retry attempts (0-10)
content_type string No application/json Content-Type header value
headers map[string]string No {} Custom HTTP headers

Input/Output Ports

Input Ports

The component accepts NATS or JetStream input ports configured in the ports.inputs section:

NATS Input:

ports:
  inputs:
    - name: events
      type: nats
      subject: system.events.>
      required: true

JetStream Input:

ports:
  inputs:
    - name: durable_events
      type: jetstream
      subject: orders.created
      stream_name: ORDERS          # Optional, derived from subject if omitted
      required: true

For JetStream inputs, the component automatically creates a durable consumer with explicit acknowledgment and a maximum of 5 delivery attempts.

Output Ports

The HTTP POST output has no NATS output ports. Messages are delivered exclusively to the configured HTTP endpoint.

Retry and Error Handling

Retry Logic

Failed HTTP requests are automatically retried with exponential backoff:

Attempt 1: Immediate
Attempt 2: After 100ms  (1*1*100)
Attempt 3: After 400ms  (2*2*100)
Attempt 4: After 900ms  (3*3*100)

Retry count configured via retry_count parameter (default: 3, maximum: 10).

Retryable Conditions

The following errors trigger automatic retry:

  • Network errors (connection refused, timeout, DNS failures)
  • 5xx server errors (500, 502, 503, 504)
  • 429 Too Many Requests
Non-Retryable Conditions

The following errors fail immediately without retry:

  • 4xx client errors (400, 401, 403, 404, except 429)
  • Invalid URL or configuration errors
  • Request body marshaling errors
  • Context cancellation
Status Code Handling
Code Range Behavior Description
2xx Success Request accepted, no retry
3xx Redirect Automatically followed (up to 10 redirects)
4xx Error Client error, not retried (except 429)
5xx Retry Server error, retried with backoff

Authentication Options

Bearer Token Authentication
headers:
  Authorization: Bearer ${API_TOKEN}  # Read from environment variable
Basic Authentication
headers:
  Authorization: Basic dXNlcjpwYXNzd29yZA==  # Base64 encoded user:password
Custom API Key
headers:
  X-API-Key: ${API_KEY}
  X-Custom-Auth: ${AUTH_SECRET}
Mutual TLS (mTLS)

Platform-level TLS configuration enables mutual TLS authentication:

security:
  tls:
    client:
      ca_files:
        - /etc/certs/ca.pem
      mtls:
        enabled: true
        cert_file: /etc/certs/client.pem
        key_file: /etc/certs/client-key.pem

The HTTP client automatically uses the configured TLS settings.

ACME Certificate Management

Automatic certificate provisioning via Let's Encrypt or custom ACME CA:

security:
  tls:
    client:
      mode: acme
      acme:
        enabled: true
        directory_url: https://acme-v02.api.letsencrypt.org/directory
        email: admin@example.com
        domains:
          - client.example.com
        challenge_type: http-01
        storage_path: /var/lib/acme/certs

Certificates are automatically renewed before expiration.

Example Use Cases

Webhook Integration

Send stream events to webhook.site for debugging:

name: debug-webhook
type: output.httppost
config:
  ports:
    inputs:
      - name: debug_events
        type: nats
        subject: debug.>
        required: true
  url: https://webhook.site/unique-id-here
  timeout: 5
  retry_count: 1
Third-Party API Integration

Post enriched events to external analytics platform:

name: analytics-export
type: output.httppost
config:
  ports:
    inputs:
      - name: analytics_events
        type: jetstream
        subject: analytics.pageview
        stream_name: ANALYTICS
        required: true
  url: https://api.analytics.com/v1/events
  timeout: 10
  retry_count: 5
  headers:
    Authorization: Bearer ${ANALYTICS_API_KEY}
    X-Source: semstreams
Metrics Ingestion

Send metrics to observability platform with high retry count for data reliability:

name: metrics-ingest
type: output.httppost
config:
  ports:
    inputs:
      - name: metrics
        type: nats
        subject: metrics.>
        required: true
  url: https://metrics.platform.io/ingest
  timeout: 15
  retry_count: 10
  content_type: application/json
  headers:
    X-Tenant-ID: ${TENANT_ID}
    Authorization: Bearer ${METRICS_TOKEN}
Multi-Endpoint Fanout

Use multiple httppost components to send the same data to different endpoints:

# Component 1: Primary endpoint
name: primary-webhook
type: output.httppost
config:
  ports:
    inputs:
      - name: orders
        type: jetstream
        subject: orders.>
        stream_name: ORDERS
        required: true
  url: https://primary.api.com/orders
  retry_count: 5

# Component 2: Backup endpoint
name: backup-webhook
type: output.httppost
config:
  ports:
    inputs:
      - name: orders_backup
        type: jetstream
        subject: orders.>
        stream_name: ORDERS
        required: true
  url: https://backup.api.com/orders
  retry_count: 3

Performance Characteristics

  • Throughput: Network-dependent, typically 100-1000 requests/second
  • Memory: O(concurrent requests), minimal overhead per message
  • Latency: Network RTT + server processing time + retry delays
  • Connections: HTTP keep-alive enabled, connections reused across requests
  • Concurrency: Thread-safe, handles multiple concurrent messages

Observability

Health Status
health := component.Health()
// Healthy: true if component is running
// ErrorCount: Total failed requests after all retries
// Uptime: Time since Start()
Data Flow Metrics
metrics := component.DataFlow()
// ErrorRate: Percentage of failed requests (errors / total)
// LastActivity: Timestamp of last message processed
Lifecycle Reporting

The component reports lifecycle stages to NATS KV bucket COMPONENT_STATUS:

  • idle: Waiting for messages
  • posting: Actively sending HTTP requests (throttled updates)

Security Considerations

  • HTTPS Required: Always use HTTPS for sensitive data (authentication tokens, PII)
  • Environment Variables: Store API keys and secrets in environment variables, never hardcode
  • Certificate Validation: SSL certificate validation enabled by default (no InsecureSkipVerify)
  • Timeouts: Always configure timeouts to prevent hanging requests and resource exhaustion
  • Header Sanitization: Sensitive headers logged at debug level only

Limitations

Current version does not support:

  • Request batching (one HTTP request per message)
  • Circuit breaker pattern
  • Rate limiting
  • Request signing (HMAC, AWS Signature v4)
  • HTTP methods other than POST (add support via config if needed)
  • Response body processing (responses are read and discarded)

Thread Safety

The component is fully thread-safe:

  • HTTP client can be used from multiple goroutines
  • Start/Stop can be called from any goroutine
  • Metrics updates use atomic operations
  • Proper mutex protection for shared state

Error Reporting

All errors use the pkg/errs package for consistent classification:

  • Invalid: Configuration errors (bad URL, invalid timeout)
  • Transient: Retryable errors (network failures, 5xx responses)
  • Fatal: Unrecoverable errors (missing dependencies)

Errors are logged with structured context including URL, status code, and retry attempt number.

Testing

Run unit tests:

go test ./output/httppost -v

Run integration tests (requires Docker):

go test ./output/httppost -tags=integration -v
  • output/file: Write messages to disk
  • output/websocket: Stream messages to WebSocket clients
  • gateway/http: HTTP server for inbound requests
  • processor/jsonmap: Transform messages before HTTP POST

Documentation

Overview

Package httppost provides an HTTP POST output component for sending messages to HTTP endpoints.

Overview

The HTTP POST output component sends incoming NATS messages to HTTP/HTTPS endpoints via POST requests, with automatic retry logic, exponential backoff, and configurable headers. It implements the StreamKit component interfaces for lifecycle management and observability.

Quick Start

Send messages to an HTTP endpoint:

config := httppost.Config{
    Ports: &component.PortConfig{
        Inputs: []component.PortDefinition{
            {Name: "input", Type: "nats", Subject: "webhooks.>", Required: true},
        },
    },
    URL:     "https://api.example.com/events",
    Timeout: 10 * time.Second,
    Headers: map[string]string{
        "Content-Type":  "application/json",
        "Authorization": "Bearer ${API_KEY}",
    },
}

rawConfig, _ := json.Marshal(config)
output, err := httppost.NewOutput(rawConfig, deps)

Configuration

The HTTPPostConfig struct controls HTTP request behavior:

  • URL: Target HTTP/HTTPS endpoint
  • Method: HTTP method (default: "POST")
  • Headers: Custom HTTP headers (map[string]string)
  • Timeout: Request timeout (default: 10s)
  • RetryCount: Number of retry attempts (default: 3)
  • RetryDelay: Initial retry delay (default: 1s)
  • RetryBackoff: Backoff multiplier (default: 2.0)

Retry Logic

Automatic retry with exponential backoff for failed requests:

RetryCount: 3
RetryDelay: 1 * time.Second
RetryBackoff: 2.0

// Retry schedule:
// Attempt 1: Immediate
// Attempt 2: After 1s
// Attempt 3: After 2s
// Attempt 4: After 4s

Retryable conditions:

  • Network errors (connection refused, timeout)
  • 5xx server errors (500, 502, 503, 504)
  • Temporary failures (429 Too Many Requests)

Non-retryable conditions:

  • 4xx client errors (except 429)
  • Invalid configuration
  • Request body marshaling errors

HTTP Headers

Custom headers support environment variable expansion:

Headers: map[string]string{
    "Content-Type":  "application/json",
    "Authorization": "Bearer ${API_KEY}",  // Reads from env
    "X-Custom":      "static-value",
}

Standard headers automatically set:

  • Content-Type: application/json (if not specified)
  • Content-Length: Calculated from request body

Message Flow

NATS Subject → Message Handler → HTTP POST → Retry (if failed) → Success/Error

Response Handling

HTTP response codes determine success/failure:

2xx: Success
3xx: Redirect (followed automatically up to 10 times)
4xx: Client error (not retried, except 429)
5xx: Server error (retried with backoff)

Response bodies are read and discarded to enable connection reuse.

Lifecycle Management

Proper HTTP client lifecycle with graceful shutdown:

// Start posting
output.Start(ctx)

// Graceful shutdown
output.Stop(5 * time.Second)

During shutdown:

  1. Stop accepting new messages
  2. Wait for in-flight requests to complete
  3. Close HTTP client connections

Observability

The component implements component.Discoverable for monitoring:

meta := output.Meta()
// Name: httppost-output
// Type: output
// Description: HTTP POST output

health := output.Health()
// Healthy: true if recent requests succeeded
// ErrorCount: Failed requests
// Uptime: Time since Start()

dataFlow := output.DataFlow()
// MessagesPerSecond: POST rate
// BytesPerSecond: Byte throughput
// ErrorRate: Failure percentage

Performance Characteristics

  • Throughput: Network-dependent (100-1000 requests/second typical)
  • Memory: O(concurrent requests)
  • Latency: Network RTT + server processing
  • Connections: Reused via HTTP keep-alive

Error Handling

The component uses streamkit/errors for consistent error classification:

  • Invalid config: errs.WrapInvalid (bad URL, invalid headers)
  • Network errors: errs.WrapTransient (retryable)
  • HTTP 4xx: errs.WrapInvalid (client error, not retried)
  • HTTP 5xx: errs.WrapTransient (server error, retried)

All errors are logged with structured context (URL, status code, retry attempt).

Common Use Cases

**Webhook Integration:**

URL: "https://webhook.site/unique-id"
Timeout: 5 * time.Second
RetryCount: 3

**API Integration:**

URL: "https://api.example.com/v1/events"
Headers: {"Authorization": "Bearer ${TOKEN}"}
Timeout: 10 * time.Second

**Third-Party Service:**

URL: "https://metrics.service.com/ingest"
Method: "POST"
RetryCount: 5  // Higher retry for critical data

Thread Safety

The component is fully thread-safe:

  • HTTP client is thread-safe (shared across goroutines)
  • Start/Stop can be called from any goroutine
  • Metrics updates use atomic operations

Testing

The package includes comprehensive test coverage:

  • Unit tests: Config validation, retry logic, status codes
  • HTTP tests: Using httptest for mocked endpoints
  • Backoff tests: Exponential backoff verification
  • Header tests: Custom header handling

Run tests:

go test ./output/httppost -v

Limitations

Current version limitations:

  • No request batching (one HTTP request per message)
  • No circuit breaker pattern
  • No rate limiting
  • No request signing (HMAC, AWS Signature v4, etc.)
  • POST only (no PUT, PATCH, etc.) - use Method field for others

Security Considerations

  • HTTPS strongly recommended for sensitive data
  • API keys should use environment variables, not hardcoded
  • Validate SSL certificates (no InsecureSkipVerify)
  • Use timeouts to prevent hanging requests

Example: Complete Configuration

{
  "ports": {
    "inputs": [
      {"name": "input", "type": "nats", "subject": "events.webhook", "required": true}
    ]
  },
  "url": "https://api.example.com/webhook",
  "method": "POST",
  "headers": {
    "Content-Type": "application/json",
    "Authorization": "Bearer ${API_TOKEN}",
    "X-Source": "streamkit"
  },
  "timeout": "10s",
  "retry_count": 3,
  "retry_delay": "1s",
  "retry_backoff": 2.0
}

Package httppost provides HTTP POST output component for sending messages to HTTP endpoints

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewOutput

func NewOutput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewOutput creates a new HTTP POST output from configuration

func Register

func Register(registry *component.Registry) error

Register registers the HTTP POST output component with the given registry

Types

type Config

type Config struct {
	Ports       *component.PortConfig `json:"ports"        schema:"type:ports,description:Port configuration,category:basic"`
	URL         string                `json:"url"          schema:"type:string,description:HTTP endpoint URL,category:basic"`
	Headers     map[string]string     `json:"headers"      schema:"type:object,description:HTTP headers,category:advanced"`
	Timeout     int                   `json:"timeout"      schema:"type:int,description:Timeout (sec),category:advanced"`
	RetryCount  int                   `json:"retry_count"  schema:"type:int,description:Retry count,category:advanced"`
	ContentType string                `json:"content_type" schema:"type:string,description:Content-Type,category:basic"`
}

Config holds configuration for HTTP POST output component

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default configuration for HTTP POST output

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the configuration for errors

type Output

type Output struct {
	// contains filtered or unexported fields
}

Output implements HTTP POST output for NATS messages

func (*Output) ConfigSchema

func (h *Output) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema

func (*Output) DataFlow

func (h *Output) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics

func (*Output) Health

func (h *Output) Health() component.HealthStatus

Health returns the current health status

func (*Output) Initialize

func (h *Output) Initialize() error

Initialize prepares the output (no-op for HTTP POST)

func (*Output) InputPorts

func (h *Output) InputPorts() []component.Port

InputPorts returns configured input port definitions

func (*Output) Meta

func (h *Output) Meta() component.Metadata

Meta returns component metadata

func (*Output) OutputPorts

func (h *Output) OutputPorts() []component.Port

OutputPorts returns configured output port definitions (none for HTTP POST)

func (*Output) Start

func (h *Output) Start(ctx context.Context) error

Start begins sending messages via HTTP POST

func (*Output) Stop

func (h *Output) Stop(timeout time.Duration) error

Stop gracefully stops the output

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL