Documentation
¶
Index ¶
- Constants
- Variables
- func AbortWithError(c *gin.Context, code int, err error)
- func AbortWithValidationError(c *gin.Context, err error)
- func AuthMiddleware(apiKey, jwtSecret string, tenantRetriever TenantRetriever, opts AuthOptions) gin.HandlerFunc
- func CursorToPtr(cursor string) *string
- func ErrorHandlerMiddleware() gin.HandlerFunc
- func GetRequestLatency(c *gin.Context) time.Duration
- func LatencyMiddleware() gin.HandlerFunc
- func LoggerMiddleware(logger *logging.Logger) gin.HandlerFunc
- func LoggerMiddlewareWithSanitizer(logger *logging.Logger, sanitizer *RequestBodySanitizer) gin.HandlerFunc
- func MetricsMiddleware() gin.HandlerFunc
- func NewRouter(cfg RouterConfig, deps RouterDeps) http.Handler
- func ParseArrayParam(c *gin.Context, fieldName string) []string
- func ParseCursors(c *gin.Context) (CursorParams, *ErrorResponse)
- func ParseDateFilter(c *gin.Context, fieldName string) (*DateFilterResult, *ErrorResponse)
- type APIAttempt
- type APIEvent
- type APIEventFull
- type APIEventSummary
- type AttemptPaginatedResult
- type AuthOptions
- type BufferedReader
- type CreateDestinationRequest
- type CursorParams
- type DateFilterResult
- type DestinationHandlers
- func (h *DestinationHandlers) Create(c *gin.Context)
- func (h *DestinationHandlers) Delete(c *gin.Context)
- func (h *DestinationHandlers) Disable(c *gin.Context)
- func (h *DestinationHandlers) Enable(c *gin.Context)
- func (h *DestinationHandlers) List(c *gin.Context)
- func (h *DestinationHandlers) ListProviderMetadata(c *gin.Context)
- func (h *DestinationHandlers) Retrieve(c *gin.Context)
- func (h *DestinationHandlers) RetrieveProviderMetadata(c *gin.Context)
- func (h *DestinationHandlers) Update(c *gin.Context)
- type ErrorResponse
- type EventPaginatedResult
- type IncludeOptions
- type JWTClaims
- type LogHandlers
- type PublishHandlers
- type PublishedEvent
- type RequestBodySanitizer
- type RetryHandlers
- type RouteDefinition
- type RouterConfig
- type RouterDeps
- type SeekPagination
- type TenantHandlers
- func (h *TenantHandlers) Delete(c *gin.Context)
- func (h *TenantHandlers) List(c *gin.Context)
- func (h *TenantHandlers) Retrieve(c *gin.Context)
- func (h *TenantHandlers) RetrievePortal(c *gin.Context)
- func (h *TenantHandlers) RetrieveToken(c *gin.Context)
- func (h *TenantHandlers) Upsert(c *gin.Context)
- type TenantRetriever
- type TopicHandlers
- type UpdateDestinationRequest
Constants ¶
const ( // Role values RoleAdmin = "admin" RoleTenant = "tenant" )
const ( // MaxRequestBodySize limits the size of request bodies we'll buffer for logging // Set to 10KB to prevent excessive log volumes MaxRequestBodySize = 10 * 1024 // SensitiveFieldMask is used to replace sensitive values SensitiveFieldMask = "[REDACTED]" )
Variables ¶
var ( ErrMissingAuthHeader = errors.New("missing authorization header") ErrInvalidBearerToken = errors.New("invalid bearer token format") )
var (
ErrInvalidToken = errors.New("invalid token")
)
var JWT = jsonwebtoken{}
Functions ¶
func AuthMiddleware ¶ added in v0.13.0
func AuthMiddleware(apiKey, jwtSecret string, tenantRetriever TenantRetriever, opts AuthOptions) gin.HandlerFunc
AuthMiddleware returns a single gin.HandlerFunc that handles authentication, authorization, and tenant resolution for every route.
Flow:
- VPC mode (apiKey=""): grant admin, resolve tenant if RequireTenant, done.
- Validate auth header → 401 if missing/malformed.
- token == apiKey → admin, resolve tenant if RequireTenant, done.
- JWT.Extract(token) → 401 if invalid.
- AdminOnly? → 403.
- :tenant_id param mismatch? → 403.
- Set tenantID + RoleTenant, always resolve tenant for JWT → 401 if missing/deleted.
func CursorToPtr ¶ added in v0.13.0
CursorToPtr converts an empty cursor string to nil, or returns a pointer to the string. Returns null for empty cursors instead of empty string.
func ErrorHandlerMiddleware ¶
func ErrorHandlerMiddleware() gin.HandlerFunc
func GetRequestLatency ¶
GetRequestLatency returns the request latency from the context
func LatencyMiddleware ¶
func LatencyMiddleware() gin.HandlerFunc
func LoggerMiddleware ¶
func LoggerMiddleware(logger *logging.Logger) gin.HandlerFunc
func LoggerMiddlewareWithSanitizer ¶
func LoggerMiddlewareWithSanitizer(logger *logging.Logger, sanitizer *RequestBodySanitizer) gin.HandlerFunc
func MetricsMiddleware ¶
func MetricsMiddleware() gin.HandlerFunc
func NewRouter ¶
func NewRouter(cfg RouterConfig, deps RouterDeps) http.Handler
func ParseArrayParam ¶ added in v0.13.0
ParseArrayParam parses bracket notation array parameters (e.g., field[0]=a&field[1]=b). Returns the parsed array in index order. Supports both numeric indices [0], [1] and simple repeated params field[]=a&field[]=b.
func ParseCursors ¶ added in v0.13.0
func ParseCursors(c *gin.Context) (CursorParams, *ErrorResponse)
ParseCursors parses the "next" and "prev" query parameters. Returns an error if both are provided (mutually exclusive).
func ParseDateFilter ¶ added in v0.13.0
func ParseDateFilter(c *gin.Context, fieldName string) (*DateFilterResult, *ErrorResponse)
ParseDateFilter parses bracket notation date filters (e.g., field[gte], field[lte], field[gt], field[lt]). Returns 400 for unsupported operators (any). Accepts both RFC3339 format (2024-01-01T00:00:00Z) and date-only format (2024-01-01).
Types ¶
type APIAttempt ¶ added in v0.13.0
type APIAttempt struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
Status string `json:"status"`
Time time.Time `json:"time"`
Code string `json:"code,omitempty"`
ResponseData map[string]interface{} `json:"response_data,omitempty"`
AttemptNumber int `json:"attempt_number"`
Manual bool `json:"manual"`
EventID string `json:"event_id"`
DestinationID string `json:"destination_id"`
Event interface{} `json:"event,omitempty"`
}
APIAttempt is the API response for an attempt
type APIEvent ¶ added in v0.13.0
type APIEvent struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
DestinationID string `json:"destination_id"`
Topic string `json:"topic"`
Time time.Time `json:"time"`
EligibleForRetry bool `json:"eligible_for_retry"`
Metadata map[string]string `json:"metadata,omitempty"`
Data map[string]interface{} `json:"data,omitempty"`
}
APIEvent is the API response for retrieving a single event
type APIEventFull ¶ added in v0.13.0
type APIEventFull struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
DestinationID string `json:"destination_id"`
Topic string `json:"topic"`
Time time.Time `json:"time"`
EligibleForRetry bool `json:"eligible_for_retry"`
Metadata map[string]string `json:"metadata,omitempty"`
Data map[string]interface{} `json:"data,omitempty"`
}
APIEventFull is the event object when expand=event.data
type APIEventSummary ¶ added in v0.13.0
type APIEventSummary struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
DestinationID string `json:"destination_id"`
Topic string `json:"topic"`
Time time.Time `json:"time"`
EligibleForRetry bool `json:"eligible_for_retry"`
Metadata map[string]string `json:"metadata,omitempty"`
}
APIEventSummary is the event object when expand=event (without data)
type AttemptPaginatedResult ¶ added in v0.13.0
type AttemptPaginatedResult struct {
Models []APIAttempt `json:"models"`
Pagination SeekPagination `json:"pagination"`
}
AttemptPaginatedResult is the paginated response for listing attempts.
type AuthOptions ¶ added in v0.13.0
AuthOptions configures the behaviour of AuthMiddleware.
type BufferedReader ¶
type BufferedReader struct {
// contains filtered or unexported fields
}
BufferedReader creates a reader that can be used multiple times
func NewBufferedReader ¶
func NewBufferedReader(r io.Reader) (*BufferedReader, error)
NewBufferedReader creates a new BufferedReader from an io.Reader
func (*BufferedReader) Bytes ¶
func (br *BufferedReader) Bytes() []byte
Bytes returns a copy of the buffered content
func (*BufferedReader) NewReadCloser ¶
func (br *BufferedReader) NewReadCloser() io.ReadCloser
func (*BufferedReader) NewReader ¶
func (br *BufferedReader) NewReader() io.Reader
NewReader creates a new reader from the buffered content
type CreateDestinationRequest ¶
type CreateDestinationRequest struct {
ID string `json:"id" binding:"-"`
Type string `json:"type" binding:"required"`
Topics models.Topics `json:"topics" binding:"required"`
Filter models.Filter `json:"filter,omitempty" binding:"-"`
Config models.Config `json:"config" binding:"-"`
Credentials models.Credentials `json:"credentials" binding:"-"`
DeliveryMetadata models.DeliveryMetadata `json:"delivery_metadata,omitempty" binding:"-"`
Metadata models.Metadata `json:"metadata,omitempty" binding:"-"`
}
func (*CreateDestinationRequest) ToDestination ¶
func (r *CreateDestinationRequest) ToDestination(tenantID string) models.Destination
type CursorParams ¶ added in v0.13.0
CursorParams holds the parsed cursor values for pagination.
type DateFilterResult ¶ added in v0.13.0
type DateFilterResult struct {
GTE *time.Time // Greater than or equal (>=)
LTE *time.Time // Less than or equal (<=)
GT *time.Time // Greater than (>)
LT *time.Time // Less than (<)
}
DateFilterResult holds the parsed date filter values.
type DestinationHandlers ¶
type DestinationHandlers struct {
// contains filtered or unexported fields
}
func NewDestinationHandlers ¶
func NewDestinationHandlers(logger *logging.Logger, telemetry telemetry.Telemetry, tenantStore tenantstore.TenantStore, topics []string, registry destregistry.Registry) *DestinationHandlers
func (*DestinationHandlers) Create ¶
func (h *DestinationHandlers) Create(c *gin.Context)
func (*DestinationHandlers) Delete ¶
func (h *DestinationHandlers) Delete(c *gin.Context)
func (*DestinationHandlers) Disable ¶
func (h *DestinationHandlers) Disable(c *gin.Context)
func (*DestinationHandlers) Enable ¶
func (h *DestinationHandlers) Enable(c *gin.Context)
func (*DestinationHandlers) List ¶
func (h *DestinationHandlers) List(c *gin.Context)
func (*DestinationHandlers) ListProviderMetadata ¶
func (h *DestinationHandlers) ListProviderMetadata(c *gin.Context)
func (*DestinationHandlers) Retrieve ¶
func (h *DestinationHandlers) Retrieve(c *gin.Context)
func (*DestinationHandlers) RetrieveProviderMetadata ¶
func (h *DestinationHandlers) RetrieveProviderMetadata(c *gin.Context)
func (*DestinationHandlers) Update ¶
func (h *DestinationHandlers) Update(c *gin.Context)
type ErrorResponse ¶
type ErrorResponse struct {
Err error `json:"-"`
Code int `json:"-"`
Status int `json:"status"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
func NewErrBadRequest ¶
func NewErrBadRequest(err error) ErrorResponse
func NewErrInternalServer ¶
func NewErrInternalServer(err error) ErrorResponse
func NewErrNotFound ¶
func NewErrNotFound(resource string) ErrorResponse
func ParseDir ¶ added in v0.13.0
func ParseDir(c *gin.Context) (string, *ErrorResponse)
ParseDir parses the "dir" query parameter for sort direction. Returns the direction (empty string if not provided) and any validation error response. Valid values are "asc" and "desc". Caller/store should apply default if empty.
func ParseOrderBy ¶ added in v0.13.0
func ParseOrderBy(c *gin.Context, allowedValues []string) (string, *ErrorResponse)
ParseOrderBy parses the "order_by" query parameter and validates against allowed values. Returns the order_by value (empty string if not provided) and any validation error response. Caller/store should apply default if empty.
func (ErrorResponse) Error ¶
func (e ErrorResponse) Error() string
func (*ErrorResponse) Parse ¶
func (e *ErrorResponse) Parse(err error)
type EventPaginatedResult ¶ added in v0.13.0
type EventPaginatedResult struct {
Models []APIEvent `json:"models"`
Pagination SeekPagination `json:"pagination"`
}
EventPaginatedResult is the paginated response for listing events.
type IncludeOptions ¶ added in v0.13.0
IncludeOptions represents which fields to include in the response
type LogHandlers ¶
type LogHandlers struct {
// contains filtered or unexported fields
}
func NewLogHandlers ¶
func NewLogHandlers( logger *logging.Logger, logStore logstore.LogStore, ) *LogHandlers
func (*LogHandlers) ListAttempts ¶ added in v0.13.0
func (h *LogHandlers) ListAttempts(c *gin.Context)
ListAttempts handles GET /attempts Query params: tenant_id, event_id, destination_id, status, topic[], time[gte], time[lte], time[gt], time[lt], limit, next, prev, include, order_by, dir
func (*LogHandlers) ListDestinationAttempts ¶ added in v0.13.0
func (h *LogHandlers) ListDestinationAttempts(c *gin.Context)
ListDestinationAttempts handles GET /:tenant_id/destinations/:destination_id/attempts Same as ListAttempts but scoped to a specific destination via URL param.
func (*LogHandlers) ListEvents ¶ added in v0.13.0
func (h *LogHandlers) ListEvents(c *gin.Context)
ListEvents handles GET /events Query params: tenant_id, destination_id, topic[], time[gte], time[lte], time[gt], time[lt], limit, next, prev, include, order_by, dir
func (*LogHandlers) RetrieveAttempt ¶ added in v0.13.0
func (h *LogHandlers) RetrieveAttempt(c *gin.Context)
RetrieveAttempt handles GET /attempts/:attempt_id
func (*LogHandlers) RetrieveEvent ¶
func (h *LogHandlers) RetrieveEvent(c *gin.Context)
RetrieveEvent handles GET /events/:event_id
type PublishHandlers ¶
type PublishHandlers struct {
// contains filtered or unexported fields
}
func NewPublishHandlers ¶
func NewPublishHandlers( logger *logging.Logger, eventHandler eventHandler, ) *PublishHandlers
func (*PublishHandlers) Ingest ¶
func (h *PublishHandlers) Ingest(c *gin.Context)
type PublishedEvent ¶
type PublishedEvent struct {
ID string `json:"id"`
TenantID string `json:"tenant_id" binding:"required"`
DestinationID string `json:"destination_id"`
Topic string `json:"topic"`
EligibleForRetry *bool `json:"eligible_for_retry"`
Time time.Time `json:"time"`
Metadata map[string]string `json:"metadata"`
Data map[string]interface{} `json:"data" binding:"required"`
}
type RequestBodySanitizer ¶
type RequestBodySanitizer struct {
// contains filtered or unexported fields
}
RequestBodySanitizer handles sanitization of request bodies for logging
func NewRequestBodySanitizer ¶
func NewRequestBodySanitizer(registry destregistry.Registry) *RequestBodySanitizer
NewRequestBodySanitizer creates a new sanitizer instance
func (*RequestBodySanitizer) SanitizeRequestBody ¶
func (s *RequestBodySanitizer) SanitizeRequestBody(body io.Reader) ([]byte, error)
SanitizeRequestBody reads and sanitizes a request body for safe logging
type RetryHandlers ¶
type RetryHandlers struct {
// contains filtered or unexported fields
}
func NewRetryHandlers ¶
func NewRetryHandlers( logger *logging.Logger, tenantStore tenantstore.TenantStore, logStore logstore.LogStore, deliveryPublisher deliveryPublisher, ) *RetryHandlers
func (*RetryHandlers) Retry ¶
func (h *RetryHandlers) Retry(c *gin.Context)
Retry handles POST /retry Accepts { event_id, destination_id } in body. Looks up the event, verifies the destination exists and is enabled, then publishes a manual delivery task.
type RouteDefinition ¶
type RouteDefinition struct {
Method string
Path string
Handler gin.HandlerFunc
AdminOnly bool
RequireTenant bool
Middlewares []gin.HandlerFunc
}
type RouterConfig ¶
type RouterConfig struct {
ServiceName string
APIKey string
JWTSecret string
DeploymentID string
Topics []string
Registry destregistry.Registry
PortalConfig portal.PortalConfig
GinMode string
}
type RouterDeps ¶ added in v0.13.0
type RouterDeps struct {
TenantStore tenantstore.TenantStore
LogStore logstore.LogStore
Logger *logging.Logger
DeliveryPublisher deliveryPublisher
EventHandler eventHandler
Telemetry telemetry.Telemetry
}
type SeekPagination ¶ added in v0.13.0
type SeekPagination struct {
OrderBy string `json:"order_by"`
Dir string `json:"dir"`
Limit int `json:"limit"`
Next *string `json:"next"`
Prev *string `json:"prev"`
}
SeekPagination represents cursor-based pagination metadata for list responses.
type TenantHandlers ¶
type TenantHandlers struct {
// contains filtered or unexported fields
}
func NewTenantHandlers ¶
func NewTenantHandlers( logger *logging.Logger, telemetry telemetry.Telemetry, jwtSecret string, deploymentID string, tenantStore tenantstore.TenantStore, ) *TenantHandlers
func (*TenantHandlers) Delete ¶
func (h *TenantHandlers) Delete(c *gin.Context)
func (*TenantHandlers) List ¶ added in v0.12.0
func (h *TenantHandlers) List(c *gin.Context)
func (*TenantHandlers) Retrieve ¶
func (h *TenantHandlers) Retrieve(c *gin.Context)
func (*TenantHandlers) RetrievePortal ¶
func (h *TenantHandlers) RetrievePortal(c *gin.Context)
func (*TenantHandlers) RetrieveToken ¶
func (h *TenantHandlers) RetrieveToken(c *gin.Context)
func (*TenantHandlers) Upsert ¶
func (h *TenantHandlers) Upsert(c *gin.Context)
type TenantRetriever ¶ added in v0.13.0
type TenantRetriever interface {
RetrieveTenant(ctx context.Context, tenantID string) (*models.Tenant, error)
}
TenantRetriever is satisfied by tenantstore.TenantStore. Defined here to avoid coupling the router/middleware to the full store interface.
type TopicHandlers ¶
type TopicHandlers struct {
// contains filtered or unexported fields
}
func NewTopicHandlers ¶
func NewTopicHandlers(logger *logging.Logger, topics []string) *TopicHandlers
func (*TopicHandlers) List ¶
func (h *TopicHandlers) List(c *gin.Context)
type UpdateDestinationRequest ¶
type UpdateDestinationRequest struct {
Type string `json:"type" binding:"-"`
Topics models.Topics `json:"topics" binding:"-"`
Filter models.Filter `json:"filter,omitempty" binding:"-"`
Config models.Config `json:"config" binding:"-"`
Credentials models.Credentials `json:"credentials" binding:"-"`
DeliveryMetadata models.DeliveryMetadata `json:"delivery_metadata,omitempty" binding:"-"`
Metadata models.Metadata `json:"metadata,omitempty" binding:"-"`
}