task

package
v1.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package task provides task queue domain types for async work processing.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithActiveState added in v1.1.5

func WithActiveState() repository.Option

WithActiveState filters for statuses in started or in_progress state.

func WithPriorityOrder added in v1.1.5

func WithPriorityOrder() []repository.Option

WithPriorityOrder returns options that order by priority DESC, created_at ASC.

func WithTrackable added in v1.1.5

func WithTrackable(trackableType TrackableType, trackableID int64) []repository.Option

WithTrackable filters by trackable_type and trackable_id.

Types

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

Filter specifies criteria for querying tasks. Immutable — builder methods return a new Filter.

func NewFilter

func NewFilter() Filter

NewFilter creates an empty Filter with no constraints.

func (Filter) Limit

func (f Filter) Limit() int

Limit returns the result limit, zero means unlimited.

func (Filter) Operation

func (f Filter) Operation() *Operation

Operation returns the operation constraint, or nil if unconstrained.

func (Filter) WithLimit

func (f Filter) WithLimit(n int) Filter

WithLimit returns a new Filter with the given result limit.

func (Filter) WithOperation

func (f Filter) WithOperation(op Operation) Filter

WithOperation returns a new Filter constrained to the given operation.

type Operation

type Operation string

Operation represents the type of task operation.

const (
	OperationRoot                                    Operation = "kodit.root"
	OperationCreateIndex                             Operation = "kodit.index.create"
	OperationRunIndex                                Operation = "kodit.index.run"
	OperationRefreshWorkingCopy                      Operation = "kodit.index.run.refresh_working_copy"
	OperationDeleteOldSnippets                       Operation = "kodit.index.run.delete_old_snippets"
	OperationExtractSnippets                         Operation = "kodit.index.run.extract_snippets"
	OperationCreateBM25Index                         Operation = "kodit.index.run.create_bm25_index"
	OperationCreateCodeEmbeddings                    Operation = "kodit.index.run.create_code_embeddings"
	OperationEnrichSnippets                          Operation = "kodit.index.run.enrich_snippets"
	OperationCreateTextEmbeddings                    Operation = "kodit.index.run.create_text_embeddings"
	OperationUpdateIndexTimestamp                    Operation = "kodit.index.run.update_index_timestamp"
	OperationClearFileProcessingStatuses             Operation = "kodit.index.run.clear_file_processing_statuses"
	OperationRepository                              Operation = "kodit.repository"
	OperationCreateRepository                        Operation = "kodit.repository.create"
	OperationDeleteRepository                        Operation = "kodit.repository.delete"
	OperationCloneRepository                         Operation = "kodit.repository.clone"
	OperationSyncRepository                          Operation = "kodit.repository.sync"
	OperationCommit                                  Operation = "kodit.commit"
	OperationExtractSnippetsForCommit                Operation = "kodit.commit.extract_snippets"
	OperationCreateBM25IndexForCommit                Operation = "kodit.commit.create_bm25_index"
	OperationCreateCodeEmbeddingsForCommit           Operation = "kodit.commit.create_code_embeddings"
	OperationCreateSummaryEnrichmentForCommit        Operation = "kodit.commit.create_summary_enrichment"
	OperationCreateSummaryEmbeddingsForCommit        Operation = "kodit.commit.create_summary_embeddings"
	OperationCreateArchitectureEnrichmentForCommit   Operation = "kodit.commit.create_architecture_enrichment"
	OperationCreatePublicAPIDocsForCommit            Operation = "kodit.commit.create_public_api_docs"
	OperationCreateCommitDescriptionForCommit        Operation = "kodit.commit.create_commit_description"
	OperationCreateDatabaseSchemaForCommit           Operation = "kodit.commit.create_database_schema"
	OperationCreateCookbookForCommit                 Operation = "kodit.commit.create_cookbook"
	OperationExtractExamplesForCommit                Operation = "kodit.commit.extract_examples"
	OperationCreateExampleSummaryForCommit           Operation = "kodit.commit.create_example_summary"
	OperationCreateExampleCodeEmbeddingsForCommit    Operation = "kodit.commit.create_example_code_embeddings"
	OperationCreateExampleSummaryEmbeddingsForCommit Operation = "kodit.commit.create_example_summary_embeddings"
	OperationGenerateWikiForCommit                   Operation = "kodit.commit.generate_wiki"
	OperationScanCommit                              Operation = "kodit.commit.scan"
	OperationRescanCommit                            Operation = "kodit.commit.rescan"
)

Operation values for the task queue system.

func (Operation) IsCommitOperation

func (o Operation) IsCommitOperation() bool

IsCommitOperation returns true if this is a commit-level operation.

func (Operation) IsRepositoryOperation

func (o Operation) IsRepositoryOperation() bool

IsRepositoryOperation returns true if this is a repository-level operation.

func (Operation) String

func (o Operation) String() string

String returns the string representation of the operation.

type PrescribedOperations

type PrescribedOperations struct {
	// contains filtered or unexported fields
}

PrescribedOperations provides predefined operation sequences for common workflows.

func NewPrescribedOperations

func NewPrescribedOperations(examples bool, enrichments bool) PrescribedOperations

NewPrescribedOperations creates a PrescribedOperations with the given settings. When enrichments is false, LLM-dependent operations (summaries, architecture docs, commit descriptions, cookbooks, wiki) are excluded from all workflows.

func (PrescribedOperations) All

func (p PrescribedOperations) All() []Operation

All returns every operation that appears in any prescribed workflow. Used at startup to validate that all required handlers are registered.

func (PrescribedOperations) CreateNewRepository

func (p PrescribedOperations) CreateNewRepository() []Operation

CreateNewRepository returns the operations needed to create a new repository.

func (PrescribedOperations) IndexCommit

func (p PrescribedOperations) IndexCommit() []Operation

IndexCommit returns the operation sequence for indexing an already-scanned commit.

func (PrescribedOperations) RescanCommit

func (p PrescribedOperations) RescanCommit() []Operation

RescanCommit returns the operation sequence for rescanning a commit (full reindex).

func (PrescribedOperations) ScanAndIndexCommit

func (p PrescribedOperations) ScanAndIndexCommit() []Operation

ScanAndIndexCommit returns the full operation sequence for scanning and indexing a commit.

func (PrescribedOperations) SyncRepository

func (p PrescribedOperations) SyncRepository() []Operation

SyncRepository returns the operations needed to sync a repository.

type Priority

type Priority int

Priority represents task queue priority levels. Values are spaced far apart to ensure batch offsets (up to ~150 for 15 tasks) never cause a lower priority level to exceed a higher one.

const (
	PriorityBackground    Priority = 1000
	PriorityNormal        Priority = 2000
	PriorityUserInitiated Priority = 5000
	PriorityCritical      Priority = 10000
)

Priority values.

type ReportingState

type ReportingState string

ReportingState represents the state of task reporting.

const (
	ReportingStateStarted    ReportingState = "started"
	ReportingStateInProgress ReportingState = "in_progress"
	ReportingStateCompleted  ReportingState = "completed"
	ReportingStateFailed     ReportingState = "failed"
	ReportingStateSkipped    ReportingState = "skipped"
)

ReportingState values.

func (ReportingState) IsTerminal

func (s ReportingState) IsTerminal() bool

IsTerminal returns true if the state represents a terminal (final) state.

type Status

type Status struct {
	// contains filtered or unexported fields
}

Status represents the status of a task with progress tracking.

func NewStatus

func NewStatus(
	operation Operation,
	parent *Status,
	trackableType TrackableType,
	trackableID int64,
) Status

NewStatus creates a new Status for the given operation.

func NewStatusFull

func NewStatusFull(
	id string,
	state ReportingState,
	operation Operation,
	message string,
	createdAt, updatedAt time.Time,
	total, current int,
	errorMessage string,
	parent *Status,
	trackableID int64,
	trackableType TrackableType,
) Status

NewStatusFull creates a Status with all fields (used by repository).

func NewStatusWithDefaults

func NewStatusWithDefaults(operation Operation) Status

NewStatusWithDefaults creates a Status with default tracking.

func (Status) Complete

func (s Status) Complete() Status

Complete marks the task as completed. If already in a terminal state, no change is made.

func (Status) CompletionPercent

func (s Status) CompletionPercent() float64

CompletionPercent calculates the completion percentage.

func (Status) CreatedAt

func (s Status) CreatedAt() time.Time

CreatedAt returns when the status was created.

func (Status) Current

func (s Status) Current() int

Current returns the current count for progress tracking.

func (Status) Error

func (s Status) Error() string

Error returns the error message if failed.

func (Status) Fail

func (s Status) Fail(errorMsg string) Status

Fail marks the task as failed with the given error message.

func (Status) ID

func (s Status) ID() string

ID returns the status ID.

func (Status) Labels added in v1.1.2

func (s Status) Labels() map[string]string

Labels returns the status labels.

func (Status) Message

func (s Status) Message() string

Message returns the status message.

func (Status) Operation

func (s Status) Operation() Operation

Operation returns the task operation.

func (Status) Parent

func (s Status) Parent() *Status

Parent returns the parent status.

func (Status) SetCurrent

func (s Status) SetCurrent(current int, message string) Status

SetCurrent sets the current progress and optionally updates the message.

func (Status) SetTotal

func (s Status) SetTotal(total int) Status

SetTotal sets the total count for progress tracking.

func (Status) SetTrackingInfo

func (s Status) SetTrackingInfo(trackableID int64, trackableType TrackableType) Status

SetTrackingInfo sets the tracking information.

func (Status) Skip

func (s Status) Skip(message string) Status

Skip marks the task as skipped with the given message.

func (Status) State

func (s Status) State() ReportingState

State returns the current state.

func (Status) Total

func (s Status) Total() int

Total returns the total count for progress tracking.

func (Status) TrackableID

func (s Status) TrackableID() int64

TrackableID returns the trackable entity ID.

func (Status) TrackableType

func (s Status) TrackableType() TrackableType

TrackableType returns the trackable entity type.

func (Status) UpdatedAt

func (s Status) UpdatedAt() time.Time

UpdatedAt returns when the status was last updated.

func (Status) WithLabel added in v1.1.2

func (s Status) WithLabel(key, value string) Status

WithLabel returns a copy of the status with the given label set.

type StatusStore

type StatusStore interface {
	Find(ctx context.Context, options ...repository.Option) ([]Status, error)
	FindOne(ctx context.Context, options ...repository.Option) (Status, error)
	Count(ctx context.Context, options ...repository.Option) (int64, error)
	Exists(ctx context.Context, options ...repository.Option) (bool, error)
	Save(ctx context.Context, status Status) (Status, error)
	DeleteBy(ctx context.Context, options ...repository.Option) error

	// LoadWithHierarchy loads all task statuses for a trackable entity
	// with their parent-child relationships reconstructed.
	LoadWithHierarchy(
		ctx context.Context,
		trackableType TrackableType,
		trackableID int64,
	) ([]Status, error)
}

StatusStore defines the interface for Status persistence operations.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task represents an item in the queue waiting to be processed. If the item exists, it is in the queue and waiting to be processed. There is no status associated - existence implies pending.

func NewTask

func NewTask(operation Operation, priority int, payload map[string]any) Task

NewTask creates a new Task with the given operation, priority, and payload. The dedup key is generated automatically from the operation and payload.

func NewTaskWithID

func NewTaskWithID(
	id int64,
	dedupKey string,
	operation Operation,
	priority int,
	payload map[string]any,
	createdAt, updatedAt time.Time,
) Task

NewTaskWithID creates a Task with all fields (used by repository).

func (Task) CreatedAt

func (t Task) CreatedAt() time.Time

CreatedAt returns when the task was created.

func (Task) DedupKey

func (t Task) DedupKey() string

DedupKey returns the deduplication key.

func (Task) ID

func (t Task) ID() int64

ID returns the task ID.

func (Task) Operation

func (t Task) Operation() Operation

Operation returns the task operation.

func (Task) Payload

func (t Task) Payload() map[string]any

Payload returns a copy of the task payload.

func (Task) PayloadJSON

func (t Task) PayloadJSON() ([]byte, error)

PayloadJSON returns the payload as JSON bytes.

func (Task) Priority

func (t Task) Priority() int

Priority returns the task priority.

func (Task) UpdatedAt

func (t Task) UpdatedAt() time.Time

UpdatedAt returns when the task was last updated.

func (Task) WithID

func (t Task) WithID(id int64) Task

WithID returns a copy of the task with the given ID.

func (Task) WithPriority added in v1.1.5

func (t Task) WithPriority(priority int) Task

WithPriority returns a copy of the task with the given priority.

func (Task) WithTimestamps

func (t Task) WithTimestamps(createdAt, updatedAt time.Time) Task

WithTimestamps returns a copy of the task with the given timestamps.

type TaskStore

type TaskStore interface {
	repository.Store[Task]

	// Exists checks if any task matches the given options.
	Exists(ctx context.Context, options ...repository.Option) (bool, error)

	// DeleteBy removes all tasks matching the given options.
	DeleteBy(ctx context.Context, options ...repository.Option) error

	// Dequeue retrieves and removes the highest priority task.
	// Returns the task and true if one was found, or zero-value and false if queue is empty.
	Dequeue(ctx context.Context) (Task, bool, error)

	// DequeueByOperation retrieves and removes the highest priority task
	// of a specific operation type.
	DequeueByOperation(ctx context.Context, operation Operation) (Task, bool, error)
}

TaskStore defines the interface for Task persistence operations.

type TrackableType

type TrackableType string

TrackableType represents types of trackable entities.

const (
	TrackableTypeIndex      TrackableType = "indexes"
	TrackableTypeRepository TrackableType = "kodit.repository"
	TrackableTypeCommit     TrackableType = "kodit.commit"
)

TrackableType values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL