lease

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

internal/lease/lease.go

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ApplyRuntimeToProto

func ApplyRuntimeToProto(rt LeaseRuntime, ar *messagepb.Message_Metadata_AttemptRuntime)

ApplyRuntimeToProto writes the LeaseRuntime back into the AttemptRuntime proto.

func GenerateAttemptID

func GenerateAttemptID(messageID, workerID string, now time.Time) string

GenerateAttemptID creates a compact, collision-resistant attempt ID. Format: att_<timestamp><hash> (20 chars total) - Timestamp: base32-encoded milliseconds (sortable) - Hash: 6-char base32 hash of (messageID + workerID + nanos + random)

Types

type HeartbeatResult

type HeartbeatResult struct {
	// True if this heartbeat extended the lease.
	Extended bool

	// Timeouts observed at the moment heartbeat was processed.
	LeaseTimedOut     bool
	HeartbeatTimedOut bool

	// If true, caller should treat this attempt as failed / timed out
	// (e.g., trigger retry/failure logic).
	ShouldFail bool
}

HeartbeatResult describes what happened when applying a heartbeat.

func HandleHeartbeat

HandleHeartbeat is a high-level helper you can call in your heartbeat RPC:

  • Merges queue + message lease policy.
  • Loads current runtime from metadata.current_attempt.
  • Applies heartbeat semantics.
  • Writes updated runtime back into metadata.current_attempt.
  • Returns HeartbeatResult so you can decide whether to fail/retry.

type LeasePolicy

type LeasePolicy struct {
	// BaseTimeout is the initial lease (L_base).
	BaseTimeout time.Duration

	// MaxExtension is the total extra lease time allowed (L_maxExt).
	// Effective hard cap per attempt is BaseTimeout + MaxExtension.
	MaxExtension time.Duration

	// HeartbeatTimeout is the max gap between heartbeats (H).
	// If 0, heartbeat timeout is disabled.
	HeartbeatTimeout time.Duration

	// ExtendStep is the amount to extend the lease by on each heartbeat (Δ),
	// until MaxExtension is exhausted.
	ExtendStep time.Duration
}

LeasePolicy is the internal, computed policy for a single attempt. It merges queue-level and message-level LeasePolicy protos.

func LeasePolicyFromCommon

func LeasePolicyFromCommon(lp *commonpb.LeasePolicy) LeasePolicy

func MergeLeasePolicy

func MergeLeasePolicy(
	q *queuepb.QueueMetadata,
	m *messagepb.Message_Metadata,
	workerBaseHint time.Duration,
) LeasePolicy

MergeLeasePolicy builds an effective LeasePolicy by merging a queue-level LeasePolicy and an optional per-message LeasePolicy override.

Semantics:

  • Start from queue.metadata.lease_policy.
  • For each field set in message.metadata.lease_policy, override only that field.
  • Apply internal defaults if still zero (e.g., BaseTimeout, ExtendStep).

func (LeasePolicy) ApplyHeartbeat

func (p LeasePolicy) ApplyHeartbeat(rt *LeaseRuntime, now time.Time) HeartbeatResult

ApplyHeartbeat updates runtime according to the policy when a heartbeat is received at time `now`.

It:

  • Checks if the attempt is already timed out.
  • Updates LastHeartbeat (+ HeartbeatExpiry if enabled).
  • Extends LeaseExpiry in increments of ExtendStep, bounded by MaxExtension.

func (LeasePolicy) CheckTimeouts

func (p LeasePolicy) CheckTimeouts(rt LeaseRuntime, now time.Time) TimeoutStatus

CheckTimeouts determines if the attempt has timed out as of `now`.

func (LeasePolicy) InitRuntime

func (p LeasePolicy) InitRuntime(now time.Time, attemptID, workerID string) LeaseRuntime

InitRuntime initializes LeaseRuntime for a *new attempt* using the effective LeasePolicy, attemptID, and workerID.

type LeaseRuntime

type LeaseRuntime struct {
	AttemptID string
	WorkerID  string

	// When this attempt acquired the lease.
	LeaseStart time.Time

	// Current lease deadline for this attempt.
	LeaseExpiry time.Time

	// Total extension used so far beyond BaseTimeout.
	ExtensionUsed time.Duration

	// How many times we successfully extended the lease.
	RenewalCount int32

	// Last successful heartbeat.
	LastHeartbeat time.Time

	// Deadline for heartbeat timeout (LastHeartbeat + HeartbeatTimeout).
	// Zero if heartbeat timeout disabled.
	HeartbeatExpiry time.Time
}

LeaseRuntime is the internal runtime state for the current attempt.

func RuntimeFromProto

RuntimeFromProto builds a LeaseRuntime from the nested AttemptRuntime proto. If ar is nil, returns a zero-valued runtime.

type TimeoutStatus

type TimeoutStatus struct {
	LeaseTimedOut     bool
	HeartbeatTimedOut bool
	ExpiredAt         time.Time
}

TimeoutStatus describes timeout checks at a given instant.

func HandleReclaim

HandleReclaim is a high-level helper you can use in a reclaimer/timer worker.

  • Merges lease policy.
  • Loads runtime from current_attempt.
  • Checks for timeouts.
  • Returns status so you can apply retry/failure logic on the message.

It does NOT mutate the message; you decide how to change state/attempts.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL