Documentation
¶
Overview ¶
Package service implements the surface-agnostic business logic that the CLI and Twirp layers all call into. Each surface translates its own shape to/from these methods and never touches the broker or manager directly.
Index ¶
- type DLQItemSummary
- type DLQListing
- type KeyListing
- type KeySummary
- type MgmtTokenResult
- type PublishAck
- type RPCAdapter
- func (a *RPCAdapter) CreatePrivate(ctx context.Context, in *rpc.CreatePrivateRequest) (*rpc.Credentials, error)
- func (a *RPCAdapter) DeleteChannel(ctx context.Context, in *rpc.ChannelRef) (*rpc.Empty, error)
- func (a *RPCAdapter) DlqCount(ctx context.Context, in *rpc.DlqCountRequest) (*rpc.DlqCountResponse, error)
- func (a *RPCAdapter) DlqDiscard(ctx context.Context, in *rpc.DlqDiscardRequest) (*rpc.Empty, error)
- func (a *RPCAdapter) DlqList(ctx context.Context, in *rpc.DlqListRequest) (*rpc.DlqListResponse, error)
- func (a *RPCAdapter) DlqReplay(ctx context.Context, in *rpc.DlqReplayRequest) (*rpc.Empty, error)
- func (a *RPCAdapter) GenerateKey(ctx context.Context, in *rpc.GenerateKeyRequest) (*rpc.KeySummary, error)
- func (a *RPCAdapter) GetChannel(ctx context.Context, in *rpc.ChannelRef) (*rpc.ChannelResponse, error)
- func (a *RPCAdapter) IssueMgmt(ctx context.Context, in *rpc.IssueMgmtRequest) (*rpc.IssueMgmtResponse, error)
- func (a *RPCAdapter) ListChannels(ctx context.Context, _ *rpc.Empty) (*rpc.ListChannelsResponse, error)
- func (a *RPCAdapter) ListKeys(ctx context.Context, _ *rpc.Empty) (*rpc.ListKeysResponse, error)
- func (a *RPCAdapter) Manifest(ctx context.Context, _ *rpc.Empty) (*rpc.JSONResponse, error)
- func (a *RPCAdapter) OpenPublic(ctx context.Context, in *rpc.OpenPublicRequest) (*rpc.ChannelResponse, error)
- func (a *RPCAdapter) Presence(ctx context.Context, in *rpc.ChannelRef) (*rpc.PresenceResponse, error)
- func (a *RPCAdapter) PromoteKey(ctx context.Context, in *rpc.KeyRef) (*rpc.Empty, error)
- func (a *RPCAdapter) Publish(ctx context.Context, in *rpc.PublishRequest) (*rpc.PublishResponse, error)
- func (a *RPCAdapter) RefreshToken(ctx context.Context, in *rpc.RefreshTokenRequest) (*rpc.RefreshTokenResponse, error)
- func (a *RPCAdapter) ReloadKeys(ctx context.Context, _ *rpc.Empty) (*rpc.Empty, error)
- func (a *RPCAdapter) RetireKey(ctx context.Context, in *rpc.KeyRef) (*rpc.Empty, error)
- func (a *RPCAdapter) RevokeToken(ctx context.Context, in *rpc.RevokeTokenRequest) (*rpc.Empty, error)
- func (a *RPCAdapter) RevokeUser(ctx context.Context, in *rpc.RevokeUserRequest) (*rpc.Empty, error)
- type Service
- func (s *Service) CountDLQ(ctx context.Context, sink string) (int, error)
- func (s *Service) CreatePrivateChannel(ctx context.Context, subjectID, name string, ttl time.Duration, ...) (parsec.Credentials, error)
- func (s *Service) DeleteChannel(ctx context.Context, name string) error
- func (s *Service) DiscardDLQ(ctx context.Context, id string) error
- func (s *Service) GenerateKey(ctx context.Context, alg auth.Alg) (KeySummary, error)
- func (s *Service) GetChannel(ctx context.Context, name string) (*channels.Channel, error)
- func (s *Service) IssueMgmt(ctx context.Context, subject string, ttl time.Duration) (MgmtTokenResult, error)
- func (s *Service) ListChannels(ctx context.Context) []channels.Channel
- func (s *Service) ListDLQ(ctx context.Context, sink string, limit int) (DLQListing, error)
- func (s *Service) ListKeys(ctx context.Context) KeyListing
- func (s *Service) Manifest(ctx context.Context) descriptor.Envelope
- func (s *Service) OpenPublicChannel(ctx context.Context, name string, ttl time.Duration) (*channels.Channel, error)
- func (s *Service) Parsec() *parsec.Parsec
- func (s *Service) Presence(ctx context.Context, name string) (int, error)
- func (s *Service) PromoteKey(ctx context.Context, id string) error
- func (s *Service) Publish(ctx context.Context, name string, data []byte) (PublishAck, error)
- func (s *Service) RefreshAccess(ctx context.Context, refreshToken string) (parsec.RefreshResult, error)
- func (s *Service) ReloadKeys(ctx context.Context) error
- func (s *Service) ReplayDLQ(ctx context.Context, id string) error
- func (s *Service) RetireKey(ctx context.Context, id string) error
- func (s *Service) RevokeToken(ctx context.Context, tokenID, userID, reason string) error
- func (s *Service) RevokeUser(ctx context.Context, userID, reason string) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DLQItemSummary ¶
type DLQItemSummary struct {
ID string `json:"id"`
Sink string `json:"sink"`
At string `json:"at"`
Recipient json.RawMessage `json:"recipient,omitempty"`
Subject string `json:"subject,omitempty"`
Body string `json:"body,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
Attempts int `json:"attempts"`
LastError string `json:"last_error,omitempty"`
}
DLQItemSummary is the surface-shape of a sinks.DLQItem. Recipient is rendered back as raw JSON so the wire and CLI views stay sink-agnostic.
type DLQListing ¶
type DLQListing struct {
Items []DLQItemSummary `json:"items"`
}
DLQListing is a list response.
type KeyListing ¶
type KeyListing struct {
ActiveKeyID string `json:"active_key_id"`
Keys []KeySummary `json:"keys"`
}
KeyListing is the result of ListKeys.
type KeySummary ¶
type KeySummary struct {
ID string `json:"id"`
Alg string `json:"alg,omitempty"`
Role string `json:"role"`
CreatedAt string `json:"created_at"`
RetiredAt string `json:"retired_at,omitempty"`
}
KeySummary is the surface-shape of an auth.Key.
type MgmtTokenResult ¶
type MgmtTokenResult struct {
Token string `json:"token"`
Expires time.Time `json:"expires"`
SignedByKeyID string `json:"signed_by_key_id"`
}
MgmtTokenResult is what IssueMgmt returns.
type PublishAck ¶
PublishAck is the surface-shape of a publish result.
type RPCAdapter ¶
type RPCAdapter struct{ S *Service }
RPCAdapter implements rpc.ParsecService over a Service. Surface code wires this at boot — internal/server passes it to rpc.NewParsecServiceServer.
func (*RPCAdapter) CreatePrivate ¶
func (a *RPCAdapter) CreatePrivate(ctx context.Context, in *rpc.CreatePrivateRequest) (*rpc.Credentials, error)
CreatePrivate mints a private channel and returns the token pair. Pattern scopes from the request are stamped into both tokens; an empty list produces an exact-match-only grant on the channel.
func (*RPCAdapter) DeleteChannel ¶
func (a *RPCAdapter) DeleteChannel(ctx context.Context, in *rpc.ChannelRef) (*rpc.Empty, error)
DeleteChannel removes a channel.
func (*RPCAdapter) DlqCount ¶
func (a *RPCAdapter) DlqCount(ctx context.Context, in *rpc.DlqCountRequest) (*rpc.DlqCountResponse, error)
DlqCount returns the live size of the DLQ for sink.
func (*RPCAdapter) DlqDiscard ¶
func (a *RPCAdapter) DlqDiscard(ctx context.Context, in *rpc.DlqDiscardRequest) (*rpc.Empty, error)
DlqDiscard removes a DLQ entry by id. Missing ids are silently OK.
func (*RPCAdapter) DlqList ¶
func (a *RPCAdapter) DlqList(ctx context.Context, in *rpc.DlqListRequest) (*rpc.DlqListResponse, error)
DlqList returns up to in.Limit DLQ items for the supplied sink (empty sink → all sinks).
func (*RPCAdapter) DlqReplay ¶
func (a *RPCAdapter) DlqReplay(ctx context.Context, in *rpc.DlqReplayRequest) (*rpc.Empty, error)
DlqReplay re-runs the stored item through the original sink. The DLQ entry remains in place — if replay fails again the Retrier pushes a new item, and the operator can compare attempt counts.
func (*RPCAdapter) GenerateKey ¶
func (a *RPCAdapter) GenerateKey(ctx context.Context, in *rpc.GenerateKeyRequest) (*rpc.KeySummary, error)
GenerateKey mints a fresh key (verify-only). Algorithm is selected via the request's Alg field; empty defaults to HS256.
func (*RPCAdapter) GetChannel ¶
func (a *RPCAdapter) GetChannel(ctx context.Context, in *rpc.ChannelRef) (*rpc.ChannelResponse, error)
GetChannel returns one channel snapshot.
func (*RPCAdapter) IssueMgmt ¶
func (a *RPCAdapter) IssueMgmt(ctx context.Context, in *rpc.IssueMgmtRequest) (*rpc.IssueMgmtResponse, error)
IssueMgmt mints a fresh mgmt bearer.
func (*RPCAdapter) ListChannels ¶
func (a *RPCAdapter) ListChannels(ctx context.Context, _ *rpc.Empty) (*rpc.ListChannelsResponse, error)
ListChannels returns every managed channel.
func (*RPCAdapter) ListKeys ¶
func (a *RPCAdapter) ListKeys(ctx context.Context, _ *rpc.Empty) (*rpc.ListKeysResponse, error)
ListKeys mirrors Service.ListKeys onto the RPC envelope.
func (*RPCAdapter) Manifest ¶
func (a *RPCAdapter) Manifest(ctx context.Context, _ *rpc.Empty) (*rpc.JSONResponse, error)
Manifest returns the manifest envelope as a JSONResponse.
func (*RPCAdapter) OpenPublic ¶
func (a *RPCAdapter) OpenPublic(ctx context.Context, in *rpc.OpenPublicRequest) (*rpc.ChannelResponse, error)
OpenPublic opens or re-opens a public channel.
func (*RPCAdapter) Presence ¶
func (a *RPCAdapter) Presence(ctx context.Context, in *rpc.ChannelRef) (*rpc.PresenceResponse, error)
Presence returns the live subscriber count.
func (*RPCAdapter) PromoteKey ¶
PromoteKey transitions id to active.
func (*RPCAdapter) Publish ¶
func (a *RPCAdapter) Publish(ctx context.Context, in *rpc.PublishRequest) (*rpc.PublishResponse, error)
Publish ships a message.
func (*RPCAdapter) RefreshToken ¶
func (a *RPCAdapter) RefreshToken(ctx context.Context, in *rpc.RefreshTokenRequest) (*rpc.RefreshTokenResponse, error)
RefreshToken exchanges a refresh token for a new access token.
func (*RPCAdapter) ReloadKeys ¶
ReloadKeys triggers a disk re-read.
func (*RPCAdapter) RevokeToken ¶ added in v0.3.0
func (a *RPCAdapter) RevokeToken(ctx context.Context, in *rpc.RevokeTokenRequest) (*rpc.Empty, error)
RevokeToken adapts the RPC envelope onto Service.RevokeToken.
func (*RPCAdapter) RevokeUser ¶ added in v0.3.0
func (a *RPCAdapter) RevokeUser(ctx context.Context, in *rpc.RevokeUserRequest) (*rpc.Empty, error)
RevokeUser adapts the RPC envelope onto Service.RevokeUser.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is the shared business-logic layer.
func (*Service) CreatePrivateChannel ¶
func (s *Service) CreatePrivateChannel(ctx context.Context, subjectID, name string, ttl time.Duration, scopes []auth.Scope) (parsec.Credentials, error)
CreatePrivateChannel mints a private channel with access + refresh tokens. subjectID is recorded in the token's sub claim; pass empty for anonymous. scopes stamps pattern grants into both tokens; pass nil for an exact-match-only grant on name.
func (*Service) DeleteChannel ¶
DeleteChannel removes a channel from the manager.
func (*Service) DiscardDLQ ¶
DiscardDLQ removes id.
func (*Service) GenerateKey ¶
GenerateKey adds a new key to the ring as verify-only and returns it. The alg argument selects HS256 (default), RS256, or EdDSA.
func (*Service) GetChannel ¶
GetChannel returns one channel by name.
func (*Service) IssueMgmt ¶
func (s *Service) IssueMgmt(ctx context.Context, subject string, ttl time.Duration) (MgmtTokenResult, error)
IssueMgmt mints a fresh mgmt token signed by the active key. Used by operators during rotation to get a bearer signed by the new key before retiring the old one.
func (*Service) ListChannels ¶
ListChannels returns a snapshot of all managed channels.
func (*Service) ListKeys ¶
func (s *Service) ListKeys(ctx context.Context) KeyListing
ListKeys returns every key currently in the ring along with the active id.
func (*Service) Manifest ¶
func (s *Service) Manifest(ctx context.Context) descriptor.Envelope
Manifest returns the self-describing instance manifest.
func (*Service) OpenPublicChannel ¶
func (s *Service) OpenPublicChannel(ctx context.Context, name string, ttl time.Duration) (*channels.Channel, error)
OpenPublicChannel opens (or re-opens) a public channel.
func (*Service) Parsec ¶
Parsec returns the underlying instance. Used by surface code that needs to reach for Verifier / Issuer directly (e.g. the bearer middleware).
func (*Service) PromoteKey ¶
PromoteKey makes id the active signing key.
func (*Service) Publish ¶
Publish ships a message and returns the broker ack. The mgmt bearer's subject (from the request context) is the rate-limit key; when no subject is in context (e.g. tests without bearer enforcement) the remote IP is the fallback; when neither is available the call proceeds without gating.
func (*Service) RefreshAccess ¶
func (s *Service) RefreshAccess(ctx context.Context, refreshToken string) (parsec.RefreshResult, error)
RefreshAccess exchanges a refresh token for a new access token. The refresh endpoint is unauthenticated by mgmt-bearer, so the gate keys off the request's remote IP — operators behind a trusted proxy that terminates TLS should run a complementary L7 rate-limit there.
func (*Service) ReloadKeys ¶
ReloadKeys re-reads the keyring file (no-op for ephemeral / programmatic rings — see parsec.Parsec.ReloadKeys for the error path).
func (*Service) RevokeToken ¶ added in v0.3.0
RevokeToken marks a single access-token jti as revoked. Subsequent subscribe attempts presenting the token are denied by the subscribe authorizer wrapper. tokenID is required; userID and reason are recorded with the revocation for audit but do not gate the call.
When parsec.Options.RevocationStore is nil the call returns PARSEC_INVALID_ARGUMENT so operators discover misconfiguration instead of silently no-op'ing.
func (*Service) RevokeUser ¶ added in v0.3.0
RevokeUser invalidates every token previously issued to userID. The cutoff is the call's wall-clock; tokens minted after the call remain valid (use this when a user's credentials are compromised and they re-authenticate).