Documentation
¶
Index ¶
- Constants
- func JobLockSortKey() string
- func JobMetaSortKey() string
- func JobPartitionKey(jobID string) string
- func JobRecordSortKey(recordID string) string
- func JobRequestSortKey(idempotencyKey string) string
- func SanitizeFields(fields map[string]any) map[string]any
- func SanitizeLogString(value string) string
- func SemaphorePartitionKey(scope, subject string) string
- func SemaphoreSlotSortKey(slot int) string
- type AcquireLeaseInput
- type AcquireSemaphoreSlotInput
- type Clock
- type CompleteIdempotencyRecordInput
- type Config
- type CreateIdempotencyRecordInput
- type CreateJobInput
- type DynamoJobLedger
- func (l *DynamoJobLedger) AcquireLease(ctx context.Context, in AcquireLeaseInput) (*JobLock, error)
- func (l *DynamoJobLedger) AcquireSemaphoreSlot(ctx context.Context, in AcquireSemaphoreSlotInput) (*SemaphoreLease, error)
- func (l *DynamoJobLedger) CompleteIdempotencyRecord(ctx context.Context, in CompleteIdempotencyRecordInput) (*JobRequest, error)
- func (l *DynamoJobLedger) CreateIdempotencyRecord(ctx context.Context, in CreateIdempotencyRecordInput) (*JobRequest, IdempotencyCreateOutcome, error)
- func (l *DynamoJobLedger) CreateJob(ctx context.Context, in CreateJobInput) (*JobMeta, error)
- func (l *DynamoJobLedger) InspectSemaphore(ctx context.Context, in InspectSemaphoreInput) (*SemaphoreInspection, error)
- func (l *DynamoJobLedger) RefreshLease(ctx context.Context, in RefreshLeaseInput) (*JobLock, error)
- func (l *DynamoJobLedger) RefreshSemaphoreSlot(ctx context.Context, in RefreshSemaphoreSlotInput) (*SemaphoreLease, error)
- func (l *DynamoJobLedger) ReleaseLease(ctx context.Context, in ReleaseLeaseInput) error
- func (l *DynamoJobLedger) ReleaseSemaphoreSlot(ctx context.Context, in ReleaseSemaphoreSlotInput) error
- func (l *DynamoJobLedger) SetClock(clock Clock)
- func (l *DynamoJobLedger) TransitionJobStatus(ctx context.Context, in TransitionJobStatusInput) (*JobMeta, error)
- func (l *DynamoJobLedger) UpsertRecordStatus(ctx context.Context, in UpsertRecordStatusInput) (*JobRecord, error)
- type Error
- type ErrorEnvelope
- type ErrorType
- type IdempotencyCreateOutcome
- type IdempotencyStatus
- type InspectSemaphoreInput
- type JobLedger
- type JobLock
- type JobMeta
- type JobRecord
- type JobRequest
- type JobStatus
- type RealClock
- type RecordStatus
- type RefreshLeaseInput
- type RefreshSemaphoreSlotInput
- type ReleaseLeaseInput
- type ReleaseSemaphoreSlotInput
- type SemaphoreInspection
- type SemaphoreLease
- type TransitionJobStatusInput
- type UpsertRecordStatusInput
Constants ¶
View Source
const (
EnvJobsTableName = "APPTHEORY_JOBS_TABLE_NAME"
)
Variables ¶
This section is empty.
Functions ¶
func JobLockSortKey ¶
func JobLockSortKey() string
func JobMetaSortKey ¶
func JobMetaSortKey() string
func JobPartitionKey ¶
func JobRecordSortKey ¶
func JobRequestSortKey ¶
func SanitizeFields ¶
SanitizeFields returns a sanitized copy of the provided map suitable for logging or durable storage.
This is safe-by-default for known sensitive keys (PAN/SSN/password/token, etc).
func SanitizeLogString ¶
SanitizeLogString removes control characters that could enable log forging.
func SemaphorePartitionKey ¶ added in v0.21.0
func SemaphoreSlotSortKey ¶ added in v0.21.0
Types ¶
type AcquireLeaseInput ¶
type AcquireSemaphoreSlotInput ¶ added in v0.21.0
type Config ¶
type Config struct {
DefaultLeaseDuration time.Duration
DefaultIdempotencyTTL time.Duration
DefaultJobTTL time.Duration
DefaultRecordTTL time.Duration
DefaultRequestResultTTL time.Duration
}
func DefaultConfig ¶
func DefaultConfig() *Config
type CreateJobInput ¶
type DynamoJobLedger ¶
type DynamoJobLedger struct {
// contains filtered or unexported fields
}
func NewDynamoJobLedger ¶
func NewDynamoJobLedger(db tablecore.DB, config *Config) *DynamoJobLedger
func (*DynamoJobLedger) AcquireLease ¶
func (l *DynamoJobLedger) AcquireLease(ctx context.Context, in AcquireLeaseInput) (*JobLock, error)
func (*DynamoJobLedger) AcquireSemaphoreSlot ¶ added in v0.21.0
func (l *DynamoJobLedger) AcquireSemaphoreSlot(ctx context.Context, in AcquireSemaphoreSlotInput) (*SemaphoreLease, error)
func (*DynamoJobLedger) CompleteIdempotencyRecord ¶
func (l *DynamoJobLedger) CompleteIdempotencyRecord(ctx context.Context, in CompleteIdempotencyRecordInput) (*JobRequest, error)
func (*DynamoJobLedger) CreateIdempotencyRecord ¶
func (l *DynamoJobLedger) CreateIdempotencyRecord(ctx context.Context, in CreateIdempotencyRecordInput) (*JobRequest, IdempotencyCreateOutcome, error)
func (*DynamoJobLedger) CreateJob ¶
func (l *DynamoJobLedger) CreateJob(ctx context.Context, in CreateJobInput) (*JobMeta, error)
func (*DynamoJobLedger) InspectSemaphore ¶ added in v0.21.0
func (l *DynamoJobLedger) InspectSemaphore(ctx context.Context, in InspectSemaphoreInput) (*SemaphoreInspection, error)
func (*DynamoJobLedger) RefreshLease ¶
func (l *DynamoJobLedger) RefreshLease(ctx context.Context, in RefreshLeaseInput) (*JobLock, error)
func (*DynamoJobLedger) RefreshSemaphoreSlot ¶ added in v0.21.0
func (l *DynamoJobLedger) RefreshSemaphoreSlot(ctx context.Context, in RefreshSemaphoreSlotInput) (*SemaphoreLease, error)
func (*DynamoJobLedger) ReleaseLease ¶
func (l *DynamoJobLedger) ReleaseLease(ctx context.Context, in ReleaseLeaseInput) error
func (*DynamoJobLedger) ReleaseSemaphoreSlot ¶ added in v0.21.0
func (l *DynamoJobLedger) ReleaseSemaphoreSlot(ctx context.Context, in ReleaseSemaphoreSlotInput) error
func (*DynamoJobLedger) SetClock ¶
func (l *DynamoJobLedger) SetClock(clock Clock)
func (*DynamoJobLedger) TransitionJobStatus ¶
func (l *DynamoJobLedger) TransitionJobStatus(ctx context.Context, in TransitionJobStatusInput) (*JobMeta, error)
func (*DynamoJobLedger) UpsertRecordStatus ¶
func (l *DynamoJobLedger) UpsertRecordStatus(ctx context.Context, in UpsertRecordStatusInput) (*JobRecord, error)
type ErrorEnvelope ¶
type ErrorEnvelope struct {
Type string `json:"type,omitempty"`
Code string `json:"code,omitempty"`
Message string `json:"message"`
Retryable bool `json:"retryable,omitempty"`
Fields map[string]any `json:"fields,omitempty"`
}
func ErrorEnvelopeFromError ¶
func ErrorEnvelopeFromError(err error, fields map[string]any) *ErrorEnvelope
func NewErrorEnvelope ¶
func NewErrorEnvelope(message string, fields map[string]any) *ErrorEnvelope
type IdempotencyCreateOutcome ¶
type IdempotencyCreateOutcome string
const ( IdempotencyOutcomeCreated IdempotencyCreateOutcome = "created" IdempotencyOutcomeAlreadyInProgress IdempotencyCreateOutcome = "already_in_progress" IdempotencyOutcomeAlreadyCompleted IdempotencyCreateOutcome = "already_completed" )
type IdempotencyStatus ¶
type IdempotencyStatus string
const ( IdempotencyStatusInProgress IdempotencyStatus = "IN_PROGRESS" IdempotencyStatusCompleted IdempotencyStatus = "COMPLETED" )
type InspectSemaphoreInput ¶ added in v0.21.0
type JobLedger ¶
type JobLedger interface {
CreateJob(ctx context.Context, in CreateJobInput) (*JobMeta, error)
TransitionJobStatus(ctx context.Context, in TransitionJobStatusInput) (*JobMeta, error)
UpsertRecordStatus(ctx context.Context, in UpsertRecordStatusInput) (*JobRecord, error)
AcquireLease(ctx context.Context, in AcquireLeaseInput) (*JobLock, error)
RefreshLease(ctx context.Context, in RefreshLeaseInput) (*JobLock, error)
ReleaseLease(ctx context.Context, in ReleaseLeaseInput) error
AcquireSemaphoreSlot(ctx context.Context, in AcquireSemaphoreSlotInput) (*SemaphoreLease, error)
RefreshSemaphoreSlot(ctx context.Context, in RefreshSemaphoreSlotInput) (*SemaphoreLease, error)
ReleaseSemaphoreSlot(ctx context.Context, in ReleaseSemaphoreSlotInput) error
InspectSemaphore(ctx context.Context, in InspectSemaphoreInput) (*SemaphoreInspection, error)
CreateIdempotencyRecord(ctx context.Context, in CreateIdempotencyRecordInput) (*JobRequest, IdempotencyCreateOutcome, error)
CompleteIdempotencyRecord(ctx context.Context, in CompleteIdempotencyRecordInput) (*JobRequest, error)
}
type JobLock ¶
type JobLock struct {
PK string `theorydb:"pk" json:"pk"`
SK string `theorydb:"sk" json:"sk"`
JobID string `json:"job_id"`
LeaseOwner string `json:"lease_owner"`
LeaseExpiresAt int64 `json:"lease_expires_at"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
TTL int64 `json:"ttl,omitempty" theorydb:"ttl,omitempty"`
// contains filtered or unexported fields
}
func NewJobLock ¶
type JobMeta ¶
type JobMeta struct {
PK string `theorydb:"pk" json:"pk"`
SK string `theorydb:"sk" json:"sk"`
JobID string `json:"job_id"`
TenantID string `json:"tenant_id" theorydb:"index:tenant-created-index,pk"`
Status JobStatus `json:"status" theorydb:"index:status-created-index,pk"`
CreatedAt time.Time `json:"created_at" theorydb:"index:tenant-created-index,sk,index:status-created-index,sk"`
UpdatedAt time.Time `json:"updated_at"`
Version int64 `json:"version" theorydb:"version"`
TTL int64 `json:"ttl,omitempty" theorydb:"ttl,omitempty"`
// contains filtered or unexported fields
}
func NewJobMeta ¶
type JobRecord ¶
type JobRecord struct {
PK string `theorydb:"pk" json:"pk"`
SK string `theorydb:"sk" json:"sk"`
JobID string `json:"job_id"`
RecordID string `json:"record_id"`
Status RecordStatus `json:"status"`
Error *ErrorEnvelope `json:"error,omitempty" theorydb:"omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
TTL int64 `json:"ttl,omitempty" theorydb:"ttl,omitempty"`
// contains filtered or unexported fields
}
func NewJobRecord ¶
type JobRequest ¶
type JobRequest struct {
PK string `theorydb:"pk" json:"pk"`
SK string `theorydb:"sk" json:"sk"`
JobID string `json:"job_id"`
IdempotencyKey string `json:"idempotency_key"`
Status IdempotencyStatus `json:"status"`
Result map[string]any `json:"result,omitempty" theorydb:"omitempty"`
Error *ErrorEnvelope `json:"error,omitempty" theorydb:"omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CompletedAt time.Time `json:"completed_at,omitempty" theorydb:"omitempty"`
TTL int64 `json:"ttl,omitempty" theorydb:"ttl,omitempty"`
// contains filtered or unexported fields
}
func NewJobRequest ¶
func NewJobRequest(jobID, idempotencyKey string) JobRequest
func (*JobRequest) SetKeys ¶
func (r *JobRequest) SetKeys()
func (*JobRequest) String ¶
func (r *JobRequest) String() string
type RecordStatus ¶
type RecordStatus string
const ( RecordStatusPending RecordStatus = "PENDING" RecordStatusProcessing RecordStatus = "PROCESSING" RecordStatusSucceeded RecordStatus = "SUCCEEDED" RecordStatusFailed RecordStatus = "FAILED" RecordStatusSkipped RecordStatus = "SKIPPED" )
type RefreshLeaseInput ¶
type RefreshSemaphoreSlotInput ¶ added in v0.21.0
type ReleaseLeaseInput ¶
type ReleaseSemaphoreSlotInput ¶ added in v0.21.0
type SemaphoreInspection ¶ added in v0.21.0
type SemaphoreInspection struct {
Scope string `json:"scope"`
Subject string `json:"subject"`
Occupancy int `json:"occupancy"`
ActiveLeases []SemaphoreLease `json:"active_leases"`
}
type SemaphoreLease ¶ added in v0.21.0
type SemaphoreLease struct {
PK string `theorydb:"pk" json:"pk"`
SK string `theorydb:"sk" json:"sk"`
Scope string `json:"scope"`
Subject string `json:"subject"`
Slot int `json:"slot"`
LeaseOwner string `json:"lease_owner"`
LeaseExpiresAt int64 `json:"lease_expires_at"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
TTL int64 `json:"ttl,omitempty" theorydb:"ttl,omitempty"`
// contains filtered or unexported fields
}
func NewSemaphoreLease ¶ added in v0.21.0
func NewSemaphoreLease(scope, subject string, slot int) SemaphoreLease
func (*SemaphoreLease) SetKeys ¶ added in v0.21.0
func (l *SemaphoreLease) SetKeys()
type UpsertRecordStatusInput ¶
type UpsertRecordStatusInput struct {
JobID string
RecordID string
Status RecordStatus
Error *ErrorEnvelope
TTL time.Duration
}
Click to show internal directories.
Click to hide internal directories.