migration

package
v0.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package migration implements the one-shot data migration jobs required by the chat-session-redesign spec (Phase 7). Jobs are invoked via the `warren migrate --job <name>` CLI; this package holds the business-logic implementation while pkg/cli/migrate.go only wires each job into the existing migrationJobs slice.

All jobs are idempotent: safe to re-run, stop, and resume. Each implementation documents the key used to skip already-migrated rows.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CommentResolverClient

type CommentResolverClient interface {
	ResolveSlackSession(ctx context.Context, ticketID *types.TicketID, thread slackModel.Thread, userID types.UserID) (*sessModel.Session, bool, error)
	LookupSlackSession(ctx context.Context, ticketID *types.TicketID, thread slackModel.Thread) (*sessModel.Session, bool, error)
}

CommentResolverClient is the minimal surface the job needs from the chat-session-redesign session.Resolver.

LookupSlackSession is used in dry-run so the scan does not create brand-new Session documents just to produce a preview count — this keeps `--dry-run` true to its name.

type CommentSource

type CommentSource interface {
	ListTicketsWithComments(ctx context.Context) ([]*ticket.Ticket, error)
	GetTicketComments(ctx context.Context, ticketID types.TicketID) ([]ticket.Comment, error)
}

CommentSource abstracts the pre-redesign Comment subcollection so the migration job can operate against either a raw Firestore client (production, see pkg/cli/migrate_chat_session.go) or an in-memory fake (tests) without pulling Comment CRUD back onto the main Repository interface. The interface is read-only on purpose: this PR migrates legacy Comment rows into Session Messages but never deletes the originals — operators can decide separately when the pre-redesign data is safe to discard.

type CommentToMessageJob

type CommentToMessageJob struct {
	// contains filtered or unexported fields
}

CommentToMessageJob rewrites every existing ticket.Comment as a type=user session.Message attached to the corresponding Slack Session. The legacy Comment row is NOT deleted — operators retain the original data indefinitely and decide separately when it is safe to discard.

Idempotence: each generated Message carries a SessionID derived deterministically from (ticket_id, slack_thread), so re-running the job inside the same (session, content, author, created_at) tuple yields the same logical Message. To keep the job side-effect-free on exact re-runs, the body skips writing a Message when a Message with the same content already exists on the target Session under that author and timestamp.

func NewCommentToMessageJob

func NewCommentToMessageJob(source CommentSource, resolver CommentResolverClient, writer MessageWriter, reader SessionMessageReader) *CommentToMessageJob

NewCommentToMessageJob constructs the job. All dependencies are required at the interface level (callers pass the same Repository value behind each interface in production).

func (*CommentToMessageJob) Description

func (j *CommentToMessageJob) Description() string

func (*CommentToMessageJob) Name

func (j *CommentToMessageJob) Name() string

func (*CommentToMessageJob) Run

func (j *CommentToMessageJob) Run(ctx context.Context, opts Options) (*Result, error)

Run scans every Ticket with Slack comments, resolves its Slack Session, and emits one Message per Comment. Errors on an individual Comment are counted but do not abort the whole job.

type HistoryScopeJob

type HistoryScopeJob struct {
	// contains filtered or unexported fields
}

HistoryScopeJob copies legacy Ticket-scoped gollem.History files at `{prefix}/{schema}/ticket/{tid}/latest.json` into Session-scoped destinations at `{prefix}/{schema}/sessions/{sid}/history.json`.

The copy uses GCS's server-side Copy API (see storage.Service.CopyLatestHistoryToSession) so the payload never traverses this process — a full migration of tens of thousands of Sessions costs no egress bandwidth from the migrate binary.

Source mapping: for each Session with Source=slack and a resolved Ticket, the ticket's latest.json is copied into the Session's history slot. Web/CLI Sessions did not exist pre-redesign and are skipped.

The job is idempotent: when the Session already has a history slot populated, it is left untouched. Legacy files are NEVER deleted here — this PR intentionally leaves the pre-redesign latest.json objects in place so operators can roll back without a GCS version restore.

func NewHistoryScopeJob

func NewHistoryScopeJob(svc *storage.Service, forEach SessionForEach) *HistoryScopeJob

NewHistoryScopeJob constructs the job. `forEach` streams every Session through the handle callback.

func (*HistoryScopeJob) Description

func (j *HistoryScopeJob) Description() string

func (*HistoryScopeJob) Name

func (j *HistoryScopeJob) Name() string

func (*HistoryScopeJob) Run

func (j *HistoryScopeJob) Run(ctx context.Context, opts Options) (*Result, error)

type Job

type Job interface {
	Name() string
	Description() string
	Run(ctx context.Context, opts Options) (*Result, error)
}

Job describes a single migration unit. Name is the CLI flag (`--job <name>`); Description is the short blurb printed by `warren migrate --list`.

type MessageWriter

type MessageWriter interface {
	PutSessionMessage(ctx context.Context, msg *sessModel.Message) error
}

MessageWriter abstracts the Message write path.

type Options

type Options struct {
	// DryRun instructs the job to compute and report what it would do
	// without mutating any data. Every Job MUST honor this.
	DryRun bool
}

Options carries the shared configuration passed down to every Job. Concrete job implementations embed the clients they need when constructed (see NewCommentToMessageJob etc.), so Options only needs the small set of cross-cutting flags.

type Result

type Result struct {
	JobName  string         `json:"job_name"`
	Scanned  int            `json:"scanned"`
	Migrated int            `json:"migrated"`
	Skipped  int            `json:"skipped"`
	Errors   int            `json:"errors"`
	Details  map[string]any `json:"details,omitempty"`
}

Result is the structured report returned by Job.Run. Scanned / Migrated / Skipped / Errors are counters; Details is for job-specific fields (e.g. a list of Session IDs that were merged, file paths copied, etc.).

func RunBundle

func RunBundle(ctx context.Context, opts Options, jobs ...Job) ([]*Result, error)

RunBundle invokes every Job in `jobs` sequentially against `opts`, collecting Results. The first error short-circuits the run; preceding Results are still returned so callers can see how far the bundle got.

This is the orchestration primitive shared by the v0.16.0 CLI bundle and the bundle's tests; keeping it in this package means the same ordering logic is exercised by both code paths.

func (*Result) MergeDetails

func (r *Result) MergeDetails(kv map[string]any)

MergeDetails is a small helper used by individual jobs to stash extra reporting fields without worrying about nil maps.

type SessionConsolidateJob

type SessionConsolidateJob struct {
	// contains filtered or unexported fields
}

SessionConsolidateJob guarantees that every Slack-origin Ticket has its canonical `slack_<hash>` Session persisted. Pre-redesign Warren created one UUID Session per @warren mention; those legacy rows are left in Firestore untouched — the Conversation UI filters them out at read time — and a separate cleanup job is expected to purge them later.

The job is idempotent: calling ResolveSlackSession on a thread whose canonical Session already exists returns the existing row without a second write.

func NewSessionConsolidateJob

func NewSessionConsolidateJob(tickets TicketList, resolver CommentResolverClient) *SessionConsolidateJob

NewSessionConsolidateJob wires the job's dependencies.

func (*SessionConsolidateJob) Description

func (j *SessionConsolidateJob) Description() string

func (*SessionConsolidateJob) Name

func (j *SessionConsolidateJob) Name() string

func (*SessionConsolidateJob) Run

func (j *SessionConsolidateJob) Run(ctx context.Context, opts Options) (*Result, error)

type SessionForEach

type SessionForEach func(ctx context.Context, handle func(*sessModel.Session) error) error

SessionForEach streams every Session in the target dataset through the supplied handle callback. Migration jobs consume this instead of a ([]*Session, error) returner so they process one Session at a time — keeping memory use bounded regardless of collection size.

If handle returns a non-nil error, iteration stops and the error bubbles up from forEach.

type SessionMessageReader

type SessionMessageReader interface {
	GetSessionMessages(ctx context.Context, sessionID types.SessionID) ([]*sessModel.Message, error)
}

SessionMessageReader lets the job check for existing Messages to enforce idempotence.

type SessionSourceBackfillJob

type SessionSourceBackfillJob struct {
	// contains filtered or unexported fields
}

SessionSourceBackfillJob walks every Session currently persisted and sets Source=slack / TicketIDPtr on rows that lack them.

Pre-redesign Sessions were all Slack — there was no Web or CLI Session code path before chat-session-redesign, so every row whose Source is empty is, by construction, a Slack Session. No inference from SlackURL or ChannelRef is required (and doing so misclassifies rows where the legacy constructor forgot to populate SlackURL).

The job is idempotent: rows that already carry a valid Source are counted as Skipped.

func NewSessionSourceBackfillJob

func NewSessionSourceBackfillJob(repo interfaces.Repository, forEach SessionForEach) *SessionSourceBackfillJob

NewSessionSourceBackfillJob constructs the job. `forEach` streams every Session through the handle callback; tests pass an in-memory closure and the CLI wires a Firestore iterator. Streaming avoids loading the entire session collection into memory.

func (*SessionSourceBackfillJob) Description

func (j *SessionSourceBackfillJob) Description() string

func (*SessionSourceBackfillJob) Name

func (j *SessionSourceBackfillJob) Name() string

func (*SessionSourceBackfillJob) Run

type TicketList

type TicketList interface {
	GetAllTickets(ctx context.Context) ([]*ticket.Ticket, error)
}

TicketList is the read-only subset of the repository used to enumerate Tickets. Declared as a separate interface so tests can pass a trivial fake without wiring the whole Repository.

type TurnSynthesisJob

type TurnSynthesisJob struct {
	// contains filtered or unexported fields
}

TurnSynthesisJob converts each legacy Session (pre-redesign: one req/res cycle) into a Turn attached to the same Session, and stamps every Message on that Session with the synthesized TurnID so downstream queries can bucket by Turn without losing per-invocation granularity.

Idempotence: the job skips Sessions that already have at least one Turn persisted, so re-runs are safe.

func NewTurnSynthesisJob

func NewTurnSynthesisJob(repo interfaces.Repository, forEach SessionForEach) *TurnSynthesisJob

NewTurnSynthesisJob constructs the job. `forEach` streams every Session through the handle callback; the CLI wrapper wires a Firestore iterator (see pkg/cli/migrate_chat_session.go: forEachSession).

func (*TurnSynthesisJob) Description

func (j *TurnSynthesisJob) Description() string

func (*TurnSynthesisJob) Name

func (j *TurnSynthesisJob) Name() string

func (*TurnSynthesisJob) Run

func (j *TurnSynthesisJob) Run(ctx context.Context, opts Options) (*Result, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL