service

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package service implements the surface-agnostic business logic that the CLI and Twirp layers all call into. Each surface translates its own shape to/from these methods and never touches the broker or manager directly.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DLQItemSummary

type DLQItemSummary struct {
	ID        string            `json:"id"`
	Sink      string            `json:"sink"`
	At        string            `json:"at"`
	Recipient json.RawMessage   `json:"recipient,omitempty"`
	Subject   string            `json:"subject,omitempty"`
	Body      string            `json:"body,omitempty"`
	Metadata  map[string]string `json:"metadata,omitempty"`
	Attempts  int               `json:"attempts"`
	LastError string            `json:"last_error,omitempty"`
}

DLQItemSummary is the surface-shape of a sinks.DLQItem. Recipient is rendered back as raw JSON so the wire and CLI views stay sink-agnostic.

type DLQListing

type DLQListing struct {
	Items []DLQItemSummary `json:"items"`
}

DLQListing is a list response.

type KeyListing

type KeyListing struct {
	ActiveKeyID string       `json:"active_key_id"`
	Keys        []KeySummary `json:"keys"`
}

KeyListing is the result of ListKeys.

type KeySummary

type KeySummary struct {
	ID        string `json:"id"`
	Alg       string `json:"alg,omitempty"`
	Role      string `json:"role"`
	CreatedAt string `json:"created_at"`
	RetiredAt string `json:"retired_at,omitempty"`
}

KeySummary is the surface-shape of an auth.Key.

type MgmtTokenResult

type MgmtTokenResult struct {
	Token         string    `json:"token"`
	Expires       time.Time `json:"expires"`
	SignedByKeyID string    `json:"signed_by_key_id"`
}

MgmtTokenResult is what IssueMgmt returns.

type PublishAck

type PublishAck struct {
	Offset uint64 `json:"offset"`
	Epoch  string `json:"epoch"`
}

PublishAck is the surface-shape of a publish result.

type RPCAdapter

type RPCAdapter struct{ S *Service }

RPCAdapter implements rpc.ParsecService over a Service. Surface code wires this at boot — internal/server passes it to rpc.NewParsecServiceServer.

func NewRPCAdapter

func NewRPCAdapter(svc *Service) *RPCAdapter

NewRPCAdapter wraps svc.

func (*RPCAdapter) CreatePrivate

func (a *RPCAdapter) CreatePrivate(ctx context.Context, in *rpc.CreatePrivateRequest) (*rpc.Credentials, error)

CreatePrivate mints a private channel and returns the token pair. Pattern scopes from the request are stamped into both tokens; an empty list produces an exact-match-only grant on the channel.

func (*RPCAdapter) DeleteChannel

func (a *RPCAdapter) DeleteChannel(ctx context.Context, in *rpc.ChannelRef) (*rpc.Empty, error)

DeleteChannel removes a channel.

func (*RPCAdapter) DlqCount

DlqCount returns the live size of the DLQ for sink.

func (*RPCAdapter) DlqDiscard

func (a *RPCAdapter) DlqDiscard(ctx context.Context, in *rpc.DlqDiscardRequest) (*rpc.Empty, error)

DlqDiscard removes a DLQ entry by id. Missing ids are silently OK.

func (*RPCAdapter) DlqList

DlqList returns up to in.Limit DLQ items for the supplied sink (empty sink → all sinks).

func (*RPCAdapter) DlqReplay

func (a *RPCAdapter) DlqReplay(ctx context.Context, in *rpc.DlqReplayRequest) (*rpc.Empty, error)

DlqReplay re-runs the stored item through the original sink. The DLQ entry remains in place — if replay fails again the Retrier pushes a new item, and the operator can compare attempt counts.

func (*RPCAdapter) GenerateKey

func (a *RPCAdapter) GenerateKey(ctx context.Context, in *rpc.GenerateKeyRequest) (*rpc.KeySummary, error)

GenerateKey mints a fresh key (verify-only). Algorithm is selected via the request's Alg field; empty defaults to HS256.

func (*RPCAdapter) GetChannel

func (a *RPCAdapter) GetChannel(ctx context.Context, in *rpc.ChannelRef) (*rpc.ChannelResponse, error)

GetChannel returns one channel snapshot.

func (*RPCAdapter) IssueMgmt

IssueMgmt mints a fresh mgmt bearer.

func (*RPCAdapter) ListChannels

func (a *RPCAdapter) ListChannels(ctx context.Context, _ *rpc.Empty) (*rpc.ListChannelsResponse, error)

ListChannels returns every managed channel.

func (*RPCAdapter) ListKeys

func (a *RPCAdapter) ListKeys(ctx context.Context, _ *rpc.Empty) (*rpc.ListKeysResponse, error)

ListKeys mirrors Service.ListKeys onto the RPC envelope.

func (*RPCAdapter) Manifest

func (a *RPCAdapter) Manifest(ctx context.Context, _ *rpc.Empty) (*rpc.JSONResponse, error)

Manifest returns the manifest envelope as a JSONResponse.

func (*RPCAdapter) OpenPublic

OpenPublic opens or re-opens a public channel.

func (*RPCAdapter) Presence

func (a *RPCAdapter) Presence(ctx context.Context, in *rpc.ChannelRef) (*rpc.PresenceResponse, error)

Presence returns the live subscriber count.

func (*RPCAdapter) PromoteKey

func (a *RPCAdapter) PromoteKey(ctx context.Context, in *rpc.KeyRef) (*rpc.Empty, error)

PromoteKey transitions id to active.

func (*RPCAdapter) Publish

Publish ships a message.

func (*RPCAdapter) RefreshToken

RefreshToken exchanges a refresh token for a new access token.

func (*RPCAdapter) ReloadKeys

func (a *RPCAdapter) ReloadKeys(ctx context.Context, _ *rpc.Empty) (*rpc.Empty, error)

ReloadKeys triggers a disk re-read.

func (*RPCAdapter) RetireKey

func (a *RPCAdapter) RetireKey(ctx context.Context, in *rpc.KeyRef) (*rpc.Empty, error)

RetireKey marks id retired.

func (*RPCAdapter) RevokeToken added in v0.3.0

func (a *RPCAdapter) RevokeToken(ctx context.Context, in *rpc.RevokeTokenRequest) (*rpc.Empty, error)

RevokeToken adapts the RPC envelope onto Service.RevokeToken.

func (*RPCAdapter) RevokeUser added in v0.3.0

func (a *RPCAdapter) RevokeUser(ctx context.Context, in *rpc.RevokeUserRequest) (*rpc.Empty, error)

RevokeUser adapts the RPC envelope onto Service.RevokeUser.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the shared business-logic layer.

func New

func New(p *parsec.Parsec, version string) *Service

New constructs a Service over a running (or about-to-run) Parsec instance.

func (*Service) CountDLQ

func (s *Service) CountDLQ(ctx context.Context, sink string) (int, error)

CountDLQ returns the live DLQ count for sink.

func (*Service) CreatePrivateChannel

func (s *Service) CreatePrivateChannel(ctx context.Context, subjectID, name string, ttl time.Duration, scopes []auth.Scope) (parsec.Credentials, error)

CreatePrivateChannel mints a private channel with access + refresh tokens. subjectID is recorded in the token's sub claim; pass empty for anonymous. scopes stamps pattern grants into both tokens; pass nil for an exact-match-only grant on name.

func (*Service) DeleteChannel

func (s *Service) DeleteChannel(ctx context.Context, name string) error

DeleteChannel removes a channel from the manager.

func (*Service) DiscardDLQ

func (s *Service) DiscardDLQ(ctx context.Context, id string) error

DiscardDLQ removes id.

func (*Service) GenerateKey

func (s *Service) GenerateKey(ctx context.Context, alg auth.Alg) (KeySummary, error)

GenerateKey adds a new key to the ring as verify-only and returns it. The alg argument selects HS256 (default), RS256, or EdDSA.

func (*Service) GetChannel

func (s *Service) GetChannel(ctx context.Context, name string) (*channels.Channel, error)

GetChannel returns one channel by name.

func (*Service) IssueMgmt

func (s *Service) IssueMgmt(ctx context.Context, subject string, ttl time.Duration) (MgmtTokenResult, error)

IssueMgmt mints a fresh mgmt token signed by the active key. Used by operators during rotation to get a bearer signed by the new key before retiring the old one.

func (*Service) ListChannels

func (s *Service) ListChannels(ctx context.Context) []channels.Channel

ListChannels returns a snapshot of all managed channels.

func (*Service) ListDLQ

func (s *Service) ListDLQ(ctx context.Context, sink string, limit int) (DLQListing, error)

ListDLQ returns up to limit items for sink (empty sink = all).

func (*Service) ListKeys

func (s *Service) ListKeys(ctx context.Context) KeyListing

ListKeys returns every key currently in the ring along with the active id.

func (*Service) Manifest

func (s *Service) Manifest(ctx context.Context) descriptor.Envelope

Manifest returns the self-describing instance manifest.

func (*Service) OpenPublicChannel

func (s *Service) OpenPublicChannel(ctx context.Context, name string, ttl time.Duration) (*channels.Channel, error)

OpenPublicChannel opens (or re-opens) a public channel.

func (*Service) Parsec

func (s *Service) Parsec() *parsec.Parsec

Parsec returns the underlying instance. Used by surface code that needs to reach for Verifier / Issuer directly (e.g. the bearer middleware).

func (*Service) Presence

func (s *Service) Presence(ctx context.Context, name string) (int, error)

Presence returns the live subscriber count for a channel.

func (*Service) PromoteKey

func (s *Service) PromoteKey(ctx context.Context, id string) error

PromoteKey makes id the active signing key.

func (*Service) Publish

func (s *Service) Publish(ctx context.Context, name string, data []byte) (PublishAck, error)

Publish ships a message and returns the broker ack. The mgmt bearer's subject (from the request context) is the rate-limit key; when no subject is in context (e.g. tests without bearer enforcement) the remote IP is the fallback; when neither is available the call proceeds without gating.

func (*Service) RefreshAccess

func (s *Service) RefreshAccess(ctx context.Context, refreshToken string) (parsec.RefreshResult, error)

RefreshAccess exchanges a refresh token for a new access token. The refresh endpoint is unauthenticated by mgmt-bearer, so the gate keys off the request's remote IP — operators behind a trusted proxy that terminates TLS should run a complementary L7 rate-limit there.

func (*Service) ReloadKeys

func (s *Service) ReloadKeys(ctx context.Context) error

ReloadKeys re-reads the keyring file (no-op for ephemeral / programmatic rings — see parsec.Parsec.ReloadKeys for the error path).

func (*Service) ReplayDLQ

func (s *Service) ReplayDLQ(ctx context.Context, id string) error

ReplayDLQ re-runs id through the original sink.

func (*Service) RetireKey

func (s *Service) RetireKey(ctx context.Context, id string) error

RetireKey removes id from verification.

func (*Service) RevokeToken added in v0.3.0

func (s *Service) RevokeToken(ctx context.Context, tokenID, userID, reason string) error

RevokeToken marks a single access-token jti as revoked. Subsequent subscribe attempts presenting the token are denied by the subscribe authorizer wrapper. tokenID is required; userID and reason are recorded with the revocation for audit but do not gate the call.

When parsec.Options.RevocationStore is nil the call returns PARSEC_INVALID_ARGUMENT so operators discover misconfiguration instead of silently no-op'ing.

func (*Service) RevokeUser added in v0.3.0

func (s *Service) RevokeUser(ctx context.Context, userID, reason string) error

RevokeUser invalidates every token previously issued to userID. The cutoff is the call's wall-clock; tokens minted after the call remain valid (use this when a user's credentials are compromised and they re-authenticate).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL