Documentation
¶
Index ¶
- Constants
- Variables
- func AdminErrorMessage(err error) string
- func AdminTokenAuth(token string) (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor)
- func AdvertisesHTFIFO() bool
- func IsAdminTableAlreadyExists(err error) bool
- func IsAdminTableNotFound(err error) bool
- func IsAdminValidation(err error) bool
- type AdminAttribute
- type AdminBucketSummary
- type AdminCreateGSI
- type AdminCreateTableInput
- type AdminGSISummary
- type AdminGroup
- type AdminPrincipal
- type AdminQueueCounters
- type AdminQueueSummary
- type AdminRole
- type AdminServer
- func (s *AdminServer) GetClusterOverview(ctx context.Context, _ *pb.GetClusterOverviewRequest) (*pb.GetClusterOverviewResponse, error)
- func (s *AdminServer) GetKeyVizMatrix(_ context.Context, req *pb.GetKeyVizMatrixRequest) (*pb.GetKeyVizMatrixResponse, error)
- func (s *AdminServer) GetRaftGroups(_ context.Context, _ *pb.GetRaftGroupsRequest) (*pb.GetRaftGroupsResponse, error)
- func (s *AdminServer) RegisterGroup(groupID uint64, g AdminGroup)
- func (s *AdminServer) RegisterSampler(sampler KeyVizSampler)
- func (s *AdminServer) SetClock(now func() time.Time)
- type AdminTableSummary
- type DeltaCompactor
- type DeltaCompactorOption
- type DistributionServer
- func (s *DistributionServer) GetRoute(ctx context.Context, req *pb.GetRouteRequest) (*pb.GetRouteResponse, error)
- func (s *DistributionServer) GetTimestamp(ctx context.Context, req *pb.GetTimestampRequest) (*pb.GetTimestampResponse, error)
- func (s *DistributionServer) ListRoutes(ctx context.Context, req *pb.ListRoutesRequest) (*pb.ListRoutesResponse, error)
- func (s *DistributionServer) SplitRange(ctx context.Context, req *pb.SplitRangeRequest) (*pb.SplitRangeResponse, error)
- func (s *DistributionServer) UpdateRoute(start, end []byte, group uint64)
- type DistributionServerOption
- type DynamoDBServer
- func (d *DynamoDBServer) AdminCreateTable(ctx context.Context, principal AdminPrincipal, in AdminCreateTableInput) (*AdminTableSummary, error)
- func (d *DynamoDBServer) AdminDeleteTable(ctx context.Context, principal AdminPrincipal, name string) error
- func (d *DynamoDBServer) AdminDescribeTable(ctx context.Context, name string) (*AdminTableSummary, bool, error)
- func (d *DynamoDBServer) AdminListTables(ctx context.Context) ([]string, error)
- func (d *DynamoDBServer) Run() error
- func (d *DynamoDBServer) Stop()
- type DynamoDBServerOption
- type GRPCServer
- func (r *GRPCServer) Close() error
- func (r *GRPCServer) Commit(ctx context.Context, req *pb.CommitRequest) (*pb.CommitResponse, error)
- func (r *GRPCServer) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteResponse, error)
- func (r *GRPCServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error)
- func (r *GRPCServer) PreWrite(ctx context.Context, req *pb.PreWriteRequest) (*pb.PreCommitResponse, error)
- func (r *GRPCServer) Put(ctx context.Context, req *pb.PutRequest) (*pb.PutResponse, error)
- func (r *GRPCServer) RawDelete(ctx context.Context, req *pb.RawDeleteRequest) (*pb.RawDeleteResponse, error)
- func (r *GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawGetResponse, error)
- func (r *GRPCServer) RawLatestCommitTS(ctx context.Context, req *pb.RawLatestCommitTSRequest) (*pb.RawLatestCommitTSResponse, error)
- func (r *GRPCServer) RawPut(ctx context.Context, req *pb.RawPutRequest) (*pb.RawPutResponse, error)
- func (r *GRPCServer) RawScanAt(ctx context.Context, req *pb.RawScanAtRequest) (*pb.RawScanAtResponse, error)
- func (r *GRPCServer) Rollback(ctx context.Context, req *pb.RollbackRequest) (*pb.RollbackResponse, error)
- func (r *GRPCServer) Scan(ctx context.Context, req *pb.ScanRequest) (*pb.ScanResponse, error)
- type GRPCServerOption
- type HTFIFOCapabilityPeerStatus
- type HTFIFOCapabilityReport
- type Internal
- type KeyVizSampler
- type Node
- type NodeIdentity
- type PollerConfig
- type RedisPubSubRelay
- type RedisServer
- type RedisServerOption
- func WithLuaFastPathObserver(observer monitoring.LuaFastPathObserver) RedisServerOption
- func WithLuaObserver(observer monitoring.LuaScriptObserver) RedisServerOption
- func WithRedisActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) RedisServerOption
- func WithRedisCompactor(c *DeltaCompactor) RedisServerOption
- func WithRedisRequestObserver(observer monitoring.RedisRequestObserver) RedisServerOption
- type S3Server
- func (s *S3Server) AdminCreateBucket(ctx context.Context, principal AdminPrincipal, name, acl string) (*AdminBucketSummary, error)
- func (s *S3Server) AdminDeleteBucket(ctx context.Context, principal AdminPrincipal, name string) error
- func (s *S3Server) AdminDescribeBucket(ctx context.Context, name string) (*AdminBucketSummary, bool, error)
- func (s *S3Server) AdminListBuckets(ctx context.Context) ([]AdminBucketSummary, error)
- func (s *S3Server) AdminPutBucketAcl(ctx context.Context, principal AdminPrincipal, name, acl string) error
- func (s *S3Server) Run() error
- func (s *S3Server) Stop()
- type S3ServerOption
- type SQSPartitionObserver
- type SQSPartitionResolver
- type SQSQueueDepth
- type SQSServer
- func (s *SQSServer) AdminDeleteQueue(ctx context.Context, principal AdminPrincipal, name string) error
- func (s *SQSServer) AdminDescribeQueue(ctx context.Context, name string) (*AdminQueueSummary, bool, error)
- func (s *SQSServer) AdminListQueues(ctx context.Context) ([]string, error)
- func (s *SQSServer) Run() error
- func (s *SQSServer) SnapshotQueueDepths(ctx context.Context) ([]SQSQueueDepth, bool)
- func (s *SQSServer) Stop()
- type SQSServerOption
- func WithSQSLeaderMap(m map[string]string) SQSServerOption
- func WithSQSPartitionObserver(o SQSPartitionObserver) SQSServerOption
- func WithSQSPartitionResolver(r *SQSPartitionResolver) SQSServerOption
- func WithSQSRegion(region string) SQSServerOption
- func WithSQSStaticCredentials(creds map[string]string) SQSServerOption
Constants ¶
const ( SQSPartitionActionSend = "send" SQSPartitionActionReceive = "receive" SQSPartitionActionDelete = "delete" )
SQSPartitionAction* mirror the action label values from monitoring.SQSPartitionAction*. Re-declared so adapter call sites do not need a monitoring import; the observer interface validates the value at runtime so a drift between these constants and the monitoring side surfaces as a dropped observation rather than a wedge.
const ( // SqsQueueMetaPrefix prefixes queue-metadata records. SqsQueueMetaPrefix = "!sqs|queue|meta|" // SqsQueueGenPrefix prefixes the per-queue monotonic generation counter. // Bumped on DeleteQueue / PurgeQueue so keys from an older incarnation of // the same queue name cannot leak into a newly created queue. SqsQueueGenPrefix = "!sqs|queue|gen|" // SqsQueueSeqPrefix prefixes the per-queue FIFO sequence counter. Bumped // on every FIFO send and embedded in the message record so consumers can // reconstruct the producer's strict total order. SqsQueueSeqPrefix = "!sqs|queue|seq|" // SqsMsgDedupPrefix prefixes FIFO deduplication records. Each entry // stores the original message id and the dedup-window expiry; the // receive path is unaware of these — they only gate sends. SqsMsgDedupPrefix = "!sqs|msg|dedup|" // SqsMsgGroupPrefix prefixes the FIFO group-lock records. The lock is // held by at most one message per group, persists across visibility // expiries, and is only released on DeleteMessage / DLQ redrive / // retention expiry. SqsMsgGroupPrefix = "!sqs|msg|group|" // SqsMsgByAgePrefix prefixes the send-age index. Each entry is // keyed by (queue, gen, send_timestamp, message_id) so the reaper // can find every record whose retention deadline has elapsed with // one bounded scan, without having to load every message body. SqsMsgByAgePrefix = "!sqs|msg|byage|" // SqsQueueTombstonePrefix prefixes a generation-orphan marker. // DeleteQueue and PurgeQueue each write one (queue, gen) tombstone // in the same OCC transaction that supersedes that generation — // DeleteQueue tombstones the gen it removes the meta row at, and // PurgeQueue tombstones the pre-bump gen so the reaper can find // pre-purge orphans even if the queue is deleted before the next // reaper tick. The reaper enumerates these markers to clean up // orphan data / vis / byage / dedup / group keys for superseded // generations. The tombstone is itself deleted once the reaper // confirms no message-keyspace state remains for that (queue, gen). SqsQueueTombstonePrefix = "!sqs|queue|tombstone|" )
SQS keyspace prefixes. Kept in sync with the naming in docs/design/2026_04_24_proposed_sqs_compatible_adapter.md.
const ( SqsPartitionedMsgDataPrefix = "!sqs|msg|data|" + sqsPartitionedDiscriminator SqsPartitionedMsgVisPrefix = "!sqs|msg|vis|" + sqsPartitionedDiscriminator SqsPartitionedMsgDedupPrefix = "!sqs|msg|dedup|" + sqsPartitionedDiscriminator SqsPartitionedMsgGroupPrefix = "!sqs|msg|group|" + sqsPartitionedDiscriminator SqsPartitionedMsgByAgePrefix = "!sqs|msg|byage|" + sqsPartitionedDiscriminator )
SqsPartitionedMsg*Prefix mirrors each legacy SqsMsg*Prefix with the partitioned-keyspace discriminator inserted. Defined as full string constants (rather than runtime concatenation in each constructor) so the byte-layout invariant is asserted by the type system: a future rename of the discriminator must touch the constants here, not 6+ scattered string concatenations.
const ( SqsMsgDataPrefix = "!sqs|msg|data|" SqsMsgVisPrefix = "!sqs|msg|vis|" )
Message-keyspace prefixes. The data record holds the message body and state; the visibility index is a separate, visible_at-sorted key family so ReceiveMessage can find the next visible message with a single bounded prefix scan.
Variables ¶
var ( // ErrAdminBucketAlreadyExists signals that AdminCreateBucket // targeted a name already in use. Maps to 409 Conflict. ErrAdminBucketAlreadyExists = errors.New("s3 admin: bucket already exists") // ErrAdminBucketNotFound signals that AdminDeleteBucket / // AdminPutBucketAcl targeted a missing bucket. Maps to 404. ErrAdminBucketNotFound = errors.New("s3 admin: bucket not found") // ErrAdminBucketNotEmpty signals that AdminDeleteBucket targeted // a bucket that still has objects. Maps to 409 Conflict to match // the SigV4 path's BucketNotEmpty response (the dashboard cannot // force a recursive delete; the operator must clean up first). ErrAdminBucketNotEmpty = errors.New("s3 admin: bucket is not empty") // ErrAdminInvalidBucketName signals that AdminCreateBucket got // a name that does not satisfy validateS3BucketName. Maps to 400. ErrAdminInvalidBucketName = errors.New("s3 admin: invalid bucket name") // ErrAdminInvalidACL signals that the ACL string did not pass // validateS3CannedAcl. Maps to 400 (the SigV4 path returns 501 // NotImplemented for unsupported canned ACLs, but the admin API // is documented as private/public-read only and rejecting other // values as invalid input is a more useful contract for the // dashboard). ErrAdminInvalidACL = errors.New("s3 admin: invalid ACL") )
Sentinel errors the admin write methods return so the bridge in main_admin.go can translate them into admin-package vocabulary without sniffing strings. Named separately from ErrAdminTableAlreadyExists / ErrAdminTableNotFound on the Dynamo side so a future per-resource role / status divergence does not require renaming both packages' callers.
var ErrAdminForbidden = errors.New("dynamodb admin: principal lacks required role")
ErrAdminForbidden is returned when the principal lacks the role required for the operation. Admin handlers translate this to 403 "forbidden" without leaking which field of the principal failed the check.
var ErrAdminNotLeader = errors.New("dynamodb admin: this node is not the raft leader")
ErrAdminNotLeader is returned by every write entrypoint when this node is not the verified Raft leader. The admin HTTP handler translates this to 503 + Retry-After: 1 today; the future AdminForward RPC catches it as the trigger to forward to the leader instead.
var ErrAdminSQSNotFound = errors.New("sqs admin: queue not found")
ErrAdminSQSNotFound is returned by write entrypoints when the target queue does not exist. Maps to 404. The describe path uses the (nil, false, nil) tuple instead of this sentinel for the not-found signal, mirroring AdminDescribeTable.
var ErrAdminSQSValidation = errors.New("sqs admin: invalid queue name")
ErrAdminSQSValidation is returned when an admin entrypoint receives a request with a missing or syntactically-bad queue name. Maps to 400 in the admin HTTP handler.
var ErrAdminTokenRequired = errors.New("admin token file required; pass --adminInsecureNoAuth to run without")
ErrAdminTokenRequired is returned by NewAdminServer helpers when the operator failed to supply a token and also did not opt into insecure mode.
var ErrCollectionTooLarge = errors.New("collection too large")
ErrCollectionTooLarge is returned when a collection exceeds maxWideColumnItems.
var ErrDeltaScanTruncated = errors.New("delta scan truncated: compaction required")
ErrDeltaScanTruncated is returned when the delta scan result is truncated, indicating that synchronous compaction is required before the operation can proceed.
var ErrLeaderNotFound = errors.New("leader not found")
var ErrNotLeader = errors.New("not leader")
var ErrTxnTimestampOverflow = errors.New("txn timestamp overflow")
Functions ¶
func AdminErrorMessage ¶
AdminErrorMessage extracts the human-readable message from a dynamoAPIError for surfacing back to the SPA. Returns "" when err is not a structured adapter error so callers fall back to a generic message instead of leaking arbitrary err.Error() output.
func AdminTokenAuth ¶
func AdminTokenAuth(token string) (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor)
AdminTokenAuth builds a gRPC unary+stream interceptor pair enforcing "authorization: Bearer <token>" metadata against the supplied token. An empty token disables enforcement; callers should pair that mode with a --adminInsecureNoAuth flag so operators knowingly opt in.
func AdvertisesHTFIFO ¶
func AdvertisesHTFIFO() bool
AdvertisesHTFIFO reports whether this binary's /sqs_health endpoint lists the htfifo capability. Mirror of the package- internal htfifoCapabilityAdvertised constant, exposed for the SQS leadership-refusal hook in main.go that uses this signal to decide whether to refuse leadership of any Raft group hosting a partitioned FIFO queue.
Stays a function (not an exported constant) so a future runtime override (env var, --no-htfifo flag for graceful degradation) can be threaded through here without changing the call site.
func IsAdminTableAlreadyExists ¶
IsAdminTableAlreadyExists reports whether err is the adapter's "table already exists" failure (ResourceInUseException). The bridge in main_admin.go uses this to map the adapter's internal error vocabulary onto admin's HTTP-facing sentinels without importing the package-private dynamoAPIError type.
func IsAdminTableNotFound ¶
IsAdminTableNotFound is the ResourceNotFoundException counterpart for AdminDeleteTable / AdminDescribeTable mapped through the adapter's structured error chain.
func IsAdminValidation ¶
IsAdminValidation reports whether err is a validation failure the adapter signalled via ValidationException. Admin handlers map this to 400 + a sanitised message.
Types ¶
type AdminAttribute ¶
AdminAttribute names a single primary-key or GSI key column. Type must be one of "S", "N", "B" — DynamoDB does not allow boolean or list keys and the adapter's existing schema validation enforces the same restriction at the next layer.
type AdminBucketSummary ¶
type AdminBucketSummary struct {
Name string
ACL string
CreatedAtHLC uint64
Generation uint64
Region string
Owner string
}
AdminBucketSummary is the bucket-level information the admin dashboard surfaces. It deliberately projects only the fields the dashboard needs so the package's wire-format types (s3BucketMeta, s3ListBucketsResult) stay internal.
CreatedAtHLC is the same physical-time-bearing HLC the bucket metadata persists; the admin HTTP handler formats it for the SPA. ACL is the canned-ACL string ("private" / "public-read") — the admin layer does not expand it into the AWS ACL XML grant tree because the dashboard renders the canned form directly.
type AdminCreateGSI ¶
type AdminCreateGSI struct {
Name string
PartitionKey AdminAttribute
SortKey *AdminAttribute
ProjectionType string
NonKeyAttributes []string
}
AdminCreateGSI describes one global secondary index in an admin CreateTable request. SortKey is optional (hash-only GSI). When ProjectionType is "INCLUDE", NonKeyAttributes lists the projected attribute names; otherwise NonKeyAttributes is ignored.
type AdminCreateTableInput ¶
type AdminCreateTableInput struct {
TableName string
PartitionKey AdminAttribute
SortKey *AdminAttribute
GSI []AdminCreateGSI
}
AdminCreateTableInput is the admin-facing CreateTable shape. The HTTP handler maps the design 4.2 JSON body into this struct, then AdminCreateTable converts it to the adapter's internal createTableInput. We do not pass the SigV4-flavoured wire struct directly because that struct's field names track AWS exactly and would be awkward for the admin SPA to author.
type AdminGSISummary ¶
type AdminGSISummary struct {
Name string
PartitionKey string
SortKey string
ProjectionType string
}
AdminGSISummary mirrors AdminTableSummary for a single GSI.
type AdminGroup ¶
type AdminGroup interface {
Status() raftengine.Status
Configuration(ctx context.Context) (raftengine.Configuration, error)
}
AdminGroup exposes per-Raft-group state to the Admin service. It is a narrow subset of raftengine.Engine so tests can supply an in-memory fake without standing up a real Raft cluster. Configuration is polled on each GetClusterOverview to pick up scale-out / scale-in events without the operator having to restart the admin binary.
type AdminPrincipal ¶
AdminPrincipal is the authentication context every admin write entrypoint takes. The adapter re-evaluates authorisation against this principal *itself* — it does not trust the caller to have already enforced the role. That is the design's "認可の真実は常に adapter 側" invariant (Section 3.2): if a follower forwards a pre-authenticated request via the future AdminForward RPC, the leader must still verify before acting.
type AdminQueueCounters ¶
AdminQueueCounters matches sqsApproxCounters (int64) so the admin bridge does not have to convert between widths. Visible / NotVisible / Delayed are the AWS Approximate* triple.
type AdminQueueSummary ¶
type AdminQueueSummary struct {
Name string
IsFIFO bool
Generation uint64
CreatedAt time.Time
Attributes map[string]string
Counters AdminQueueCounters
}
AdminQueueSummary is the per-queue projection the admin dashboard surfaces. It deliberately covers only the fields the SPA renders so the package's wire-format types stay internal.
Counters mirror the AWS Approximate* attribute set produced by scanApproxCounters; they are best-effort by AWS contract and stop counting once the catalog's per-call cap is reached (the SPA polls continuously, so an unbounded scan would pin the leader).
type AdminRole ¶
type AdminRole string
AdminRole is the authorization tier the adapter checks against on every admin write entrypoint. The constants intentionally mirror internal/admin.Role string values so the wire / persisted role vocabulary stays aligned across packages, but we keep a separate type here so the adapter has zero dependency on internal/admin.
type AdminServer ¶
type AdminServer struct {
pb.UnimplementedAdminServer
// contains filtered or unexported fields
}
AdminServer implements the node-side Admin gRPC service described in docs/admin_ui_key_visualizer_design.md §4 (Layer A). Phase 0 only implements GetClusterOverview and GetRaftGroups; remaining RPCs return Unimplemented so the generated client can still compile against older nodes during rollout.
func NewAdminServer ¶
func NewAdminServer(self NodeIdentity, members []NodeIdentity) *AdminServer
NewAdminServer constructs an AdminServer. `self` identifies the local node for responses that return node identity. `members` is the static membership snapshot shipped to the admin binary; callers that already have a membership source may pass nil and let the admin binary's fan-out layer discover peers by other means.
func (*AdminServer) GetClusterOverview ¶
func (s *AdminServer) GetClusterOverview( ctx context.Context, _ *pb.GetClusterOverviewRequest, ) (*pb.GetClusterOverviewResponse, error)
GetClusterOverview returns the local node identity, the current member list, and per-group leader identity collected from the engines registered via RegisterGroup. The member list is the union of (a) the bootstrap seed supplied to NewAdminServer and (b) the live Configuration of every registered Raft group — the latter picks up scale-out nodes added after startup so the admin binary's fan-out discovery does not miss them.
func (*AdminServer) GetKeyVizMatrix ¶
func (s *AdminServer) GetKeyVizMatrix( _ context.Context, req *pb.GetKeyVizMatrixRequest, ) (*pb.GetKeyVizMatrixResponse, error)
GetKeyVizMatrix renders the keyviz heatmap matrix for the [from, to) range supplied by the request, returning one KeyVizRow per tracked route or virtual bucket and a parallel column-timestamp slice.
Series selection (Reads / Writes / ReadBytes / WriteBytes) maps from the request's KeyVizSeries enum to the matching keyviz.MatrixRow counter; KEYVIZ_SERIES_UNSPECIFIED defaults to Reads.
Returns codes.Unavailable when no sampler is registered (keyviz disabled) so callers can distinguish that from "no data yet" (which yields a successful empty response).
func (*AdminServer) GetRaftGroups ¶
func (s *AdminServer) GetRaftGroups( _ context.Context, _ *pb.GetRaftGroupsRequest, ) (*pb.GetRaftGroupsResponse, error)
GetRaftGroups returns per-group state snapshots. Phase 0 wires commit/applied indices only; per-follower contact and term history land in later phases.
func (*AdminServer) RegisterGroup ¶
func (s *AdminServer) RegisterGroup(groupID uint64, g AdminGroup)
RegisterGroup binds a Raft group ID to its engine so the Admin service can report leader and log state for that group.
func (*AdminServer) RegisterSampler ¶
func (s *AdminServer) RegisterSampler(sampler KeyVizSampler)
RegisterSampler wires the keyviz sampler used by GetKeyVizMatrix. Without this call (or with a nil sampler) the RPC returns codes.Unavailable so callers can distinguish "keyviz disabled" from "no data yet".
func (*AdminServer) SetClock ¶
func (s *AdminServer) SetClock(now func() time.Time)
SetClock overrides the clock used by GetRaftGroups, letting tests inject a fixed time without mutating any package-global state. Concurrent RPCs on other AdminServer instances are unaffected.
type AdminTableSummary ¶
type AdminTableSummary struct {
Name string
PartitionKey string
SortKey string
Generation uint64
GlobalSecondaryIndexes []AdminGSISummary
}
AdminTableSummary is the table-level information the admin dashboard surfaces for a single Dynamo-compatible table. It deliberately projects only the fields the dashboard needs so the package's wire-format types (dynamoTableSchema and friends) stay internal.
type DeltaCompactor ¶
type DeltaCompactor struct {
// contains filtered or unexported fields
}
DeltaCompactor folds accumulated delta keys into their corresponding base metadata keys for all wide-column collection types (List, Hash, Set, ZSet).
It runs as a background goroutine on the Raft leader. Non-leaders skip each tick silently. Compaction is performed as an OCC transaction so concurrent writers never conflict with the compactor.
func NewDeltaCompactor ¶
func NewDeltaCompactor(st store.MVCCStore, coord kv.Coordinator, opts ...DeltaCompactorOption) *DeltaCompactor
NewDeltaCompactor creates a DeltaCompactor that operates on st using coord.
func (*DeltaCompactor) Run ¶
func (c *DeltaCompactor) Run(ctx context.Context) error
Run starts the background compaction loop and blocks until ctx is cancelled.
func (*DeltaCompactor) SyncOnce ¶
func (c *DeltaCompactor) SyncOnce(ctx context.Context) error
SyncOnce runs one compaction pass. The IsLeader() guard avoids the full-prefix delta scan on followers, which would proxy cross-node on ShardStore backends. For sharded deployments where this node is the leader for a non-default shard group, the regular tick is skipped; those keys are still handled by the urgent compaction path (compactUrgentKey) which uses IsLeaderForKey for per-key routing. buildBatchElems adds an additional per-key IsLeaderForKey filter so a default-group leader never dispatches mutations for shards it does not own. Each collection-type handler runs in its own goroutine so that a slow handler (e.g. one with many list deltas) does not delay Hash/Set/ZSet compaction. All goroutines share the same per-tick timeout context.
func (*DeltaCompactor) TriggerUrgentCompaction ¶
func (c *DeltaCompactor) TriggerUrgentCompaction(typeName string, userKey []byte)
TriggerUrgentCompaction queues an immediate single-key compaction for a key whose delta count has exceeded MaxDeltaScanLimit. The request is dropped silently when the channel is full (the regular tick will catch it).
type DeltaCompactorOption ¶
type DeltaCompactorOption func(*DeltaCompactor)
DeltaCompactorOption configures a DeltaCompactor.
func WithDeltaCompactorLogger ¶
func WithDeltaCompactorLogger(l *slog.Logger) DeltaCompactorOption
WithDeltaCompactorLogger sets the logger.
func WithDeltaCompactorMaxDeltaCount ¶
func WithDeltaCompactorMaxDeltaCount(n int) DeltaCompactorOption
WithDeltaCompactorMaxDeltaCount sets the soft threshold at which a key's deltas are folded into its base metadata. Default: 64.
func WithDeltaCompactorScanInterval ¶
func WithDeltaCompactorScanInterval(d time.Duration) DeltaCompactorOption
WithDeltaCompactorScanInterval sets the period between compaction passes. Default: 30s.
func WithDeltaCompactorTimeout ¶
func WithDeltaCompactorTimeout(d time.Duration) DeltaCompactorOption
WithDeltaCompactorTimeout sets the per-tick timeout. Default: 5s.
type DistributionServer ¶
type DistributionServer struct {
pb.UnimplementedDistributionServer
// contains filtered or unexported fields
}
DistributionServer serves distribution related gRPC APIs.
func NewDistributionServer ¶
func NewDistributionServer(e *distribution.Engine, catalog *distribution.CatalogStore, opts ...DistributionServerOption) *DistributionServer
NewDistributionServer creates a new server.
func (*DistributionServer) GetRoute ¶
func (s *DistributionServer) GetRoute(ctx context.Context, req *pb.GetRouteRequest) (*pb.GetRouteResponse, error)
GetRoute returns route for a key.
func (*DistributionServer) GetTimestamp ¶
func (s *DistributionServer) GetTimestamp(ctx context.Context, req *pb.GetTimestampRequest) (*pb.GetTimestampResponse, error)
GetTimestamp returns monotonically increasing timestamp.
func (*DistributionServer) ListRoutes ¶
func (s *DistributionServer) ListRoutes(ctx context.Context, req *pb.ListRoutesRequest) (*pb.ListRoutesResponse, error)
ListRoutes returns all durable routes from catalog storage.
func (*DistributionServer) SplitRange ¶
func (s *DistributionServer) SplitRange(ctx context.Context, req *pb.SplitRangeRequest) (*pb.SplitRangeResponse, error)
SplitRange splits a route into two child routes in the same raft group.
func (*DistributionServer) UpdateRoute ¶
func (s *DistributionServer) UpdateRoute(start, end []byte, group uint64)
UpdateRoute allows updating route information.
type DistributionServerOption ¶
type DistributionServerOption func(*DistributionServer)
DistributionServerOption configures DistributionServer behavior.
func WithCatalogReloadRetryPolicy ¶
func WithCatalogReloadRetryPolicy(attempts int, interval time.Duration) DistributionServerOption
WithCatalogReloadRetryPolicy configures the retry policy used after split commit when waiting for the local catalog snapshot to become visible.
func WithDistributionActiveTimestampTracker ¶
func WithDistributionActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) DistributionServerOption
func WithDistributionCoordinator ¶
func WithDistributionCoordinator(coordinator kv.Coordinator) DistributionServerOption
WithDistributionCoordinator configures the coordinator used for Raft-backed catalog mutations in SplitRange.
type DynamoDBServer ¶
type DynamoDBServer struct {
// contains filtered or unexported fields
}
func NewDynamoDBServer ¶
func NewDynamoDBServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordinator, opts ...DynamoDBServerOption) *DynamoDBServer
func (*DynamoDBServer) AdminCreateTable ¶
func (d *DynamoDBServer) AdminCreateTable(ctx context.Context, principal AdminPrincipal, in AdminCreateTableInput) (*AdminTableSummary, error)
AdminCreateTable creates a Dynamo-compatible table on the local node, after re-validating the principal's role and confirming this node is the verified Raft leader. The returned summary mirrors the shape of AdminDescribeTable on the same name so the SPA can show the freshly-created table without an extra describe round-trip.
Errors:
- ErrAdminForbidden when the principal cannot write.
- ErrAdminNotLeader when the node is a follower.
- The adapter's standard dynamoAPIError chain for validation / storage failures, preserved unmodified so the HTTP handler can map the inner code (ValidationException, ResourceInUseException, etc.) to the appropriate status without re-classifying.
func (*DynamoDBServer) AdminDeleteTable ¶
func (d *DynamoDBServer) AdminDeleteTable(ctx context.Context, principal AdminPrincipal, name string) error
AdminDeleteTable is the SigV4-bypass counterpart to deleteTable. Returns the same sentinel errors as AdminCreateTable plus the adapter's standard dynamoErrResourceNotFound when the table is absent — admin handlers should map that to 404 rather than 500.
func (*DynamoDBServer) AdminDescribeTable ¶
func (d *DynamoDBServer) AdminDescribeTable(ctx context.Context, name string) (*AdminTableSummary, bool, error)
AdminDescribeTable returns a schema snapshot for name. The triple (result, present, error) lets admin callers distinguish a genuine "not found" from a storage error without sniffing sentinels: when the table is missing the function returns (nil, false, nil).
Unlike the SigV4 describeTable handler, AdminDescribeTable does NOT invoke ensureLegacyTableMigration. The admin dashboard is a strictly read-only surface (Gemini medium review on PR #633), so triggering Raft-coordinated key-encoding migrations as a side effect of routine polling would (a) violate the read-only contract and (b) cause every dashboard refresh to write to the cluster. Migration still runs lazily on the next SigV4 read or write of the same table — the schema we return here is just a snapshot for display, not a guarantee that the table is up-to-date for serving.
func (*DynamoDBServer) AdminListTables ¶
func (d *DynamoDBServer) AdminListTables(ctx context.Context) ([]string, error)
AdminListTables returns every Dynamo-style table this server knows about, in the lexicographic order the metadata index produces. Intended for the in-process admin listener as the SigV4-free counterpart to the listTables HTTP handler; both share the same underlying lookup so the two views cannot drift.
func (*DynamoDBServer) Run ¶
func (d *DynamoDBServer) Run() error
func (*DynamoDBServer) Stop ¶
func (d *DynamoDBServer) Stop()
type DynamoDBServerOption ¶
type DynamoDBServerOption func(*DynamoDBServer)
func WithDynamoDBActiveTimestampTracker ¶
func WithDynamoDBActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) DynamoDBServerOption
func WithDynamoDBLeaderMap ¶
func WithDynamoDBLeaderMap(m map[string]string) DynamoDBServerOption
WithDynamoDBLeaderMap configures the Raft-address-to-DynamoDB-address mapping used to forward requests from followers to the current leader. The format mirrors the raftRedisMap / raftS3Map convention.
func WithDynamoDBRequestObserver ¶
func WithDynamoDBRequestObserver(observer monitoring.DynamoDBRequestObserver) DynamoDBServerOption
WithDynamoDBRequestObserver enables Prometheus-compatible request metrics.
type GRPCServer ¶
type GRPCServer struct {
pb.UnimplementedRawKVServer
pb.UnimplementedTransactionalKVServer
// contains filtered or unexported fields
}
func NewGRPCServer ¶
func NewGRPCServer(store store.MVCCStore, coordinate kv.Coordinator, opts ...GRPCServerOption) *GRPCServer
func (*GRPCServer) Close ¶
func (r *GRPCServer) Close() error
func (*GRPCServer) Commit ¶
func (r *GRPCServer) Commit(ctx context.Context, req *pb.CommitRequest) (*pb.CommitResponse, error)
func (*GRPCServer) Delete ¶
func (r *GRPCServer) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteResponse, error)
func (*GRPCServer) Get ¶
func (r *GRPCServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error)
func (*GRPCServer) PreWrite ¶
func (r *GRPCServer) PreWrite(ctx context.Context, req *pb.PreWriteRequest) (*pb.PreCommitResponse, error)
func (*GRPCServer) Put ¶
func (r *GRPCServer) Put(ctx context.Context, req *pb.PutRequest) (*pb.PutResponse, error)
func (*GRPCServer) RawDelete ¶
func (r *GRPCServer) RawDelete(ctx context.Context, req *pb.RawDeleteRequest) (*pb.RawDeleteResponse, error)
func (*GRPCServer) RawGet ¶
func (r *GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawGetResponse, error)
func (*GRPCServer) RawLatestCommitTS ¶
func (r *GRPCServer) RawLatestCommitTS(ctx context.Context, req *pb.RawLatestCommitTSRequest) (*pb.RawLatestCommitTSResponse, error)
func (*GRPCServer) RawPut ¶
func (r *GRPCServer) RawPut(ctx context.Context, req *pb.RawPutRequest) (*pb.RawPutResponse, error)
func (*GRPCServer) RawScanAt ¶
func (r *GRPCServer) RawScanAt(ctx context.Context, req *pb.RawScanAtRequest) (*pb.RawScanAtResponse, error)
func (*GRPCServer) Rollback ¶
func (r *GRPCServer) Rollback(ctx context.Context, req *pb.RollbackRequest) (*pb.RollbackResponse, error)
func (*GRPCServer) Scan ¶
func (r *GRPCServer) Scan(ctx context.Context, req *pb.ScanRequest) (*pb.ScanResponse, error)
type GRPCServerOption ¶
type GRPCServerOption func(*GRPCServer)
func WithCloseStore ¶
func WithCloseStore() GRPCServerOption
type HTFIFOCapabilityPeerStatus ¶
type HTFIFOCapabilityPeerStatus struct {
// Address is the peer's host:port as supplied to the poller.
Address string
// HasHTFIFO is true iff the peer's /sqs_health JSON body's
// capabilities array contained the htfifo capability string.
HasHTFIFO bool
// Capabilities is the parsed capabilities array. Nil on any
// failure before JSON parsing, or non-nil but missing
// htfifo when the peer is on an older binary.
Capabilities []string
// Error is empty on a clean success (HTTP 200 + parseable
// JSON, regardless of whether HasHTFIFO is true) and non-empty
// on any failure (transport error, non-200 status, malformed
// JSON, or context cancellation).
Error string
}
HTFIFOCapabilityPeerStatus is one peer's polling result.
type HTFIFOCapabilityReport ¶
type HTFIFOCapabilityReport struct {
// AllAdvertise is true iff every peer in the input list
// returned a /sqs_health body whose `capabilities` array
// contains the htfifo capability string. False on any timeout,
// HTTP error, malformed body, or missing-capability — the
// gate fails closed.
//
// Vacuously true on an empty peer list. The caller (CreateQueue
// gate) is responsible for ensuring the peer list reflects the
// current cluster membership before consulting this report.
AllAdvertise bool
// Peers is the per-peer status, indexed in input order. Each
// entry has either HasHTFIFO=true (peer advertised the
// capability) or a non-empty Error explaining why the peer
// did not pass. Capabilities is the raw list returned by the
// peer when the body was parseable.
Peers []HTFIFOCapabilityPeerStatus
}
HTFIFOCapabilityReport summarises the result of polling each peer's /sqs_health endpoint for the htfifo capability. Used by the CreateQueue capability gate (Phase 3.D PR 5) and by operator tooling that needs to confirm a rolling upgrade has finished before enabling partitioned FIFO queues.
AllAdvertise is the binary go/no-go signal for the gate; Peers carries per-node detail for log lines and operator triage.
func PollSQSHTFIFOCapability ¶
func PollSQSHTFIFOCapability(ctx context.Context, peers []string, cfg PollerConfig) *HTFIFOCapabilityReport
PollSQSHTFIFOCapability polls each peer's /sqs_health endpoint concurrently and reports whether all advertise htfifo. The helper is stateless — every call dials its peers fresh, so a transient network blip on one call does not poison subsequent calls.
Per-peer behaviour:
- GET http://<peer>/sqs_health with Accept: application/json
- Expect HTTP 200 and a parseable JSON body matching {"status":"ok","capabilities":[...]}.
- HasHTFIFO is the membership of htfifo in capabilities.
- Any failure (transport error, non-200, malformed JSON, timeout, context cancellation) records the reason in Error and leaves HasHTFIFO=false. The poller never returns a fatal error from PollSQSHTFIFOCapability itself; the report carries every per-peer outcome instead.
Concurrency: peers are polled in goroutines; results land via an indexed channel so the slice writes are obviously race-free.
Timeouts: each peer poll is bounded by min(ctx.Deadline(), now+cfg.PerPeerTimeout). A long ctx deadline does not extend the per-peer cap, and an absent ctx deadline still triggers fail-closed at the per-peer cap.
type Internal ¶
type Internal struct {
pb.UnimplementedInternalServer
// contains filtered or unexported fields
}
func NewInternalWithEngine ¶
func NewInternalWithEngine(txm kv.Transactional, leader raftengine.LeaderView, clock *kv.HLC, relay *RedisPubSubRelay) *Internal
func (*Internal) Forward ¶
func (i *Internal) Forward(ctx context.Context, req *pb.ForwardRequest) (*pb.ForwardResponse, error)
func (*Internal) RelayPublish ¶
func (i *Internal) RelayPublish(_ context.Context, req *pb.RelayPublishRequest) (*pb.RelayPublishResponse, error)
type KeyVizSampler ¶
type KeyVizSampler interface {
// Snapshot returns the matrix columns in [from, to). Either
// bound may be the zero time meaning unbounded on that side.
// Implementations must return rows the caller can mutate freely
// (a deep copy) — see keyviz.MemSampler.Snapshot.
Snapshot(from, to time.Time) []keyviz.MatrixColumn
}
KeyVizSampler is the read-side abstraction the Admin service needs from the keyviz package: a time-bounded matrix snapshot. Defined here (not in keyviz) so tests can pass an in-memory fake without constructing a full *keyviz.MemSampler. *keyviz.MemSampler satisfies this interface.
type NodeIdentity ¶
NodeIdentity is the value form of the protobuf NodeIdentity message used for AdminServer configuration. It avoids copying pb.NodeIdentity, which embeds a protoimpl.MessageState (and a mutex).
type PollerConfig ¶
type PollerConfig struct {
// HTTPClient is the client used for /sqs_health GETs. Nil
// falls back to http.DefaultClient. Callers wanting connection
// pooling, custom Transport, or shorter Client.Timeout pass
// their own.
HTTPClient *http.Client
// PerPeerTimeout caps how long any single peer's poll runs
// before being abandoned. Zero defaults to
// defaultSQSCapabilityPollTimeout (3s). Tests pass a small
// value (e.g. 100ms) so the per-peer cap path can be
// exercised quickly without a parent context deadline.
PerPeerTimeout time.Duration
}
PollerConfig tunes PollSQSHTFIFOCapability for a specific call site. All fields are optional — the zero value picks safe defaults. Tests use the explicit PerPeerTimeout to exercise the per-peer cap independently of any caller-supplied context deadline.
type RedisPubSubRelay ¶
type RedisPubSubRelay struct {
// contains filtered or unexported fields
}
RedisPubSubRelay lets the internal gRPC service publish into the local Redis pubsub bus without depending on RedisServer startup order.
func NewRedisPubSubRelay ¶
func NewRedisPubSubRelay() *RedisPubSubRelay
func (*RedisPubSubRelay) Bind ¶
func (r *RedisPubSubRelay) Bind(publish func(channel, message []byte) int64)
func (*RedisPubSubRelay) Publish ¶
func (r *RedisPubSubRelay) Publish(channel, message []byte) int64
type RedisServer ¶
type RedisServer struct {
// contains filtered or unexported fields
}
func NewRedisServer ¶
func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore, coordinate kv.Coordinator, leaderRedis map[string]string, relay *RedisPubSubRelay, opts ...RedisServerOption) *RedisServer
func (*RedisServer) Close ¶
func (r *RedisServer) Close() error
Close cancels the base context, signalling all in-flight handlers to abort. Idempotent. The underlying redcon listener is still owned by the caller; Close does NOT touch it so shutdown orchestration can remain with the server owner.
func (*RedisServer) Run ¶
func (r *RedisServer) Run() error
func (*RedisServer) Stop ¶
func (r *RedisServer) Stop()
type RedisServerOption ¶
type RedisServerOption func(*RedisServer)
func WithLuaFastPathObserver ¶
func WithLuaFastPathObserver(observer monitoring.LuaFastPathObserver) RedisServerOption
WithLuaFastPathObserver enables per-redis.call() fast-path outcome metrics inside Lua scripts. Used to diagnose fast-path hit ratios for commands like ZRANGEBYSCORE / ZSCORE / HGET.
Resolves per-command counter handles up front so the hot path avoids CounterVec.WithLabelValues on every redis.call().
func WithLuaObserver ¶
func WithLuaObserver(observer monitoring.LuaScriptObserver) RedisServerOption
WithLuaObserver enables per-phase Lua script metrics (VM exec, Raft commit, retries).
func WithRedisActiveTimestampTracker ¶
func WithRedisActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) RedisServerOption
func WithRedisCompactor ¶
func WithRedisCompactor(c *DeltaCompactor) RedisServerOption
WithRedisCompactor wires a DeltaCompactor to the RedisServer so that urgent single-key compaction can be triggered when ErrDeltaScanTruncated is hit.
func WithRedisRequestObserver ¶
func WithRedisRequestObserver(observer monitoring.RedisRequestObserver) RedisServerOption
WithRedisRequestObserver enables Prometheus-compatible request metrics.
type S3Server ¶
type S3Server struct {
// contains filtered or unexported fields
}
func NewS3Server ¶
func (*S3Server) AdminCreateBucket ¶
func (s *S3Server) AdminCreateBucket(ctx context.Context, principal AdminPrincipal, name, acl string) (*AdminBucketSummary, error)
AdminCreateBucket creates a bucket on behalf of the admin dashboard. The principal MUST be re-validated by the caller (the admin HTTP handler does this against the live RoleStore); this method enforces the authorisation invariant a second time so a follower-forwarded call cannot smuggle a read-only principal past the check on the leader side (Section 3.2 "認可の真実は常に adapter 側").
The transaction is atomic: bucket meta + generation + ACL all land in a single OperationGroup, mirroring the SigV4 createBucket path. On success returns the freshly-stored summary; on conflict returns ErrAdminBucketAlreadyExists; on a non-leader / non-full-role / bad input returns the corresponding sentinel.
func (*S3Server) AdminDeleteBucket ¶
func (s *S3Server) AdminDeleteBucket(ctx context.Context, principal AdminPrincipal, name string) error
AdminDeleteBucket removes a bucket if it is empty. Same authorisation contract as the other admin write methods. The bucket-must-be-empty rule mirrors the SigV4 deleteBucket path — the dashboard cannot force a recursive delete, by design.
The dispatch happens in two phases because the production coordinator (kv/sharded_coordinator.go: dispatchDelPrefixBroadcast) rejects DEL_PREFIX inside a transaction and rejects DEL_PREFIX mixed with Del or Put in the same OperationGroup:
Phase 1: Del BucketMetaKey in a txn (OCC-protected against
a concurrent AdminCreateBucket landing between our
readTS and commitTS).
Phase 2: DEL_PREFIX over every per-bucket key family in a
non-txn broadcast — the safety net that sweeps
orphans left by any PutObject that committed
chunks/manifest between the empty-probe and the
Phase-1 commit. See design doc
2026_04_28_proposed_admin_delete_bucket_safety_net.md
§6.2 for the original single-OperationGroup design
and the dispatch-shape rejection that forced the
two-phase split.
Phase 2 is best-effort: a Phase-2 failure leaves the bucket meta already deleted (Phase 1 succeeded) but per-bucket prefixes possibly still containing orphans. That state is no worse than the pre-fix behaviour on main and recovers on operator-driven re-cleanup. We log a warning rather than propagate the error so the operator-visible delete reports success — the bucket really is gone from the API surface, and a retry would 404 because loadBucketMetaAt no longer finds the meta.
BucketGenerationKey is intentionally NOT deleted. Re-creating the bucket bumps the generation; orphan blobs that escaped this delete (e.g. on an older generation) stay isolated under the old generation prefix and never surface in the new bucket. Pinned by TestS3Server_AdminDeleteBucket_BucketGenerationKeySurvives.
The contract change for clients: a PutObject that returned 200 OK during the race window can have its data swept by the concurrent delete. Operators are advised to pause writes before AdminDeleteBucket; the alternative (orphan objects that no API can enumerate or remove) is strictly worse.
The same shape is mirrored on the SigV4 path (adapter/s3.go:deleteBucket) so both delete entrypoints share the same race-window guarantees.
func (*S3Server) AdminDescribeBucket ¶
func (s *S3Server) AdminDescribeBucket(ctx context.Context, name string) (*AdminBucketSummary, bool, error)
AdminDescribeBucket returns the bucket-level snapshot for name. The triple (result, present, error) lets admin callers distinguish a genuine "not found" from a storage error without sniffing sentinels — when the bucket is missing the function returns (nil, false, nil), mirroring AdminDescribeTable's contract on the Dynamo side.
Like AdminListBuckets this is a read-only path that bypasses SigV4. The HTTP admin handler enforces session + CSRF + role at the boundary; the adapter trusts the caller for authentication (Section 3.2's exception for read-only paths).
func (*S3Server) AdminListBuckets ¶
func (s *S3Server) AdminListBuckets(ctx context.Context) ([]AdminBucketSummary, error)
AdminListBuckets returns every S3-style bucket this server knows about, in lexicographic order (the metadata-prefix scan natural ordering). Intended for the in-process admin listener as the SigV4-free counterpart to the listBuckets HTTP handler.
Unlike the SigV4 path (which intentionally caps each call at s3MaxKeys = 1000 because the AWS API is page-based), the admin dashboard's pagination is implemented at the handler layer, which expects this method to return the full set. We loop the per-page ScanAt until the metadata prefix is exhausted — same pattern as scanAllByPrefixAt on the Dynamo side (Codex P1 + Claude Issue 1 on PR #658).
Returns an empty slice (not nil) when no buckets exist so JSON callers see `[]` instead of `null`.
func (*S3Server) AdminPutBucketAcl ¶
func (s *S3Server) AdminPutBucketAcl(ctx context.Context, principal AdminPrincipal, name, acl string) error
AdminPutBucketAcl swaps the canned ACL on an existing bucket. Same authorisation contract as AdminCreateBucket. Mutates only the meta.Acl field; generation is preserved so existing object references stay valid.
type S3ServerOption ¶
type S3ServerOption func(*S3Server)
func WithS3ActiveTimestampTracker ¶
func WithS3ActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) S3ServerOption
func WithS3Region ¶
func WithS3Region(region string) S3ServerOption
func WithS3StaticCredentials ¶
func WithS3StaticCredentials(creds map[string]string) S3ServerOption
type SQSPartitionObserver ¶
type SQSPartitionObserver interface {
ObservePartitionMessage(queue string, partition uint32, action string)
}
SQSPartitionObserver is the metrics-package interface (monitoring.SQSPartitionObserver) re-declared here so the adapter does not import monitoring at the package boundary — matches the existing observer pattern for DynamoDB / Redis.
type SQSPartitionResolver ¶
type SQSPartitionResolver struct {
// contains filtered or unexported fields
}
SQSPartitionResolver maps a partitioned-SQS key to the operator- chosen Raft group for the (queue, partition) tuple. Implements kv.PartitionResolver via duck typing — see the integration in main.go where the resolver is installed on ShardedCoordinator.
The byte-range engine cannot route partitioned queues because adding per-partition routes would break its non-overlapping-cover invariant (a partition route for partition K of one queue would leave a gap for legacy keys that fall lexicographically between partitions K and K+1). The resolver-first dispatch path avoids this — it answers only for keys that match a partitioned family prefix and otherwise lets the engine handle dispatch.
func NewSQSPartitionResolver ¶
func NewSQSPartitionResolver(routes map[string][]uint64) *SQSPartitionResolver
NewSQSPartitionResolver builds a resolver from the operator- supplied partition map. routes[queue][k] is the Raft group ID that owns partition k of queue, with len(routes[queue]) equal to the queue's PartitionCount.
Returns nil when routes is empty so callers can keep the resolver out of the request path entirely on a non-partitioned cluster (kv.ShardRouter.WithPartitionResolver(nil) is a documented no-op).
The constructor takes a defensive copy so a later caller mutation to the input map does not leak into the resolver's view at runtime.
func (*SQSPartitionResolver) RecognisesPartitionedKey ¶
func (r *SQSPartitionResolver) RecognisesPartitionedKey(key []byte) bool
RecognisesPartitionedKey reports whether key has the structural shape of a partitioned-SQS key — i.e. starts with one of the partitioned family prefixes. The check is PREFIX-ONLY, not a full parse: a key with a partitioned prefix followed by a malformed queue / partition segment still answers true, so the router fails closed via kv.PartitionResolver semantics instead of falling through to the engine and silently routing to the SQS catalog default group via routeKey's !sqs|route|global collapse (round 5 review nit on PR #715).
A nil receiver returns false so kv.ShardRouter's typed-nil case (ResolveGroup(nil) == (0, false)) pairs with an honest "I don't recognise anything" answer instead of falsely claiming a shape.
func (*SQSPartitionResolver) ResolveGroup ¶
func (r *SQSPartitionResolver) ResolveGroup(key []byte) (uint64, bool)
ResolveGroup decodes the (queue, partition) embedded in a partitioned-SQS key and returns the operator-chosen Raft group.
Returns (0, false) for any key that does not match a partitioned family prefix (legacy SQS, KV, S3, DynamoDB, queue-meta records, …) so kv.ShardRouter falls through to its byte-range engine for default routing.
Returns (0, false) for a partitioned-shaped key whose queue is not in the routes map or whose partition index is beyond len(routes[queue]). The router pairs this with RecognisesPartitionedKey to fail closed instead of falling through — silently routing through the engine's !sqs|route|global default would mis-route HT-FIFO traffic during partition-map drift (codex P1 round 2 on PR #715).
func (*SQSPartitionResolver) RoutedPartitionCount ¶
func (r *SQSPartitionResolver) RoutedPartitionCount(queueName string) int
RoutedPartitionCount returns the number of partition routes configured for queueName, or 0 if the queue is not in the routing map. Used by the CreateQueue capability gate (validateHTFIFOCapability) to verify that EVERY partition of a requested partitioned queue is routable BEFORE the create commits — without this, a queue could land with PartitionCount=N but only K<N routes, and SendMessage on the missing partitions would fail closed at the router with "no route for key" (Codex P1 review on PR #734).
A nil receiver returns 0 so the gate's "resolver==nil → skip the coverage check" branch kicks in cleanly: a single-shard / no---sqsFifoPartitionMap deployment has no per-partition routing to verify, and partitioned keys fall through to the engine's default group.
type SQSQueueDepth ¶
SQSQueueDepth is one queue's depth-attribute snapshot, the unit the SQSServer hands to monitoring.SQSObserver on each tick. The fields mirror sqsApproxCounters byte-for-byte and the public AdminQueueCounters JSON shape — operators see consistent numbers in dashboards and the admin SPA.
type SQSServer ¶
type SQSServer struct {
// contains filtered or unexported fields
}
func NewSQSServer ¶
func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordinator, opts ...SQSServerOption) *SQSServer
func (*SQSServer) AdminDeleteQueue ¶
func (s *SQSServer) AdminDeleteQueue(ctx context.Context, principal AdminPrincipal, name string) error
AdminDeleteQueue is the SigV4-bypass counterpart to deleteQueue. Returns the same sentinel errors as AdminCreateTable on the Dynamo side: ErrAdminForbidden on a read-only principal, ErrAdminNotLeader on a follower, ErrAdminSQSNotFound when the queue is absent.
func (*SQSServer) AdminDescribeQueue ¶
func (s *SQSServer) AdminDescribeQueue(ctx context.Context, name string) (*AdminQueueSummary, bool, error)
AdminDescribeQueue returns a snapshot of name's metadata plus the approximate counters. The triple (result, present, error) lets admin callers distinguish a missing queue from a storage error without sniffing sentinels.
Like AdminDescribeTable on the Dynamo side, this entrypoint runs on either the leader or a follower (read-only); the counter scan uses a fresh nextTxnReadTS so the result is consistent with what SigV4 GetQueueAttributes would have returned at the same instant.
func (*SQSServer) AdminListQueues ¶
AdminListQueues returns every queue name this server knows about, in the lexicographic order the queue catalog index produces. Read path; runs on follower or leader and uses the same scanQueueNames helper the SigV4 ListQueues handler does.
func (*SQSServer) SnapshotQueueDepths ¶
func (s *SQSServer) SnapshotQueueDepths(ctx context.Context) ([]SQSQueueDepth, bool)
SnapshotQueueDepths satisfies monitoring.SQSDepthSource. The observer Start loop calls this on every tick.
Returns:
- (snaps, true) — leader, scrape OK. Observer writes snaps to the gauges and diffs against the previous tick (forgetting any queue that disappeared from this snapshot).
- (nil, true) — this node is a follower (leader-only emission keeps gauges consistent with AdminListQueues / AdminDescribeQueue at the same instant — follower scans would race the leader's writes). Empty-but-OK so the observer ForgetQueue's any gauges this node was emitting before stepping down.
- (nil, false) — leader, but scrape failed (transient catalog-read error or ctx cancel mid-scan). Tells the observer to skip this tick: leave existing gauges in place rather than wiping every depth series — a single failed scrape would otherwise dashboard-render as a false "all queues drained" event until the next successful tick.
Per-queue scan errors (loadQueueMetaAt / scanApproxCounters) remain handled in-line by snapshotOneQueueDepth: the offending queue is dropped from this tick's snapshot but ok stays true, so the observer ForgetQueue's just that one queue's gauges. Only a top-level scanQueueNames failure (which would silently turn into "no queues anywhere") flips ok to false.
type SQSServerOption ¶
type SQSServerOption func(*SQSServer)
func WithSQSLeaderMap ¶
func WithSQSLeaderMap(m map[string]string) SQSServerOption
WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to forward requests from followers to the current leader. Format mirrors WithDynamoDBLeaderMap / WithS3LeaderMap.
func WithSQSPartitionObserver ¶
func WithSQSPartitionObserver(o SQSPartitionObserver) SQSServerOption
WithSQSPartitionObserver installs the elastickv_sqs_partition_messages_total counter observer on the SQS server. Pass nil (the default) on non-monitored test fixtures; the partitioned send / receive / delete paths then observe via a nil interface and the metric stays at zero. The monitoring registry's SQSPartitionObserver() returns the concrete implementation in production.
func WithSQSPartitionResolver ¶
func WithSQSPartitionResolver(r *SQSPartitionResolver) SQSServerOption
WithSQSPartitionResolver installs the cluster's partition resolver on the SQS server so the CreateQueue capability gate (validateHTFIFOCapability) can verify routing coverage before admitting a partitioned create. Pass nil (the default) on single-shard / no---sqsFifoPartitionMap deployments — the gate then skips the coverage check.
Callers must ensure the resolver passed here matches the one installed on the kv coordinator via WithPartitionResolver, otherwise the gate would admit a queue that the coordinator then fails to route. main.go builds the resolver once and hands the same pointer to both consumers.
func WithSQSRegion ¶
func WithSQSRegion(region string) SQSServerOption
WithSQSRegion configures the signing region the adapter expects inside the Credential scope. Empty values retain the previous setting.
func WithSQSStaticCredentials ¶
func WithSQSStaticCredentials(creds map[string]string) SQSServerOption
WithSQSStaticCredentials supplies the access-key → secret map the adapter will accept. Passing an empty map disables authorization entirely (open endpoint), matching the S3 adapter's behavior for unit-test friendliness.
Source Files
¶
- admin_grpc.go
- distribution_server.go
- dynamodb.go
- dynamodb_admin.go
- dynamodb_storage_codec.go
- dynamodb_types.go
- grpc.go
- grpc_transcoder.go
- internal.go
- leader_http_proxy.go
- prefix_scan.go
- pubsub_relay.go
- redis.go
- redis_command_info.go
- redis_command_specs.go
- redis_compat_commands.go
- redis_compat_helpers.go
- redis_compat_types.go
- redis_delta_compactor.go
- redis_key_waiters.go
- redis_lua.go
- redis_lua_context.go
- redis_lua_pool.go
- redis_proxy.go
- redis_pubsub.go
- redis_retry.go
- redis_storage_codec.go
- redis_transcord.go
- s3.go
- s3_admin.go
- s3_auth.go
- s3_chunked.go
- sigv4.go
- sqs.go
- sqs_admin.go
- sqs_auth.go
- sqs_capability_gate.go
- sqs_capability_poller.go
- sqs_catalog.go
- sqs_depth_source.go
- sqs_fifo.go
- sqs_keys.go
- sqs_keys_dispatch.go
- sqs_messages.go
- sqs_messages_batch.go
- sqs_partition_resolver.go
- sqs_partitioning.go
- sqs_purge.go
- sqs_query_protocol.go
- sqs_reaper.go
- sqs_redrive.go
- sqs_tags.go
- sqs_throttle.go
- test_util.go
- ts.go