Documentation
¶
Index ¶
- Constants
- type ConsumerConfig
- type InboundServer
- type OutboundClient
- func (c *OutboundClient) AddOutboundSubscription(ctx context.Context, streamName, consumerName, subject string) error
- func (c *OutboundClient) AddSubscription(ctx context.Context, streamName, consumerName, subject string, workers int) (*OutboundSubscription, error)
- func (c *OutboundClient) GetSubscriptions() []*OutboundSubscription
- func (c *OutboundClient) RemoveOutboundSubscription(subject string)
- func (c *OutboundClient) Start(ctx context.Context) error
- func (c *OutboundClient) Stop() error
- type OutboundSubscription
- type ServerConfig
Constants ¶
const ( // DefaultInboundWorkerCount is the default number of workers processing inbound webhooks. DefaultInboundWorkerCount = 10 // DefaultInboundQueueSize is the default size of the inbound work queue. DefaultInboundQueueSize = 100 // MaxInboundBodySize is the maximum size of an inbound HTTP request body (10MB). MaxInboundBodySize = 10 * 1024 * 1024 )
Server defaults and limits
const (
// MaxOutboundResponseSize is the maximum size of an HTTP response body to read for logging (1MB).
MaxOutboundResponseSize = 1024 * 1024
)
Client limits and timeout constants
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerConfig ¶
type ConsumerConfig struct {
WorkerCount int
FetchBatchSize int
FetchTimeout time.Duration
MaxAckPending int
AckWaitTimeout time.Duration
MaxDeliver int
}
ConsumerConfig contains JetStream consumer configuration
type InboundServer ¶
type InboundServer struct {
// contains filtered or unexported fields
}
InboundServer handles HTTP requests and publishes to NATS. It uses a fixed-size worker pool for bounded concurrency and backpressure.
func NewInboundServer ¶
func NewInboundServer( log *slog.Logger, m *metrics.Metrics, processor *rule.Processor, js jetstream.JetStream, nc *nats.Conn, serverCfg *ServerConfig, publishCfg *config.PublishConfig, ) *InboundServer
NewInboundServer creates a new HTTP inbound server with a worker pool.
type OutboundClient ¶
type OutboundClient struct {
// contains filtered or unexported fields
}
OutboundClient handles NATS messages and makes HTTP requests ACK-on-Success: ACKs message only on HTTP 200-299, NAKs on failure
func NewOutboundClient ¶
func NewOutboundClient( logger *slog.Logger, metrics *metrics.Metrics, processor *rule.Processor, js jetstream.JetStream, consumerCfg *ConsumerConfig, httpClientCfg *config.HTTPClientConfig, ) *OutboundClient
NewOutboundClient creates a new HTTP outbound client
func (*OutboundClient) AddOutboundSubscription ¶ added in v0.0.3
func (c *OutboundClient) AddOutboundSubscription(ctx context.Context, streamName, consumerName, subject string) error
AddOutboundSubscription implements broker.OutboundSubscriber.
func (*OutboundClient) AddSubscription ¶
func (c *OutboundClient) AddSubscription(ctx context.Context, streamName, consumerName, subject string, workers int) (*OutboundSubscription, error)
AddSubscription adds a NATS subscription for outbound HTTP.
func (*OutboundClient) GetSubscriptions ¶
func (c *OutboundClient) GetSubscriptions() []*OutboundSubscription
GetSubscriptions returns a copy of all subscriptions
func (*OutboundClient) RemoveOutboundSubscription ¶ added in v0.0.3
func (c *OutboundClient) RemoveOutboundSubscription(subject string)
RemoveOutboundSubscription implements broker.OutboundSubscriber. Blocks until in-flight workers finish so the subsequent DeleteConsumer call does not race an in-flight Ack/Nak. Deletion errors are logged (consumer left for manual cleanup) and ErrConsumerNotFound is treated as success so callers can replay the call without spurious warnings.
func (*OutboundClient) Start ¶
func (c *OutboundClient) Start(ctx context.Context) error
Start begins consuming messages and making HTTP requests
func (*OutboundClient) Stop ¶
func (c *OutboundClient) Stop() error
Stop gracefully shuts down all outbound subscriptions
type OutboundSubscription ¶
type OutboundSubscription struct {
Subject string
ConsumerName string
StreamName string
Consumer jetstream.Consumer
Workers int
// contains filtered or unexported fields
}
OutboundSubscription represents a NATS subscription for outbound HTTP
type ServerConfig ¶
type ServerConfig struct {
Address string
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
MaxHeaderBytes int
ShutdownGracePeriod time.Duration
// Number of concurrent workers processing inbound webhooks.
// This should be a configurable value.
InboundWorkerCount int
// Size of the buffered channel for incoming webhooks.
// This allows the server to absorb bursts of traffic.
InboundQueueSize int
}
ServerConfig contains HTTP server configuration. Added worker pool configuration fields.