gateway

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultInboundWorkerCount is the default number of workers processing inbound webhooks.
	DefaultInboundWorkerCount = 10
	// DefaultInboundQueueSize is the default size of the inbound work queue.
	DefaultInboundQueueSize = 100
	// MaxInboundBodySize is the maximum size of an inbound HTTP request body (10MB).
	MaxInboundBodySize = 10 * 1024 * 1024
)

Server defaults and limits

View Source
const (
	// MaxOutboundResponseSize is the maximum size of an HTTP response body to read for logging (1MB).
	MaxOutboundResponseSize = 1024 * 1024
)

Client limits and timeout constants

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerConfig

type ConsumerConfig struct {
	WorkerCount    int
	FetchBatchSize int
	FetchTimeout   time.Duration
	MaxAckPending  int
	AckWaitTimeout time.Duration
	MaxDeliver     int
}

ConsumerConfig contains JetStream consumer configuration

type InboundServer

type InboundServer struct {
	// contains filtered or unexported fields
}

InboundServer handles HTTP requests and publishes to NATS. It uses a fixed-size worker pool for bounded concurrency and backpressure.

func NewInboundServer

func NewInboundServer(
	log *slog.Logger,
	m *metrics.Metrics,
	processor *rule.Processor,
	js jetstream.JetStream,
	nc *nats.Conn,
	serverCfg *ServerConfig,
	publishCfg *config.PublishConfig,
) *InboundServer

NewInboundServer creates a new HTTP inbound server with a worker pool.

func (*InboundServer) Start

func (s *InboundServer) Start(ctx context.Context) error

Start begins the HTTP server and starts the worker pool.

func (*InboundServer) Stop

func (s *InboundServer) Stop(ctx context.Context) error

Stop gracefully shuts down the HTTP server and the worker pool.

type OutboundClient

type OutboundClient struct {
	// contains filtered or unexported fields
}

OutboundClient handles NATS messages and makes HTTP requests ACK-on-Success: ACKs message only on HTTP 200-299, NAKs on failure

func NewOutboundClient

func NewOutboundClient(
	logger *slog.Logger,
	metrics *metrics.Metrics,
	processor *rule.Processor,
	js jetstream.JetStream,
	consumerCfg *ConsumerConfig,
	httpClientCfg *config.HTTPClientConfig,
) *OutboundClient

NewOutboundClient creates a new HTTP outbound client

func (*OutboundClient) AddOutboundSubscription added in v0.0.3

func (c *OutboundClient) AddOutboundSubscription(ctx context.Context, streamName, consumerName, subject string) error

AddOutboundSubscription implements broker.OutboundSubscriber.

func (*OutboundClient) AddSubscription

func (c *OutboundClient) AddSubscription(ctx context.Context, streamName, consumerName, subject string, workers int) (*OutboundSubscription, error)

AddSubscription adds a NATS subscription for outbound HTTP.

func (*OutboundClient) GetSubscriptions

func (c *OutboundClient) GetSubscriptions() []*OutboundSubscription

GetSubscriptions returns a copy of all subscriptions

func (*OutboundClient) RemoveOutboundSubscription added in v0.0.3

func (c *OutboundClient) RemoveOutboundSubscription(subject string)

RemoveOutboundSubscription implements broker.OutboundSubscriber. Blocks until in-flight workers finish so the subsequent DeleteConsumer call does not race an in-flight Ack/Nak. Deletion errors are logged (consumer left for manual cleanup) and ErrConsumerNotFound is treated as success so callers can replay the call without spurious warnings.

func (*OutboundClient) Start

func (c *OutboundClient) Start(ctx context.Context) error

Start begins consuming messages and making HTTP requests

func (*OutboundClient) Stop

func (c *OutboundClient) Stop() error

Stop gracefully shuts down all outbound subscriptions

type OutboundSubscription

type OutboundSubscription struct {
	Subject      string
	ConsumerName string
	StreamName   string
	Consumer     jetstream.Consumer
	Workers      int
	// contains filtered or unexported fields
}

OutboundSubscription represents a NATS subscription for outbound HTTP

type ServerConfig

type ServerConfig struct {
	Address             string
	ReadTimeout         time.Duration
	WriteTimeout        time.Duration
	IdleTimeout         time.Duration
	MaxHeaderBytes      int
	ShutdownGracePeriod time.Duration

	// Number of concurrent workers processing inbound webhooks.
	// This should be a configurable value.
	InboundWorkerCount int

	// Size of the buffered channel for incoming webhooks.
	// This allows the server to absorb bursts of traffic.
	InboundQueueSize int
}

ServerConfig contains HTTP server configuration. Added worker pool configuration fields.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL