task

package
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package task provides task queue domain types for async work processing.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

Filter specifies criteria for querying tasks. Immutable — builder methods return a new Filter.

func NewFilter

func NewFilter() Filter

NewFilter creates an empty Filter with no constraints.

func (Filter) Limit

func (f Filter) Limit() int

Limit returns the result limit, zero means unlimited.

func (Filter) Operation

func (f Filter) Operation() *Operation

Operation returns the operation constraint, or nil if unconstrained.

func (Filter) WithLimit

func (f Filter) WithLimit(n int) Filter

WithLimit returns a new Filter with the given result limit.

func (Filter) WithOperation

func (f Filter) WithOperation(op Operation) Filter

WithOperation returns a new Filter constrained to the given operation.

type Operation

type Operation string

Operation represents the type of task operation.

const (
	OperationRoot                                    Operation = "kodit.root"
	OperationCreateIndex                             Operation = "kodit.index.create"
	OperationRunIndex                                Operation = "kodit.index.run"
	OperationRefreshWorkingCopy                      Operation = "kodit.index.run.refresh_working_copy"
	OperationDeleteOldSnippets                       Operation = "kodit.index.run.delete_old_snippets"
	OperationExtractSnippets                         Operation = "kodit.index.run.extract_snippets"
	OperationCreateBM25Index                         Operation = "kodit.index.run.create_bm25_index"
	OperationCreateCodeEmbeddings                    Operation = "kodit.index.run.create_code_embeddings"
	OperationEnrichSnippets                          Operation = "kodit.index.run.enrich_snippets"
	OperationCreateTextEmbeddings                    Operation = "kodit.index.run.create_text_embeddings"
	OperationUpdateIndexTimestamp                    Operation = "kodit.index.run.update_index_timestamp"
	OperationClearFileProcessingStatuses             Operation = "kodit.index.run.clear_file_processing_statuses"
	OperationRepository                              Operation = "kodit.repository"
	OperationCreateRepository                        Operation = "kodit.repository.create"
	OperationDeleteRepository                        Operation = "kodit.repository.delete"
	OperationCloneRepository                         Operation = "kodit.repository.clone"
	OperationSyncRepository                          Operation = "kodit.repository.sync"
	OperationCommit                                  Operation = "kodit.commit"
	OperationExtractSnippetsForCommit                Operation = "kodit.commit.extract_snippets"
	OperationCreateBM25IndexForCommit                Operation = "kodit.commit.create_bm25_index"
	OperationCreateCodeEmbeddingsForCommit           Operation = "kodit.commit.create_code_embeddings"
	OperationCreateSummaryEnrichmentForCommit        Operation = "kodit.commit.create_summary_enrichment"
	OperationCreateSummaryEmbeddingsForCommit        Operation = "kodit.commit.create_summary_embeddings"
	OperationCreateArchitectureEnrichmentForCommit   Operation = "kodit.commit.create_architecture_enrichment"
	OperationCreatePublicAPIDocsForCommit            Operation = "kodit.commit.create_public_api_docs"
	OperationCreateCommitDescriptionForCommit        Operation = "kodit.commit.create_commit_description"
	OperationCreateDatabaseSchemaForCommit           Operation = "kodit.commit.create_database_schema"
	OperationCreateCookbookForCommit                 Operation = "kodit.commit.create_cookbook"
	OperationExtractExamplesForCommit                Operation = "kodit.commit.extract_examples"
	OperationCreateExampleSummaryForCommit           Operation = "kodit.commit.create_example_summary"
	OperationCreateExampleCodeEmbeddingsForCommit    Operation = "kodit.commit.create_example_code_embeddings"
	OperationCreateExampleSummaryEmbeddingsForCommit Operation = "kodit.commit.create_example_summary_embeddings"
	OperationGenerateWikiForCommit                   Operation = "kodit.commit.generate_wiki"
	OperationScanCommit                              Operation = "kodit.commit.scan"
	OperationRescanCommit                            Operation = "kodit.commit.rescan"
)

Operation values for the task queue system.

func (Operation) IsCommitOperation

func (o Operation) IsCommitOperation() bool

IsCommitOperation returns true if this is a commit-level operation.

func (Operation) IsRepositoryOperation

func (o Operation) IsRepositoryOperation() bool

IsRepositoryOperation returns true if this is a repository-level operation.

func (Operation) String

func (o Operation) String() string

String returns the string representation of the operation.

type PrescribedOperations

type PrescribedOperations struct {
	// contains filtered or unexported fields
}

PrescribedOperations provides predefined operation sequences for common workflows.

func NewPrescribedOperations

func NewPrescribedOperations(examples bool, enrichments bool) PrescribedOperations

NewPrescribedOperations creates a PrescribedOperations with the given settings. When enrichments is false, LLM-dependent operations (summaries, architecture docs, commit descriptions, cookbooks, wiki) are excluded from all workflows.

func (PrescribedOperations) All

func (p PrescribedOperations) All() []Operation

All returns every operation that appears in any prescribed workflow. Used at startup to validate that all required handlers are registered.

func (PrescribedOperations) CreateNewRepository

func (p PrescribedOperations) CreateNewRepository() []Operation

CreateNewRepository returns the operations needed to create a new repository.

func (PrescribedOperations) IndexCommit

func (p PrescribedOperations) IndexCommit() []Operation

IndexCommit returns the operation sequence for indexing an already-scanned commit.

func (PrescribedOperations) RescanCommit

func (p PrescribedOperations) RescanCommit() []Operation

RescanCommit returns the operation sequence for rescanning a commit (full reindex).

func (PrescribedOperations) ScanAndIndexCommit

func (p PrescribedOperations) ScanAndIndexCommit() []Operation

ScanAndIndexCommit returns the full operation sequence for scanning and indexing a commit.

func (PrescribedOperations) SyncRepository

func (p PrescribedOperations) SyncRepository() []Operation

SyncRepository returns the operations needed to sync a repository.

type Priority

type Priority int

Priority represents task queue priority levels. Values are spaced far apart to ensure batch offsets (up to ~150 for 15 tasks) never cause a lower priority level to exceed a higher one.

const (
	PriorityBackground    Priority = 1000
	PriorityNormal        Priority = 2000
	PriorityUserInitiated Priority = 5000
	PriorityCritical      Priority = 10000
)

Priority values.

type ReportingState

type ReportingState string

ReportingState represents the state of task reporting.

const (
	ReportingStateStarted    ReportingState = "started"
	ReportingStateInProgress ReportingState = "in_progress"
	ReportingStateCompleted  ReportingState = "completed"
	ReportingStateFailed     ReportingState = "failed"
	ReportingStateSkipped    ReportingState = "skipped"
)

ReportingState values.

func (ReportingState) IsTerminal

func (s ReportingState) IsTerminal() bool

IsTerminal returns true if the state represents a terminal (final) state.

type Status

type Status struct {
	// contains filtered or unexported fields
}

Status represents the status of a task with progress tracking.

func NewStatus

func NewStatus(
	operation Operation,
	parent *Status,
	trackableType TrackableType,
	trackableID int64,
) Status

NewStatus creates a new Status for the given operation.

func NewStatusFull

func NewStatusFull(
	id string,
	state ReportingState,
	operation Operation,
	message string,
	createdAt, updatedAt time.Time,
	total, current int,
	errorMessage string,
	parent *Status,
	trackableID int64,
	trackableType TrackableType,
) Status

NewStatusFull creates a Status with all fields (used by repository).

func NewStatusWithDefaults

func NewStatusWithDefaults(operation Operation) Status

NewStatusWithDefaults creates a Status with default tracking.

func (Status) Complete

func (s Status) Complete() Status

Complete marks the task as completed. If already in a terminal state, no change is made.

func (Status) CompletionPercent

func (s Status) CompletionPercent() float64

CompletionPercent calculates the completion percentage.

func (Status) CreatedAt

func (s Status) CreatedAt() time.Time

CreatedAt returns when the status was created.

func (Status) Current

func (s Status) Current() int

Current returns the current count for progress tracking.

func (Status) Error

func (s Status) Error() string

Error returns the error message if failed.

func (Status) Fail

func (s Status) Fail(errorMsg string) Status

Fail marks the task as failed with the given error message.

func (Status) ID

func (s Status) ID() string

ID returns the status ID.

func (Status) Labels added in v1.1.2

func (s Status) Labels() map[string]string

Labels returns the status labels.

func (Status) Message

func (s Status) Message() string

Message returns the status message.

func (Status) Operation

func (s Status) Operation() Operation

Operation returns the task operation.

func (Status) Parent

func (s Status) Parent() *Status

Parent returns the parent status.

func (Status) SetCurrent

func (s Status) SetCurrent(current int, message string) Status

SetCurrent sets the current progress and optionally updates the message.

func (Status) SetTotal

func (s Status) SetTotal(total int) Status

SetTotal sets the total count for progress tracking.

func (Status) SetTrackingInfo

func (s Status) SetTrackingInfo(trackableID int64, trackableType TrackableType) Status

SetTrackingInfo sets the tracking information.

func (Status) Skip

func (s Status) Skip(message string) Status

Skip marks the task as skipped with the given message.

func (Status) State

func (s Status) State() ReportingState

State returns the current state.

func (Status) Total

func (s Status) Total() int

Total returns the total count for progress tracking.

func (Status) TrackableID

func (s Status) TrackableID() int64

TrackableID returns the trackable entity ID.

func (Status) TrackableType

func (s Status) TrackableType() TrackableType

TrackableType returns the trackable entity type.

func (Status) UpdatedAt

func (s Status) UpdatedAt() time.Time

UpdatedAt returns when the status was last updated.

func (Status) WithLabel added in v1.1.2

func (s Status) WithLabel(key, value string) Status

WithLabel returns a copy of the status with the given label set.

type StatusStore

type StatusStore interface {
	// Get retrieves a task status by ID.
	Get(ctx context.Context, id string) (Status, error)

	// FindByTrackable retrieves task statuses for a trackable entity.
	FindByTrackable(ctx context.Context, trackableType TrackableType, trackableID int64) ([]Status, error)

	// Save creates a new task status or updates an existing one.
	// If the status has a parent, the parent chain is saved first.
	Save(ctx context.Context, status Status) (Status, error)

	// SaveBulk creates or updates multiple task statuses.
	SaveBulk(ctx context.Context, statuses []Status) ([]Status, error)

	// Delete removes a task status.
	Delete(ctx context.Context, status Status) error

	// DeleteByTrackable removes task statuses for a trackable entity.
	DeleteByTrackable(ctx context.Context, trackableType TrackableType, trackableID int64) error

	// Count returns the total number of task statuses.
	Count(ctx context.Context) (int64, error)

	// LoadWithHierarchy loads all task statuses for a trackable entity
	// with their parent-child relationships reconstructed.
	LoadWithHierarchy(
		ctx context.Context,
		trackableType TrackableType,
		trackableID int64,
	) ([]Status, error)
}

StatusStore defines the interface for Status persistence operations.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task represents an item in the queue waiting to be processed. If the item exists, it is in the queue and waiting to be processed. There is no status associated - existence implies pending.

func NewTask

func NewTask(operation Operation, priority int, payload map[string]any) Task

NewTask creates a new Task with the given operation, priority, and payload. The dedup key is generated automatically from the operation and payload.

func NewTaskWithID

func NewTaskWithID(
	id int64,
	dedupKey string,
	operation Operation,
	priority int,
	payload map[string]any,
	createdAt, updatedAt time.Time,
) Task

NewTaskWithID creates a Task with all fields (used by repository).

func (Task) CreatedAt

func (t Task) CreatedAt() time.Time

CreatedAt returns when the task was created.

func (Task) DedupKey

func (t Task) DedupKey() string

DedupKey returns the deduplication key.

func (Task) ID

func (t Task) ID() int64

ID returns the task ID.

func (Task) Operation

func (t Task) Operation() Operation

Operation returns the task operation.

func (Task) Payload

func (t Task) Payload() map[string]any

Payload returns a copy of the task payload.

func (Task) PayloadJSON

func (t Task) PayloadJSON() ([]byte, error)

PayloadJSON returns the payload as JSON bytes.

func (Task) Priority

func (t Task) Priority() int

Priority returns the task priority.

func (Task) UpdatedAt

func (t Task) UpdatedAt() time.Time

UpdatedAt returns when the task was last updated.

func (Task) WithID

func (t Task) WithID(id int64) Task

WithID returns a copy of the task with the given ID.

func (Task) WithTimestamps

func (t Task) WithTimestamps(createdAt, updatedAt time.Time) Task

WithTimestamps returns a copy of the task with the given timestamps.

type TaskStore

type TaskStore interface {
	// Get retrieves a task by ID.
	Get(ctx context.Context, id int64) (Task, error)

	// FindAll retrieves all tasks.
	FindAll(ctx context.Context) ([]Task, error)

	// FindPending retrieves pending tasks ordered by priority.
	FindPending(ctx context.Context, options ...repository.Option) ([]Task, error)

	// Save creates a new task or updates an existing one.
	// Uses dedup_key for conflict resolution - if a task with the same
	// dedup_key exists, it will be returned instead of creating a duplicate.
	Save(ctx context.Context, task Task) (Task, error)

	// SaveBulk creates or updates multiple tasks.
	SaveBulk(ctx context.Context, tasks []Task) ([]Task, error)

	// Delete removes a task.
	Delete(ctx context.Context, task Task) error

	// DeleteAll removes all tasks.
	DeleteAll(ctx context.Context) error

	// CountPending returns the number of pending tasks.
	CountPending(ctx context.Context, options ...repository.Option) (int64, error)

	// Exists checks if a task with the given ID exists.
	Exists(ctx context.Context, id int64) (bool, error)

	// Dequeue retrieves and removes the highest priority task.
	// Returns the task and true if one was found, or zero-value and false if queue is empty.
	Dequeue(ctx context.Context) (Task, bool, error)

	// DequeueByOperation retrieves and removes the highest priority task
	// of a specific operation type.
	DequeueByOperation(ctx context.Context, operation Operation) (Task, bool, error)
}

TaskStore defines the interface for Task persistence operations.

type TrackableType

type TrackableType string

TrackableType represents types of trackable entities.

const (
	TrackableTypeIndex      TrackableType = "indexes"
	TrackableTypeRepository TrackableType = "kodit.repository"
	TrackableTypeCommit     TrackableType = "kodit.commit"
)

TrackableType values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL