Documentation
¶
Index ¶
- Constants
- Variables
- func MigrateDeviceIDs(destHomeserver, postgresURI, secret string, commit bool) error
- type Client
- type Device
- type DeviceDataTicker
- type DevicesTable
- type EventsResponse
- type HTTPClient
- type IPollerMap
- type PollerID
- type PollerMap
- func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, ...)
- func (h *PollerMap) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage)
- func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, ...)
- func (h *PollerMap) Initialise(ctx context.Context, roomID string, state []json.RawMessage) (result []json.RawMessage)
- func (h *PollerMap) NumPollers() (count int)
- func (h *PollerMap) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage)
- func (h *PollerMap) OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, ...)
- func (h *PollerMap) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string)
- func (h *PollerMap) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage)
- func (h *PollerMap) OnLeftRoom(ctx context.Context, userID, roomID string)
- func (h *PollerMap) OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ...)
- func (h *PollerMap) OnTerminated(ctx context.Context, userID, deviceID string)
- func (h *PollerMap) SetCallbacks(callbacks V2DataReceiver)
- func (h *PollerMap) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage)
- func (h *PollerMap) Terminate()
- func (h *PollerMap) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string)
- func (h *PollerMap) UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int)
- type Storage
- type SyncResponse
- type SyncRoomsResponse
- type SyncV2InviteResponse
- type SyncV2JoinResponse
- type SyncV2LeaveResponse
- type TimelineResponse
- type Token
- type TokenForPoller
- type TokensTable
- func (t *TokensTable) Delete(accessTokenHash string) error
- func (t *TokensTable) GetTokenAndSince(userID, deviceID, tokenHash string) (accessToken, since string, err error)
- func (t *TokensTable) Insert(txn *sqlx.Tx, plaintextToken, userID, deviceID string, lastSeen time.Time) (*Token, error)
- func (t *TokensTable) MaybeUpdateLastSeen(token *Token, newLastSeen time.Time) error
- func (t *TokensTable) Token(plaintextToken string) (*Token, error)
- func (t *TokensTable) TokenForEachDevice(txn *sqlx.Tx) (tokens []TokenForPoller, err error)
- type TransactionIDCache
- type UnreadNotifications
- type V2DataReceiver
Constants ¶
const AccountDataGlobalRoom = ""
Variables ¶
var HTTP401 error = fmt.Errorf("HTTP 401")
var ProxyVersion = ""
Functions ¶
func MigrateDeviceIDs ¶ added in v0.99.3
MigrateDeviceIDs performs a one-off DB migration from the old device ids (hash of access token) to the new device ids (actual device ids from the homeserver). This is not backwards compatible. If the migration has already taken place, this function is a no-op.
This code will be removed in a future version of the proxy.
Types ¶
type Client ¶
type Client interface {
// WhoAmI asks the homeserver to lookup the access token using the CSAPI /whoami
// endpoint. The response must contain a device ID (meaning that we assume the
// homeserver supports Matrix >= 1.1.)
WhoAmI(accessToken string) (userID, deviceID string, err error)
DoSyncV2(ctx context.Context, accessToken, since string, isFirst bool, toDeviceOnly bool) (*SyncResponse, int, error)
}
type DeviceDataTicker ¶ added in v0.99.4
type DeviceDataTicker struct {
// contains filtered or unexported fields
}
This struct remembers user+device IDs to notify for then periodically emits them all to the caller. Use to rate limit the frequency of device list updates.
func NewDeviceDataTicker ¶ added in v0.99.4
func NewDeviceDataTicker(d time.Duration) *DeviceDataTicker
Create a new device data ticker, which batches calls to Remember and invokes a callback every d duration. If d is 0, no batching is performed and the callback is invoked synchronously, which is useful for testing.
func (*DeviceDataTicker) Remember ¶ added in v0.99.4
func (t *DeviceDataTicker) Remember(pid PollerID)
Remember this user/device ID, and emit it later on.
func (*DeviceDataTicker) Run ¶ added in v0.99.4
func (t *DeviceDataTicker) Run()
Blocks forever, ticking until Stop() is called.
func (*DeviceDataTicker) SetCallback ¶ added in v0.99.4
func (t *DeviceDataTicker) SetCallback(fn func(payload *pubsub.V2DeviceData))
Set the function which should be called when the tick happens.
type DevicesTable ¶ added in v0.99.3
type DevicesTable struct {
// contains filtered or unexported fields
}
DevicesTable remembers syncv2 since positions per-device
func NewDevicesTable ¶ added in v0.99.3
func NewDevicesTable(db *sqlx.DB) *DevicesTable
func (*DevicesTable) InsertDevice ¶ added in v0.99.3
func (t *DevicesTable) InsertDevice(txn *sqlx.Tx, userID, deviceID string) error
InsertDevice creates a new devices row with a blank since token if no such row exists. Otherwise, it does nothing.
func (*DevicesTable) UpdateDeviceSince ¶ added in v0.99.3
func (t *DevicesTable) UpdateDeviceSince(userID, deviceID, since string) error
type EventsResponse ¶
type EventsResponse struct {
Events []json.RawMessage `json:"events"`
}
type HTTPClient ¶
HTTPClient represents a Sync v2 Client. One client can be shared among many users.
func (*HTTPClient) DoSyncV2 ¶
func (v *HTTPClient) DoSyncV2(ctx context.Context, accessToken, since string, isFirst, toDeviceOnly bool) (*SyncResponse, int, error)
DoSyncV2 performs a sync v2 request. Returns the sync response and the response status code or an error. Set isFirst=true on the first sync to force a timeout=0 sync to ensure snapiness.
type IPollerMap ¶ added in v0.99.3
type PollerMap ¶
type PollerMap struct {
Pollers map[PollerID]*poller
// contains filtered or unexported fields
}
PollerMap is a map of device ID to Poller
func NewPollerMap ¶
NewPollerMap makes a new PollerMap. Guarantees that the V2DataReceiver will be called on the same goroutine for all pollers. This is required to avoid race conditions at the Go level. Whilst we use SQL transactions to ensure that the DB doesn't race, we then subsequently feed new events from that call into a global cache. This can race which can result in out of order latest NIDs which, if we assert NIDs only increment, will result in missed events.
Consider these events in the same room, with 3 different pollers getting the data:
1 2 3 4 5 6 7 eventual DB event NID A B C D E F G ----- poll loop 1 = A,B,C new events = A,B,C latest=3 --------- poll loop 2 = A,B,C,D,E new events = D,E latest=5 ------------- poll loop 3 = A,B,C,D,E,F,G new events = F,G latest=7
The DB layer will correctly assign NIDs and stop duplicates, resulting in a set of new events which do not overlap. However, there is a gap between this point and updating the cache, where variable delays can be introduced, so F,G latest=7 could be injected first. If we then never walk back to earlier NIDs, A,B,C,D,E will be dropped from the cache.
This only affects resources which are shared across multiple DEVICES such as:
- room resources: events, EDUs
- user resources: notif counts, account data
NOT to-device messages,or since tokens.
func (*PollerMap) Accumulate ¶
func (*PollerMap) AddToDeviceMessages ¶
func (h *PollerMap) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage)
Add messages for this device. If an error is returned, the poll loop is terminated as continuing would implicitly acknowledge these messages.
func (*PollerMap) EnsurePolling ¶
func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger)
EnsurePolling makes sure there is a poller for this device, making one if need be. Blocks until at least 1 sync is done if and only if the poller was just created. This ensures that calls to the database will return data. Guarantees only 1 poller will be running per deviceID. Note that we will immediately return if there is a poller for the same user but a different device. We do this to allow for logins on clients to be snappy fast, even though they won't yet have the to-device msgs to decrypt E2EE rooms.
func (*PollerMap) Initialise ¶
func (h *PollerMap) Initialise(ctx context.Context, roomID string, state []json.RawMessage) (result []json.RawMessage)
func (*PollerMap) NumPollers ¶
func (*PollerMap) OnAccountData ¶
func (*PollerMap) OnE2EEData ¶
func (*PollerMap) OnExpiredToken ¶ added in v0.99.2
func (*PollerMap) OnLeftRoom ¶
func (*PollerMap) OnTerminated ¶
func (*PollerMap) SetCallbacks ¶
func (h *PollerMap) SetCallbacks(callbacks V2DataReceiver)
func (*PollerMap) Terminate ¶
func (h *PollerMap) Terminate()
Terminate all pollers. Useful in tests.
func (*PollerMap) UpdateDeviceSince ¶
type Storage ¶
type Storage struct {
DevicesTable *DevicesTable
TokensTable *TokensTable
DB *sqlx.DB
}
type SyncResponse ¶
type SyncResponse struct {
NextBatch string `json:"next_batch"`
AccountData EventsResponse `json:"account_data"`
Presence struct {
Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
} `json:"presence"`
Rooms SyncRoomsResponse `json:"rooms"`
ToDevice EventsResponse `json:"to_device"`
DeviceLists struct {
Changed []string `json:"changed,omitempty"`
Left []string `json:"left,omitempty"`
} `json:"device_lists"`
DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"`
DeviceUnusedFallbackKeyTypes []string `json:"device_unused_fallback_key_types,omitempty"`
}
type SyncRoomsResponse ¶
type SyncRoomsResponse struct {
Join map[string]SyncV2JoinResponse `json:"join"`
Invite map[string]SyncV2InviteResponse `json:"invite"`
Leave map[string]SyncV2LeaveResponse `json:"leave"`
}
type SyncV2InviteResponse ¶
type SyncV2InviteResponse struct {
InviteState EventsResponse `json:"invite_state"`
}
InviteResponse represents a /sync response for a room which is under the 'invite' key.
type SyncV2JoinResponse ¶
type SyncV2JoinResponse struct {
State EventsResponse `json:"state"`
Timeline TimelineResponse `json:"timeline"`
Ephemeral EventsResponse `json:"ephemeral"`
AccountData EventsResponse `json:"account_data"`
UnreadNotifications UnreadNotifications `json:"unread_notifications"`
}
JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.
type SyncV2LeaveResponse ¶
type SyncV2LeaveResponse struct {
State struct {
Events []json.RawMessage `json:"events"`
} `json:"state"`
Timeline struct {
Events []json.RawMessage `json:"events"`
Limited bool `json:"limited"`
PrevBatch string `json:"prev_batch,omitempty"`
} `json:"timeline"`
}
LeaveResponse represents a /sync response for a room which is under the 'leave' key.
type TimelineResponse ¶
type TimelineResponse struct {
Events []json.RawMessage `json:"events"`
Limited bool `json:"limited"`
PrevBatch string `json:"prev_batch,omitempty"`
}
type TokenForPoller ¶ added in v0.99.3
TokenForPoller represents a row of the tokens table, together with any data maintained by pollers for that token's device.
type TokensTable ¶ added in v0.99.3
type TokensTable struct {
// contains filtered or unexported fields
}
TokensTable remembers sync v2 tokens
func NewTokensTable ¶ added in v0.99.3
func NewTokensTable(db *sqlx.DB, secret string) *TokensTable
NewTokensTable creates the syncv3_sync2_tokens table if it does not already exist.
func (*TokensTable) Delete ¶ added in v0.99.3
func (t *TokensTable) Delete(accessTokenHash string) error
Delete looks up a token by its hash and deletes the row. If no token exists with the given hash, a warning is logged but no error is returned.
func (*TokensTable) GetTokenAndSince ¶ added in v0.99.3
func (t *TokensTable) GetTokenAndSince(userID, deviceID, tokenHash string) (accessToken, since string, err error)
func (*TokensTable) Insert ¶ added in v0.99.3
func (t *TokensTable) Insert(txn *sqlx.Tx, plaintextToken, userID, deviceID string, lastSeen time.Time) (*Token, error)
Insert a new token into the table.
func (*TokensTable) MaybeUpdateLastSeen ¶ added in v0.99.3
func (t *TokensTable) MaybeUpdateLastSeen(token *Token, newLastSeen time.Time) error
MaybeUpdateLastSeen actions a request to update a Token struct with its last_seen value in the DB. To avoid spamming the DB with a write every time a sync3 request arrives, we only update the last seen timestamp or the if it is at least 24 hours old. The timestamp is updated on the Token struct if and only if it is updated in the DB.
func (*TokensTable) Token ¶ added in v0.99.3
func (t *TokensTable) Token(plaintextToken string) (*Token, error)
Token retrieves a tokens row from the database if it exists. Errors with sql.NoRowsError if the token does not exist. Errors with an unspecified error otherwise.
func (*TokensTable) TokenForEachDevice ¶ added in v0.99.3
func (t *TokensTable) TokenForEachDevice(txn *sqlx.Tx) (tokens []TokenForPoller, err error)
TokenForEachDevice loads the most recently used token for each device. If given a transaction, it will SELECT inside that transaction.
type TransactionIDCache ¶
type TransactionIDCache struct {
// contains filtered or unexported fields
}
func NewTransactionIDCache ¶
func NewTransactionIDCache() *TransactionIDCache
func (*TransactionIDCache) Get ¶
func (c *TransactionIDCache) Get(userID, eventID string) string
Get a transaction ID previously stored.
func (*TransactionIDCache) Store ¶
func (c *TransactionIDCache) Store(userID, eventID, txnID string)
Store a new transaction ID received via v2 /sync
type UnreadNotifications ¶
type V2DataReceiver ¶
type V2DataReceiver interface {
// Update the since token for this device. Called AFTER all other data in this sync response has been processed.
UpdateDeviceSince(ctx context.Context, userID, deviceID, since string)
// Accumulate data for this room. This means the timeline section of the v2 response.
Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) // latest pos with event nids of timeline entries
// Initialise the room, if it hasn't been already. This means the state section of the v2 response.
// If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB.
Initialise(ctx context.Context, roomID string, state []json.RawMessage) []json.RawMessage // snapshot ID?
// SetTyping indicates which users are typing.
SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage)
// Sent when there is a new receipt
OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage)
// AddToDeviceMessages adds this chunk of to_device messages. Preserve the ordering.
AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) // start/end stream pos
// UpdateUnreadCounts sets the highlight_count and notification_count for this user in this room.
UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int)
// Set the latest account data for this user.
OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) // ping update with types? Can you race when re-querying?
// Sent when there is a room in the `invite` section of the v2 response.
OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) // invitestate in db
// Sent when there is a room in the `leave` section of the v2 response.
OnLeftRoom(ctx context.Context, userID, roomID string)
// Sent when there is a _change_ in E2EE data, not all the time
OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int)
// Sent when the poll loop terminates
OnTerminated(ctx context.Context, userID, deviceID string)
// Sent when the token gets a 401 response
OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string)
}
V2DataReceiver is the receiver for all the v2 sync data the poller gets