adapter

package
v0.0.0-...-07ff07d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: AGPL-3.0 Imports: 69 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SQSPartitionActionSend    = "send"
	SQSPartitionActionReceive = "receive"
	SQSPartitionActionDelete  = "delete"
)

SQSPartitionAction* mirror the action label values from monitoring.SQSPartitionAction*. Re-declared so adapter call sites do not need a monitoring import; the observer interface validates the value at runtime so a drift between these constants and the monitoring side surfaces as a dropped observation rather than a wedge.

View Source
const (
	// SqsQueueMetaPrefix prefixes queue-metadata records.
	SqsQueueMetaPrefix = "!sqs|queue|meta|"
	// SqsQueueGenPrefix prefixes the per-queue monotonic generation counter.
	// Bumped on DeleteQueue / PurgeQueue so keys from an older incarnation of
	// the same queue name cannot leak into a newly created queue.
	SqsQueueGenPrefix = "!sqs|queue|gen|"
	// SqsQueueSeqPrefix prefixes the per-queue FIFO sequence counter. Bumped
	// on every FIFO send and embedded in the message record so consumers can
	// reconstruct the producer's strict total order.
	SqsQueueSeqPrefix = "!sqs|queue|seq|"
	// SqsMsgDedupPrefix prefixes FIFO deduplication records. Each entry
	// stores the original message id and the dedup-window expiry; the
	// receive path is unaware of these — they only gate sends.
	SqsMsgDedupPrefix = "!sqs|msg|dedup|"
	// SqsMsgGroupPrefix prefixes the FIFO group-lock records. The lock is
	// held by at most one message per group, persists across visibility
	// expiries, and is only released on DeleteMessage / DLQ redrive /
	// retention expiry.
	SqsMsgGroupPrefix = "!sqs|msg|group|"
	// SqsMsgByAgePrefix prefixes the send-age index. Each entry is
	// keyed by (queue, gen, send_timestamp, message_id) so the reaper
	// can find every record whose retention deadline has elapsed with
	// one bounded scan, without having to load every message body.
	SqsMsgByAgePrefix = "!sqs|msg|byage|"
	// SqsQueueTombstonePrefix prefixes a generation-orphan marker.
	// DeleteQueue and PurgeQueue each write one (queue, gen) tombstone
	// in the same OCC transaction that supersedes that generation —
	// DeleteQueue tombstones the gen it removes the meta row at, and
	// PurgeQueue tombstones the pre-bump gen so the reaper can find
	// pre-purge orphans even if the queue is deleted before the next
	// reaper tick. The reaper enumerates these markers to clean up
	// orphan data / vis / byage / dedup / group keys for superseded
	// generations. The tombstone is itself deleted once the reaper
	// confirms no message-keyspace state remains for that (queue, gen).
	SqsQueueTombstonePrefix = "!sqs|queue|tombstone|"
)

SQS keyspace prefixes. Kept in sync with the naming in docs/design/2026_04_24_proposed_sqs_compatible_adapter.md.

View Source
const (
	SqsPartitionedMsgDataPrefix  = "!sqs|msg|data|" + sqsPartitionedDiscriminator
	SqsPartitionedMsgVisPrefix   = "!sqs|msg|vis|" + sqsPartitionedDiscriminator
	SqsPartitionedMsgDedupPrefix = "!sqs|msg|dedup|" + sqsPartitionedDiscriminator
	SqsPartitionedMsgGroupPrefix = "!sqs|msg|group|" + sqsPartitionedDiscriminator
	SqsPartitionedMsgByAgePrefix = "!sqs|msg|byage|" + sqsPartitionedDiscriminator
)

SqsPartitionedMsg*Prefix mirrors each legacy SqsMsg*Prefix with the partitioned-keyspace discriminator inserted. Defined as full string constants (rather than runtime concatenation in each constructor) so the byte-layout invariant is asserted by the type system: a future rename of the discriminator must touch the constants here, not 6+ scattered string concatenations.

View Source
const (
	SqsMsgDataPrefix = "!sqs|msg|data|"
	SqsMsgVisPrefix  = "!sqs|msg|vis|"
)

Message-keyspace prefixes. The data record holds the message body and state; the visibility index is a separate, visible_at-sorted key family so ReceiveMessage can find the next visible message with a single bounded prefix scan.

Variables

View Source
var (
	// ErrAdminBucketAlreadyExists signals that AdminCreateBucket
	// targeted a name already in use. Maps to 409 Conflict.
	ErrAdminBucketAlreadyExists = errors.New("s3 admin: bucket already exists")
	// ErrAdminBucketNotFound signals that AdminDeleteBucket /
	// AdminPutBucketAcl targeted a missing bucket. Maps to 404.
	ErrAdminBucketNotFound = errors.New("s3 admin: bucket not found")
	// ErrAdminBucketNotEmpty signals that AdminDeleteBucket targeted
	// a bucket that still has objects. Maps to 409 Conflict to match
	// the SigV4 path's BucketNotEmpty response (the dashboard cannot
	// force a recursive delete; the operator must clean up first).
	ErrAdminBucketNotEmpty = errors.New("s3 admin: bucket is not empty")
	// ErrAdminInvalidBucketName signals that AdminCreateBucket got
	// a name that does not satisfy validateS3BucketName. Maps to 400.
	ErrAdminInvalidBucketName = errors.New("s3 admin: invalid bucket name")
	// ErrAdminInvalidACL signals that the ACL string did not pass
	// validateS3CannedAcl. Maps to 400 (the SigV4 path returns 501
	// NotImplemented for unsupported canned ACLs, but the admin API
	// is documented as private/public-read only and rejecting other
	// values as invalid input is a more useful contract for the
	// dashboard).
	ErrAdminInvalidACL = errors.New("s3 admin: invalid ACL")
)

Sentinel errors the admin write methods return so the bridge in main_admin.go can translate them into admin-package vocabulary without sniffing strings. Named separately from ErrAdminTableAlreadyExists / ErrAdminTableNotFound on the Dynamo side so a future per-resource role / status divergence does not require renaming both packages' callers.

View Source
var ErrAdminForbidden = errors.New("dynamodb admin: principal lacks required role")

ErrAdminForbidden is returned when the principal lacks the role required for the operation. Admin handlers translate this to 403 "forbidden" without leaking which field of the principal failed the check.

View Source
var ErrAdminNotLeader = errors.New("dynamodb admin: this node is not the raft leader")

ErrAdminNotLeader is returned by every write entrypoint when this node is not the verified Raft leader. The admin HTTP handler translates this to 503 + Retry-After: 1 today; the future AdminForward RPC catches it as the trigger to forward to the leader instead.

View Source
var ErrAdminSQSNotFound = errors.New("sqs admin: queue not found")

ErrAdminSQSNotFound is returned by write entrypoints when the target queue does not exist. Maps to 404. The describe path uses the (nil, false, nil) tuple instead of this sentinel for the not-found signal, mirroring AdminDescribeTable.

View Source
var ErrAdminSQSValidation = errors.New("sqs admin: invalid queue name")

ErrAdminSQSValidation is returned when an admin entrypoint receives a request with a missing or syntactically-bad queue name. Maps to 400 in the admin HTTP handler.

View Source
var ErrAdminTokenRequired = errors.New("admin token file required; pass --adminInsecureNoAuth to run without")

ErrAdminTokenRequired is returned by NewAdminServer helpers when the operator failed to supply a token and also did not opt into insecure mode.

View Source
var ErrCollectionTooLarge = errors.New("collection too large")

ErrCollectionTooLarge is returned when a collection exceeds maxWideColumnItems.

View Source
var ErrDeltaScanTruncated = errors.New("delta scan truncated: compaction required")

ErrDeltaScanTruncated is returned when the delta scan result is truncated, indicating that synchronous compaction is required before the operation can proceed.

View Source
var ErrLeaderNotFound = errors.New("leader not found")
View Source
var ErrNotLeader = errors.New("not leader")
View Source
var ErrTxnTimestampOverflow = errors.New("txn timestamp overflow")

Functions

func AdminErrorMessage

func AdminErrorMessage(err error) string

AdminErrorMessage extracts the human-readable message from a dynamoAPIError for surfacing back to the SPA. Returns "" when err is not a structured adapter error so callers fall back to a generic message instead of leaking arbitrary err.Error() output.

func AdminTokenAuth

AdminTokenAuth builds a gRPC unary+stream interceptor pair enforcing "authorization: Bearer <token>" metadata against the supplied token. An empty token disables enforcement; callers should pair that mode with a --adminInsecureNoAuth flag so operators knowingly opt in.

func AdvertisesHTFIFO

func AdvertisesHTFIFO() bool

AdvertisesHTFIFO reports whether this binary's /sqs_health endpoint lists the htfifo capability. Mirror of the package- internal htfifoCapabilityAdvertised constant, exposed for the SQS leadership-refusal hook in main.go that uses this signal to decide whether to refuse leadership of any Raft group hosting a partitioned FIFO queue.

Stays a function (not an exported constant) so a future runtime override (env var, --no-htfifo flag for graceful degradation) can be threaded through here without changing the call site.

func IsAdminTableAlreadyExists

func IsAdminTableAlreadyExists(err error) bool

IsAdminTableAlreadyExists reports whether err is the adapter's "table already exists" failure (ResourceInUseException). The bridge in main_admin.go uses this to map the adapter's internal error vocabulary onto admin's HTTP-facing sentinels without importing the package-private dynamoAPIError type.

func IsAdminTableNotFound

func IsAdminTableNotFound(err error) bool

IsAdminTableNotFound is the ResourceNotFoundException counterpart for AdminDeleteTable / AdminDescribeTable mapped through the adapter's structured error chain.

func IsAdminValidation

func IsAdminValidation(err error) bool

IsAdminValidation reports whether err is a validation failure the adapter signalled via ValidationException. Admin handlers map this to 400 + a sanitised message.

Types

type AdminAttribute

type AdminAttribute struct {
	Name string
	Type string
}

AdminAttribute names a single primary-key or GSI key column. Type must be one of "S", "N", "B" — DynamoDB does not allow boolean or list keys and the adapter's existing schema validation enforces the same restriction at the next layer.

type AdminBucketSummary

type AdminBucketSummary struct {
	Name         string
	ACL          string
	CreatedAtHLC uint64
	Generation   uint64
	Region       string
	Owner        string
}

AdminBucketSummary is the bucket-level information the admin dashboard surfaces. It deliberately projects only the fields the dashboard needs so the package's wire-format types (s3BucketMeta, s3ListBucketsResult) stay internal.

CreatedAtHLC is the same physical-time-bearing HLC the bucket metadata persists; the admin HTTP handler formats it for the SPA. ACL is the canned-ACL string ("private" / "public-read") — the admin layer does not expand it into the AWS ACL XML grant tree because the dashboard renders the canned form directly.

type AdminCreateGSI

type AdminCreateGSI struct {
	Name             string
	PartitionKey     AdminAttribute
	SortKey          *AdminAttribute
	ProjectionType   string
	NonKeyAttributes []string
}

AdminCreateGSI describes one global secondary index in an admin CreateTable request. SortKey is optional (hash-only GSI). When ProjectionType is "INCLUDE", NonKeyAttributes lists the projected attribute names; otherwise NonKeyAttributes is ignored.

type AdminCreateTableInput

type AdminCreateTableInput struct {
	TableName    string
	PartitionKey AdminAttribute
	SortKey      *AdminAttribute
	GSI          []AdminCreateGSI
}

AdminCreateTableInput is the admin-facing CreateTable shape. The HTTP handler maps the design 4.2 JSON body into this struct, then AdminCreateTable converts it to the adapter's internal createTableInput. We do not pass the SigV4-flavoured wire struct directly because that struct's field names track AWS exactly and would be awkward for the admin SPA to author.

type AdminGSISummary

type AdminGSISummary struct {
	Name           string
	PartitionKey   string
	SortKey        string
	ProjectionType string
}

AdminGSISummary mirrors AdminTableSummary for a single GSI.

type AdminGroup

type AdminGroup interface {
	Status() raftengine.Status
	Configuration(ctx context.Context) (raftengine.Configuration, error)
}

AdminGroup exposes per-Raft-group state to the Admin service. It is a narrow subset of raftengine.Engine so tests can supply an in-memory fake without standing up a real Raft cluster. Configuration is polled on each GetClusterOverview to pick up scale-out / scale-in events without the operator having to restart the admin binary.

type AdminPrincipal

type AdminPrincipal struct {
	AccessKey string
	Role      AdminRole
}

AdminPrincipal is the authentication context every admin write entrypoint takes. The adapter re-evaluates authorisation against this principal *itself* — it does not trust the caller to have already enforced the role. That is the design's "認可の真実は常に adapter 側" invariant (Section 3.2): if a follower forwards a pre-authenticated request via the future AdminForward RPC, the leader must still verify before acting.

type AdminQueueCounters

type AdminQueueCounters struct {
	Visible    int64
	NotVisible int64
	Delayed    int64
}

AdminQueueCounters matches sqsApproxCounters (int64) so the admin bridge does not have to convert between widths. Visible / NotVisible / Delayed are the AWS Approximate* triple.

type AdminQueueSummary

type AdminQueueSummary struct {
	Name       string
	IsFIFO     bool
	Generation uint64
	CreatedAt  time.Time
	Attributes map[string]string
	Counters   AdminQueueCounters
}

AdminQueueSummary is the per-queue projection the admin dashboard surfaces. It deliberately covers only the fields the SPA renders so the package's wire-format types stay internal.

Counters mirror the AWS Approximate* attribute set produced by scanApproxCounters; they are best-effort by AWS contract and stop counting once the catalog's per-call cap is reached (the SPA polls continuously, so an unbounded scan would pin the leader).

type AdminRole

type AdminRole string

AdminRole is the authorization tier the adapter checks against on every admin write entrypoint. The constants intentionally mirror internal/admin.Role string values so the wire / persisted role vocabulary stays aligned across packages, but we keep a separate type here so the adapter has zero dependency on internal/admin.

const (
	// AdminRoleReadOnly may issue list / describe but not create or delete.
	AdminRoleReadOnly AdminRole = "read_only"
	// AdminRoleFull may issue every admin operation.
	AdminRoleFull AdminRole = "full"
)

type AdminServer

type AdminServer struct {
	pb.UnimplementedAdminServer
	// contains filtered or unexported fields
}

AdminServer implements the node-side Admin gRPC service described in docs/admin_ui_key_visualizer_design.md §4 (Layer A). Phase 0 only implements GetClusterOverview and GetRaftGroups; remaining RPCs return Unimplemented so the generated client can still compile against older nodes during rollout.

func NewAdminServer

func NewAdminServer(self NodeIdentity, members []NodeIdentity) *AdminServer

NewAdminServer constructs an AdminServer. `self` identifies the local node for responses that return node identity. `members` is the static membership snapshot shipped to the admin binary; callers that already have a membership source may pass nil and let the admin binary's fan-out layer discover peers by other means.

func (*AdminServer) GetClusterOverview

GetClusterOverview returns the local node identity, the current member list, and per-group leader identity collected from the engines registered via RegisterGroup. The member list is the union of (a) the bootstrap seed supplied to NewAdminServer and (b) the live Configuration of every registered Raft group — the latter picks up scale-out nodes added after startup so the admin binary's fan-out discovery does not miss them.

func (*AdminServer) GetKeyVizMatrix

GetKeyVizMatrix renders the keyviz heatmap matrix for the [from, to) range supplied by the request, returning one KeyVizRow per tracked route or virtual bucket and a parallel column-timestamp slice.

Series selection (Reads / Writes / ReadBytes / WriteBytes) maps from the request's KeyVizSeries enum to the matching keyviz.MatrixRow counter; KEYVIZ_SERIES_UNSPECIFIED defaults to Reads.

Returns codes.Unavailable when no sampler is registered (keyviz disabled) so callers can distinguish that from "no data yet" (which yields a successful empty response).

func (*AdminServer) GetRaftGroups

GetRaftGroups returns per-group state snapshots. Phase 0 wires commit/applied indices only; per-follower contact and term history land in later phases.

func (*AdminServer) RegisterGroup

func (s *AdminServer) RegisterGroup(groupID uint64, g AdminGroup)

RegisterGroup binds a Raft group ID to its engine so the Admin service can report leader and log state for that group.

func (*AdminServer) RegisterSampler

func (s *AdminServer) RegisterSampler(sampler KeyVizSampler)

RegisterSampler wires the keyviz sampler used by GetKeyVizMatrix. Without this call (or with a nil sampler) the RPC returns codes.Unavailable so callers can distinguish "keyviz disabled" from "no data yet".

func (*AdminServer) SetClock

func (s *AdminServer) SetClock(now func() time.Time)

SetClock overrides the clock used by GetRaftGroups, letting tests inject a fixed time without mutating any package-global state. Concurrent RPCs on other AdminServer instances are unaffected.

type AdminTableSummary

type AdminTableSummary struct {
	Name                   string
	PartitionKey           string
	SortKey                string
	Generation             uint64
	GlobalSecondaryIndexes []AdminGSISummary
}

AdminTableSummary is the table-level information the admin dashboard surfaces for a single Dynamo-compatible table. It deliberately projects only the fields the dashboard needs so the package's wire-format types (dynamoTableSchema and friends) stay internal.

type DeltaCompactor

type DeltaCompactor struct {
	// contains filtered or unexported fields
}

DeltaCompactor folds accumulated delta keys into their corresponding base metadata keys for all wide-column collection types (List, Hash, Set, ZSet).

It runs as a background goroutine on the Raft leader. Non-leaders skip each tick silently. Compaction is performed as an OCC transaction so concurrent writers never conflict with the compactor.

func NewDeltaCompactor

func NewDeltaCompactor(st store.MVCCStore, coord kv.Coordinator, opts ...DeltaCompactorOption) *DeltaCompactor

NewDeltaCompactor creates a DeltaCompactor that operates on st using coord.

func (*DeltaCompactor) Run

func (c *DeltaCompactor) Run(ctx context.Context) error

Run starts the background compaction loop and blocks until ctx is cancelled.

func (*DeltaCompactor) SyncOnce

func (c *DeltaCompactor) SyncOnce(ctx context.Context) error

SyncOnce runs one compaction pass. The IsLeader() guard avoids the full-prefix delta scan on followers, which would proxy cross-node on ShardStore backends. For sharded deployments where this node is the leader for a non-default shard group, the regular tick is skipped; those keys are still handled by the urgent compaction path (compactUrgentKey) which uses IsLeaderForKey for per-key routing. buildBatchElems adds an additional per-key IsLeaderForKey filter so a default-group leader never dispatches mutations for shards it does not own. Each collection-type handler runs in its own goroutine so that a slow handler (e.g. one with many list deltas) does not delay Hash/Set/ZSet compaction. All goroutines share the same per-tick timeout context.

func (*DeltaCompactor) TriggerUrgentCompaction

func (c *DeltaCompactor) TriggerUrgentCompaction(typeName string, userKey []byte)

TriggerUrgentCompaction queues an immediate single-key compaction for a key whose delta count has exceeded MaxDeltaScanLimit. The request is dropped silently when the channel is full (the regular tick will catch it).

type DeltaCompactorOption

type DeltaCompactorOption func(*DeltaCompactor)

DeltaCompactorOption configures a DeltaCompactor.

func WithDeltaCompactorLogger

func WithDeltaCompactorLogger(l *slog.Logger) DeltaCompactorOption

WithDeltaCompactorLogger sets the logger.

func WithDeltaCompactorMaxDeltaCount

func WithDeltaCompactorMaxDeltaCount(n int) DeltaCompactorOption

WithDeltaCompactorMaxDeltaCount sets the soft threshold at which a key's deltas are folded into its base metadata. Default: 64.

func WithDeltaCompactorScanInterval

func WithDeltaCompactorScanInterval(d time.Duration) DeltaCompactorOption

WithDeltaCompactorScanInterval sets the period between compaction passes. Default: 30s.

func WithDeltaCompactorTimeout

func WithDeltaCompactorTimeout(d time.Duration) DeltaCompactorOption

WithDeltaCompactorTimeout sets the per-tick timeout. Default: 5s.

type DistributionServer

type DistributionServer struct {
	pb.UnimplementedDistributionServer
	// contains filtered or unexported fields
}

DistributionServer serves distribution related gRPC APIs.

func NewDistributionServer

func NewDistributionServer(e *distribution.Engine, catalog *distribution.CatalogStore, opts ...DistributionServerOption) *DistributionServer

NewDistributionServer creates a new server.

func (*DistributionServer) GetRoute

GetRoute returns route for a key.

func (*DistributionServer) GetTimestamp

GetTimestamp returns monotonically increasing timestamp.

func (*DistributionServer) ListRoutes

ListRoutes returns all durable routes from catalog storage.

func (*DistributionServer) SplitRange

SplitRange splits a route into two child routes in the same raft group.

func (*DistributionServer) UpdateRoute

func (s *DistributionServer) UpdateRoute(start, end []byte, group uint64)

UpdateRoute allows updating route information.

type DistributionServerOption

type DistributionServerOption func(*DistributionServer)

DistributionServerOption configures DistributionServer behavior.

func WithCatalogReloadRetryPolicy

func WithCatalogReloadRetryPolicy(attempts int, interval time.Duration) DistributionServerOption

WithCatalogReloadRetryPolicy configures the retry policy used after split commit when waiting for the local catalog snapshot to become visible.

func WithDistributionActiveTimestampTracker

func WithDistributionActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) DistributionServerOption

func WithDistributionCoordinator

func WithDistributionCoordinator(coordinator kv.Coordinator) DistributionServerOption

WithDistributionCoordinator configures the coordinator used for Raft-backed catalog mutations in SplitRange.

type DynamoDBServer

type DynamoDBServer struct {
	// contains filtered or unexported fields
}

func NewDynamoDBServer

func NewDynamoDBServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordinator, opts ...DynamoDBServerOption) *DynamoDBServer

func (*DynamoDBServer) AdminCreateTable

func (d *DynamoDBServer) AdminCreateTable(ctx context.Context, principal AdminPrincipal, in AdminCreateTableInput) (*AdminTableSummary, error)

AdminCreateTable creates a Dynamo-compatible table on the local node, after re-validating the principal's role and confirming this node is the verified Raft leader. The returned summary mirrors the shape of AdminDescribeTable on the same name so the SPA can show the freshly-created table without an extra describe round-trip.

Errors:

  • ErrAdminForbidden when the principal cannot write.
  • ErrAdminNotLeader when the node is a follower.
  • The adapter's standard dynamoAPIError chain for validation / storage failures, preserved unmodified so the HTTP handler can map the inner code (ValidationException, ResourceInUseException, etc.) to the appropriate status without re-classifying.

func (*DynamoDBServer) AdminDeleteTable

func (d *DynamoDBServer) AdminDeleteTable(ctx context.Context, principal AdminPrincipal, name string) error

AdminDeleteTable is the SigV4-bypass counterpart to deleteTable. Returns the same sentinel errors as AdminCreateTable plus the adapter's standard dynamoErrResourceNotFound when the table is absent — admin handlers should map that to 404 rather than 500.

func (*DynamoDBServer) AdminDescribeTable

func (d *DynamoDBServer) AdminDescribeTable(ctx context.Context, name string) (*AdminTableSummary, bool, error)

AdminDescribeTable returns a schema snapshot for name. The triple (result, present, error) lets admin callers distinguish a genuine "not found" from a storage error without sniffing sentinels: when the table is missing the function returns (nil, false, nil).

Unlike the SigV4 describeTable handler, AdminDescribeTable does NOT invoke ensureLegacyTableMigration. The admin dashboard is a strictly read-only surface (Gemini medium review on PR #633), so triggering Raft-coordinated key-encoding migrations as a side effect of routine polling would (a) violate the read-only contract and (b) cause every dashboard refresh to write to the cluster. Migration still runs lazily on the next SigV4 read or write of the same table — the schema we return here is just a snapshot for display, not a guarantee that the table is up-to-date for serving.

func (*DynamoDBServer) AdminListTables

func (d *DynamoDBServer) AdminListTables(ctx context.Context) ([]string, error)

AdminListTables returns every Dynamo-style table this server knows about, in the lexicographic order the metadata index produces. Intended for the in-process admin listener as the SigV4-free counterpart to the listTables HTTP handler; both share the same underlying lookup so the two views cannot drift.

func (*DynamoDBServer) Run

func (d *DynamoDBServer) Run() error

func (*DynamoDBServer) Stop

func (d *DynamoDBServer) Stop()

type DynamoDBServerOption

type DynamoDBServerOption func(*DynamoDBServer)

func WithDynamoDBActiveTimestampTracker

func WithDynamoDBActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) DynamoDBServerOption

func WithDynamoDBLeaderMap

func WithDynamoDBLeaderMap(m map[string]string) DynamoDBServerOption

WithDynamoDBLeaderMap configures the Raft-address-to-DynamoDB-address mapping used to forward requests from followers to the current leader. The format mirrors the raftRedisMap / raftS3Map convention.

func WithDynamoDBRequestObserver

func WithDynamoDBRequestObserver(observer monitoring.DynamoDBRequestObserver) DynamoDBServerOption

WithDynamoDBRequestObserver enables Prometheus-compatible request metrics.

type GRPCServer

type GRPCServer struct {
	pb.UnimplementedRawKVServer
	pb.UnimplementedTransactionalKVServer
	// contains filtered or unexported fields
}

func NewGRPCServer

func NewGRPCServer(store store.MVCCStore, coordinate kv.Coordinator, opts ...GRPCServerOption) *GRPCServer

func (*GRPCServer) Close

func (r *GRPCServer) Close() error

func (*GRPCServer) Commit

func (r *GRPCServer) Commit(ctx context.Context, req *pb.CommitRequest) (*pb.CommitResponse, error)

func (*GRPCServer) Delete

func (r *GRPCServer) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteResponse, error)

func (*GRPCServer) Get

func (r *GRPCServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error)

func (*GRPCServer) PreWrite

func (*GRPCServer) Put

func (r *GRPCServer) Put(ctx context.Context, req *pb.PutRequest) (*pb.PutResponse, error)

func (*GRPCServer) RawDelete

func (r *GRPCServer) RawDelete(ctx context.Context, req *pb.RawDeleteRequest) (*pb.RawDeleteResponse, error)

func (*GRPCServer) RawGet

func (r *GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawGetResponse, error)

func (*GRPCServer) RawLatestCommitTS

func (*GRPCServer) RawPut

func (r *GRPCServer) RawPut(ctx context.Context, req *pb.RawPutRequest) (*pb.RawPutResponse, error)

func (*GRPCServer) RawScanAt

func (r *GRPCServer) RawScanAt(ctx context.Context, req *pb.RawScanAtRequest) (*pb.RawScanAtResponse, error)

func (*GRPCServer) Rollback

func (r *GRPCServer) Rollback(ctx context.Context, req *pb.RollbackRequest) (*pb.RollbackResponse, error)

func (*GRPCServer) Scan

func (r *GRPCServer) Scan(ctx context.Context, req *pb.ScanRequest) (*pb.ScanResponse, error)

type GRPCServerOption

type GRPCServerOption func(*GRPCServer)

func WithCloseStore

func WithCloseStore() GRPCServerOption

type HTFIFOCapabilityPeerStatus

type HTFIFOCapabilityPeerStatus struct {
	// Address is the peer's host:port as supplied to the poller.
	Address string

	// HasHTFIFO is true iff the peer's /sqs_health JSON body's
	// capabilities array contained the htfifo capability string.
	HasHTFIFO bool

	// Capabilities is the parsed capabilities array. Nil on any
	// failure before JSON parsing, or non-nil but missing
	// htfifo when the peer is on an older binary.
	Capabilities []string

	// Error is empty on a clean success (HTTP 200 + parseable
	// JSON, regardless of whether HasHTFIFO is true) and non-empty
	// on any failure (transport error, non-200 status, malformed
	// JSON, or context cancellation).
	Error string
}

HTFIFOCapabilityPeerStatus is one peer's polling result.

type HTFIFOCapabilityReport

type HTFIFOCapabilityReport struct {
	// AllAdvertise is true iff every peer in the input list
	// returned a /sqs_health body whose `capabilities` array
	// contains the htfifo capability string. False on any timeout,
	// HTTP error, malformed body, or missing-capability — the
	// gate fails closed.
	//
	// Vacuously true on an empty peer list. The caller (CreateQueue
	// gate) is responsible for ensuring the peer list reflects the
	// current cluster membership before consulting this report.
	AllAdvertise bool

	// Peers is the per-peer status, indexed in input order. Each
	// entry has either HasHTFIFO=true (peer advertised the
	// capability) or a non-empty Error explaining why the peer
	// did not pass. Capabilities is the raw list returned by the
	// peer when the body was parseable.
	Peers []HTFIFOCapabilityPeerStatus
}

HTFIFOCapabilityReport summarises the result of polling each peer's /sqs_health endpoint for the htfifo capability. Used by the CreateQueue capability gate (Phase 3.D PR 5) and by operator tooling that needs to confirm a rolling upgrade has finished before enabling partitioned FIFO queues.

AllAdvertise is the binary go/no-go signal for the gate; Peers carries per-node detail for log lines and operator triage.

func PollSQSHTFIFOCapability

func PollSQSHTFIFOCapability(ctx context.Context, peers []string, cfg PollerConfig) *HTFIFOCapabilityReport

PollSQSHTFIFOCapability polls each peer's /sqs_health endpoint concurrently and reports whether all advertise htfifo. The helper is stateless — every call dials its peers fresh, so a transient network blip on one call does not poison subsequent calls.

Per-peer behaviour:

  • GET http://<peer>/sqs_health with Accept: application/json
  • Expect HTTP 200 and a parseable JSON body matching {"status":"ok","capabilities":[...]}.
  • HasHTFIFO is the membership of htfifo in capabilities.
  • Any failure (transport error, non-200, malformed JSON, timeout, context cancellation) records the reason in Error and leaves HasHTFIFO=false. The poller never returns a fatal error from PollSQSHTFIFOCapability itself; the report carries every per-peer outcome instead.

Concurrency: peers are polled in goroutines; results land via an indexed channel so the slice writes are obviously race-free.

Timeouts: each peer poll is bounded by min(ctx.Deadline(), now+cfg.PerPeerTimeout). A long ctx deadline does not extend the per-peer cap, and an absent ctx deadline still triggers fail-closed at the per-peer cap.

type Internal

type Internal struct {
	pb.UnimplementedInternalServer
	// contains filtered or unexported fields
}

func NewInternalWithEngine

func NewInternalWithEngine(txm kv.Transactional, leader raftengine.LeaderView, clock *kv.HLC, relay *RedisPubSubRelay) *Internal

func (*Internal) Forward

func (i *Internal) Forward(ctx context.Context, req *pb.ForwardRequest) (*pb.ForwardResponse, error)

func (*Internal) RelayPublish

type KeyVizSampler

type KeyVizSampler interface {
	// Snapshot returns the matrix columns in [from, to). Either
	// bound may be the zero time meaning unbounded on that side.
	// Implementations must return rows the caller can mutate freely
	// (a deep copy) — see keyviz.MemSampler.Snapshot.
	Snapshot(from, to time.Time) []keyviz.MatrixColumn
}

KeyVizSampler is the read-side abstraction the Admin service needs from the keyviz package: a time-bounded matrix snapshot. Defined here (not in keyviz) so tests can pass an in-memory fake without constructing a full *keyviz.MemSampler. *keyviz.MemSampler satisfies this interface.

type Node

type Node struct {
	// contains filtered or unexported fields
}

type NodeIdentity

type NodeIdentity struct {
	NodeID      string
	GRPCAddress string
}

NodeIdentity is the value form of the protobuf NodeIdentity message used for AdminServer configuration. It avoids copying pb.NodeIdentity, which embeds a protoimpl.MessageState (and a mutex).

type PollerConfig

type PollerConfig struct {
	// HTTPClient is the client used for /sqs_health GETs. Nil
	// falls back to http.DefaultClient. Callers wanting connection
	// pooling, custom Transport, or shorter Client.Timeout pass
	// their own.
	HTTPClient *http.Client

	// PerPeerTimeout caps how long any single peer's poll runs
	// before being abandoned. Zero defaults to
	// defaultSQSCapabilityPollTimeout (3s). Tests pass a small
	// value (e.g. 100ms) so the per-peer cap path can be
	// exercised quickly without a parent context deadline.
	PerPeerTimeout time.Duration
}

PollerConfig tunes PollSQSHTFIFOCapability for a specific call site. All fields are optional — the zero value picks safe defaults. Tests use the explicit PerPeerTimeout to exercise the per-peer cap independently of any caller-supplied context deadline.

type RedisPubSubRelay

type RedisPubSubRelay struct {
	// contains filtered or unexported fields
}

RedisPubSubRelay lets the internal gRPC service publish into the local Redis pubsub bus without depending on RedisServer startup order.

func NewRedisPubSubRelay

func NewRedisPubSubRelay() *RedisPubSubRelay

func (*RedisPubSubRelay) Bind

func (r *RedisPubSubRelay) Bind(publish func(channel, message []byte) int64)

func (*RedisPubSubRelay) Publish

func (r *RedisPubSubRelay) Publish(channel, message []byte) int64

type RedisServer

type RedisServer struct {
	// contains filtered or unexported fields
}

func NewRedisServer

func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore, coordinate kv.Coordinator, leaderRedis map[string]string, relay *RedisPubSubRelay, opts ...RedisServerOption) *RedisServer

func (*RedisServer) Close

func (r *RedisServer) Close() error

Close cancels the base context, signalling all in-flight handlers to abort. Idempotent. The underlying redcon listener is still owned by the caller; Close does NOT touch it so shutdown orchestration can remain with the server owner.

func (*RedisServer) Run

func (r *RedisServer) Run() error

func (*RedisServer) Stop

func (r *RedisServer) Stop()

type RedisServerOption

type RedisServerOption func(*RedisServer)

func WithLuaFastPathObserver

func WithLuaFastPathObserver(observer monitoring.LuaFastPathObserver) RedisServerOption

WithLuaFastPathObserver enables per-redis.call() fast-path outcome metrics inside Lua scripts. Used to diagnose fast-path hit ratios for commands like ZRANGEBYSCORE / ZSCORE / HGET.

Resolves per-command counter handles up front so the hot path avoids CounterVec.WithLabelValues on every redis.call().

func WithLuaObserver

func WithLuaObserver(observer monitoring.LuaScriptObserver) RedisServerOption

WithLuaObserver enables per-phase Lua script metrics (VM exec, Raft commit, retries).

func WithRedisActiveTimestampTracker

func WithRedisActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) RedisServerOption

func WithRedisCompactor

func WithRedisCompactor(c *DeltaCompactor) RedisServerOption

WithRedisCompactor wires a DeltaCompactor to the RedisServer so that urgent single-key compaction can be triggered when ErrDeltaScanTruncated is hit.

func WithRedisRequestObserver

func WithRedisRequestObserver(observer monitoring.RedisRequestObserver) RedisServerOption

WithRedisRequestObserver enables Prometheus-compatible request metrics.

type S3Server

type S3Server struct {
	// contains filtered or unexported fields
}

func NewS3Server

func NewS3Server(listen net.Listener, s3Addr string, st store.MVCCStore, coordinate kv.Coordinator, leaderS3 map[string]string, opts ...S3ServerOption) *S3Server

func (*S3Server) AdminCreateBucket

func (s *S3Server) AdminCreateBucket(ctx context.Context, principal AdminPrincipal, name, acl string) (*AdminBucketSummary, error)

AdminCreateBucket creates a bucket on behalf of the admin dashboard. The principal MUST be re-validated by the caller (the admin HTTP handler does this against the live RoleStore); this method enforces the authorisation invariant a second time so a follower-forwarded call cannot smuggle a read-only principal past the check on the leader side (Section 3.2 "認可の真実は常に adapter 側").

The transaction is atomic: bucket meta + generation + ACL all land in a single OperationGroup, mirroring the SigV4 createBucket path. On success returns the freshly-stored summary; on conflict returns ErrAdminBucketAlreadyExists; on a non-leader / non-full-role / bad input returns the corresponding sentinel.

func (*S3Server) AdminDeleteBucket

func (s *S3Server) AdminDeleteBucket(ctx context.Context, principal AdminPrincipal, name string) error

AdminDeleteBucket removes a bucket if it is empty. Same authorisation contract as the other admin write methods. The bucket-must-be-empty rule mirrors the SigV4 deleteBucket path — the dashboard cannot force a recursive delete, by design.

The dispatch happens in two phases because the production coordinator (kv/sharded_coordinator.go: dispatchDelPrefixBroadcast) rejects DEL_PREFIX inside a transaction and rejects DEL_PREFIX mixed with Del or Put in the same OperationGroup:

Phase 1: Del BucketMetaKey in a txn (OCC-protected against
         a concurrent AdminCreateBucket landing between our
         readTS and commitTS).
Phase 2: DEL_PREFIX over every per-bucket key family in a
         non-txn broadcast — the safety net that sweeps
         orphans left by any PutObject that committed
         chunks/manifest between the empty-probe and the
         Phase-1 commit. See design doc
         2026_04_28_proposed_admin_delete_bucket_safety_net.md
         §6.2 for the original single-OperationGroup design
         and the dispatch-shape rejection that forced the
         two-phase split.

Phase 2 is best-effort: a Phase-2 failure leaves the bucket meta already deleted (Phase 1 succeeded) but per-bucket prefixes possibly still containing orphans. That state is no worse than the pre-fix behaviour on main and recovers on operator-driven re-cleanup. We log a warning rather than propagate the error so the operator-visible delete reports success — the bucket really is gone from the API surface, and a retry would 404 because loadBucketMetaAt no longer finds the meta.

BucketGenerationKey is intentionally NOT deleted. Re-creating the bucket bumps the generation; orphan blobs that escaped this delete (e.g. on an older generation) stay isolated under the old generation prefix and never surface in the new bucket. Pinned by TestS3Server_AdminDeleteBucket_BucketGenerationKeySurvives.

The contract change for clients: a PutObject that returned 200 OK during the race window can have its data swept by the concurrent delete. Operators are advised to pause writes before AdminDeleteBucket; the alternative (orphan objects that no API can enumerate or remove) is strictly worse.

The same shape is mirrored on the SigV4 path (adapter/s3.go:deleteBucket) so both delete entrypoints share the same race-window guarantees.

func (*S3Server) AdminDescribeBucket

func (s *S3Server) AdminDescribeBucket(ctx context.Context, name string) (*AdminBucketSummary, bool, error)

AdminDescribeBucket returns the bucket-level snapshot for name. The triple (result, present, error) lets admin callers distinguish a genuine "not found" from a storage error without sniffing sentinels — when the bucket is missing the function returns (nil, false, nil), mirroring AdminDescribeTable's contract on the Dynamo side.

Like AdminListBuckets this is a read-only path that bypasses SigV4. The HTTP admin handler enforces session + CSRF + role at the boundary; the adapter trusts the caller for authentication (Section 3.2's exception for read-only paths).

func (*S3Server) AdminListBuckets

func (s *S3Server) AdminListBuckets(ctx context.Context) ([]AdminBucketSummary, error)

AdminListBuckets returns every S3-style bucket this server knows about, in lexicographic order (the metadata-prefix scan natural ordering). Intended for the in-process admin listener as the SigV4-free counterpart to the listBuckets HTTP handler.

Unlike the SigV4 path (which intentionally caps each call at s3MaxKeys = 1000 because the AWS API is page-based), the admin dashboard's pagination is implemented at the handler layer, which expects this method to return the full set. We loop the per-page ScanAt until the metadata prefix is exhausted — same pattern as scanAllByPrefixAt on the Dynamo side (Codex P1 + Claude Issue 1 on PR #658).

Returns an empty slice (not nil) when no buckets exist so JSON callers see `[]` instead of `null`.

func (*S3Server) AdminPutBucketAcl

func (s *S3Server) AdminPutBucketAcl(ctx context.Context, principal AdminPrincipal, name, acl string) error

AdminPutBucketAcl swaps the canned ACL on an existing bucket. Same authorisation contract as AdminCreateBucket. Mutates only the meta.Acl field; generation is preserved so existing object references stay valid.

func (*S3Server) Run

func (s *S3Server) Run() error

func (*S3Server) Stop

func (s *S3Server) Stop()

type S3ServerOption

type S3ServerOption func(*S3Server)

func WithS3ActiveTimestampTracker

func WithS3ActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) S3ServerOption

func WithS3Region

func WithS3Region(region string) S3ServerOption

func WithS3StaticCredentials

func WithS3StaticCredentials(creds map[string]string) S3ServerOption

type SQSPartitionObserver

type SQSPartitionObserver interface {
	ObservePartitionMessage(queue string, partition uint32, action string)
}

SQSPartitionObserver is the metrics-package interface (monitoring.SQSPartitionObserver) re-declared here so the adapter does not import monitoring at the package boundary — matches the existing observer pattern for DynamoDB / Redis.

type SQSPartitionResolver

type SQSPartitionResolver struct {
	// contains filtered or unexported fields
}

SQSPartitionResolver maps a partitioned-SQS key to the operator- chosen Raft group for the (queue, partition) tuple. Implements kv.PartitionResolver via duck typing — see the integration in main.go where the resolver is installed on ShardedCoordinator.

The byte-range engine cannot route partitioned queues because adding per-partition routes would break its non-overlapping-cover invariant (a partition route for partition K of one queue would leave a gap for legacy keys that fall lexicographically between partitions K and K+1). The resolver-first dispatch path avoids this — it answers only for keys that match a partitioned family prefix and otherwise lets the engine handle dispatch.

func NewSQSPartitionResolver

func NewSQSPartitionResolver(routes map[string][]uint64) *SQSPartitionResolver

NewSQSPartitionResolver builds a resolver from the operator- supplied partition map. routes[queue][k] is the Raft group ID that owns partition k of queue, with len(routes[queue]) equal to the queue's PartitionCount.

Returns nil when routes is empty so callers can keep the resolver out of the request path entirely on a non-partitioned cluster (kv.ShardRouter.WithPartitionResolver(nil) is a documented no-op).

The constructor takes a defensive copy so a later caller mutation to the input map does not leak into the resolver's view at runtime.

func (*SQSPartitionResolver) RecognisesPartitionedKey

func (r *SQSPartitionResolver) RecognisesPartitionedKey(key []byte) bool

RecognisesPartitionedKey reports whether key has the structural shape of a partitioned-SQS key — i.e. starts with one of the partitioned family prefixes. The check is PREFIX-ONLY, not a full parse: a key with a partitioned prefix followed by a malformed queue / partition segment still answers true, so the router fails closed via kv.PartitionResolver semantics instead of falling through to the engine and silently routing to the SQS catalog default group via routeKey's !sqs|route|global collapse (round 5 review nit on PR #715).

A nil receiver returns false so kv.ShardRouter's typed-nil case (ResolveGroup(nil) == (0, false)) pairs with an honest "I don't recognise anything" answer instead of falsely claiming a shape.

func (*SQSPartitionResolver) ResolveGroup

func (r *SQSPartitionResolver) ResolveGroup(key []byte) (uint64, bool)

ResolveGroup decodes the (queue, partition) embedded in a partitioned-SQS key and returns the operator-chosen Raft group.

Returns (0, false) for any key that does not match a partitioned family prefix (legacy SQS, KV, S3, DynamoDB, queue-meta records, …) so kv.ShardRouter falls through to its byte-range engine for default routing.

Returns (0, false) for a partitioned-shaped key whose queue is not in the routes map or whose partition index is beyond len(routes[queue]). The router pairs this with RecognisesPartitionedKey to fail closed instead of falling through — silently routing through the engine's !sqs|route|global default would mis-route HT-FIFO traffic during partition-map drift (codex P1 round 2 on PR #715).

func (*SQSPartitionResolver) RoutedPartitionCount

func (r *SQSPartitionResolver) RoutedPartitionCount(queueName string) int

RoutedPartitionCount returns the number of partition routes configured for queueName, or 0 if the queue is not in the routing map. Used by the CreateQueue capability gate (validateHTFIFOCapability) to verify that EVERY partition of a requested partitioned queue is routable BEFORE the create commits — without this, a queue could land with PartitionCount=N but only K<N routes, and SendMessage on the missing partitions would fail closed at the router with "no route for key" (Codex P1 review on PR #734).

A nil receiver returns 0 so the gate's "resolver==nil → skip the coverage check" branch kicks in cleanly: a single-shard / no---sqsFifoPartitionMap deployment has no per-partition routing to verify, and partitioned keys fall through to the engine's default group.

type SQSQueueDepth

type SQSQueueDepth struct {
	Queue      string
	Visible    int64
	NotVisible int64
	Delayed    int64
}

SQSQueueDepth is one queue's depth-attribute snapshot, the unit the SQSServer hands to monitoring.SQSObserver on each tick. The fields mirror sqsApproxCounters byte-for-byte and the public AdminQueueCounters JSON shape — operators see consistent numbers in dashboards and the admin SPA.

type SQSServer

type SQSServer struct {
	// contains filtered or unexported fields
}

func NewSQSServer

func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordinator, opts ...SQSServerOption) *SQSServer

func (*SQSServer) AdminDeleteQueue

func (s *SQSServer) AdminDeleteQueue(ctx context.Context, principal AdminPrincipal, name string) error

AdminDeleteQueue is the SigV4-bypass counterpart to deleteQueue. Returns the same sentinel errors as AdminCreateTable on the Dynamo side: ErrAdminForbidden on a read-only principal, ErrAdminNotLeader on a follower, ErrAdminSQSNotFound when the queue is absent.

func (*SQSServer) AdminDescribeQueue

func (s *SQSServer) AdminDescribeQueue(ctx context.Context, name string) (*AdminQueueSummary, bool, error)

AdminDescribeQueue returns a snapshot of name's metadata plus the approximate counters. The triple (result, present, error) lets admin callers distinguish a missing queue from a storage error without sniffing sentinels.

Like AdminDescribeTable on the Dynamo side, this entrypoint runs on either the leader or a follower (read-only); the counter scan uses a fresh nextTxnReadTS so the result is consistent with what SigV4 GetQueueAttributes would have returned at the same instant.

func (*SQSServer) AdminListQueues

func (s *SQSServer) AdminListQueues(ctx context.Context) ([]string, error)

AdminListQueues returns every queue name this server knows about, in the lexicographic order the queue catalog index produces. Read path; runs on follower or leader and uses the same scanQueueNames helper the SigV4 ListQueues handler does.

func (*SQSServer) Run

func (s *SQSServer) Run() error

func (*SQSServer) SnapshotQueueDepths

func (s *SQSServer) SnapshotQueueDepths(ctx context.Context) ([]SQSQueueDepth, bool)

SnapshotQueueDepths satisfies monitoring.SQSDepthSource. The observer Start loop calls this on every tick.

Returns:

  • (snaps, true) — leader, scrape OK. Observer writes snaps to the gauges and diffs against the previous tick (forgetting any queue that disappeared from this snapshot).
  • (nil, true) — this node is a follower (leader-only emission keeps gauges consistent with AdminListQueues / AdminDescribeQueue at the same instant — follower scans would race the leader's writes). Empty-but-OK so the observer ForgetQueue's any gauges this node was emitting before stepping down.
  • (nil, false) — leader, but scrape failed (transient catalog-read error or ctx cancel mid-scan). Tells the observer to skip this tick: leave existing gauges in place rather than wiping every depth series — a single failed scrape would otherwise dashboard-render as a false "all queues drained" event until the next successful tick.

Per-queue scan errors (loadQueueMetaAt / scanApproxCounters) remain handled in-line by snapshotOneQueueDepth: the offending queue is dropped from this tick's snapshot but ok stays true, so the observer ForgetQueue's just that one queue's gauges. Only a top-level scanQueueNames failure (which would silently turn into "no queues anywhere") flips ok to false.

func (*SQSServer) Stop

func (s *SQSServer) Stop()

type SQSServerOption

type SQSServerOption func(*SQSServer)

func WithSQSLeaderMap

func WithSQSLeaderMap(m map[string]string) SQSServerOption

WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to forward requests from followers to the current leader. Format mirrors WithDynamoDBLeaderMap / WithS3LeaderMap.

func WithSQSPartitionObserver

func WithSQSPartitionObserver(o SQSPartitionObserver) SQSServerOption

WithSQSPartitionObserver installs the elastickv_sqs_partition_messages_total counter observer on the SQS server. Pass nil (the default) on non-monitored test fixtures; the partitioned send / receive / delete paths then observe via a nil interface and the metric stays at zero. The monitoring registry's SQSPartitionObserver() returns the concrete implementation in production.

func WithSQSPartitionResolver

func WithSQSPartitionResolver(r *SQSPartitionResolver) SQSServerOption

WithSQSPartitionResolver installs the cluster's partition resolver on the SQS server so the CreateQueue capability gate (validateHTFIFOCapability) can verify routing coverage before admitting a partitioned create. Pass nil (the default) on single-shard / no---sqsFifoPartitionMap deployments — the gate then skips the coverage check.

Callers must ensure the resolver passed here matches the one installed on the kv coordinator via WithPartitionResolver, otherwise the gate would admit a queue that the coordinator then fails to route. main.go builds the resolver once and hands the same pointer to both consumers.

func WithSQSRegion

func WithSQSRegion(region string) SQSServerOption

WithSQSRegion configures the signing region the adapter expects inside the Credential scope. Empty values retain the previous setting.

func WithSQSStaticCredentials

func WithSQSStaticCredentials(creds map[string]string) SQSServerOption

WithSQSStaticCredentials supplies the access-key → secret map the adapter will accept. Passing an empty map disables authorization entirely (open endpoint), matching the S3 adapter's behavior for unit-test friendliness.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL