Documentation
¶
Index ¶
- Constants
- Variables
- func Acquire(ctx context.Context) (*pgxpool.Conn, error)
- func AssertDatabaseVersion(pool *pgxpool.Pool, version int) error
- func Connect(opts *ConnectOpts) error
- func ConnectTest(ctx context.Context) error
- func ConnectionFromContext(ctx context.Context) *pgxpool.Conn
- func ContextWithConnection(ctx context.Context, conn *pgxpool.Conn) context.Context
- func CountCommandsError(ctx context.Context, tx pgx.Tx) (int, error)
- func CountCommandsRequested(ctx context.Context, tx pgx.Tx) (int, error)
- func CountCommandsSuccess(ctx context.Context, tx pgx.Tx) (int, error)
- func CountCommandsWithState(ctx context.Context, tx pgx.Tx, state string) (int, error)
- func DeleteMeetingStateByID(ctx context.Context, tx pgx.Tx, id string) error
- func DeleteMeetingStateByInternalID(ctx context.Context, tx pgx.Tx, id string) error
- func DeleteOrphanMeetings(ctx context.Context, tx pgx.Tx, backendID string, backendMeetings []string) (int64, error)
- func DeleteRecordingByID(ctx context.Context, tx pgx.Tx, recordID string) error
- func GetRecordingTextTracks(ctx context.Context, tx pgx.Tx, recordID string) ([]*bbb.TextTrack, error)
- func LookupFrontendIDByMeetingID(ctx context.Context, tx pgx.Tx, meetingID string) (string, bool, error)
- func NewDelete() sq.DeleteBuilder
- func NewQuery() sq.SelectBuilder
- func NextDeadline(dt time.Duration) time.Time
- func Q() sq.SelectBuilder
- func QueryRecordingsByFrontendKey(frontendKey string) sq.SelectBuilder
- func QueueCommand(ctx context.Context, tx pgx.Tx, cmd *Command) error
- func RemoveStaleFrontendMeetings(ctx context.Context, tx pgx.Tx, t time.Time) error
- func SQLSafeParam(p string) string
- func SetRecordingTextTracks(ctx context.Context, tx pgx.Tx, recordID string, tracks []*bbb.TextTrack) error
- type AgentHeartbeat
- type BackendSettings
- type BackendState
- func (s *BackendState) ClearMeetings(ctx context.Context, tx pgx.Tx) error
- func (s *BackendState) CreateMeetingState(ctx context.Context, tx pgx.Tx, frontend *bbb.Frontend, meeting *bbb.Meeting) (*MeetingState, error)
- func (s *BackendState) CreateOrUpdateMeetingState(ctx context.Context, tx pgx.Tx, meeting *bbb.Meeting) error
- func (s *BackendState) Delete(ctx context.Context, tx pgx.Tx) error
- func (s *BackendState) IsAgentAlive() bool
- func (s *BackendState) IsNodeReady() bool
- func (s *BackendState) Refresh(ctx context.Context, tx pgx.Tx) error
- func (s *BackendState) Save(ctx context.Context, tx pgx.Tx) error
- func (s *BackendState) UpdateAgentHeartbeat(ctx context.Context, tx pgx.Tx) (*AgentHeartbeat, error)
- func (s *BackendState) UpdateStatCounters(ctx context.Context, tx pgx.Tx) error
- func (s *BackendState) Validate() ValidationError
- type Command
- type CommandHandler
- type CommandQueue
- type ConnectOpts
- type DefaultPresentationSettings
- type FrontendSettings
- type FrontendState
- type MeetingState
- func AwaitMeetingState(ctx context.Context, conn *pgxpool.Conn, q sq.SelectBuilder) (*MeetingState, pgx.Tx, error)
- func GetMeetingState(ctx context.Context, tx pgx.Tx, q sq.SelectBuilder) (*MeetingState, error)
- func GetMeetingStateByID(ctx context.Context, tx pgx.Tx, id string) (*MeetingState, error)
- func GetMeetingStates(ctx context.Context, tx pgx.Tx, q sq.SelectBuilder) ([]*MeetingState, error)
- func InitMeetingState(init *MeetingState) *MeetingState
- func (s *MeetingState) BindFrontendID(ctx context.Context, tx pgx.Tx, id string) error
- func (s *MeetingState) GetBackendState(ctx context.Context, tx pgx.Tx) (*BackendState, error)
- func (s *MeetingState) GetFrontendState(ctx context.Context, tx pgx.Tx) (*FrontendState, error)
- func (s *MeetingState) IsStale(threshold time.Duration) bool
- func (s *MeetingState) MarkSynced()
- func (s *MeetingState) Refresh(ctx context.Context, tx pgx.Tx) error
- func (s *MeetingState) Save(ctx context.Context, tx pgx.Tx) error
- func (s *MeetingState) SetBackendID(ctx context.Context, tx pgx.Tx, id string) error
- func (s *MeetingState) Upsert(ctx context.Context, tx pgx.Tx) (string, error)
- type RecordingState
- func GetRecordingState(ctx context.Context, tx pgx.Tx, q sq.SelectBuilder) (*RecordingState, error)
- func GetRecordingStateByID(ctx context.Context, tx pgx.Tx, recordID string) (*RecordingState, error)
- func GetRecordingStates(ctx context.Context, tx pgx.Tx, q sq.SelectBuilder) ([]*RecordingState, error)
- func InitRecordingState(init *RecordingState) *RecordingState
- func NewStateFromRecording(recording *bbb.Recording) *RecordingState
- func (s *RecordingState) Delete(ctx context.Context, tx pgx.Tx) error
- func (s *RecordingState) DeleteFiles() error
- func (s *RecordingState) Exists(ctx context.Context, tx pgx.Tx) (bool, error)
- func (s *RecordingState) Merge(other *RecordingState) error
- func (s *RecordingState) PublishFiles() error
- func (s *RecordingState) Save(ctx context.Context, tx pgx.Tx) error
- func (s *RecordingState) SetFrontendID(ctx context.Context, tx pgx.Tx, frontendID string) error
- func (s *RecordingState) SetTextTracks(ctx context.Context, tx pgx.Tx, tracks []*bbb.TextTrack) error
- func (s *RecordingState) UnpublishFiles() error
- type RecordingsStorage
- func (s *RecordingsStorage) Check() error
- func (s *RecordingsStorage) ListThumbnailFiles(recordID string) []string
- func (s *RecordingsStorage) MakeRecordingPreview(recordID string) *bbb.Preview
- func (s *RecordingsStorage) PublishedRecordingPath(id string) string
- func (s *RecordingsStorage) UnpublishedRecordingPath(id string) string
- type Tags
- type ValidationError
Constants ¶
const (
// ErrFieldRequired is an error message if a value is missing
ErrFieldRequired = "this field is required"
)
Variables ¶
var ( // ErrNotInitialized will be returned if the // database pool is accessed before initializing using // Connect. ErrNotInitialized = errors.New("store not initialized, pool not ready") // ErrMaxConnsUnconfigured will be returned, if the // the maximum connections are zero. ErrMaxConnsUnconfigured = errors.New("MaxConns not configured") )
var ( ErrNoBackend = errors.New("no backend associated with meeting") ErrDeadlineRequired = errors.New("the operation requires a dealine") )
MeetingStateErrors
var (
ErrFrontendRequired = errors.New("meeting requires a frontend state")
)
Errors
var ( // ErrNoConnectionInConfig will occure when the connection // is not available in the context. ErrNoConnectionInConfig = errors.New("connection missing in context") )
Errors
var ( // ErrRecordingsStorageUnconfigured will be returned, when // the environment variables for the published and unpublished // recording path are missing. ErrRecordingsStorageUnconfigured = errors.New( "environment for " + config.EnvRecordingsPublishedPath + " or " + config.EnvRecordingsUnpublishedPath + " is not set") )
var ( // ReMatchParamSQLUnsafe will match anything // NOT a-Z, 0-9, '_' and '-'. ReMatchParamSQLUnsafe = regexp.MustCompile(`[^a-zA-Z0-9_-]`) )
Functions ¶
func AssertDatabaseVersion ¶
AssertDatabaseVersion tests if the current version of the database is equal to a required version
func Connect ¶
func Connect(opts *ConnectOpts) error
Connect initializes the connection pool and checks the schema version of the database.
func ConnectTest ¶
ConnectTest to pgx db pool. Use b3scale defaults if environment variable is not set.
func ConnectionFromContext ¶
ConnectionFromContext will retrieve the connection.
func ContextWithConnection ¶
ContextWithConnection will return a child context with a value for connection.
func CountCommandsError ¶
CountCommandsError returns the number of successfully processed commands in the queue.
func CountCommandsRequested ¶
CountCommandsRequested returns the number of unprocessed commands in the queue.
func CountCommandsSuccess ¶
CountCommandsSuccess returns the number of successfully processed commands in the queue.
func CountCommandsWithState ¶
CountCommandsWithState retrievs the number of commands in the queue with a given state. e.g. requested, error, etc.
func DeleteMeetingStateByID ¶
DeleteMeetingStateByID will remove a meeting state. It will succeed, even if no such meeting was present. TODO: merge with DeleteMeetingStateByInternalID
func DeleteMeetingStateByInternalID ¶
DeleteMeetingStateByInternalID will remove a meeting state. It will succeed, even if no such meeting was present.
func DeleteOrphanMeetings ¶
func DeleteOrphanMeetings( ctx context.Context, tx pgx.Tx, backendID string, backendMeetings []string, ) (int64, error)
DeleteOrphanMeetings will remove all meetings not in a list of (internal) meeting IDs, but associated with a backend
func DeleteRecordingByID ¶
DeleteRecordingByID will delete a recording identified by its id.
func GetRecordingTextTracks ¶
func GetRecordingTextTracks( ctx context.Context, tx pgx.Tx, recordID string, ) ([]*bbb.TextTrack, error)
GetRecordingTextTracks retrieves the text tracks from a recording.
func LookupFrontendIDByMeetingID ¶
func LookupFrontendIDByMeetingID( ctx context.Context, tx pgx.Tx, meetingID string, ) (string, bool, error)
LookupFrontendIDByMeetingID queries the frontend_meetings mapping and returns the frontendID for a given meetingID. The function name might be a hint.
func NextDeadline ¶
NextDeadline calculates the deadline for a newly requested command
func QueryRecordingsByFrontendKey ¶
func QueryRecordingsByFrontendKey(frontendKey string) sq.SelectBuilder
QueryRecordingsByFrontendKey creates a query selecting recordings by frontend key. The query can be extended e.g. for filtering by recordingID.
func QueueCommand ¶
QueueCommand adds a new command to the queue
func RemoveStaleFrontendMeetings ¶
RemoveStaleFrontendMeetings removes all frontend meetings older than a threshold.
func SQLSafeParam ¶
SQLSafeParam will remove any unsafe chars from a param potentially used without parameter binding.
Types ¶
type AgentHeartbeat ¶
type AgentHeartbeat struct {
BackendID string `json:"backend_id"`
Heartbeat time.Time `json:"heartbeat"`
}
AgentHeartbeat is a short api response
type BackendSettings ¶
type BackendSettings struct {
Tags Tags `` /* 140-byte string literal not displayed */
}
BackendSettings hold per backend runtime configuration.
type BackendState ¶
type BackendState struct {
ID string `json:"id"`
NodeState string `json:"node_state" doc:"The current state of the node." example:"ready" enum:"init,ready,error,stopped,decommissioned"`
AdminState string `` /* 155-byte string literal not displayed */
AgentHeartbeat time.Time `json:"agent_heartbeat" doc:"The last time we heared from the node agent."`
AgentRef *string `` /* 131-byte string literal not displayed */
LastError *string `json:"last_error" doc:"The last error that happend. For example destination host not reachable."`
Latency time.Duration `json:"latency" doc:"The amount of milliseconds when polling the current node state."`
MeetingsCount uint `json:"meetings_count" doc:"Number of meetings on the backend."`
AttendeesCount uint `json:"attendees_count" doc:"Number of participants in meetings on the backend."`
LoadFactor float64 `` /* 247-byte string literal not displayed */
Backend *bbb.Backend `json:"bbb" api:"BackendConfig"`
Settings BackendSettings `json:"settings"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
SyncedAt time.Time `json:"synced_at"`
}
The BackendState is shared across b3scale instances and encapsulates the list of meetings and recordings. The backend.ID should be used as identifier.
func GetBackendState ¶
func GetBackendState( ctx context.Context, tx pgx.Tx, q sq.SelectBuilder, ) (*BackendState, error)
GetBackendState tries to retriev a single backend state
func GetBackendStates ¶
func GetBackendStates( ctx context.Context, tx pgx.Tx, q sq.SelectBuilder, ) ([]*BackendState, error)
GetBackendStates retrievs all backends
func InitBackendState ¶
func InitBackendState(init *BackendState) *BackendState
InitBackendState initializes a new backend state with an initial state.
func (*BackendState) ClearMeetings ¶
ClearMeetings will remove all meetings in the current state
func (*BackendState) CreateMeetingState ¶
func (s *BackendState) CreateMeetingState( ctx context.Context, tx pgx.Tx, frontend *bbb.Frontend, meeting *bbb.Meeting, ) (*MeetingState, error)
CreateMeetingState will create a new state for the current backend state. A frontend is attached if present.
func (*BackendState) CreateOrUpdateMeetingState ¶
func (s *BackendState) CreateOrUpdateMeetingState( ctx context.Context, tx pgx.Tx, meeting *bbb.Meeting, ) error
CreateOrUpdateMeetingState will try to update the meeting or will create a new meeting state if the meeting does not exists. The new meeting will not be associated with a frontend state - however the meeting can later be claimed by a frontend.
func (*BackendState) IsAgentAlive ¶
func (s *BackendState) IsAgentAlive() bool
IsAgentAlive checks if the heartbeat is older than the threshold
func (*BackendState) IsNodeReady ¶
func (s *BackendState) IsNodeReady() bool
IsNodeReady checks if the agent is alive and the node state is ready
func (*BackendState) UpdateAgentHeartbeat ¶
func (s *BackendState) UpdateAgentHeartbeat( ctx context.Context, tx pgx.Tx, ) (*AgentHeartbeat, error)
UpdateAgentHeartbeat will set the attribute to the current timestamp
func (*BackendState) UpdateStatCounters ¶
UpdateStatCounters counts meetings and attendees and updates the properties
func (*BackendState) Validate ¶
func (s *BackendState) Validate() ValidationError
Validate the backend state
type Command ¶
type Command struct {
ID string `json:"id"`
Seq int `json:"seq"`
State string `json:"state" doc:"The current state of the command." enum:"requested,success,error"`
Action string `json:"action" doc:"The operation to perform." enum:"end_all_meetings"`
Params interface{} `json:"params" doc:"Key value options for the command. See example above."`
Result interface{} `json:"result" doc:"The result of the command. as key value object."`
Deadline time.Time `json:"deadline" doc:"The commands need to be processed before the deadline is reached. The deadline is optional."`
StartedAt *time.Time `json:"started_at"`
StoppedAt *time.Time `json:"stopped_at"`
CreatedAt time.Time `json:"created_at"`
// contains filtered or unexported fields
}
A Command is a representation of an operation
func GetCommand ¶
GetCommand retrievs a command query
func GetCommands ¶
GetCommands retrieves the current command queue. This includes locked.
type CommandHandler ¶
CommandHandler is a callback function for handling commands. The command was successful if no error was returned.
type CommandQueue ¶
type CommandQueue struct {
}
The CommandQueue is connected to the database and provides methods for queuing and dequeuing commands.
func NewCommandQueue ¶
func NewCommandQueue() *CommandQueue
NewCommandQueue initializes a new command queue
func (*CommandQueue) Receive ¶
func (q *CommandQueue) Receive(handler CommandHandler) error
Receive will await a command and will block until a command can be processed. If the handler responds with an error, the error will be returned.
type ConnectOpts ¶
ConnectOpts database connection options
type DefaultPresentationSettings ¶
type DefaultPresentationSettings struct {
URL string `` /* 134-byte string literal not displayed */
Force bool `json:"force" doc:"Override any default presentation provided by the frontend."`
}
DefaultPresentationSettings configure a per frontend default presentation.
type FrontendSettings ¶
type FrontendSettings struct {
RequiredTags Tags `` /* 131-byte string literal not displayed */
DefaultPresentation *DefaultPresentationSettings `json:"default_presentation"`
CreateDefaultParams bbb.Params `` /* 220-byte string literal not displayed */
CreateOverrideParams bbb.Params `` /* 137-byte string literal not displayed */
}
FrontendSettings hold all well known settings for a frontend.
type FrontendState ¶
type FrontendState struct {
ID string `json:"id"`
Active bool `json:"active" doc:"When false, the frontend can not longer use the API."`
Frontend *bbb.Frontend `json:"bbb" api:"FrontendConfig"`
Settings FrontendSettings `json:"settings"`
AccountRef *string `` /* 182-byte string literal not displayed */
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
The FrontendState holds shared information about a frontend.
func GetFrontendState ¶
func GetFrontendState( ctx context.Context, tx pgx.Tx, q sq.SelectBuilder, ) (*FrontendState, error)
GetFrontendState gets a single row from the store. This may return nil without an error.
func GetFrontendStates ¶
func GetFrontendStates( ctx context.Context, tx pgx.Tx, q sq.SelectBuilder, ) ([]*FrontendState, error)
GetFrontendStates retrievs all frontend states from the database.
func InitFrontendState ¶
func InitFrontendState(init *FrontendState) *FrontendState
InitFrontendState initializes the state with a database pool and default values where required.
func (*FrontendState) Validate ¶
func (s *FrontendState) Validate() ValidationError
Validate checks for presence of required fields.
type MeetingState ¶
type MeetingState struct {
ID string `json:"id"`
InternalID string `json:"internal_id"`
Meeting *bbb.Meeting `json:"meeting" api:"MeetingInfo"`
FrontendID *string `json:"frontend_id"`
BackendID *string `json:"backend_id"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
SyncedAt time.Time `json:"synced_at"`
// contains filtered or unexported fields
}
The MeetingState holds a meeting and its relations to a backend and frontend.
func AwaitMeetingState ¶
func AwaitMeetingState( ctx context.Context, conn *pgxpool.Conn, q sq.SelectBuilder, ) (*MeetingState, pgx.Tx, error)
AwaitMeetingState polls the database for a meeting state until the context expires.
func GetMeetingState ¶
func GetMeetingState( ctx context.Context, tx pgx.Tx, q sq.SelectBuilder, ) (*MeetingState, error)
GetMeetingState tries to retriev a single meeting state
func GetMeetingStateByID ¶
GetMeetingStateByID is a convenience wrapper around GetMeetingState.
func GetMeetingStates ¶
func GetMeetingStates( ctx context.Context, tx pgx.Tx, q sq.SelectBuilder, ) ([]*MeetingState, error)
GetMeetingStates retrieves all meeting states
func InitMeetingState ¶
func InitMeetingState( init *MeetingState, ) *MeetingState
InitMeetingState initializes meeting state with defaults and a connection
func (*MeetingState) BindFrontendID ¶
BindFrontendID associates an unclaimed meeting with a frontend
func (*MeetingState) GetBackendState ¶
func (s *MeetingState) GetBackendState( ctx context.Context, tx pgx.Tx, ) (*BackendState, error)
GetBackendState loads the backend state
func (*MeetingState) GetFrontendState ¶
func (s *MeetingState) GetFrontendState( ctx context.Context, tx pgx.Tx, ) (*FrontendState, error)
GetFrontendState loads the frontend state for the meeting
func (*MeetingState) IsStale ¶
func (s *MeetingState) IsStale(threshold time.Duration) bool
IsStale checks if the last sync is longer ago than a given threshold.
func (*MeetingState) MarkSynced ¶
func (s *MeetingState) MarkSynced()
MarkSynced sets the synced at timestamp
func (*MeetingState) SetBackendID ¶
SetBackendID associates a meeting with a backend
type RecordingState ¶
type RecordingState struct {
RecordID string
Recording *bbb.Recording
MeetingID string
InternalMeetingID string
FrontendID string
CreatedAt time.Time
UpdatedAt time.Time
SyncedAt time.Time
}
RecordingState holds a recording and its relation to a meeting.
func GetRecordingState ¶
func GetRecordingState( ctx context.Context, tx pgx.Tx, q sq.SelectBuilder, ) (*RecordingState, error)
GetRecordingState retrieves a single state from the database.
func GetRecordingStateByID ¶
func GetRecordingStateByID( ctx context.Context, tx pgx.Tx, recordID string, ) (*RecordingState, error)
GetRecordingStateByID retrievs a single state identified by recordID.
func GetRecordingStates ¶
func GetRecordingStates( ctx context.Context, tx pgx.Tx, q sq.SelectBuilder, ) ([]*RecordingState, error)
GetRecordingStates retrieves a list of recordings filtered by criteria in the select builder
func InitRecordingState ¶
func InitRecordingState(init *RecordingState) *RecordingState
InitRecordingState initializes the state with default values where required
func NewStateFromRecording ¶ added in v1.1.0
func NewStateFromRecording( recording *bbb.Recording, ) *RecordingState
NewStateFromRecording will initialize a recording state with a recording.
func (*RecordingState) Delete ¶
Delete will remove a recording from the database. This cascades to associated text tracks.
func (*RecordingState) DeleteFiles ¶
func (s *RecordingState) DeleteFiles() error
DeleteFiles will remove the recording from the filesystem.
func (*RecordingState) Merge ¶ added in v1.1.0
func (s *RecordingState) Merge(other *RecordingState) error
Merge will combine the state and also fill in missing fields
func (*RecordingState) PublishFiles ¶
func (s *RecordingState) PublishFiles() error
PublishFiles will move the recording from the unpublished directory to the published directory.
func (*RecordingState) SetFrontendID ¶
SetFrontendID will set the frontend_id attribute of the recording state.
func (*RecordingState) SetTextTracks ¶
func (s *RecordingState) SetTextTracks( ctx context.Context, tx pgx.Tx, tracks []*bbb.TextTrack, ) error
SetTextTracks will persist associated text tracks without touching the rest. The recording has to be present in the database.
func (*RecordingState) UnpublishFiles ¶
func (s *RecordingState) UnpublishFiles() error
UnpublishFiles will move the recording from the published directory to the unpublished directory.
type RecordingsStorage ¶
RecordingsStorage is handleing the filesystem access when dealing with recordings.
func NewRecordingsStorageFromEnv ¶
func NewRecordingsStorageFromEnv() (*RecordingsStorage, error)
NewRecordingsStorageFromEnv creates a new recordings storage instance and configures it through well known environment variables.
func (*RecordingsStorage) Check ¶
func (s *RecordingsStorage) Check() error
Check will test if we can access and manipulate the recordings storage.
func (*RecordingsStorage) ListThumbnailFiles ¶
func (s *RecordingsStorage) ListThumbnailFiles(recordID string) []string
ListThumbnailFiles retrievs all thumbnail files from presentations relative to the published path.
func (*RecordingsStorage) MakeRecordingPreview ¶
func (s *RecordingsStorage) MakeRecordingPreview( recordID string, ) *bbb.Preview
MakeRecordingPreview will use the thumbnails to create previews
func (*RecordingsStorage) PublishedRecordingPath ¶
func (s *RecordingsStorage) PublishedRecordingPath(id string) string
PublishedRecordingPath returns the joined filepath for an "id" (this will be the internal meeting id).
func (*RecordingsStorage) UnpublishedRecordingPath ¶
func (s *RecordingsStorage) UnpublishedRecordingPath(id string) string
UnpublishedRecordingPath returns the joined filepath for an "id" (this will be the internal meeting id).
type Tags ¶
type Tags []string
Tags are a list of strings with labels to declare for example backend capabilities
type ValidationError ¶
A ValidationError is a mapping between the field name (same as json field name), and a list of error strings.
func (ValidationError) Add ¶
func (e ValidationError) Add(field, err string)
Add a validation error to the collection
func (ValidationError) Error ¶
func (e ValidationError) Error() string
Error implements the error interface