queue

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2025 License: BSD-3-Clause Imports: 15 Imported by: 1

README

Queues

There two type of queues in the go-activitypub package. "Delivery" queues handle the details of delivery ActivityPub activities to one or more recipients (inboxes). "Processing" queues handle additional, or custom, processing of events related to ActivityPub messages received by the server application.

There are currently two "processing" queues:

  • A message processing queue with processes a message (which resolves to an ActivityPub "note") after its been received and recorded an account's inbox.
  • A follower processing queue with processes a follow event (as in a remote actor following an account) after its been received.

It is an open question whether or not to support multiple processing queues which bring with it the hassle and complexity of managing an equal number of queue endpoints. On the other hand it's too soon to know what sort of information would need to be passed, and how, to a single user-defined processing endpoint. For now, the decision is to be explicit and configure each processing queue with its own dispatcher and receiver.

Delivery queues

Delivery queues implement the DeliveryQueue interface:

type DeliveryQueue interface {
	DeliverActivity(context.Context, *deliver.DeliverActivityOptions) error
	Close(context.Context) error
}
Implementations

The following implementations of the DeliveryQueue interface are available by default:

null://

This implementation will receive an activity but not do anything with it. It is akin to writing data to /dev/null.

pubsub://

This implementation will dispatch the activity unique ActivityId property to an underlying implementation of the sfomuseum/go-pubsub/publisher.Publisher interface. That ID is expected to have been recorded in the ActivitiesDatabase table and that it can be retrieved by whatever code receives the message.

See also:

slog://

The implementation will log the activity using the default log/slog logger.

synchronous://

Message processing queues

Message processing queues implement the ProcessMessageQueue interface:

type ProcessMessageQueue interface {
	ProcessMessage(context.Context, int64) error
	Close(context.Context) error
}

Currently, "messages" are considered to be ActivityPub "Create" activities with type "Note". Remember a "message" in the go-activitypub is a pointer to a note associated with a specific account. Messages are dispatched to a ProcessMessageQueue as a final step in the www.InboxPostHandler in the server application.

There is no default endpoint, or code, for receiving or processing those messages after they have been dispatched. That is left up to individual users to implement, out of bounds, as their needs suit them. There is an example application for processing messages that you can use as "starter code" which can run from the command line or as a Lambda function. It does nothing more than validate the message, recipient account and associated note and logging those details.

Implementations
null://

This implementation will receive a message (ID) but not do anything with it. It is akin to writing data to /dev/null.

pubsub://

This implementation will dispatch a message ID to an underlying implementation of the sfomuseum/go-pubsub/publisher.Publisher interface. That ID is expected to have been recorded in the MessagesDatabase table and that it can be retrieved by whatever code receives the message.

See also:

slog://

The implementation will log the activity using the default log/slog logger.

Follower processing queues

Follower processing queues implement the ProcessFollowerQueue interface:

type ProcessFollowerQueue interface {
	ProcessFollower(context.Context, int64) error
	Close(context.Context) error
}

This queue is dispatched to with the unique 64-bit ID of the Follower record created in the FollowersDatabase when a remote actor follows an account hosted by the server application. Messages are dispatched to a ProcessFollowerQueue as a final step processing "Follow" events in the www.InboxPostHandler in the server application.

There is no default endpoint, or code, for receiving or processing those messages after they have been dispatched. That is left up to individual users to implement, out of bounds, as their needs suit them. There is an example application for processing messages that you can use as "starter code" which can run from the command line or as a Lambda function. It does nothing more than validate the message, recipient account and associated note and logging those details.

Implementations
null://

This implementation will receive a follower (ID) but not do anything with it. It is akin to writing data to /dev/null.

pubsub://

This implementation will dispatch a follower ID to an underlying implementation of the sfomuseum/go-pubsub/publisher.Publisher interface. That ID is expected to have been recorded in the FollowersDatabase table and that it can be retrieved by whatever code receives the event.

See also:

slog://

The implementation will log the follow(er) activity using the default log/slog logger.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DeliverActivityToFollowers

func DeliverActivityToFollowers(ctx context.Context, opts *DeliverActivityToFollowersOptions) error

func DeliveryQueueSchemes

func DeliveryQueueSchemes() []string

Schemes returns the list of schemes that have been registered.

func ProcessFollowerQueueSchemes added in v0.0.3

func ProcessFollowerQueueSchemes() []string

Schemes returns the list of schemes that have been registered.

func ProcessMessageQueueSchemes

func ProcessMessageQueueSchemes() []string

Schemes returns the list of schemes that have been registered.

func RegisterDeliveryQueue

func RegisterDeliveryQueue(ctx context.Context, scheme string, init_func DeliveryQueueInitializationFunc) error

RegisterDeliveryQueue registers 'scheme' as a key pointing to 'init_func' in an internal lookup table used to create new `DeliveryQueue` instances by the `NewDeliveryQueue` method.

func RegisterProcessFollowerQueue added in v0.0.3

func RegisterProcessFollowerQueue(ctx context.Context, scheme string, init_func ProcessFollowerQueueInitializationFunc) error

RegisterProcessFollowerQueue registers 'scheme' as a key pointing to 'init_func' in an internal lookup table used to create new `ProcessFollowerQueue` instances by the `NewProcessFollowerQueue` method.

func RegisterProcessMessageQueue

func RegisterProcessMessageQueue(ctx context.Context, scheme string, init_func ProcessMessageQueueInitializationFunc) error

RegisterProcessMessageQueue registers 'scheme' as a key pointing to 'init_func' in an internal lookup table used to create new `ProcessMessageQueue` instances by the `NewProcessMessageQueue` method.

func RegisterPubSubDeliverySchemes

func RegisterPubSubDeliverySchemes(ctx context.Context) error

func RegisterPubSubProcessFollowerSchemes added in v0.0.3

func RegisterPubSubProcessFollowerSchemes(ctx context.Context) error

func RegisterPubSubProcessMessageSchemes

func RegisterPubSubProcessMessageSchemes(ctx context.Context) error

Types

type DeliverActivityToFollowersOptions

type DeliverActivityToFollowersOptions struct {
	AccountsDatabase   database.AccountsDatabase
	FollowersDatabase  database.FollowersDatabase
	NotesDatabase      database.NotesDatabase
	DeliveriesDatabase database.DeliveriesDatabase
	DeliveryQueue      DeliveryQueue
	Activity           *activitypub.Activity
	Mentions           []*activitypub.PostTag `json:"mentions"`
	MaxAttempts        int                    `json:"max_attempts"`
	URIs               *uris.URIs
}

type DeliveryQueue

type DeliveryQueue interface {
	DeliverActivity(context.Context, *deliver.DeliverActivityOptions) error
	Close(context.Context) error
}

func NewDeliveryQueue

func NewDeliveryQueue(ctx context.Context, uri string) (DeliveryQueue, error)

NewDeliveryQueue returns a new `DeliveryQueue` instance configured by 'uri'. The value of 'uri' is parsed as a `url.URL` and its scheme is used as the key for a corresponding `DeliveryQueueInitializationFunc` function used to instantiate the new `DeliveryQueue`. It is assumed that the scheme (and initialization function) have been registered by the `RegisterDeliveryQueue` method.

func NewNullDeliveryQueue

func NewNullDeliveryQueue(ctx context.Context, uri string) (DeliveryQueue, error)

func NewPubSubDeliveryQueue

func NewPubSubDeliveryQueue(ctx context.Context, uri string) (DeliveryQueue, error)

func NewSlogDeliveryQueue

func NewSlogDeliveryQueue(ctx context.Context, uri string) (DeliveryQueue, error)

func NewSynchronousDeliveryQueue

func NewSynchronousDeliveryQueue(ctx context.Context, uri string) (DeliveryQueue, error)

type DeliveryQueueInitializationFunc

type DeliveryQueueInitializationFunc func(ctx context.Context, uri string) (DeliveryQueue, error)

DeliveryQueueInitializationFunc is a function defined by individual delivery_queue package and used to create an instance of that delivery_queue

type NullDeliveryQueue

type NullDeliveryQueue struct {
	DeliveryQueue
}

func (*NullDeliveryQueue) Close

func (q *NullDeliveryQueue) Close(ctx context.Context) error

func (*NullDeliveryQueue) DeliverActivity

func (q *NullDeliveryQueue) DeliverActivity(ctx context.Context, opts *deliver.DeliverActivityOptions) error

type NullProcessFollowerQueue added in v0.0.3

type NullProcessFollowerQueue struct {
	ProcessFollowerQueue
}

func (*NullProcessFollowerQueue) Close added in v0.0.3

func (*NullProcessFollowerQueue) ProcessFollower added in v0.0.3

func (q *NullProcessFollowerQueue) ProcessFollower(ctx context.Context, follower_id int64) error

type NullProcessMessageQueue

type NullProcessMessageQueue struct {
	ProcessMessageQueue
}

func (*NullProcessMessageQueue) Close

func (*NullProcessMessageQueue) ProcessMessage

func (q *NullProcessMessageQueue) ProcessMessage(ctx context.Context, message_id int64) error

type ProcessFollowerQueue added in v0.0.3

type ProcessFollowerQueue interface {
	ProcessFollower(context.Context, int64) error
	Close(context.Context) error
}

func NewNullProcessFollowerQueue added in v0.0.3

func NewNullProcessFollowerQueue(ctx context.Context, uri string) (ProcessFollowerQueue, error)

func NewProcessFollowerQueue added in v0.0.3

func NewProcessFollowerQueue(ctx context.Context, uri string) (ProcessFollowerQueue, error)

NewProcessFollowerQueue returns a new `ProcessFollowerQueue` instance configured by 'uri'. The value of 'uri' is parsed as a `url.URL` and its scheme is used as the key for a corresponding `ProcessFollowerQueueInitializationFunc` function used to instantiate the new `ProcessFollowerQueue`. It is assumed that the scheme (and initialization function) have been registered by the `RegisterProcessFollowerQueue` method.

func NewPubSubProcessFollowerQueue added in v0.0.3

func NewPubSubProcessFollowerQueue(ctx context.Context, uri string) (ProcessFollowerQueue, error)

func NewSlogProcessFollowerQueue added in v0.0.3

func NewSlogProcessFollowerQueue(ctx context.Context, uri string) (ProcessFollowerQueue, error)

type ProcessFollowerQueueInitializationFunc added in v0.0.3

type ProcessFollowerQueueInitializationFunc func(ctx context.Context, uri string) (ProcessFollowerQueue, error)

ProcessFollowerQueueInitializationFunc is a function defined by individual process_follow_queue package and used to create an instance of that process_follow_queue

type ProcessMessageQueue

type ProcessMessageQueue interface {
	ProcessMessage(context.Context, int64) error
	Close(context.Context) error
}

func NewNullProcessMessageQueue

func NewNullProcessMessageQueue(ctx context.Context, uri string) (ProcessMessageQueue, error)

func NewProcessMessageQueue

func NewProcessMessageQueue(ctx context.Context, uri string) (ProcessMessageQueue, error)

NewProcessMessageQueue returns a new `ProcessMessageQueue` instance configured by 'uri'. The value of 'uri' is parsed as a `url.URL` and its scheme is used as the key for a corresponding `ProcessMessageQueueInitializationFunc` function used to instantiate the new `ProcessMessageQueue`. It is assumed that the scheme (and initialization function) have been registered by the `RegisterProcessMessageQueue` method.

func NewPubSubProcessMessageQueue

func NewPubSubProcessMessageQueue(ctx context.Context, uri string) (ProcessMessageQueue, error)

func NewSlogProcessMessageQueue

func NewSlogProcessMessageQueue(ctx context.Context, uri string) (ProcessMessageQueue, error)

type ProcessMessageQueueInitializationFunc

type ProcessMessageQueueInitializationFunc func(ctx context.Context, uri string) (ProcessMessageQueue, error)

ProcessMessageQueueInitializationFunc is a function defined by individual process_message_queue package and used to create an instance of that process_message_queue

type PubSubDeliveryQueue

type PubSubDeliveryQueue struct {
	DeliveryQueue
	// contains filtered or unexported fields
}

func (*PubSubDeliveryQueue) Close

func (q *PubSubDeliveryQueue) Close(ctx context.Context) error

func (*PubSubDeliveryQueue) DeliverActivity

func (q *PubSubDeliveryQueue) DeliverActivity(ctx context.Context, opts *deliver.DeliverActivityOptions) error

type PubSubDeliveryQueueOptions

type PubSubDeliveryQueueOptions struct {
	// The unique ID associated with the pubsub delivery. This is mostly for debugging between the sender and the receiver.
	Id int64 `json:"id"`
	// The actor to whom the activity should be delivered.
	To string `json:"to"`
	// The unique Activity(Database) Id associated with the delivery.
	ActivityId int64 `json:"activity_id"`
}

type PubSubProcessFollowerQueue added in v0.0.3

type PubSubProcessFollowerQueue struct {
	ProcessFollowerQueue
	// contains filtered or unexported fields
}

func (*PubSubProcessFollowerQueue) Close added in v0.0.3

func (*PubSubProcessFollowerQueue) ProcessFollower added in v0.0.3

func (q *PubSubProcessFollowerQueue) ProcessFollower(ctx context.Context, follower_id int64) error

type PubSubProcessMessageQueue

type PubSubProcessMessageQueue struct {
	ProcessMessageQueue
	// contains filtered or unexported fields
}

func (*PubSubProcessMessageQueue) Close

func (*PubSubProcessMessageQueue) ProcessMessage

func (q *PubSubProcessMessageQueue) ProcessMessage(ctx context.Context, message_id int64) error

type SlogDeliveryQueue

type SlogDeliveryQueue struct {
	DeliveryQueue
}

func (*SlogDeliveryQueue) Close

func (q *SlogDeliveryQueue) Close(ctx context.Context) error

func (*SlogDeliveryQueue) DeliverActivity

func (q *SlogDeliveryQueue) DeliverActivity(ctx context.Context, opts *deliver.DeliverActivityOptions) error

type SlogProcessFollowerQueue added in v0.0.3

type SlogProcessFollowerQueue struct {
	ProcessFollowerQueue
}

func (*SlogProcessFollowerQueue) Close added in v0.0.3

func (*SlogProcessFollowerQueue) ProcessFollower added in v0.0.3

func (q *SlogProcessFollowerQueue) ProcessFollower(ctx context.Context, follower_id int64) error

type SlogProcessMessageQueue

type SlogProcessMessageQueue struct {
	ProcessMessageQueue
}

func (*SlogProcessMessageQueue) Close

func (*SlogProcessMessageQueue) ProcessMessage

func (q *SlogProcessMessageQueue) ProcessMessage(ctx context.Context, message_id int64) error

type SynchronousDeliveryQueue

type SynchronousDeliveryQueue struct {
	DeliveryQueue
}

func (*SynchronousDeliveryQueue) Close

func (*SynchronousDeliveryQueue) DeliverActivity

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL