control

package
v4.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Overview

Package control holds the implementation of the Slack Stream controller. It runs the API scraping in several goroutines and manages the data flow between them. It records the output of the API scraper into a chunk directory. It also manages the transformation of the data, if the caller is interested in it.

Index

Constants

This section is empty.

Variables

View Source
var ErrNoUsers = errors.New("no users returned")

Functions

This section is empty.

Types

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, s Streamer, erc EncodeReferenceCloser, opts ...Option) (*Controller, error)

New creates a new generic Controller, that accepts EncodeReferenceCloser. Once the [Control.Close] is called it closes all processors, including the EncodeReferenceCloser.

func (*Controller) Close

func (c *Controller) Close() error

Close closes the controller and all its file processors.

func (*Controller) Run

func (c *Controller) Run(ctx context.Context, list *structures.EntityList) error

Run starts the scraping of the Slack API. The [EntityList] is used to determine which entities to scrape. The [EntityList] can be created with the structures.NewEntityList function.

func (*Controller) RunNoTransform

func (c *Controller) RunNoTransform(ctx context.Context, list *structures.EntityList) error

RunNoTransform is similar to [Run] but does not apply any transformation to the data. Call this if you don't need to track channel completion etc.

func (*Controller) Search

func (c *Controller) Search(ctx context.Context, query string, stype SearchType) error

Search starts the search for the query string. The search type is defined by the SearchType parameter. The search is done in parallel for messages and files.

type DirController deprecated

type DirController struct {
	// contains filtered or unexported fields
}

DirController is the main controller of the Slack Stream. It runs the API scraping in several goroutines and manages the data flow between them.

Deprecated: use [Control] instead.

func NewDir deprecated

func NewDir(cd *chunk.Directory, s Streamer, opts ...Option) *DirController

NewDir creates a new DirController. Once the [Control.Close] is called it closes all file processors.

Deprecated: use New instead.

func (*DirController) Close

func (c *DirController) Close() error

Close closes the controller and all its file processors.

func (*DirController) Run

type EncodeReferenceCloser

type EncodeReferenceCloser interface {
	chunk.Encoder
	ReferenceChecker
	io.Closer
}

EncodeReferenceCloser is an interface that combines the chunk.Encoder, ReferenceChecker and io.Closer interfaces.

type Error

type Error struct {
	// Subroutine is the name of the subroutine that failed.
	Subroutine string
	// Stage is the stage of the subroutine that failed.
	Stage Stage
	// Err is the error that caused the failure.
	Err error
}

Error is a controller error.

func (Error) Error

func (e Error) Error() string

func (Error) Unwrap

func (e Error) Unwrap() error

type ExportTransformer

type ExportTransformer interface {
	chunk.Transformer
	TransformStarter
}

ExportTransformer is a transformer that can be started with a list of users. The compound nature of this interface is called by the asynchronous nature of execution and the fact that we need to start the transformer after Users goroutine is done, which can happen any time after the Run has started.

type Flags

type Flags struct {
	// MemberOnly is the flag to fetch only those channels where the user is a
	// member.
	MemberOnly bool
	// RecordFiles instructs directory processor to record the files as chunks.
	RecordFiles bool
	// Refresh is to fetch additional channels from the API in addition to
	// those provided in the list.  It's useful when the list is
	// incomplete or outdated.
	Refresh bool
	// ChannelUsers is the flag to fetch only users involved in the channel,
	// and skip fetching of all users.
	ChannelUsers bool
	// ChannelTypes is the list of channel types to fetch.  If empty, all
	// channel types are fetched.
	ChannelTypes []string
	// IncludeLabels requests API to include the labels for the custom fields.
	// works only with ChannelUsers. Server may throttle requests hard.
	IncludeLabels bool
}

Flags are the controller flags.

type Option

type Option func(*options)

Option is a functional option for the Controller.

func WithAvatarProcessor

func WithAvatarProcessor(avp processor.Avatars) Option

WithAvatarProcessor configures the controller with an avatar downloader.

func WithCoordinator

func WithCoordinator(tf ExportTransformer) Option

WithCoordinator configures the controller with a transformer.

func WithFiler

func WithFiler(f processor.Filer) Option

WithFiler configures the controller with a file subprocessor.

func WithFlags

func WithFlags(f Flags) Option

WithFlags configures the controller with flags.

func WithLogger

func WithLogger(lg *slog.Logger) Option

WithLogger configures the controller with a logger.

type ReferenceChecker

type ReferenceChecker interface {
	// IsComplete should return true, if all messages and threads for the
	// channel has been processed.
	IsComplete(ctx context.Context, channelID string) (bool, error)
	// IsCompleteThread should return true, if all messages in the thread
	// for thread-only list entry have been processed.  The behaviour of
	// this function is undefined for non-thread-only list entries.
	IsCompleteThread(ctx context.Context, channelID string, threadID string) (bool, error)
}

ReferenceChecker is an interface that contains functions to check if all messages for the channel were processed.

type SearchType

type SearchType int
const (
	SMessages SearchType = 1 << iota
	SFiles
)

type Stage

type Stage string

Stage is the stage controller that failed.

const (
	// StgGenerator is the generator stage.
	StgGenerator Stage = "generator"
	// StgWorker is the worker stage.
	StgWorker Stage = "worker"
)

type Streamer

type Streamer interface {
	Conversations(ctx context.Context, proc processor.Conversations, links <-chan structures.EntityItem) error
	ListChannels(ctx context.Context, proc processor.Channels, p *slack.GetConversationsParameters) error
	ListChannelsEx(ctx context.Context, proc processor.Channels, p *slack.GetConversationsParameters, onlyMyChannels bool) error
	Users(ctx context.Context, proc processor.Users, opt ...slack.GetUsersOption) error
	WorkspaceInfo(ctx context.Context, proc processor.WorkspaceInfo) error
	SearchMessages(ctx context.Context, proc processor.MessageSearcher, query string) error
	SearchFiles(ctx context.Context, proc processor.FileSearcher, query string) error
	UsersBulk(ctx context.Context, proc processor.Users, ids ...string) error
	UsersBulkWithCustom(ctx context.Context, proc processor.Users, includeLabels bool, ids ...string) error
}

Streamer is the interface for the API scraper.

type TransformStarter

type TransformStarter interface {
	StartWithUsers(ctx context.Context, users []slack.User) error
}

Directories

Path Synopsis
Package mock_control is a generated GoMock package.
Package mock_control is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL