sync2

package
v0.99.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2023 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const AccountDataGlobalRoom = ""

Variables

View Source
var HTTP401 error = fmt.Errorf("HTTP 401")
View Source
var ProxyVersion = ""

Functions

func MigrateDeviceIDs added in v0.99.3

func MigrateDeviceIDs(destHomeserver, postgresURI, secret string, commit bool) error

MigrateDeviceIDs performs a one-off DB migration from the old device ids (hash of access token) to the new device ids (actual device ids from the homeserver). This is not backwards compatible. If the migration has already taken place, this function is a no-op.

This code will be removed in a future version of the proxy.

Types

type Client

type Client interface {
	// WhoAmI asks the homeserver to lookup the access token using the CSAPI /whoami
	// endpoint. The response must contain a device ID (meaning that we assume the
	// homeserver supports Matrix >= 1.1.)
	WhoAmI(accessToken string) (userID, deviceID string, err error)
	DoSyncV2(ctx context.Context, accessToken, since string, isFirst bool, toDeviceOnly bool) (*SyncResponse, int, error)
}

type Device

type Device struct {
	UserID   string `db:"user_id"`
	DeviceID string `db:"device_id"`
	Since    string `db:"since"`
}

type DeviceDataTicker added in v0.99.4

type DeviceDataTicker struct {
	// contains filtered or unexported fields
}

This struct remembers user+device IDs to notify for then periodically emits them all to the caller. Use to rate limit the frequency of device list updates.

func NewDeviceDataTicker added in v0.99.4

func NewDeviceDataTicker(d time.Duration) *DeviceDataTicker

Create a new device data ticker, which batches calls to Remember and invokes a callback every d duration. If d is 0, no batching is performed and the callback is invoked synchronously, which is useful for testing.

func (*DeviceDataTicker) Remember added in v0.99.4

func (t *DeviceDataTicker) Remember(pid PollerID)

Remember this user/device ID, and emit it later on.

func (*DeviceDataTicker) Run added in v0.99.4

func (t *DeviceDataTicker) Run()

Blocks forever, ticking until Stop() is called.

func (*DeviceDataTicker) SetCallback added in v0.99.4

func (t *DeviceDataTicker) SetCallback(fn func(payload *pubsub.V2DeviceData))

Set the function which should be called when the tick happens.

func (*DeviceDataTicker) Stop added in v0.99.4

func (t *DeviceDataTicker) Stop()

Stop ticking.

type DevicesTable added in v0.99.3

type DevicesTable struct {
	// contains filtered or unexported fields
}

DevicesTable remembers syncv2 since positions per-device

func NewDevicesTable added in v0.99.3

func NewDevicesTable(db *sqlx.DB) *DevicesTable

func (*DevicesTable) InsertDevice added in v0.99.3

func (t *DevicesTable) InsertDevice(txn *sqlx.Tx, userID, deviceID string) error

InsertDevice creates a new devices row with a blank since token if no such row exists. Otherwise, it does nothing.

func (*DevicesTable) UpdateDeviceSince added in v0.99.3

func (t *DevicesTable) UpdateDeviceSince(userID, deviceID, since string) error

type EventsResponse

type EventsResponse struct {
	Events []json.RawMessage `json:"events"`
}

type HTTPClient

type HTTPClient struct {
	Client            *http.Client
	DestinationServer string
}

HTTPClient represents a Sync v2 Client. One client can be shared among many users.

func (*HTTPClient) DoSyncV2

func (v *HTTPClient) DoSyncV2(ctx context.Context, accessToken, since string, isFirst, toDeviceOnly bool) (*SyncResponse, int, error)

DoSyncV2 performs a sync v2 request. Returns the sync response and the response status code or an error. Set isFirst=true on the first sync to force a timeout=0 sync to ensure snapiness.

func (*HTTPClient) WhoAmI

func (v *HTTPClient) WhoAmI(accessToken string) (string, string, error)

Return sync2.HTTP401 if this request returns 401

type IPollerMap added in v0.99.3

type IPollerMap interface {
	EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger)
	NumPollers() int
	Terminate()
}

type PollerID added in v0.99.3

type PollerID struct {
	UserID   string
	DeviceID string
}

type PollerMap

type PollerMap struct {
	Pollers map[PollerID]*poller
	// contains filtered or unexported fields
}

PollerMap is a map of device ID to Poller

func NewPollerMap

func NewPollerMap(v2Client Client, enablePrometheus bool) *PollerMap

NewPollerMap makes a new PollerMap. Guarantees that the V2DataReceiver will be called on the same goroutine for all pollers. This is required to avoid race conditions at the Go level. Whilst we use SQL transactions to ensure that the DB doesn't race, we then subsequently feed new events from that call into a global cache. This can race which can result in out of order latest NIDs which, if we assert NIDs only increment, will result in missed events.

Consider these events in the same room, with 3 different pollers getting the data:

1 2 3 4 5 6 7 eventual DB event NID
A B C D E F G
-----          poll loop 1 = A,B,C          new events = A,B,C latest=3
---------      poll loop 2 = A,B,C,D,E      new events = D,E   latest=5
-------------  poll loop 3 = A,B,C,D,E,F,G  new events = F,G   latest=7

The DB layer will correctly assign NIDs and stop duplicates, resulting in a set of new events which do not overlap. However, there is a gap between this point and updating the cache, where variable delays can be introduced, so F,G latest=7 could be injected first. If we then never walk back to earlier NIDs, A,B,C,D,E will be dropped from the cache.

This only affects resources which are shared across multiple DEVICES such as:

  • room resources: events, EDUs
  • user resources: notif counts, account data

NOT to-device messages,or since tokens.

func (*PollerMap) Accumulate

func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage)

func (*PollerMap) AddToDeviceMessages

func (h *PollerMap) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage)

Add messages for this device. If an error is returned, the poll loop is terminated as continuing would implicitly acknowledge these messages.

func (*PollerMap) EnsurePolling

func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger)

EnsurePolling makes sure there is a poller for this device, making one if need be. Blocks until at least 1 sync is done if and only if the poller was just created. This ensures that calls to the database will return data. Guarantees only 1 poller will be running per deviceID. Note that we will immediately return if there is a poller for the same user but a different device. We do this to allow for logins on clients to be snappy fast, even though they won't yet have the to-device msgs to decrypt E2EE rooms.

func (*PollerMap) Initialise

func (h *PollerMap) Initialise(ctx context.Context, roomID string, state []json.RawMessage) (result []json.RawMessage)

func (*PollerMap) NumPollers

func (h *PollerMap) NumPollers() (count int)

func (*PollerMap) OnAccountData

func (h *PollerMap) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage)

func (*PollerMap) OnE2EEData

func (h *PollerMap) OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int)

func (*PollerMap) OnExpiredToken added in v0.99.2

func (h *PollerMap) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string)

func (*PollerMap) OnInvite

func (h *PollerMap) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage)

func (*PollerMap) OnLeftRoom

func (h *PollerMap) OnLeftRoom(ctx context.Context, userID, roomID string)

func (*PollerMap) OnReceipt

func (h *PollerMap) OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage)

func (*PollerMap) OnTerminated

func (h *PollerMap) OnTerminated(ctx context.Context, userID, deviceID string)

func (*PollerMap) SetCallbacks

func (h *PollerMap) SetCallbacks(callbacks V2DataReceiver)

func (*PollerMap) SetTyping

func (h *PollerMap) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage)

func (*PollerMap) Terminate

func (h *PollerMap) Terminate()

Terminate all pollers. Useful in tests.

func (*PollerMap) UpdateDeviceSince

func (h *PollerMap) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string)

func (*PollerMap) UpdateUnreadCounts

func (h *PollerMap) UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int)

type Storage

type Storage struct {
	DevicesTable *DevicesTable
	TokensTable  *TokensTable
	DB           *sqlx.DB
}

func NewStore

func NewStore(postgresURI, secret string) *Storage

func NewStoreWithDB added in v0.99.4

func NewStoreWithDB(db *sqlx.DB, secret string) *Storage

func (*Storage) Teardown

func (s *Storage) Teardown()

type SyncResponse

type SyncResponse struct {
	NextBatch   string         `json:"next_batch"`
	AccountData EventsResponse `json:"account_data"`
	Presence    struct {
		Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
	} `json:"presence"`
	Rooms       SyncRoomsResponse `json:"rooms"`
	ToDevice    EventsResponse    `json:"to_device"`
	DeviceLists struct {
		Changed []string `json:"changed,omitempty"`
		Left    []string `json:"left,omitempty"`
	} `json:"device_lists"`
	DeviceListsOTKCount          map[string]int `json:"device_one_time_keys_count,omitempty"`
	DeviceUnusedFallbackKeyTypes []string       `json:"device_unused_fallback_key_types,omitempty"`
}

type SyncRoomsResponse

type SyncRoomsResponse struct {
	Join   map[string]SyncV2JoinResponse   `json:"join"`
	Invite map[string]SyncV2InviteResponse `json:"invite"`
	Leave  map[string]SyncV2LeaveResponse  `json:"leave"`
}

type SyncV2InviteResponse

type SyncV2InviteResponse struct {
	InviteState EventsResponse `json:"invite_state"`
}

InviteResponse represents a /sync response for a room which is under the 'invite' key.

type SyncV2JoinResponse

type SyncV2JoinResponse struct {
	State               EventsResponse      `json:"state"`
	Timeline            TimelineResponse    `json:"timeline"`
	Ephemeral           EventsResponse      `json:"ephemeral"`
	AccountData         EventsResponse      `json:"account_data"`
	UnreadNotifications UnreadNotifications `json:"unread_notifications"`
}

JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.

type SyncV2LeaveResponse

type SyncV2LeaveResponse struct {
	State struct {
		Events []json.RawMessage `json:"events"`
	} `json:"state"`
	Timeline struct {
		Events    []json.RawMessage `json:"events"`
		Limited   bool              `json:"limited"`
		PrevBatch string            `json:"prev_batch,omitempty"`
	} `json:"timeline"`
}

LeaveResponse represents a /sync response for a room which is under the 'leave' key.

type TimelineResponse

type TimelineResponse struct {
	Events    []json.RawMessage `json:"events"`
	Limited   bool              `json:"limited"`
	PrevBatch string            `json:"prev_batch,omitempty"`
}

type Token added in v0.99.3

type Token struct {
	AccessToken          string
	AccessTokenHash      string
	AccessTokenEncrypted string    `db:"token_encrypted"`
	UserID               string    `db:"user_id"`
	DeviceID             string    `db:"device_id"`
	LastSeen             time.Time `db:"last_seen"`
}

type TokenForPoller added in v0.99.3

type TokenForPoller struct {
	*Token
	Since string `db:"since"`
}

TokenForPoller represents a row of the tokens table, together with any data maintained by pollers for that token's device.

type TokensTable added in v0.99.3

type TokensTable struct {
	// contains filtered or unexported fields
}

TokensTable remembers sync v2 tokens

func NewTokensTable added in v0.99.3

func NewTokensTable(db *sqlx.DB, secret string) *TokensTable

NewTokensTable creates the syncv3_sync2_tokens table if it does not already exist.

func (*TokensTable) Delete added in v0.99.3

func (t *TokensTable) Delete(accessTokenHash string) error

Delete looks up a token by its hash and deletes the row. If no token exists with the given hash, a warning is logged but no error is returned.

func (*TokensTable) GetTokenAndSince added in v0.99.3

func (t *TokensTable) GetTokenAndSince(userID, deviceID, tokenHash string) (accessToken, since string, err error)

func (*TokensTable) Insert added in v0.99.3

func (t *TokensTable) Insert(txn *sqlx.Tx, plaintextToken, userID, deviceID string, lastSeen time.Time) (*Token, error)

Insert a new token into the table.

func (*TokensTable) MaybeUpdateLastSeen added in v0.99.3

func (t *TokensTable) MaybeUpdateLastSeen(token *Token, newLastSeen time.Time) error

MaybeUpdateLastSeen actions a request to update a Token struct with its last_seen value in the DB. To avoid spamming the DB with a write every time a sync3 request arrives, we only update the last seen timestamp or the if it is at least 24 hours old. The timestamp is updated on the Token struct if and only if it is updated in the DB.

func (*TokensTable) Token added in v0.99.3

func (t *TokensTable) Token(plaintextToken string) (*Token, error)

Token retrieves a tokens row from the database if it exists. Errors with sql.NoRowsError if the token does not exist. Errors with an unspecified error otherwise.

func (*TokensTable) TokenForEachDevice added in v0.99.3

func (t *TokensTable) TokenForEachDevice(txn *sqlx.Tx) (tokens []TokenForPoller, err error)

TokenForEachDevice loads the most recently used token for each device. If given a transaction, it will SELECT inside that transaction.

type TransactionIDCache

type TransactionIDCache struct {
	// contains filtered or unexported fields
}

func NewTransactionIDCache

func NewTransactionIDCache() *TransactionIDCache

func (*TransactionIDCache) Get

func (c *TransactionIDCache) Get(userID, eventID string) string

Get a transaction ID previously stored.

func (*TransactionIDCache) Store

func (c *TransactionIDCache) Store(userID, eventID, txnID string)

Store a new transaction ID received via v2 /sync

type UnreadNotifications

type UnreadNotifications struct {
	HighlightCount    *int `json:"highlight_count,omitempty"`
	NotificationCount *int `json:"notification_count,omitempty"`
}

type V2DataReceiver

type V2DataReceiver interface {
	// Update the since token for this device. Called AFTER all other data in this sync response has been processed.
	UpdateDeviceSince(ctx context.Context, userID, deviceID, since string)
	// Accumulate data for this room. This means the timeline section of the v2 response.
	Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) // latest pos with event nids of timeline entries
	// Initialise the room, if it hasn't been already. This means the state section of the v2 response.
	// If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB.
	Initialise(ctx context.Context, roomID string, state []json.RawMessage) []json.RawMessage // snapshot ID?
	// SetTyping indicates which users are typing.
	SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage)
	// Sent when there is a new receipt
	OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage)
	// AddToDeviceMessages adds this chunk of to_device messages. Preserve the ordering.
	AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) // start/end stream pos
	// UpdateUnreadCounts sets the highlight_count and notification_count for this user in this room.
	UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int)
	// Set the latest account data for this user.
	OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) // ping update with types? Can you race when re-querying?
	// Sent when there is a room in the `invite` section of the v2 response.
	OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) // invitestate in db
	// Sent when there is a room in the `leave` section of the v2 response.
	OnLeftRoom(ctx context.Context, userID, roomID string)
	// Sent when there is a _change_ in E2EE data, not all the time
	OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int)
	// Sent when the poll loop terminates
	OnTerminated(ctx context.Context, userID, deviceID string)
	// Sent when the token gets a 401 response
	OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string)
}

V2DataReceiver is the receiver for all the v2 sync data the poller gets

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL