threads

package
v1.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: AGPL-3.0 Imports: 13 Imported by: 0

README

Threads Service

Overview

The Threads Service provides comprehensive thread synchronization and traversal capabilities for ActivityPub conversations. It enables Lesser to fetch, build, and maintain complete conversation threads across federated instances.

Features

  • Thread Traversal: Walk up and down reply chains to find thread roots and build complete conversation trees
  • Remote Synchronization: Fetch missing notes and replies from remote instances
  • Missing Reply Detection: Automatically detect and track gaps in conversation threads
  • Circular Reference Detection: Prevent infinite loops when traversing malformed thread structures
  • Depth Limiting: Configurable depth limits to prevent excessive traversal
  • Error Handling: Classify and retry federation errors with exponential backoff

Architecture

Components
  1. Service Layer (service.go)

    • Thread traversal logic
    • Remote note fetching
    • Missing reply synchronization
    • Federation integration
  2. Storage Models (pkg/storage/models/)

    • ThreadSync: Tracks synchronization status
    • ThreadNode: Represents individual nodes in thread trees
    • MissingReply: Tracks missing replies with retry logic
  3. Repository (pkg/storage/repositories/thread_repository.go)

    • Thread data persistence
    • Query operations
    • Batch operations

Usage

Service Initialization
import (
    "github.com/equaltoai/lesser/pkg/services/threads"
    "github.com/equaltoai/lesser/pkg/storage/repositories"
)

service := threads.NewService(
    threadRepo,
    statusRepo,
    objectRepo,
    actorRepo,
    federationClient,
    publisher,
    logger,
    "example.com",
)
Get Thread Context

Retrieve the complete context for a note, including ancestors and descendants:

query := threads.ThreadContextQuery{
    NoteID:      "https://example.com/note/123",
    ViewerID:    "user123",
    IncludeTree: true,
}

context, err := service.GetThreadContext(ctx, query)
if err != nil {
    // Handle error
}

fmt.Printf("Root: %s\n", context.RootNote.ID)
fmt.Printf("Ancestors: %d\n", len(context.Ancestors))
fmt.Printf("Participants: %d\n", context.ParticipantCount)
fmt.Printf("Missing: %d\n", context.MissingCount)
Sync Remote Thread

Fetch and synchronize a complete thread from a remote instance:

cmd := threads.SyncRemoteThreadCommand{
    NoteURL:      "https://remote.social/users/alice/status/123",
    Depth:        3, // Fetch up to 3 levels deep
    ViewerID:     "user123",
    ForceRefresh: false,
}

result, err := service.SyncRemoteThread(ctx, cmd)
if err != nil {
    // Handle error
}

fmt.Printf("Synced %d posts\n", result.SyncedPosts)
fmt.Printf("Errors: %d\n", result.ErrorCount)
fmt.Printf("Status: %s\n", result.SyncStatus)
Sync Missing Replies

Attempt to fetch replies that were previously marked as missing:

cmd := threads.SyncMissingRepliesCommand{
    NoteID:   "https://example.com/note/123",
    ViewerID: "user123",
}

result, err := service.SyncMissingReplies(ctx, cmd)
if err != nil {
    // Handle error
}

fmt.printf("Synced %d replies\n", result.SyncedReplies)

Storage Models

ThreadSync

Tracks the synchronization status of a thread:

  • StatusID: The status being synced
  • LastSyncAt: When the last sync occurred
  • SyncStatus: pending, syncing, completed, failed
  • MissingReplies: List of missing reply IDs
  • RemoteFetched: Whether remote fetch was attempted
  • ThreadDepth: Current known depth
ThreadNode

Represents a single node in the thread tree:

  • RootStatusID: The thread root
  • StatusID: This node's status ID
  • ParentID: Direct parent ID
  • Depth: Depth in the tree (0 for root)
  • Path: Full path from root
  • ChildIDs: Direct children
  • ReplyCount: Number of direct replies
  • DescendantCount: Total descendants
MissingReply

Tracks replies that couldn't be fetched:

  • ReplyID: The missing reply ID
  • ParentStatusID: Parent that referenced it
  • AttemptCount: Number of fetch attempts
  • Status: pending, fetching, failed, resolved
  • FailureReason: deleted, 404, 403, timeout, unreachable, invalid
  • NextRetryAt: When to retry (exponential backoff)

Error Handling

The service defines custom error types for different failure scenarios:

var (
    ErrThreadNotFound        error
    ErrThreadRootNotFound    error
    ErrCircularReference     error
    ErrMaxDepthExceeded      error
    ErrFetchRemoteNote       error
    ErrSyncInProgress        error
    // ... and more
)

Fetch errors are automatically classified:

  • 404/not found: FailureReasonNotFound
  • 410/gone: FailureReasonDeleted
  • 403/forbidden: FailureReasonForbidden
  • Timeout: FailureReasonTimeout
  • Connection errors: FailureReasonUnreachable

Configuration

Constants
const (
    MaxThreadDepth = 10 // Maximum depth for traversal
    DefaultDepth   = 3  // Default sync depth
)

Federation Integration

The service integrates with the federation layer to fetch remote content using HTTP Signatures (authorized fetch):

type FederationClient interface {
    FetchObject(ctx, objectURL, signingActor) (any, error)
    FetchActor(ctx, actorURL, signingActor) (*Actor, error)
}

Testing

Run the test suite:

go test ./pkg/services/threads/...

Run with coverage:

go test -cover ./pkg/services/threads/...

Best Practices

  1. Depth Limiting: Always set reasonable depth limits to prevent excessive federation requests
  2. Error Classification: Use the built-in error classification for retry logic
  3. Missing Reply Tracking: Leverage the automatic missing reply detection and retry system
  4. Circular Reference Detection: The service automatically detects and prevents circular references
  5. Local First: Always check local storage before fetching from remote instances

Performance Considerations

  • Thread traversal is bounded by MaxThreadDepth to prevent excessive recursion
  • Missing replies use exponential backoff (5min, 15min, 1hr, 6hr, 24hr)
  • Permanent failures (deleted, forbidden) are not retried
  • Batch operations are used where possible

Future Enhancements

  • Parallel fetching of thread branches
  • Caching of thread contexts
  • Background sync jobs for popular threads
  • Thread prefetching based on user activity
  • Real-time thread updates via WebSocket

Dependencies

  • pkg/activitypub: ActivityPub types and utilities
  • pkg/federation: Federation client for remote fetching
  • pkg/storage/models: Data models
  • pkg/storage/repositories: Data persistence
  • pkg/streaming: Event publishing

See Also

Documentation

Overview

Package threads provides thread synchronization and traversal services

Index

Constants

View Source
const (
	MaxThreadDepth = 10 // Maximum depth for thread traversal
	DefaultDepth   = 3  // Default depth for sync operations

	// Sync status constants
	SyncStatusNone     = "NONE"
	SyncStatusComplete = "COMPLETE"
	SyncStatusPartial  = "PARTIAL"
	SyncStatusFailed   = "FAILED"
	SyncStatusSyncing  = "SYNCING"
)

Constants for thread operations

Variables

View Source
var (
	// Thread traversal errors
	ErrThreadNotFound         = errors.New("thread not found")
	ErrThreadRootNotFound     = errors.New("thread root not found")
	ErrCircularReference      = errors.New("circular reference detected in thread")
	ErrMaxDepthExceeded       = errors.New("maximum thread depth exceeded")
	ErrInvalidThreadStructure = errors.New("invalid thread structure")

	// Federation errors
	ErrFetchRemoteNote           = errors.New("failed to fetch remote note")
	ErrFetchRemoteReplies        = errors.New("failed to fetch remote replies")
	ErrRemoteNoteNotFound        = errors.New("remote note not found")
	ErrRemoteInstanceUnreachable = errors.New("remote instance unreachable")
	ErrRemoteAuthFailed          = errors.New("remote authentication failed")
	ErrRemoteTimeout             = errors.New("remote request timeout")

	// Sync errors
	ErrSyncInProgress     = errors.New("sync already in progress for this thread")
	ErrSyncFailed         = errors.New("thread synchronization failed")
	ErrPartialSync        = errors.New("thread synchronization partially completed")
	ErrSyncMissingReplies = errors.New("failed to sync missing replies")

	// Storage errors
	ErrSaveThreadNode   = errors.New("failed to save thread node")
	ErrSaveThreadSync   = errors.New("failed to save thread sync record")
	ErrGetThreadContext = errors.New("failed to get thread context")
	ErrMarkMissingReply = errors.New("failed to mark missing reply")

	// Validation errors
	ErrInvalidNoteID        = errors.New("invalid note ID")
	ErrInvalidNoteURL       = errors.New("invalid note URL")
	ErrInvalidDepth         = errors.New("invalid depth parameter")
	ErrMissingRequiredParam = errors.New("missing required parameter")

	// Note type errors
	ErrNotANote             = errors.New("object is not a note")
	ErrInvalidNoteStructure = errors.New("note has invalid structure")
)

Service-level errors for thread operations

Functions

func ValidateNoteURL

func ValidateNoteURL(noteURL string) error

ValidateNoteURL validates and normalizes a note URL

Types

type ActorRepository

type ActorRepository interface {
	GetActorByUsername(ctx context.Context, username string) (*activitypub.Actor, error)
}

ActorRepository defines the interface for actor operations

type FederationClient

type FederationClient interface {
	FetchObject(ctx context.Context, objectURL string, signingActor *activitypub.Actor) (any, error)
}

FederationClient defines the interface for federation operations

type ObjectRepository

type ObjectRepository interface {
	GetObject(ctx context.Context, objectID string) (any, error)
	CreateObject(ctx context.Context, object any) error
}

ObjectRepository defines the interface for object storage operations

type Publisher

type Publisher interface {
	PublishToStream(ctx context.Context, stream string, event *streaming.Event) error
}

Publisher defines the interface for event publishing

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service provides thread synchronization and traversal operations

func NewService

func NewService(
	threadRepo ThreadRepository,
	statusRepo StatusRepository,
	objectRepo ObjectRepository,
	actorRepo ActorRepository,
	federation FederationClient,
	publisher Publisher,
	logger *zap.Logger,
	domainName string,
) *Service

NewService creates a new threads service

func (*Service) FindThreadRoot

func (s *Service) FindThreadRoot(ctx context.Context, note *activitypub.Note) (*activitypub.Note, []*activitypub.Note, error)

FindThreadRoot finds the root of a thread by walking up the inReplyTo chain

func (*Service) GetThreadContext

func (s *Service) GetThreadContext(ctx context.Context, query ThreadContextQuery) (*ThreadContextResult, error)

GetThreadContext retrieves the complete thread context for a note

func (*Service) SyncMissingReplies

func (s *Service) SyncMissingReplies(ctx context.Context, cmd SyncMissingRepliesCommand) (*SyncMissingRepliesResult, error)

SyncMissingReplies syncs replies that were detected as missing

func (*Service) SyncRemoteThread

func (s *Service) SyncRemoteThread(ctx context.Context, cmd SyncRemoteThreadCommand) (*SyncRemoteThreadResult, error)

SyncRemoteThread synchronizes a remote thread by fetching it and building the tree

type StatusRepository

type StatusRepository interface {
	GetReplies(ctx context.Context, parentStatusID string, opts interfaces.PaginationOptions) (*interfaces.PaginatedResult[*models.Status], error)
}

StatusRepository defines minimal interface for status operations

type SyncMissingRepliesCommand

type SyncMissingRepliesCommand struct {
	NoteID   string
	ViewerID string
}

SyncMissingRepliesCommand represents a command to sync missing replies

type SyncMissingRepliesResult

type SyncMissingRepliesResult struct {
	Success       bool
	SyncedReplies int
	Errors        []string
}

SyncMissingRepliesResult represents the result of syncing missing replies

type SyncRemoteThreadCommand

type SyncRemoteThreadCommand struct {
	NoteURL      string
	Depth        int
	ViewerID     string
	ForceRefresh bool
}

SyncRemoteThreadCommand represents a command to sync a remote thread

type SyncRemoteThreadResult

type SyncRemoteThreadResult struct {
	Success     bool
	ThreadRoot  *activitypub.Note
	SyncedPosts int
	ErrorCount  int
	Errors      []string
	SyncStatus  string
}

SyncRemoteThreadResult represents the result of a thread sync operation

type ThreadContextQuery

type ThreadContextQuery struct {
	NoteID      string
	ViewerID    string
	IncludeTree bool
}

ThreadContextQuery represents a query for thread context

type ThreadContextResult

type ThreadContextResult struct {
	RootNote         *activitypub.Note
	RequestedNote    *activitypub.Note
	Ancestors        []*activitypub.Note
	Descendants      []*models.ThreadNode
	ParticipantCount int
	ReplyCount       int
	MissingCount     int
	LastActivity     time.Time
	SyncStatus       string
}

ThreadContextResult represents the complete context of a thread

type ThreadRepository

type ThreadRepository interface {
	SaveThreadSync(ctx context.Context, sync *models.ThreadSync) error
	GetThreadSync(ctx context.Context, statusID string) (*models.ThreadSync, error)
	SaveThreadNode(ctx context.Context, node *models.ThreadNode) error
	GetThreadNodes(ctx context.Context, rootStatusID string) ([]*models.ThreadNode, error)
	GetThreadNode(ctx context.Context, rootStatusID, statusID string) (*models.ThreadNode, error)
	GetThreadNodeByStatusID(ctx context.Context, statusID string) (*models.ThreadNode, error)
	MarkMissingReplies(ctx context.Context, rootStatusID, parentStatusID string, replyIDs []string) error
	GetMissingReplies(ctx context.Context, rootStatusID string) ([]*models.MissingReply, error)
	GetThreadContext(ctx context.Context, statusID string) (*repositories.ThreadContextResult, error)
	SaveMissingReply(ctx context.Context, missing *models.MissingReply) error
	DeleteMissingReply(ctx context.Context, rootStatusID, replyID string) error
}

ThreadRepository defines the interface for thread storage operations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL