chunk

package
v4.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: AGPL-3.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UploadsDir = "__uploads" // for serving files
	AvatarsDir = "__avatars"
)
View Source
const (
	ChunkExt = ".json.gz"
)

file extensions

Variables

View Source
var (
	ErrNotFound       = errors.New("not found")
	ErrNoData         = errors.New("no data")
	ErrDataMisaligned = errors.New("internal error: index and file data misaligned")
)
View Source
var ErrNoChannelUsers = errors.New("no channel users")
View Source
var ErrUnsupChunkType = fmt.Errorf("unsupported chunk type")

Functions

This section is empty.

Types

type Addr

type Addr struct {
	Offset int64 // offset within the chunk file
	Index  int16 // index of the message within the messages slice in the chunk
}

Addr is the address of a particular message within a chunk file.

type Chunk

type Chunk struct {
	// header
	// Type is the type of the Chunk
	Type ChunkType `json:"t"`
	// Timestamp when the chunk was recorded.
	Timestamp int64 `json:"ts"`
	// ChannelID that this chunk relates to.
	ChannelID string `json:"id,omitempty"`
	// Count is the count of elements in the chunk, i.e. messages or files.
	Count int32 `json:"n,omitempty"`

	// NumThreads is the number of threads in the message chunk.
	NumThreads int32 `json:"nt,omitempty"`
	// ThreadTS is populated if the chunk contains thread related data.  It
	// is Slack's thread_ts.
	ThreadTS string `json:"r,omitempty"`

	IsLast bool `json:"l,omitempty"`
	// ThreadOnly is set to true if the chunk was generated as a result
	// of thread only scraping.
	ThreadOnly bool `json:"to,omitempty"`

	// Channel contains the channel information.  Within the chunk file, it
	// may not be immediately followed by messages from the channel due to
	// concurrent nature of the calls.
	//
	// Populated by ChannelInfo and Files methods.
	Channel *slack.Channel `json:"ci,omitempty"`

	// ChannelUsers contains the user IDs of the users in the channel.
	ChannelUsers []string `json:"cu,omitempty"` // Populated by ChannelUsers

	// Parent is populated in case the chunk is a thread, or a file. Populated
	// by ThreadMessages and Files methods.
	Parent *slack.Message `json:"p,omitempty"`
	// Messages contains a chunk of messages as returned by the API. Populated
	// by Messages and ThreadMessages methods.
	Messages []slack.Message `json:"m,omitempty"`
	// Files contains a chunk of files as returned by the API. Populated by
	// Files method.
	Files []slack.File `json:"f,omitempty"`

	// Users contains a chunk of users as returned by the API. Populated by
	// Users method.
	Users []slack.User `json:"u,omitempty"`
	// Channels contains a chunk of channels as returned by the API. Populated
	// by Channels method.
	Channels []slack.Channel `json:"ch,omitempty"`
	// WorkspaceInfo contains the workspace information as returned by the
	// API.  Populated by WorkspaceInfo.
	WorkspaceInfo *slack.AuthTestResponse `json:"w,omitempty"`
	// StarredItems contains the starred items.
	StarredItems []slack.StarredItem `json:"st,omitempty"` // Populated by StarredItems
	// Bookmarks contains the bookmarks.
	Bookmarks []slack.Bookmark `json:"b,omitempty"` // Populated by Bookmarks
	// SearchQuery contains the search query.
	SearchQuery string `json:"sq,omitempty"` // Populated by SearchMessages and SearchFiles.
	// SearchMessages contains the search results.
	SearchMessages []slack.SearchMessage `json:"sm,omitempty"` // Populated by SearchMessages
	// SearchFiles contains the search results.
	SearchFiles []slack.File `json:"sf,omitempty"` // Populated by SearchFiles
}

Chunk is a representation of a single chunk of data retrieved from the API. A single API call always produces a single Chunk.

func (*Chunk) ID

func (c *Chunk) ID() GroupID

ID returns a Group ID for the chunk.

func (*Chunk) String

func (c *Chunk) String() string

func (*Chunk) Timestamps

func (c *Chunk) Timestamps() ([]int64, error)

Timestamps returns the timestamps of the messages in the chunk. For files and other types of chunks, it returns ErrUnsupChunkType.

type ChunkType

type ChunkType uint8

ChunkType is the type of chunk that was recorded..

const (
	CMessages ChunkType = iota
	CThreadMessages
	CFiles
	CUsers
	CChannels
	CChannelInfo
	CWorkspaceInfo
	CChannelUsers
	CStarredItems
	CBookmarks
	CSearchMessages
	CSearchFiles
)

func (ChunkType) String

func (i ChunkType) String() string

type DirOption

type DirOption func(*Directory)

func WithCache

func WithCache(enabled bool) DirOption

func WithNumWorkers

func WithNumWorkers(n int) DirOption

type Directory

type Directory struct {
	// contains filtered or unexported fields
}

Directory is an abstraction over the directory with chunk files. It provides a way to write chunk files and read channels, users and messages across many the chunk files. All functions that require a name, except functions with suffix RAW, will append an extension to the name automatically (".json.gz"). *RAW functions expect the full name of the file with the extension. All files created by this package will be compressed with GZIP, unless stated otherwise.

func CreateDir

func CreateDir(dir string) (*Directory, error)

CreateDir creates and opens a directory. It will create all parent directories if they don't exist.

func OpenDir

func OpenDir(dir string, opt ...DirOption) (*Directory, error)

OpenDir "opens" an existing directory for read and write operations. It expects the directory to exist and to be a directory, otherwise it will return an error.

func (*Directory) AllMessages

func (d *Directory) AllMessages(ctx context.Context, channelID string) ([]slack.Message, error)

func (*Directory) AllThreadMessages

func (d *Directory) AllThreadMessages(_ context.Context, channelID, threadID string) ([]slack.Message, error)

func (*Directory) ChannelInfo

func (d *Directory) ChannelInfo(ctx context.Context, id string) (*slack.Channel, error)

func (*Directory) Channels

func (d *Directory) Channels(ctx context.Context) ([]slack.Channel, error)

Channels collects all channels from the chunk directory. First, it attempts to find the channel.json.gz file, if it's not present, it will go through all conversation files and try to get "ChannelInfo" chunk from each file.

func (*Directory) Close

func (d *Directory) Close() error

Close closes the directory and all open files.

func (*Directory) Create

func (d *Directory) Create(fileID FileID) (io.WriteCloser, error)

Create creates the chunk file with the given name. Extension is appended automatically.

Example:

cd, _ := chunk.OpenDirectory("chunks")
f, _ := cd.Create("channels") // creates channels.json.gz

It will NOT overwrite an existing file and will return an error if the file exists.

func (*Directory) FastAllMessages

func (d *Directory) FastAllMessages(ctx context.Context, channelID string) ([]slack.Message, error)

func (*Directory) FastAllThreadMessages

func (d *Directory) FastAllThreadMessages(channelID, threadID string) ([]slack.Message, error)

func (*Directory) File

func (d *Directory) File(id string, name string) (fs.File, error)

File returns the file with the given id and name.

func (*Directory) Latest

func (d *Directory) Latest(ctx context.Context) (map[GroupID]time.Time, error)

Latest returns the latest timestamps for the channels and threads in the directory.

func (*Directory) Name

func (d *Directory) Name() string

Name returns the full directory path.

func (*Directory) Open

func (d *Directory) Open(id FileID) (*File, error)

Open opens a chunk file with the given name. Extension is appended automatically.

func (*Directory) OpenRAW

func (d *Directory) OpenRAW(filename string) (io.ReadSeekCloser, error)

OpenRAW opens a compressed chunk file with filename within the directory, and returns a ReadSeekCloser. filename is the full name of the file with extension.

func (*Directory) RemoveAll

func (d *Directory) RemoveAll() error

RemoveAll deletes the directory and all its contents. Make sure all files are closed.

func (*Directory) Sorted

func (d *Directory) Sorted(ctx context.Context, channelID string, desc bool, cb func(ts time.Time, msg *slack.Message) error) error

func (*Directory) Stat

func (d *Directory) Stat(id FileID) (fs.FileInfo, error)

func (*Directory) ToChunk

func (d *Directory) ToChunk(ctx context.Context, enc Encoder, _ int64) error

ToChunk writes all chunks from the directory to the encoder.

func (*Directory) Users

func (d *Directory) Users() ([]slack.User, error)

Users returns the collected users from the directory.

func (*Directory) Walk

func (d *Directory) Walk(fn func(name string, f *File, err error) error) error

Walk iterates over all chunk files in the directory and calls the function for each file. If the function returns an error, the iteration stops. It does not close files after the callback is called, so it's a caller's responsibility to close it.

func (*Directory) WalkSync

func (d *Directory) WalkSync(fn func(name string, f *File, err error) error) error

WalkSync is the same as Walk, but it closes the file after the callback is called.

func (*Directory) WorkspaceInfo

func (d *Directory) WorkspaceInfo() (*slack.AuthTestResponse, error)

WorkspaceInfo returns the workspace info from the directory.

type Encoder

type Encoder interface {
	Encode(ctx context.Context, chunk *Chunk) error
}

Encoder is the interface that wraps the Encode method.

type File

type File struct {
	// contains filtered or unexported fields
}

File is the catalog of chunks in a file.

func FromReader

func FromReader(rs io.ReadSeeker) (*File, error)

FromReader creates a new chunk File from the io.ReadSeeker.

func (*File) AllChannelIDs

func (p *File) AllChannelIDs() []string

AllChannelIDs returns all the channels in the chunkfile.

func (*File) AllChannelInfos

func (f *File) AllChannelInfos() ([]slack.Channel, error)

AllChannelInfos returns all the channel information collected by the channel info API.

func (*File) AllChannels

func (p *File) AllChannels() ([]slack.Channel, error)

AllChannels returns all channels collected by listing channels in the dump file.

func (*File) AllMessages

func (f *File) AllMessages(_ context.Context, channelID string) ([]slack.Message, error)

AllMessages returns all the messages for the given channel posted to it (no thread). The messages are in the order as they appear in the file.

func (*File) AllThreadMessages

func (f *File) AllThreadMessages(channelID, threadTS string) ([]slack.Message, error)

AllThreadMessages returns all the messages for the given thread. It does not return the parent message in the result, use File.ThreadParent for that. The messages are in the order as they appear in the file.

func (*File) AllUsers

func (p *File) AllUsers() ([]slack.User, error)

AllUsers returns all users in the dump file.

func (*File) ChannelInfo

func (f *File) ChannelInfo(channelID string) (*slack.Channel, error)

ChannelInfo returns the information for the given channel.

func (*File) ChannelUsers

func (f *File) ChannelUsers(channelID string) ([]string, error)

func (*File) Close

func (f *File) Close() error

Close closes the underlying reader if it implements io.Closer.

func (*File) ForEach

func (f *File) ForEach(fn func(ev *Chunk) error) error

ForEach iterates over the chunks in the reader and calls the function for each chunk. It will lock the file until it finishes.

func (*File) HasChannels

func (f *File) HasChannels() bool

HasChannels returns true if there is at least one channel chunk in the file.

func (*File) HasChunks

func (f *File) HasChunks(id GroupID) bool

HasChunks returns true if there is at least one chunk for the given id.

func (*File) HasUsers

func (f *File) HasUsers() bool

HasUsers returns true if there is at least one user chunk in the file.

func (*File) Latest

func (f *File) Latest(ctx context.Context) (map[GroupID]time.Time, error)

Latest returns the latest timestamps for the channels and threads in the file.

func (*File) Sorted

func (f *File) Sorted(ctx context.Context, chanID string, desc bool, fn func(ts time.Time, m *slack.Message) error) error

Sorted iterates over all the messages in the chunkfile in chronological order for the requested chanID. If desc is true, the slice will be iterated in reverse order. It does not differentiate between the channel and thread messages. The function fn is called for each message in the slice. If the function returns an error, the iteration stops and the error is returned.

func (*File) ThreadParent

func (f *File) ThreadParent(channelID, threadTS string) (*slack.Message, error)

ThreadParent returns the thread parent message for the given thread. It returns ErrNotFound if the thread is not found.

func (*File) WorkspaceInfo

func (f *File) WorkspaceInfo() (*slack.AuthTestResponse, error)

WorkspaceInfo returns the workspace info from the chunkfile.

type FileID

type FileID string

FileID is the ID of the file within the Directory (it's basically the file name without an extension).

const (
	FChannels  FileID = "channels"
	FUsers     FileID = "users"
	FWorkspace FileID = "workspace"
	FSearch    FileID = "search"
)

common filenames

func LinkToFileID

func LinkToFileID(sl structures.SlackLink, includeThread bool) FileID

LinkToFileID converts the SlackLink to file ID. If includeThread is true and the thread timestamp is not empty, the thread timestamp will be appended to the channel ID. Otherwise, only the channel ID will be returned.

func ToFileID

func ToFileID(channelID, threadTS string, includeThread bool) FileID

ToFileID returns the file ID for the given channel and thread timestamp. If includeThread is true and threadTS is not empty, the thread timestamp will be appended to the channel ID. Otherwise, only the channel ID will be returned.

func (id FileID) SlackLink() structures.SlackLink

SlackLink returns the SlackLink for the file ID. If the file ID doesn't contain the thread timestamp, the thread timestamp will be empty.

func (FileID) Split

func (id FileID) Split() (channelID, threadTS string)

Split splits the file ID into channel ID and thread timestamp. If the file ID doesn't contain the thread timestamp, the thread timestamp will be empty.

func (FileID) String

func (id FileID) String() string

type GroupID

type GroupID string

GroupID is a unique ID for a chunk group. It is used to group chunks of the same type together for indexing purposes. It may or may not be equal to the Slack ID of the entity.

func (GroupID) AsChannelID

func (id GroupID) AsChannelID() (channelID string, ok bool)

asChannelID returns the channel ID from the GroupID. If the GroupID is not a channel ID, it returns false.

func (GroupID) AsThreadID

func (id GroupID) AsThreadID() (channelID, threadTS string, ok bool)

asThreadID returns the channelID and threadTS from the GroupID. If the GroupID is not a thread ID, it returns false.

func (GroupID) ExtractChannelID

func (id GroupID) ExtractChannelID() (channelID string, ok bool)

ExtractChannelID attempts to extract the channel ID from the GroupID if it is a channel or a thread ID. Otherwise, ok will be false.

func (GroupID) IsChannel

func (id GroupID) IsChannel() bool

func (GroupID) IsThread

func (id GroupID) IsThread() bool

type NopTransformer

type NopTransformer struct{}

NopTransformer is a transformer that does nothing.

func (*NopTransformer) Transform

type Option

type Option func(r *Recorder)

Option is a function that configures the Recorder.

func WithEncoder

func WithEncoder(enc Encoder) Option

WithEncoder allows you to specify a custom encoder to use for the chunks. By default json.Encoder is used.

type Player

type Player struct {
	// contains filtered or unexported fields
}

Player replays the chunks from a file, it is able to emulate the API responses, if used in conjunction with the [proctest.Server]. Zero value is not usable.

func NewPlayer

func NewPlayer(rs io.ReadSeeker) (*Player, error)

func NewPlayerFromFile

func NewPlayerFromFile(cf *File) *Player

func (*Player) ChannelInfo

func (p *Player) ChannelInfo(id string) (*slack.Channel, error)

ChannelInfo returns the channel information for the given channel. It returns an error if the channel is not found within the chunkfile.

func (*Player) ChannelUsers

func (p *Player) ChannelUsers(channelID string) ([]string, error)

func (*Player) Channels

func (p *Player) Channels() ([]slack.Channel, error)

Channels returns the next channels chunk.

func (*Player) Close

func (p *Player) Close() error

func (*Player) HasChannels

func (p *Player) HasChannels() bool

func (*Player) HasMoreChannelUsers

func (p *Player) HasMoreChannelUsers(channelID string) bool

func (*Player) HasMoreChannels

func (p *Player) HasMoreChannels() bool

func (*Player) HasMoreMessages

func (p *Player) HasMoreMessages(channelID string) bool

HasMoreMessages returns true if there are more messages to be read for the channel.

func (*Player) HasMoreThreads

func (p *Player) HasMoreThreads(channelID string, threadTS string) bool

func (*Player) HasUsers

func (p *Player) HasUsers() bool

HasUsers returns true if there is at least one user chunk in the file.

func (*Player) Messages

func (p *Player) Messages(channelID string) ([]slack.Message, error)

Messages returns the next message chunk for the given channel.

func (*Player) Offset

func (p *Player) Offset() int64

Offset returns the last read offset of the record in ReadSeeker.

func (*Player) Reset

func (p *Player) Reset() error

Reset resets the state of the Player.

func (*Player) SetState

func (p *Player) SetState(ptrs map[GroupID]int)

func (*Player) State

func (p *Player) State() map[GroupID]int

func (*Player) Thread

func (p *Player) Thread(channelID string, threadTS string) ([]slack.Message, error)

Thread returns the messages for the given thread.

func (*Player) ThreadChannelInfo

func (p *Player) ThreadChannelInfo(id string) (*slack.Channel, error)

func (*Player) Users

func (p *Player) Users() ([]slack.User, error)

Users returns the next users chunk.

func (*Player) WorkspaceInfo

func (p *Player) WorkspaceInfo() (*slack.AuthTestResponse, error)

type Recorder

type Recorder struct {
	// contains filtered or unexported fields
}

Recorder records all the data it receives into a writer.

func NewCustomRecorder

func NewCustomRecorder(enc Encoder, options ...Option) *Recorder

NewCustomRecorder creates a new recorder with a custom encoder.

func NewRecorder

func NewRecorder(w io.Writer, options ...Option) *Recorder

NewRecorder creates a new recorder to writer.

func (*Recorder) ChannelInfo

func (rec *Recorder) ChannelInfo(ctx context.Context, channel *slack.Channel, threadTS string) error

ChannelInfo records a channel information. threadTS should be set to threadTS, if ChannelInfo is called while streaming a thread (user requested a thread).

func (*Recorder) ChannelUsers

func (rec *Recorder) ChannelUsers(ctx context.Context, channelID string, threadTS string, users []string) error

ChannelUsers records the channel users

func (*Recorder) Channels

func (rec *Recorder) Channels(ctx context.Context, channels []slack.Channel) error

Channel records a slice of channels.

func (*Recorder) Close

func (rec *Recorder) Close() error

Close closes the recorder (it's a noop for now).

func (*Recorder) Files

func (rec *Recorder) Files(ctx context.Context, channel *slack.Channel, parent slack.Message, f []slack.File) error

Files is called for each file chunk that is retrieved. The parent message is passed in as well.

func (*Recorder) Messages

func (rec *Recorder) Messages(ctx context.Context, channelID string, numThreads int, isLast bool, m []slack.Message) error

Messages is called for each message chunk that is retrieved.

func (*Recorder) SearchFiles

func (rec *Recorder) SearchFiles(ctx context.Context, query string, sf []slack.File) error

SearchMessages records the result of a file search.

func (*Recorder) SearchMessages

func (rec *Recorder) SearchMessages(ctx context.Context, query string, sm []slack.SearchMessage) error

SearchMessages records the result of a message search.

func (*Recorder) ThreadMessages

func (rec *Recorder) ThreadMessages(ctx context.Context, channelID string, parent slack.Message, threadOnly, isLast bool, tm []slack.Message) error

ThreadMessages is called for each of the thread messages that are retrieved. The parent message is passed in as well.

func (*Recorder) Users

func (rec *Recorder) Users(ctx context.Context, users []slack.User) error

Users records a slice of users.

func (*Recorder) WorkspaceInfo

func (rec *Recorder) WorkspaceInfo(ctx context.Context, atr *slack.AuthTestResponse) error

WorkspaceInfo is called when workspace info is retrieved.

type Result

type Result[T any] struct {
	Err error
	Val T
}

Result is the iterator result

func (Result[T]) Error

func (r Result[T]) Error() string

func (Result[T]) Unwrap

func (r Result[T]) Unwrap() error

type Transformer

type Transformer interface {
	// Transform is the function that starts the transformation of the channel
	// or thread with the given id.  It is called  when the reference count for
	// the channel id becomes zero (meaning, that there are no more chunks to
	// process).  It should return [transform.ErrClosed] if the transformer is
	// closed.
	Transform(ctx context.Context, channelID, threadID string) error
}

Transformer is an interface that is called when the processor is finished processing a channel or thread.

Directories

Path Synopsis
backend
dbase/repository/mock_repository
Package mock_repository is a generated GoMock package.
Package mock_repository is a generated GoMock package.
directory
Package directory is a processor that writes the data into gzipped files in a directory.
Package directory is a processor that writes the data into gzipped files in a directory.
Package chunktest provides a test server for testing the chunk package.
Package chunktest provides a test server for testing the chunk package.
Package control holds the implementation of the Slack Stream controller.
Package control holds the implementation of the Slack Stream controller.
mock_control
Package mock_control is a generated GoMock package.
Package mock_control is a generated GoMock package.
Package mock_chunk is a generated GoMock package.
Package mock_chunk is a generated GoMock package.
Package obfuscate obfuscates a slackdump chunk recording.
Package obfuscate obfuscates a slackdump chunk recording.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL