jobs

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EnvJobsTableName = "APPTHEORY_JOBS_TABLE_NAME"
)

Variables

This section is empty.

Functions

func JobLockSortKey

func JobLockSortKey() string

func JobMetaSortKey

func JobMetaSortKey() string

func JobPartitionKey

func JobPartitionKey(jobID string) string

func JobRecordSortKey

func JobRecordSortKey(recordID string) string

func JobRequestSortKey

func JobRequestSortKey(idempotencyKey string) string

func SanitizeFields

func SanitizeFields(fields map[string]any) map[string]any

SanitizeFields returns a sanitized copy of the provided map suitable for logging or durable storage.

This is safe-by-default for known sensitive keys (PAN/SSN/password/token, etc).

func SanitizeLogString

func SanitizeLogString(value string) string

SanitizeLogString removes control characters that could enable log forging.

func SemaphorePartitionKey added in v0.21.0

func SemaphorePartitionKey(scope, subject string) string

func SemaphoreSlotSortKey added in v0.21.0

func SemaphoreSlotSortKey(slot int) string

Types

type AcquireLeaseInput

type AcquireLeaseInput struct {
	JobID         string
	Owner         string
	LeaseDuration time.Duration
	TTL           time.Duration
}

type AcquireSemaphoreSlotInput added in v0.21.0

type AcquireSemaphoreSlotInput struct {
	Scope         string
	Subject       string
	Limit         int
	Owner         string
	LeaseDuration time.Duration
	TTL           time.Duration
}

type Clock

type Clock interface {
	Now() time.Time
}

type CompleteIdempotencyRecordInput

type CompleteIdempotencyRecordInput struct {
	JobID          string
	IdempotencyKey string

	Result map[string]any
	Error  *ErrorEnvelope

	TTL time.Duration
}

type Config

type Config struct {
	DefaultLeaseDuration    time.Duration
	DefaultIdempotencyTTL   time.Duration
	DefaultJobTTL           time.Duration
	DefaultRecordTTL        time.Duration
	DefaultRequestResultTTL time.Duration
}

func DefaultConfig

func DefaultConfig() *Config

type CreateIdempotencyRecordInput

type CreateIdempotencyRecordInput struct {
	JobID          string
	IdempotencyKey string
	TTL            time.Duration
}

type CreateJobInput

type CreateJobInput struct {
	JobID    string
	TenantID string
	Status   JobStatus

	TTL time.Duration
}

type DynamoJobLedger

type DynamoJobLedger struct {
	// contains filtered or unexported fields
}

func NewDynamoJobLedger

func NewDynamoJobLedger(db tablecore.DB, config *Config) *DynamoJobLedger

func (*DynamoJobLedger) AcquireLease

func (l *DynamoJobLedger) AcquireLease(ctx context.Context, in AcquireLeaseInput) (*JobLock, error)

func (*DynamoJobLedger) AcquireSemaphoreSlot added in v0.21.0

func (l *DynamoJobLedger) AcquireSemaphoreSlot(ctx context.Context, in AcquireSemaphoreSlotInput) (*SemaphoreLease, error)

func (*DynamoJobLedger) CompleteIdempotencyRecord

func (l *DynamoJobLedger) CompleteIdempotencyRecord(ctx context.Context, in CompleteIdempotencyRecordInput) (*JobRequest, error)

func (*DynamoJobLedger) CreateIdempotencyRecord

func (*DynamoJobLedger) CreateJob

func (l *DynamoJobLedger) CreateJob(ctx context.Context, in CreateJobInput) (*JobMeta, error)

func (*DynamoJobLedger) InspectSemaphore added in v0.21.0

func (*DynamoJobLedger) RefreshLease

func (l *DynamoJobLedger) RefreshLease(ctx context.Context, in RefreshLeaseInput) (*JobLock, error)

func (*DynamoJobLedger) RefreshSemaphoreSlot added in v0.21.0

func (l *DynamoJobLedger) RefreshSemaphoreSlot(ctx context.Context, in RefreshSemaphoreSlotInput) (*SemaphoreLease, error)

func (*DynamoJobLedger) ReleaseLease

func (l *DynamoJobLedger) ReleaseLease(ctx context.Context, in ReleaseLeaseInput) error

func (*DynamoJobLedger) ReleaseSemaphoreSlot added in v0.21.0

func (l *DynamoJobLedger) ReleaseSemaphoreSlot(ctx context.Context, in ReleaseSemaphoreSlotInput) error

func (*DynamoJobLedger) SetClock

func (l *DynamoJobLedger) SetClock(clock Clock)

func (*DynamoJobLedger) TransitionJobStatus

func (l *DynamoJobLedger) TransitionJobStatus(ctx context.Context, in TransitionJobStatusInput) (*JobMeta, error)

func (*DynamoJobLedger) UpsertRecordStatus

func (l *DynamoJobLedger) UpsertRecordStatus(ctx context.Context, in UpsertRecordStatusInput) (*JobRecord, error)

type Error

type Error struct {
	Type    ErrorType
	Message string
	Cause   error
}

func NewError

func NewError(errorType ErrorType, message string) *Error

func WrapError

func WrapError(cause error, errorType ErrorType, message string) *Error

func (*Error) Error

func (e *Error) Error() string

func (*Error) Unwrap

func (e *Error) Unwrap() error

type ErrorEnvelope

type ErrorEnvelope struct {
	Type      string         `json:"type,omitempty"`
	Code      string         `json:"code,omitempty"`
	Message   string         `json:"message"`
	Retryable bool           `json:"retryable,omitempty"`
	Fields    map[string]any `json:"fields,omitempty"`
}

func ErrorEnvelopeFromError

func ErrorEnvelopeFromError(err error, fields map[string]any) *ErrorEnvelope

func NewErrorEnvelope

func NewErrorEnvelope(message string, fields map[string]any) *ErrorEnvelope

type ErrorType

type ErrorType string
const (
	ErrorTypeInternal     ErrorType = "internal_error"
	ErrorTypeInvalidInput ErrorType = "invalid_input"
	ErrorTypeConflict     ErrorType = "conflict"
	ErrorTypeNotFound     ErrorType = "not_found"
)

type IdempotencyCreateOutcome

type IdempotencyCreateOutcome string
const (
	IdempotencyOutcomeCreated           IdempotencyCreateOutcome = "created"
	IdempotencyOutcomeAlreadyInProgress IdempotencyCreateOutcome = "already_in_progress"
	IdempotencyOutcomeAlreadyCompleted  IdempotencyCreateOutcome = "already_completed"
)

type IdempotencyStatus

type IdempotencyStatus string
const (
	IdempotencyStatusInProgress IdempotencyStatus = "IN_PROGRESS"
	IdempotencyStatusCompleted  IdempotencyStatus = "COMPLETED"
)

type InspectSemaphoreInput added in v0.21.0

type InspectSemaphoreInput struct {
	Scope   string
	Subject string
}

type JobLedger

type JobLedger interface {
	CreateJob(ctx context.Context, in CreateJobInput) (*JobMeta, error)
	TransitionJobStatus(ctx context.Context, in TransitionJobStatusInput) (*JobMeta, error)

	UpsertRecordStatus(ctx context.Context, in UpsertRecordStatusInput) (*JobRecord, error)

	AcquireLease(ctx context.Context, in AcquireLeaseInput) (*JobLock, error)
	RefreshLease(ctx context.Context, in RefreshLeaseInput) (*JobLock, error)
	ReleaseLease(ctx context.Context, in ReleaseLeaseInput) error

	AcquireSemaphoreSlot(ctx context.Context, in AcquireSemaphoreSlotInput) (*SemaphoreLease, error)
	RefreshSemaphoreSlot(ctx context.Context, in RefreshSemaphoreSlotInput) (*SemaphoreLease, error)
	ReleaseSemaphoreSlot(ctx context.Context, in ReleaseSemaphoreSlotInput) error
	InspectSemaphore(ctx context.Context, in InspectSemaphoreInput) (*SemaphoreInspection, error)

	CreateIdempotencyRecord(ctx context.Context, in CreateIdempotencyRecordInput) (*JobRequest, IdempotencyCreateOutcome, error)
	CompleteIdempotencyRecord(ctx context.Context, in CompleteIdempotencyRecordInput) (*JobRequest, error)
}

type JobLock

type JobLock struct {
	PK string `theorydb:"pk" json:"pk"`
	SK string `theorydb:"sk" json:"sk"`

	JobID string `json:"job_id"`

	LeaseOwner     string `json:"lease_owner"`
	LeaseExpiresAt int64  `json:"lease_expires_at"`

	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`

	TTL int64 `json:"ttl,omitempty" theorydb:"ttl,omitempty"`
	// contains filtered or unexported fields
}

func NewJobLock

func NewJobLock(jobID string) JobLock

func (*JobLock) SetKeys

func (l *JobLock) SetKeys()

func (JobLock) TableName

func (JobLock) TableName() string

type JobMeta

type JobMeta struct {
	PK string `theorydb:"pk" json:"pk"`
	SK string `theorydb:"sk" json:"sk"`

	JobID    string    `json:"job_id"`
	TenantID string    `json:"tenant_id" theorydb:"index:tenant-created-index,pk"`
	Status   JobStatus `json:"status" theorydb:"index:status-created-index,pk"`

	CreatedAt time.Time `json:"created_at" theorydb:"index:tenant-created-index,sk,index:status-created-index,sk"`
	UpdatedAt time.Time `json:"updated_at"`

	Version int64 `json:"version" theorydb:"version"`

	TTL int64 `json:"ttl,omitempty" theorydb:"ttl,omitempty"`
	// contains filtered or unexported fields
}

func NewJobMeta

func NewJobMeta(jobID string) JobMeta

func (*JobMeta) SetKeys

func (j *JobMeta) SetKeys()

func (JobMeta) TableName

func (JobMeta) TableName() string

type JobRecord

type JobRecord struct {
	PK string `theorydb:"pk" json:"pk"`
	SK string `theorydb:"sk" json:"sk"`

	JobID    string       `json:"job_id"`
	RecordID string       `json:"record_id"`
	Status   RecordStatus `json:"status"`

	Error *ErrorEnvelope `json:"error,omitempty" theorydb:"omitempty"`

	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`

	TTL int64 `json:"ttl,omitempty" theorydb:"ttl,omitempty"`
	// contains filtered or unexported fields
}

func NewJobRecord

func NewJobRecord(jobID, recordID string) JobRecord

func (*JobRecord) SetKeys

func (r *JobRecord) SetKeys()

func (JobRecord) TableName

func (JobRecord) TableName() string

type JobRequest

type JobRequest struct {
	PK string `theorydb:"pk" json:"pk"`
	SK string `theorydb:"sk" json:"sk"`

	JobID          string            `json:"job_id"`
	IdempotencyKey string            `json:"idempotency_key"`
	Status         IdempotencyStatus `json:"status"`

	Result map[string]any `json:"result,omitempty" theorydb:"omitempty"`
	Error  *ErrorEnvelope `json:"error,omitempty" theorydb:"omitempty"`

	CreatedAt   time.Time `json:"created_at"`
	UpdatedAt   time.Time `json:"updated_at"`
	CompletedAt time.Time `json:"completed_at,omitempty" theorydb:"omitempty"`

	TTL int64 `json:"ttl,omitempty" theorydb:"ttl,omitempty"`
	// contains filtered or unexported fields
}

func NewJobRequest

func NewJobRequest(jobID, idempotencyKey string) JobRequest

func (*JobRequest) SetKeys

func (r *JobRequest) SetKeys()

func (*JobRequest) String

func (r *JobRequest) String() string

func (JobRequest) TableName

func (JobRequest) TableName() string

type JobStatus

type JobStatus string
const (
	JobStatusPending   JobStatus = "PENDING"
	JobStatusRunning   JobStatus = "RUNNING"
	JobStatusSucceeded JobStatus = "SUCCEEDED"
	JobStatusFailed    JobStatus = "FAILED"
	JobStatusCanceled  JobStatus = "CANCELED"
)

type RealClock

type RealClock struct{}

func (RealClock) Now

func (RealClock) Now() time.Time

type RecordStatus

type RecordStatus string
const (
	RecordStatusPending    RecordStatus = "PENDING"
	RecordStatusProcessing RecordStatus = "PROCESSING"
	RecordStatusSucceeded  RecordStatus = "SUCCEEDED"
	RecordStatusFailed     RecordStatus = "FAILED"
	RecordStatusSkipped    RecordStatus = "SKIPPED"
)

type RefreshLeaseInput

type RefreshLeaseInput struct {
	JobID         string
	Owner         string
	LeaseDuration time.Duration
	TTL           time.Duration
}

type RefreshSemaphoreSlotInput added in v0.21.0

type RefreshSemaphoreSlotInput struct {
	Scope         string
	Subject       string
	Slot          int
	Owner         string
	LeaseDuration time.Duration
	TTL           time.Duration
}

type ReleaseLeaseInput

type ReleaseLeaseInput struct {
	JobID string
	Owner string
}

type ReleaseSemaphoreSlotInput added in v0.21.0

type ReleaseSemaphoreSlotInput struct {
	Scope   string
	Subject string
	Slot    int
	Owner   string
}

type SemaphoreInspection added in v0.21.0

type SemaphoreInspection struct {
	Scope        string           `json:"scope"`
	Subject      string           `json:"subject"`
	Occupancy    int              `json:"occupancy"`
	ActiveLeases []SemaphoreLease `json:"active_leases"`
}

type SemaphoreLease added in v0.21.0

type SemaphoreLease struct {
	PK string `theorydb:"pk" json:"pk"`
	SK string `theorydb:"sk" json:"sk"`

	Scope   string `json:"scope"`
	Subject string `json:"subject"`
	Slot    int    `json:"slot"`

	LeaseOwner     string `json:"lease_owner"`
	LeaseExpiresAt int64  `json:"lease_expires_at"`

	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`

	TTL int64 `json:"ttl,omitempty" theorydb:"ttl,omitempty"`
	// contains filtered or unexported fields
}

func NewSemaphoreLease added in v0.21.0

func NewSemaphoreLease(scope, subject string, slot int) SemaphoreLease

func (*SemaphoreLease) SetKeys added in v0.21.0

func (l *SemaphoreLease) SetKeys()

func (SemaphoreLease) TableName added in v0.21.0

func (SemaphoreLease) TableName() string

type TransitionJobStatusInput

type TransitionJobStatusInput struct {
	JobID           string
	ExpectedVersion int64
	ToStatus        JobStatus

	FromStatus JobStatus
}

type UpsertRecordStatusInput

type UpsertRecordStatusInput struct {
	JobID    string
	RecordID string
	Status   RecordStatus
	Error    *ErrorEnvelope

	TTL time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL