Documentation
¶
Overview ¶
internal/lease/lease.go
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ApplyRuntimeToProto ¶
func ApplyRuntimeToProto(rt LeaseRuntime, ar *messagepb.Message_Metadata_AttemptRuntime)
ApplyRuntimeToProto writes the LeaseRuntime back into the AttemptRuntime proto.
func GenerateAttemptID ¶
GenerateAttemptID creates a compact, collision-resistant attempt ID. Format: att_<timestamp><hash> (20 chars total) - Timestamp: base32-encoded milliseconds (sortable) - Hash: 6-char base32 hash of (messageID + workerID + nanos + random)
Types ¶
type HeartbeatResult ¶
type HeartbeatResult struct {
// True if this heartbeat extended the lease.
Extended bool
// Timeouts observed at the moment heartbeat was processed.
LeaseTimedOut bool
HeartbeatTimedOut bool
// If true, caller should treat this attempt as failed / timed out
// (e.g., trigger retry/failure logic).
ShouldFail bool
}
HeartbeatResult describes what happened when applying a heartbeat.
func HandleHeartbeat ¶
func HandleHeartbeat( q *queuepb.QueueMetadata, m *messagepb.Message_Metadata, now time.Time, ) HeartbeatResult
HandleHeartbeat is a high-level helper you can call in your heartbeat RPC:
- Merges queue + message lease policy.
- Loads current runtime from metadata.current_attempt.
- Applies heartbeat semantics.
- Writes updated runtime back into metadata.current_attempt.
- Returns HeartbeatResult so you can decide whether to fail/retry.
type LeasePolicy ¶
type LeasePolicy struct {
// BaseTimeout is the initial lease (L_base).
BaseTimeout time.Duration
// MaxExtension is the total extra lease time allowed (L_maxExt).
// Effective hard cap per attempt is BaseTimeout + MaxExtension.
MaxExtension time.Duration
// HeartbeatTimeout is the max gap between heartbeats (H).
// If 0, heartbeat timeout is disabled.
HeartbeatTimeout time.Duration
// ExtendStep is the amount to extend the lease by on each heartbeat (Δ),
// until MaxExtension is exhausted.
ExtendStep time.Duration
}
LeasePolicy is the internal, computed policy for a single attempt. It merges queue-level and message-level LeasePolicy protos.
func LeasePolicyFromCommon ¶
func LeasePolicyFromCommon(lp *commonpb.LeasePolicy) LeasePolicy
func MergeLeasePolicy ¶
func MergeLeasePolicy( q *queuepb.QueueMetadata, m *messagepb.Message_Metadata, workerBaseHint time.Duration, ) LeasePolicy
MergeLeasePolicy builds an effective LeasePolicy by merging a queue-level LeasePolicy and an optional per-message LeasePolicy override.
Semantics:
- Start from queue.metadata.lease_policy.
- For each field set in message.metadata.lease_policy, override only that field.
- Apply internal defaults if still zero (e.g., BaseTimeout, ExtendStep).
func (LeasePolicy) ApplyHeartbeat ¶
func (p LeasePolicy) ApplyHeartbeat(rt *LeaseRuntime, now time.Time) HeartbeatResult
ApplyHeartbeat updates runtime according to the policy when a heartbeat is received at time `now`.
It:
- Checks if the attempt is already timed out.
- Updates LastHeartbeat (+ HeartbeatExpiry if enabled).
- Extends LeaseExpiry in increments of ExtendStep, bounded by MaxExtension.
func (LeasePolicy) CheckTimeouts ¶
func (p LeasePolicy) CheckTimeouts(rt LeaseRuntime, now time.Time) TimeoutStatus
CheckTimeouts determines if the attempt has timed out as of `now`.
func (LeasePolicy) InitRuntime ¶
func (p LeasePolicy) InitRuntime(now time.Time, attemptID, workerID string) LeaseRuntime
InitRuntime initializes LeaseRuntime for a *new attempt* using the effective LeasePolicy, attemptID, and workerID.
type LeaseRuntime ¶
type LeaseRuntime struct {
AttemptID string
WorkerID string
// When this attempt acquired the lease.
LeaseStart time.Time
// Current lease deadline for this attempt.
LeaseExpiry time.Time
// Total extension used so far beyond BaseTimeout.
ExtensionUsed time.Duration
// How many times we successfully extended the lease.
RenewalCount int32
// Last successful heartbeat.
LastHeartbeat time.Time
// Deadline for heartbeat timeout (LastHeartbeat + HeartbeatTimeout).
// Zero if heartbeat timeout disabled.
HeartbeatExpiry time.Time
}
LeaseRuntime is the internal runtime state for the current attempt.
func RuntimeFromProto ¶
func RuntimeFromProto(ar *messagepb.Message_Metadata_AttemptRuntime) LeaseRuntime
RuntimeFromProto builds a LeaseRuntime from the nested AttemptRuntime proto. If ar is nil, returns a zero-valued runtime.
type TimeoutStatus ¶
TimeoutStatus describes timeout checks at a given instant.
func HandleReclaim ¶
func HandleReclaim( q *queuepb.QueueMetadata, m *messagepb.Message_Metadata, now time.Time, ) TimeoutStatus
HandleReclaim is a high-level helper you can use in a reclaimer/timer worker.
- Merges lease policy.
- Loads runtime from current_attempt.
- Checks for timeouts.
- Returns status so you can apply retry/failure logic on the message.
It does NOT mutate the message; you decide how to change state/attempts.