Documentation
¶
Overview ¶
Package federation provides ActivityPub federation services including authorized fetch and object retrieval.
Index ¶
- Constants
- Variables
- func DetectKeyType(key interface{}) string
- func DetermineSigningAlgorithm(privateKey crypto.PrivateKey, preferLegacy bool) string
- func EncodePrivateKeyPEM(privateKey *rsa.PrivateKey) ([]byte, error)
- func EncodePublicKeyPEM(publicKey *rsa.PublicKey) ([]byte, error)
- func GenerateRSAKeyPair(bits int) (*rsa.PrivateKey, error)
- func NegotiateSignatureAlgorithm(acceptedAlgorithms []string, keyType string) string
- func ParsePrivateKeyPEM(pemData []byte) (crypto.PrivateKey, error)
- func ParsePublicKeyPEM(pemData []byte) (crypto.PublicKey, error)
- func SignHTTPRequest(req *http.Request, privateKey crypto.PrivateKey, keyID string) error
- func SignHTTPRequestWithAlgorithm(req *http.Request, privateKey crypto.PrivateKey, keyID, algorithm string) error
- func VerifyDigest(req *http.Request, body []byte) error
- func VerifyHTTPSignature(req *http.Request, publicKey crypto.PublicKey) error
- func VerifyHTTPSignatureEnhanced(req *http.Request, publicKey crypto.PublicKey, sig *HTTPSignature) error
- func VerifyHTTPSignatureV2(req *http.Request, publicKey crypto.PublicKey) error
- type ActivityPattern
- type AnalyticsAggregator
- func (a *AnalyticsAggregator) GetDomainHealthStatus(ctx context.Context, domain string) (*DomainHealthStatus, error)
- func (a *AnalyticsAggregator) GetUnhealthyDomains(ctx context.Context, threshold float64) ([]*DomainHealthStatus, error)
- func (a *AnalyticsAggregator) RecordMetric(ctx context.Context, domain string, metric *Metric) error
- type ArchiveData
- type ArchiveMetadata
- type AuthorizedFetchService
- func (f *AuthorizedFetchService) FetchActor(ctx context.Context, actorURL string, signingActor *activitypub.Actor) (*activitypub.Actor, error)
- func (f *AuthorizedFetchService) FetchObject(ctx context.Context, objectURL string, signingActor *activitypub.Actor) (any, error)
- func (f *AuthorizedFetchService) IsAuthorizedFetchEnabled(ctx context.Context) bool
- func (f *AuthorizedFetchService) VerifyAuthorizedFetch(ctx context.Context, req *http.Request) (*activitypub.Actor, error)
- type CompressionPipeline
- type CompressionStats
- type ConnectionError
- type CostBreakdown
- type CostCalculationParams
- type CostCalculator
- func (c *CostCalculator) CalculateFederationCosts(params *CostCalculationParams) *models.FederationCostTracking
- func (c *CostCalculator) EstimateInboundActivityCost(activityType string, payloadSize int64, requiresSignatureVerification bool) int64
- func (c *CostCalculator) EstimateOutboundActivityCost(activityType string, payloadSize int64, targetCount int) int64
- func (c *CostCalculator) GetCostEstimate(params *CostCalculationParams) *CostEstimate
- type CostEstimate
- type DeliveryAttempt
- type DeliveryService
- func (d *DeliveryService) DeliverActivity(ctx context.Context, activity *activitypub.Activity, targetInbox string, ...) error
- func (d *DeliveryService) DeliverActivityWithPrivacy(ctx context.Context, activity *activitypub.Activity, targetInbox string, ...) error
- func (d *DeliveryService) DeliverDirectMessage(ctx context.Context, activity *activitypub.Activity, ...) error
- func (d *DeliveryService) DeliverToFollowers(ctx context.Context, activity *activitypub.Activity, actor *activitypub.Actor) error
- func (d *DeliveryService) DeliverToRecipients(ctx context.Context, activity *activitypub.Activity, actor *activitypub.Actor) error
- func (d *DeliveryService) DeliverToRecipientsWithPrivacy(ctx context.Context, activity *activitypub.Activity, actor *activitypub.Actor) error
- func (d *DeliveryService) QueueDelivery(ctx context.Context, activity *activitypub.Activity, targetInbox string, ...) error
- type DomainHealthStatus
- type DomainStats
- type DynamORMFederationStorage
- func (s *DynamORMFederationStorage) CacheRemoteActor(ctx context.Context, handle string, actor *activitypub.Actor, ...) error
- func (s *DynamORMFederationStorage) GetActor(ctx context.Context, username string) (*activitypub.Actor, error)
- func (s *DynamORMFederationStorage) GetActorPrivateKey(ctx context.Context, username string) (string, error)
- func (s *DynamORMFederationStorage) GetCachedRemoteActor(ctx context.Context, actorID string) (*activitypub.Actor, error)
- func (s *DynamORMFederationStorage) GetFollowers(ctx context.Context, username string, limit int, cursor string) ([]string, string, error)
- func (s *DynamORMFederationStorage) RecordFederationActivity(ctx context.Context, activity *storage.FederationActivity) error
- type EnhancedHTTPSignature
- type EnhancedRetryMessage
- type EnhancedRetryProcessor
- type ErrorRateDataPoint
- type ErrorRateTrend
- type FederationHooks
- func (fh *FederationHooks) GetFederationRecommendations(ctx context.Context, domain string) ([]*FederationRecommendation, error)
- func (fh *FederationHooks) GetRelationshipAnalysis(ctx context.Context, sourceDomain, targetDomain string) (*RelationshipAnalysis, error)
- func (fh *FederationHooks) OnConnectionError(ctx context.Context, connError *ConnectionError) error
- func (fh *FederationHooks) OnInboxReceive(ctx context.Context, activity *InboxActivity) error
- func (fh *FederationHooks) OnInstanceDiscovery(ctx context.Context, instance *InstanceDiscovery) error
- func (fh *FederationHooks) OnOutboxDelivery(ctx context.Context, delivery *OutboxDelivery) error
- type FederationRecommendation
- type FederationStorage
- type HTTPSignature
- type InboundActivity
- type InboxActivity
- type InstanceDiscovery
- type Metric
- type OutboxDelivery
- type RelationshipAnalysis
- type RelationshipTracker
- func (rt *RelationshipTracker) AnalyzeRelationshipStrength(ctx context.Context, sourceDomain, targetDomain string) (*RelationshipAnalysis, error)
- func (rt *RelationshipTracker) ArchiveInstanceGroup(ctx context.Context, targetInstance string, ...) error
- func (rt *RelationshipTracker) BatchArchiveToS3(ctx context.Context, relationships []models.FederationRelationship) error
- func (rt *RelationshipTracker) BatchRestoreRelationships(ctx context.Context, archiveLocations []string) error
- func (rt *RelationshipTracker) ForceStateTransition(ctx context.Context, userID, targetInstance, relType string, ...) error
- func (rt *RelationshipTracker) GenerateRecommendations(ctx context.Context, domain string) ([]*FederationRecommendation, error)
- func (rt *RelationshipTracker) GetHealthScore(ctx context.Context, targetInstance string) (float64, error)
- func (rt *RelationshipTracker) GetInstanceAggregate(ctx context.Context, instanceDomain string, period string) (*models.FederationRelationshipAggregate, error)
- func (rt *RelationshipTracker) GetRelationshipByID(ctx context.Context, userID, targetInstance, relType string) (*models.FederationRelationship, error)
- func (rt *RelationshipTracker) GetRelationshipsByState(ctx context.Context, state models.RelationshipState, limit int) ([]*models.FederationRelationship, error)
- func (rt *RelationshipTracker) GetSuccessRate(ctx context.Context, targetDomain string) (float64, error)
- func (rt *RelationshipTracker) GetUnhealthyRelationships(ctx context.Context, threshold float64, limit int) ([]*models.FederationRelationship, error)
- func (rt *RelationshipTracker) GetUserRelationships(ctx context.Context, userID string, limit int) ([]*models.FederationRelationship, error)
- func (rt *RelationshipTracker) ReactivateRelationship(ctx context.Context, userID, targetInstance, relType string) error
- func (rt *RelationshipTracker) TrackDeliveryAttempt(ctx context.Context, attempt *DeliveryAttempt) error
- func (rt *RelationshipTracker) TrackInboundActivity(ctx context.Context, activity *InboundActivity) error
- func (rt *RelationshipTracker) UpdateInstanceMetadata(ctx context.Context, metadata *storage.InstanceMetadata) error
- type RelayBudgetRecommendations
- type RelayBudgetService
- func (rbs *RelayBudgetService) AggregateRelayCosts(ctx context.Context, relayURL string) error
- func (rbs *RelayBudgetService) CheckRelayBudget(ctx context.Context, relayURL string, estimatedCostMicroCents int64) error
- func (rbs *RelayBudgetService) CreateRelayBudget(ctx context.Context, relayURL, period string, limitMicroCents int64, ...) error
- func (rbs *RelayBudgetService) GetRelayBudgetRecommendations(ctx context.Context, relayURL string) (*RelayBudgetRecommendations, error)
- func (rbs *RelayBudgetService) GetRelayBudgetStatus(ctx context.Context, relayURL, period string) (*RelayBudgetStatus, error)
- func (rbs *RelayBudgetService) UpdateRelayBudgetUsage(ctx context.Context, relayURL, period string, additionalCostMicroCents int64) error
- type RelayBudgetStatus
- type RelayInfo
- type RelayService
- func (r *RelayService) ForwardToRelays(ctx context.Context, activity *activitypub.Activity, actor *activitypub.Actor) error
- func (r *RelayService) HandleRelayActivity(ctx context.Context, activity *activitypub.Activity, relayURL string) error
- func (r *RelayService) SubscribeToRelay(ctx context.Context, relayURL string, actorUsername string) error
- func (r *RelayService) UnsubscribeFromRelay(ctx context.Context, relayURL string, actorUsername string) error
- type RemoteSearchService
- func (s *RemoteSearchService) ResolveActor(ctx context.Context, handle string) (*SearchResult, error)
- func (s *RemoteSearchService) ResolveActorURL(ctx context.Context, actorURL string) (*SearchResult, error)
- func (s *RemoteSearchService) SearchRemoteActors(ctx context.Context, query string, limit int) ([]*SearchResult, error)
- type RepositoryStorageAdapter
- func (a *RepositoryStorageAdapter) CacheRemoteActor(ctx context.Context, handle string, actor *activitypub.Actor, ...) error
- func (a *RepositoryStorageAdapter) GetActor(ctx context.Context, username string) (*activitypub.Actor, error)
- func (a *RepositoryStorageAdapter) GetActorPrivateKey(ctx context.Context, username string) (string, error)
- func (a *RepositoryStorageAdapter) GetCachedRemoteActor(ctx context.Context, actorID string) (*activitypub.Actor, error)
- func (a *RepositoryStorageAdapter) GetFollowers(ctx context.Context, username string, limit int, cursor string) ([]string, string, error)
- func (a *RepositoryStorageAdapter) RecordFederationActivity(ctx context.Context, activity *storage.FederationActivity) error
- type ResponseTimeDataPoint
- type ResponseTimeTrend
- type SearchResult
- type SignatureService
- type StatisticalSummary
- type TrendAnalysis
- type TrendAnalyzer
- type TrendingInstance
- type VolumeDataPoint
- type VolumeTrend
Constants ¶
const ( // MIME types for ActivityPub ActivityPubAcceptType = "application/activity+json, application/ld+json" // User Agent for outbound requests UserAgent = "Lesser/1.0" )
ActivityPub-related constants
const ( // SignatureHeader is the HTTP header containing the signature SignatureHeader = "Signature" // SignatureInputHeader is the new draft header for signature input SignatureInputHeader = "Signature-Input" // DigestHeader is the HTTP header containing the body digest DigestHeader = "Digest" // DateHeader is the HTTP header containing the request date DateHeader = "Date" // DefaultAlgorithm is the default signature algorithm DefaultAlgorithm = "rsa-sha256" // MaxClockSkew is the maximum clock skew allowed (5 minutes) MaxClockSkew = 5 * time.Minute // RequestTargetHeader is the pseudo-header for request target RequestTargetHeader = "(request-target)" )
const ( AlgorithmHS2019 = "hs2019" // Recommended in newer drafts AlgorithmRSASHA256 = "rsa-sha256" // Legacy RSA AlgorithmRSASHA512 = "rsa-sha512" // RSA with SHA-512 AlgorithmECDSASHA256 = "ecdsa-sha256" // ECDSA support AlgorithmEd25519 = "ed25519" // EdDSA support )
Supported algorithms
Variables ¶
var ( // ErrDeliveryFailed is returned when federation delivery fails ErrDeliveryFailed = apperrors.DeliveryFailed("delivery", errors.New("delivery failed")) // ErrMessageParseFailed is returned when federation message parsing fails ErrMessageParseFailed = apperrors.ActivityParsingFailed("message", errors.New("message parsing failed")) // ErrSigningActorNotFound is returned when signing actor cannot be retrieved ErrSigningActorNotFound = apperrors.ActorNotFound("") // ErrMessageMarshalFailed is returned when message marshaling fails ErrMessageMarshalFailed = apperrors.ActivityParsingFailed("message", errors.New("message marshaling failed")) // ErrMessageRequeueFailed is returned when message requeuing fails ErrMessageRequeueFailed = apperrors.DeliveryFailed("requeue", errors.New("message requeue failed")) // ErrDeliveryPermanentFailure is returned when delivery permanently failed ErrDeliveryPermanentFailure = apperrors.DeliveryPermanentFailure("", "") // ErrInstanceStatsNotFound is returned when instance stats cannot be found ErrInstanceStatsNotFound = apperrors.InstanceNotFound("") // ErrDeliveryHTTPStatusFailed is returned when delivery fails with non-2xx status ErrDeliveryHTTPStatusFailed = apperrors.DeliveryRejected("", 0) // ErrDeliveryToInboxesFailed is returned when delivery to multiple inboxes fails ErrDeliveryToInboxesFailed = apperrors.DeliveryToInboxesFailed(0, errors.New("delivery to inboxes failed")) // ErrDeliveryToDomainsFailed is returned when delivery to multiple domains fails ErrDeliveryToDomainsFailed = apperrors.DeliveryToDomainsFailed(0, errors.New("delivery to domains failed")) // ErrDeliveryToRecipientsFailed is returned when delivery to multiple recipients fails ErrDeliveryToRecipientsFailed = apperrors.DeliveryToInboxesFailed(0, errors.New("delivery to recipients failed")) ErrNoSharedInboxFound = apperrors.NoSharedInboxFound("") // ErrFetchActorHTTPStatusFailed is returned when actor fetch fails with non-2xx status ErrFetchActorHTTPStatusFailed = apperrors.ActorFetchFailed("", errors.New("actor fetch failed")) // ErrDeliveryDirectMessageToInboxesFailed is returned when direct message delivery to multiple inboxes fails ErrDeliveryDirectMessageToInboxesFailed = apperrors.DeliveryToInboxesFailed(0, errors.New("direct message delivery failed")) // ErrActivityMarshalFailed is returned when activity marshaling fails ErrActivityMarshalFailed = apperrors.ActivityParsingFailed("activity", errors.New("activity marshaling failed")) // ErrRequestCreationFailed is returned when HTTP request creation fails ErrRequestCreationFailed = apperrors.RemoteFetchFailed("", errors.New("request creation failed")) // ErrPrivateKeyRetrievalFailed is returned when private key retrieval fails ErrPrivateKeyRetrievalFailed = apperrors.SigningKeyNotFound("") // ErrPrivateKeyParseFailed is returned when private key parsing fails ErrPrivateKeyParseFailed = apperrors.SigningKeyInvalid("") // ErrRequestSigningFailed is returned when HTTP request signing fails ErrRequestSigningFailed = apperrors.HTTPSignatureVerificationFailed("") // ErrHTTPRequestFailed is returned when HTTP request execution fails ErrHTTPRequestFailed = apperrors.RemoteFetchFailed("", errors.New("http request failed")) // ErrGetFollowersFailed is returned when retrieving followers fails ErrGetFollowersFailed = apperrors.CollectionFetchFailed("", errors.New("get followers failed")) // ErrActorDecodeFailed is returned when actor decoding fails ErrActorDecodeFailed = apperrors.ActorFetchFailed("", errors.New("actor decode failed")) // ErrFetchRemoteActorFailed is returned when remote actor fetching fails ErrFetchRemoteActorFailed = apperrors.ActorFetchFailed("", errors.New("fetch remote actor failed")) // Enhanced retry specific errors // ErrRetryMessageMarshalFailed is returned when retry message marshaling fails ErrRetryMessageMarshalFailed = apperrors.ActivityParsingFailed("retry message", errors.New("retry message marshaling failed")) // ErrRetryQueueFailed is returned when queuing for enhanced retry fails ErrRetryQueueFailed = apperrors.DeliveryFailed("retry queue", errors.New("retry queue failed")) // ErrEnhancedDeliveryIDGenFailed is returned when enhanced delivery ID generation fails ErrEnhancedDeliveryIDGenFailed = apperrors.DeliveryFailed("id gen", errors.New("delivery ID generation failed")) // ErrRetryLimitExceeded is returned when maximum retry attempts are exceeded ErrRetryLimitExceeded = apperrors.DeliveryPermanentFailure("", "retry limit exceeded") // ErrRetryDeliveryFailed is returned when retry delivery fails ErrRetryDeliveryFailed = apperrors.DeliveryFailed("retry", errors.New("retry delivery failed")) // ErrBackoffCalculationFailed is returned when backoff calculation fails ErrBackoffCalculationFailed = apperrors.DeliveryFailed("backoff", errors.New("backoff calculation failed")) )
Federation delivery errors - now using centralized error system
var ( // ErrBuildSignatureString is returned when signature string building fails ErrBuildSignatureString = apperrors.SignatureInvalid("failed to build signature string") // ErrECDSAVerificationFailed is returned when ECDSA signature verification fails ErrECDSAVerificationFailed = apperrors.HTTPSignatureVerificationFailed("ECDSA signature verification failed") // ErrEd25519VerificationFailed is returned when Ed25519 signature verification fails ErrEd25519VerificationFailed = apperrors.HTTPSignatureVerificationFailed("Ed25519 signature verification failed") // ErrUnsupportedPublicKeyType is returned when public key type is not supported for hs2019 ErrUnsupportedPublicKeyType = apperrors.SigningKeyInvalid("unsupported public key type for hs2019") // ErrAlgorithmRequiresRSA is returned when algorithm requires RSA key but different key type provided ErrAlgorithmRequiresRSA = apperrors.SignatureInvalid("algorithm requires RSA key") // ErrAlgorithmRequiresECDSA is returned when algorithm requires ECDSA key but different key type provided ErrAlgorithmRequiresECDSA = apperrors.SignatureInvalid("algorithm requires ECDSA key") // ErrAlgorithmRequiresEd25519 is returned when algorithm requires Ed25519 key but different key type provided ErrAlgorithmRequiresEd25519 = apperrors.SignatureInvalid("algorithm requires Ed25519 key") // ErrUnsupportedAlgorithm is returned when signature algorithm is not supported ErrUnsupportedAlgorithm = apperrors.SignatureInvalid("unsupported algorithm") // ErrSignatureFailed is returned when signing operation fails ErrSignatureFailed = apperrors.HTTPSignatureVerificationFailed("failed to sign") // ErrUnsupportedPrivateKeyType is returned when private key type is not supported for hs2019 ErrUnsupportedPrivateKeyType = apperrors.SigningKeyInvalid("unsupported private key type for hs2019") // ErrInvalidSignatureInputFormat is returned when signature-input format is invalid ErrInvalidSignatureInputFormat = apperrors.SignatureInvalid("invalid signature-input format: missing parentheses") // ErrDecodeSignature is returned when signature decoding fails ErrDecodeSignature = apperrors.SignatureInvalid("failed to decode signature") // ErrInvalidSignatureHeaderFormat is returned when signature header format is invalid ErrInvalidSignatureHeaderFormat = apperrors.SignatureInvalid("invalid signature header format") // ErrMissingKeyID is returned when keyId is missing in signature ErrMissingKeyID = apperrors.SignatureMissing() // ErrMissingSignatureValue is returned when signature value is missing ErrMissingSignatureValue = apperrors.SignatureMissing() // ErrRequiredHeaderNotFound is returned when a required header is not found ErrRequiredHeaderNotFound = apperrors.SignatureInvalid("required header not found") // ErrFailedToParsePEMBlock is returned when PEM block parsing fails ErrFailedToParsePEMBlock = apperrors.SigningKeyInvalid("failed to parse PEM block") // ErrUnsupportedKeyType is returned when key type is not supported ErrUnsupportedKeyType = apperrors.SigningKeyInvalid("unsupported key type") // ErrKeySizeTooSmall is returned when key size is insufficient ErrKeySizeTooSmall = apperrors.SigningKeyInvalid("key size must be at least 2048 bits") // ErrInvalidSignatureHeaderFormatWrapper is returned when signature header format validation fails ErrInvalidSignatureHeaderFormatWrapper = apperrors.SignatureInvalid("invalid signature header format") // ErrDecodeSignatureFailed is returned when base64 signature decoding fails ErrDecodeSignatureFailed = apperrors.SignatureInvalid("failed to decode signature") // ErrReadRequestBodyFailed is returned when reading HTTP request body fails ErrReadRequestBodyFailed = apperrors.RemoteFetchFailed("", errors.New("read request body failed")) // ErrRSAKeyGenFailed is returned when RSA key generation fails ErrRSAKeyGenFailed = apperrors.SigningKeyInvalid("failed to generate RSA key pair") // ErrMarshalPublicKeyFailed is returned when marshaling public key fails ErrMarshalPublicKeyFailed = apperrors.SigningKeyInvalid("failed to marshal public key") // ErrMarshalPrivateKeyFailed is returned when marshaling private key fails ErrMarshalPrivateKeyFailed = apperrors.SigningKeyInvalid("failed to marshal private key") )
HTTP Signature errors - now using centralized error system
var ( // ErrMissingRequestIDInConfirmation is returned when request ID is missing in recovery confirmation ErrMissingRequestIDInConfirmation = apperrors.InboxMessageInvalid("missing request ID in recovery confirmation") // ErrMissingActorInConfirmation is returned when actor is missing in recovery confirmation ErrMissingActorInConfirmation = apperrors.InboxMessageInvalid("missing actor in recovery confirmation") // ErrMissingInviterUsername is returned when inviter username is missing in trustee acceptance ErrMissingInviterUsername = apperrors.InboxMessageInvalid("missing inviter username in trustee acceptance") // ErrMissingActorInAcceptance is returned when actor is missing in trustee acceptance ErrMissingActorInAcceptance = apperrors.InboxMessageInvalid("missing actor in trustee acceptance") )
Inbox recovery errors - now using centralized error system
var ( // ErrFetchObjectHTTPFailed is returned when HTTP request fails during object fetch ErrFetchObjectHTTPFailed = apperrors.RemoteFetchFailed("", errors.New("fetch object http failed")) // ErrInvalidActorObjectType is returned when fetched object is not a valid actor type ErrInvalidActorObjectType = apperrors.ObjectInvalidField("type", "invalid actor object type") // ErrNotActorObject is returned when object type is not a valid actor type ErrNotActorObject = apperrors.ObjectInvalidField("type", "not an actor object") // ErrMissingSignatureHeader is returned when signature header is missing in authorized fetch request ErrMissingSignatureHeader = apperrors.SignatureMissing() // ErrExtractActorIDFailed is returned when actor ID cannot be extracted from keyId ErrExtractActorIDFailed = apperrors.ActorURIExtractionFailed("keyId extraction", errors.New("extract actor ID failed")) // ErrObjectIDMismatch is returned when fetched object ID doesn't match requested ID ErrObjectIDMismatch = apperrors.ObjectInvalidField("id", "object ID mismatch") // ErrObjectMissingType is returned when object is missing type field ErrObjectMissingType = apperrors.ObjectMissingField("type") // ErrFetchActorHTTPFailed is returned when HTTP request fails during actor fetch ErrFetchActorHTTPFailed = apperrors.ActorFetchFailed("", errors.New("fetch actor http failed")) // ErrRepositoryAccessValidationFailed is returned when repository access validation fails ErrRepositoryAccessValidationFailed = apperrors.RemoteFetchUnauthorized("") // ErrActorDataMarshalFailed is returned when actor data marshaling fails ErrActorDataMarshalFailed = apperrors.ActivityParsingFailed("actor", errors.New("actor data marshal failed")) // ErrActorUnmarshalFailed is returned when actor unmarshaling fails ErrActorUnmarshalFailed = apperrors.ActivityParsingFailed("actor", errors.New("actor unmarshal failed")) // ErrSignatureParseFailed is returned when signature parsing fails ErrSignatureParseFailed = apperrors.SignatureInvalid("failed to parse signature") // ErrPublicKeyParseFailed is returned when public key parsing fails ErrPublicKeyParseFailed = apperrors.SigningKeyInvalid("failed to parse public key") // ErrSignatureVerificationFailed is returned when signature verification fails ErrSignatureVerificationFailed = apperrors.HTTPSignatureVerificationFailed("signature verification failed") // ErrResponseDecodeFailed is returned when response decoding fails ErrResponseDecodeFailed = apperrors.RemoteFetchFailed("", errors.New("response decode failed")) // ErrObjectValidationFailed is returned when object validation fails ErrObjectValidationFailed = apperrors.ObjectParsingFailed("", errors.New("object validation failed")) // ErrInvalidCachedPublicKey is returned when cached public key is invalid ErrInvalidCachedPublicKey = apperrors.SigningKeyInvalid("invalid cached public key") // ErrPublicKeyFetchFailed is returned when public key fetch fails after retries ErrPublicKeyFetchFailed = apperrors.SigningKeyNotFound("failed to fetch public key after retries") // ErrActorHasNoPublicKey is returned when actor has no public key ErrActorHasNoPublicKey = apperrors.SigningKeyNotFound("actor has no public key") // ErrPublicKeyExtractionFailed is returned when public key extraction fails ErrPublicKeyExtractionFailed = apperrors.SigningKeyInvalid("failed to extract public key") )
Authorized fetch errors - now using centralized error system
var ( // ErrS3ClientNotConfigured is returned when S3 client is not configured for restore operation ErrS3ClientNotConfigured = apperrors.NewFederationError(apperrors.CodeDependencyNotMet, "S3 client not configured for restore operation") // ErrArchiveContainsNoRelationships is returned when archive contains no relationships ErrArchiveContainsNoRelationships = apperrors.CollectionInvalid("", "archive contains no relationships") // Database operation errors // ErrGetFederationEdgesFailed is returned when getting federation edges fails ErrGetFederationEdgesFailed = apperrors.CollectionFetchFailed("federation edges", errors.New("get federation edges failed")) // ErrGetCreateRelationshipFailed is returned when getting or creating relationship fails ErrGetCreateRelationshipFailed = apperrors.FollowNotFound("", "") // ErrGetCreateAggregateFailed is returned when getting or creating aggregate fails ErrGetCreateAggregateFailed = apperrors.CollectionFetchFailed("aggregate", errors.New("get create aggregate failed")) // ErrQueryRelationshipFailed is returned when querying relationship fails ErrQueryRelationshipFailed = apperrors.FollowNotFound("", "") // ErrQueryAggregateFailed is returned when querying aggregate fails ErrQueryAggregateFailed = apperrors.CollectionFetchFailed("aggregate", errors.New("query aggregate failed")) // ErrSaveRelationshipFailed is returned when saving relationship fails ErrSaveRelationshipFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to save relationship", errors.New("save relationship failed")) // ErrSaveAggregateFailed is returned when saving aggregate fails ErrSaveAggregateFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to save aggregate", errors.New("save aggregate failed")) // ErrQueryDormantRelationshipsFailed is returned when querying dormant relationships fails ErrQueryDormantRelationshipsFailed = apperrors.CollectionFetchFailed("dormant relationships", errors.New("query dormant relationships failed")) // ErrQueryUserRelationshipsFailed is returned when querying user relationships fails ErrQueryUserRelationshipsFailed = apperrors.CollectionFetchFailed("user relationships", errors.New("query user relationships failed")) // ErrQueryRelationshipsByStateFailed is returned when querying relationships by state fails ErrQueryRelationshipsByStateFailed = apperrors.CollectionFetchFailed("relationships by state", errors.New("query relationships by state failed")) // ErrGetRelationshipFailed is returned when getting relationship fails ErrGetRelationshipFailed = apperrors.FollowNotFound("", "") // ErrSaveStateTransitionFailed is returned when saving state transition fails ErrSaveStateTransitionFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to save state transition", errors.New("save state transition failed")) // ErrGetAggregateFailed is returned when getting aggregate fails ErrGetAggregateFailed = apperrors.CollectionFetchFailed("aggregate", errors.New("get aggregate failed")) // ErrGetActiveRelationshipsFailed is returned when getting active relationships fails ErrGetActiveRelationshipsFailed = apperrors.CollectionFetchFailed("active relationships", errors.New("get active relationships failed")) // ErrCheckArchivedRelationshipFailed is returned when checking for archived relationship fails ErrCheckArchivedRelationshipFailed = apperrors.FollowNotFound("", "") // ErrSaveRestoredRelationshipFailed is returned when saving restored relationship fails ErrSaveRestoredRelationshipFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to save restored relationship", errors.New("save restored relationship failed")) // ErrSaveReactivatedRelationshipFailed is returned when saving reactivated relationship fails ErrSaveReactivatedRelationshipFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to save reactivated relationship", errors.New("save reactivated relationship failed")) // Archive operation errors // ErrMarshalArchiveDataFailed is returned when marshaling archive data fails ErrMarshalArchiveDataFailed = apperrors.ActivityParsingFailed("archive data", errors.New("marshal archive data failed")) // ErrCompressDataFailed is returned when compressing data fails ErrCompressDataFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to compress data", errors.New("compress data failed")) // ErrCloseGzipWriterFailed is returned when closing gzip writer fails ErrCloseGzipWriterFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to close gzip writer", errors.New("close gzip writer failed")) // ErrArchiveToS3Failed is returned when archiving to S3 fails after retries ErrArchiveToS3Failed = apperrors.NewFederationInternalError(apperrors.CodeExternalServiceUnavailable, "failed to archive to S3 after retries", errors.New("archive to s3 failed")) // ErrMarshalBatchArchiveDataFailed is returned when marshaling batch archive data fails ErrMarshalBatchArchiveDataFailed = apperrors.ActivityParsingFailed("batch archive data", errors.New("marshal batch archive data failed")) // ErrCompressBatchDataFailed is returned when compressing batch data fails ErrCompressBatchDataFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to compress batch data", errors.New("compress batch data failed")) // ErrUploadBatchArchiveToS3Failed is returned when uploading batch archive to S3 fails ErrUploadBatchArchiveToS3Failed = apperrors.NewFederationInternalError(apperrors.CodeExternalServiceUnavailable, "failed to upload batch archive to S3", errors.New("upload batch archive to s3 failed")) // ErrCreateGzipReaderFailed is returned when creating gzip reader fails ErrCreateGzipReaderFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to create gzip reader", errors.New("create gzip reader failed")) // ErrReadCompressedDataFailed is returned when reading compressed data fails ErrReadCompressedDataFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to read compressed data", errors.New("read compressed data failed")) // ErrUnmarshalArchiveDataFailed is returned when unmarshaling archive data fails ErrUnmarshalArchiveDataFailed = apperrors.ActivityParsingFailed("archive data", errors.New("unmarshal archive data failed")) // ErrRestoreFromS3Failed is returned when restoring from S3 fails after retries ErrRestoreFromS3Failed = apperrors.NewFederationInternalError(apperrors.CodeExternalServiceUnavailable, "failed to restore from S3 after retries", errors.New("restore from s3 failed")) // ErrDownloadArchiveFromS3Failed is returned when downloading archive from S3 fails ErrDownloadArchiveFromS3Failed = apperrors.NewFederationInternalError(apperrors.CodeExternalServiceUnavailable, "failed to download archive from S3", errors.New("download archive from s3 failed")) // ErrDeleteS3ArchiveFailed is returned when deleting S3 archive fails ErrDeleteS3ArchiveFailed = apperrors.NewFederationInternalError(apperrors.CodeExternalServiceUnavailable, "failed to delete S3 archive", errors.New("delete s3 archive failed")) // ErrBatchWriteRestoredRelationshipsFailed is returned when batch writing restored relationships fails ErrBatchWriteRestoredRelationshipsFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to batch write restored relationships", errors.New("batch write restored relationships failed")) )
Relationship tracker errors - now using centralized error system
var ( // ErrInvalidRelayURL is returned when relay URL is invalid ErrInvalidRelayURL = apperrors.ActorURIInvalid("") // ErrFetchRelayActorFailed is returned when relay actor fetching fails ErrFetchRelayActorFailed = apperrors.ActorFetchFailed("", errors.New("fetch relay actor failed")) // ErrGetActorFailed is returned when actor retrieval fails ErrGetActorFailed = apperrors.ActorNotFound("") // ErrStoreRelayInfoFailed is returned when storing relay info fails ErrStoreRelayInfoFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to store relay info", errors.New("store relay info failed")) // ErrDeliverFollowActivityFailed is returned when delivering follow activity fails ErrDeliverFollowActivityFailed = apperrors.DeliveryFailed("follow", errors.New("deliver follow activity failed")) // ErrRelayNotFound is returned when relay is not found ErrRelayNotFound = apperrors.ActorNotFound("") // ErrRemoveRelayInfoFailed is returned when removing relay info fails ErrRemoveRelayInfoFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to remove relay info", errors.New("remove relay info failed")) // ErrUnknownInactiveRelay is returned when activity is from unknown or inactive relay ErrUnknownInactiveRelay = apperrors.ActorNotFound("unknown or inactive relay") // ErrRelayForwardingFailed is returned when forwarding to multiple relays fails ErrRelayForwardingFailed = apperrors.DeliveryToInboxesFailed(0, errors.New("relay forwarding failed")) // ErrFetchRelayActorHTTPFailed is returned when relay actor fetch fails with non-OK status ErrFetchRelayActorHTTPFailed = apperrors.ActorFetchFailed("", errors.New("fetch relay actor http failed")) // ErrNotRelayActor is returned when fetched actor is not a relay type ErrNotRelayActor = apperrors.ObjectInvalidField("type", "not a relay actor") // ErrMarshalAnnouncedObjectFailed is returned when marshaling announced object fails ErrMarshalAnnouncedObjectFailed = apperrors.ActivityParsingFailed("announced object", errors.New("marshal announced object failed")) // ErrUnmarshalAnnouncedActivityFailed is returned when unmarshaling announced activity fails ErrUnmarshalAnnouncedActivityFailed = apperrors.ActivityParsingFailed("announced activity", errors.New("unmarshal announced activity failed")) // ErrInvalidAnnouncedObjectType is returned when announced object type is invalid ErrInvalidAnnouncedObjectType = apperrors.ObjectInvalidField("type", "invalid announced object type") // ErrRelayBudgetExceeded is returned when relay operation would exceed budget ErrRelayBudgetExceeded = apperrors.NewFederationError(apperrors.CodeRateLimited, "relay operation would exceed daily budget") // ErrRelayBudgetCreationFailed is returned when relay budget creation fails ErrRelayBudgetCreationFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to create relay budget", errors.New("relay budget creation failed")) // ErrRelayBudgetAlreadyExceeded is returned when relay budget is already exceeded ErrRelayBudgetAlreadyExceeded = apperrors.NewFederationError(apperrors.CodeRateLimited, "relay budget already exceeded") // ErrRelayOperationsPaused is returned when relay operations are paused due to budget limits ErrRelayOperationsPaused = apperrors.NewFederationError(apperrors.CodeRateLimited, "relay operations paused due to budget limit") // ErrRelayCostSummaryFailed is returned when getting relay cost summary fails ErrRelayCostSummaryFailed = apperrors.MetricsCollectionFailed("relay cost summary", errors.New("relay cost summary failed")) )
Relay errors - now using centralized error system
var ( // ErrWebFingerLookupFailed is returned when webfinger lookup fails ErrWebFingerLookupFailed = apperrors.WebFingerFailed("", errors.New("webfinger lookup failed")) // ErrWebFingerRequestFailed is returned when webfinger request fails ErrWebFingerRequestFailed = apperrors.WebFingerFailed("", errors.New("webfinger request failed")) // ErrWebFingerNon2xxStatus is returned when webfinger returns non-2xx status ErrWebFingerNon2xxStatus = apperrors.WebFingerFailed("", errors.New("webfinger non 2xx status")) // ErrWebFingerResponseParseFailed is returned when webfinger response parsing fails ErrWebFingerResponseParseFailed = apperrors.WebFingerFailed("", errors.New("webfinger response parse failed")) // ErrNoActivityPubLinkFound is returned when no ActivityPub link found in webfinger response ErrNoActivityPubLinkFound = apperrors.WebFingerNotFound("") // ErrRemoteActorNon2xxStatus is returned when remote actor fetch returns non-2xx status ErrRemoteActorNon2xxStatus = apperrors.ActorFetchFailed("", errors.New("remote actor non 2xx status")) // ErrRemoteActorDecodeFailed is returned when remote actor decoding fails ErrRemoteActorDecodeFailed = apperrors.ActorFetchFailed("", errors.New("remote actor decode failed")) // ErrInvalidActorMissingFields is returned when actor is missing required fields ErrInvalidActorMissingFields = apperrors.ObjectMissingField("required fields") // ErrInvalidUsernameFormat is returned when username format is invalid ErrInvalidUsernameFormat = apperrors.UsernameExtractionFailed("format validation", errors.New("invalid username format")) // ErrInvalidDomainFormat is returned when domain format is invalid ErrInvalidDomainFormat = apperrors.ActorURIInvalid("invalid domain format") // ErrInvalidHandleFormat is returned when handle format is invalid ErrInvalidHandleFormat = apperrors.ActorURIInvalid("invalid handle format") // ErrGetKnownInstancesFailed is returned when getting known instances fails ErrGetKnownInstancesFailed = apperrors.CollectionFetchFailed("known instances", errors.New("get known instances failed")) // ErrCreateSearchRequestFailed is returned when creating search request fails ErrCreateSearchRequestFailed = apperrors.RemoteFetchFailed("", errors.New("create search request failed")) // ErrSearchRequestFailed is returned when search request fails ErrSearchRequestFailed = apperrors.RemoteFetchFailed("", errors.New("search request failed")) // ErrSearchNon2xxStatus is returned when search returns non-2xx status ErrSearchNon2xxStatus = apperrors.RemoteFetchFailed("", errors.New("search non 2xx status")) // ErrSearchResponseDecodeFailed is returned when search response decoding fails ErrSearchResponseDecodeFailed = apperrors.RemoteFetchFailed("", errors.New("search response decode failed")) )
Remote search errors - now using centralized error system
var ( // ErrCompressionFailed is returned when data compression fails ErrCompressionFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "compression failed", errors.New("compression failed")) // ErrDecompressionFailed is returned when data decompression fails ErrDecompressionFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "decompression failed", errors.New("decompression failed")) // ErrCompressionAlgorithmUnsupported is returned when compression algorithm is not supported ErrCompressionAlgorithmUnsupported = apperrors.NewFederationError(apperrors.CodeValidationFailed, "compression algorithm unsupported") // ErrCompressionRatioInvalid is returned when compression ratio is invalid ErrCompressionRatioInvalid = apperrors.NewFederationError(apperrors.CodeValidationFailed, "compression ratio invalid") // ErrPayloadCompressionFailed is returned when payload compression fails ErrPayloadCompressionFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "payload compression failed", errors.New("payload compression failed")) // ErrMetricMarshalFailed is returned when metric marshaling for compression fails ErrMetricMarshalFailed = apperrors.ActivityParsingFailed("metric", errors.New("metric marshal failed")) // ErrGzipWriteFailed is returned when writing to gzip writer fails ErrGzipWriteFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to write gzip data", errors.New("gzip write failed")) // ErrGzipCloseFailed is returned when closing gzip writer fails ErrGzipCloseFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to close gzip writer", errors.New("gzip close failed")) // ErrOldMetricsRetrievalFailed is returned when retrieving old metrics fails ErrOldMetricsRetrievalFailed = apperrors.MetricsCollectionFailed("old metrics retrieval", errors.New("old metrics retrieval failed")) // ErrDataArchivalFailed is returned when data archival fails ErrDataArchivalFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "data archival failed", errors.New("data archival failed")) )
Compression pipeline errors - now using centralized error system
var ( // ErrFederationMetricStoreFailed is returned when storing federation metric fails ErrFederationMetricStoreFailed = apperrors.MetricsCollectionFailed("federation metric store", errors.New("federation metric store failed")) // ErrHealthScoreRetrieveFailed is returned when retrieving health score fails ErrHealthScoreRetrieveFailed = apperrors.HealthCheckFailed("", errors.New("health score retrieve failed")) // ErrRecentMetricsRetrieveFailed is returned when retrieving recent metrics fails ErrRecentMetricsRetrieveFailed = apperrors.MetricsCollectionFailed("recent metrics", errors.New("recent metrics retrieval failed")) // ErrUnhealthyDomainsRetrieveFailed is returned when retrieving unhealthy domains fails ErrUnhealthyDomainsRetrieveFailed = apperrors.CollectionFetchFailed("unhealthy domains", errors.New("unhealthy domains retrieval failed")) )
Analytics aggregator errors - now using centralized error system
var ( // ErrRemoteActorCacheRetrieveFailed is returned when retrieving cached remote actor fails ErrRemoteActorCacheRetrieveFailed = apperrors.ActorNotFound("cached remote actor") // ErrRemoteActorCacheUpdateFailed is returned when updating cached remote actor fails ErrRemoteActorCacheUpdateFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to update cached remote actor", errors.New("remote actor cache update failed")) // ErrRemoteActorCacheStoreFailed is returned when storing cached remote actor fails ErrRemoteActorCacheStoreFailed = apperrors.NewFederationInternalError(apperrors.CodeInternal, "failed to cache remote actor", errors.New("remote actor cache store failed")) )
Remote actor caching errors - now using centralized error system
var ( // ErrGetConnectionsFailed is returned when getting instance connections fails during trend analysis ErrGetConnectionsFailed = apperrors.RemoteFetchFailed("", errors.New("get connections failed")) )
Trend analysis errors - now using centralized error system
Functions ¶
func DetectKeyType ¶
func DetectKeyType(key interface{}) string
DetectKeyType detects the type of a crypto key (public or private)
func DetermineSigningAlgorithm ¶
func DetermineSigningAlgorithm(privateKey crypto.PrivateKey, preferLegacy bool) string
DetermineSigningAlgorithm determines the best signing algorithm based on key type and compatibility
func EncodePrivateKeyPEM ¶
func EncodePrivateKeyPEM(privateKey *rsa.PrivateKey) ([]byte, error)
EncodePrivateKeyPEM encodes an RSA private key to PEM format
func EncodePublicKeyPEM ¶
EncodePublicKeyPEM encodes an RSA public key to PEM format
func GenerateRSAKeyPair ¶
func GenerateRSAKeyPair(bits int) (*rsa.PrivateKey, error)
GenerateRSAKeyPair generates a new RSA key pair
func NegotiateSignatureAlgorithm ¶
NegotiateSignatureAlgorithm selects the best algorithm based on preferences and key type
func ParsePrivateKeyPEM ¶
func ParsePrivateKeyPEM(pemData []byte) (crypto.PrivateKey, error)
ParsePrivateKeyPEM parses a PEM-encoded private key
func ParsePublicKeyPEM ¶
ParsePublicKeyPEM parses a PEM-encoded public key
func SignHTTPRequest ¶
SignHTTPRequest signs an outgoing HTTP request
func SignHTTPRequestWithAlgorithm ¶
func SignHTTPRequestWithAlgorithm(req *http.Request, privateKey crypto.PrivateKey, keyID, algorithm string) error
SignHTTPRequestWithAlgorithm signs a request with a specific algorithm
func VerifyDigest ¶
VerifyDigest verifies the digest header against the request body
func VerifyHTTPSignature ¶
VerifyHTTPSignature verifies an incoming HTTP request's signature
func VerifyHTTPSignatureEnhanced ¶
func VerifyHTTPSignatureEnhanced(req *http.Request, publicKey crypto.PublicKey, sig *HTTPSignature) error
VerifyHTTPSignatureEnhanced verifies signatures with support for multiple algorithms
Types ¶
type ActivityPattern ¶
type ActivityPattern struct {
Type string `json:"type"`
Description string `json:"description"`
Confidence float64 `json:"confidence"`
Metadata map[string]any `json:"metadata"`
}
ActivityPattern represents a detected activity pattern
type AnalyticsAggregator ¶
type AnalyticsAggregator struct {
// contains filtered or unexported fields
}
AnalyticsAggregator implements the 5-minute primary aggregation pattern following the federation-analytics-guidance.md specifications
func NewAnalyticsAggregator ¶
func NewAnalyticsAggregator(federationRepo *repositories.FederationRepository, logger *zap.Logger) *AnalyticsAggregator
NewAnalyticsAggregator creates a new analytics aggregator
func (*AnalyticsAggregator) GetDomainHealthStatus ¶
func (a *AnalyticsAggregator) GetDomainHealthStatus(ctx context.Context, domain string) (*DomainHealthStatus, error)
GetDomainHealthStatus returns the current health status for a domain
func (*AnalyticsAggregator) GetUnhealthyDomains ¶
func (a *AnalyticsAggregator) GetUnhealthyDomains(ctx context.Context, threshold float64) ([]*DomainHealthStatus, error)
GetUnhealthyDomains returns a list of domains that need attention
func (*AnalyticsAggregator) RecordMetric ¶
func (a *AnalyticsAggregator) RecordMetric(ctx context.Context, domain string, metric *Metric) error
RecordMetric records a raw federation metric that will be aggregated
type ArchiveData ¶
type ArchiveData struct {
Relationships []models.FederationRelationship `json:"relationships"`
Metadata ArchiveMetadata `json:"metadata"`
}
ArchiveData represents the data structure for S3 archived relationships
type ArchiveMetadata ¶
type ArchiveMetadata struct {
ArchivedAt time.Time `json:"archived_at"`
Reason string `json:"reason"`
LastActivity time.Time `json:"last_activity"`
TotalItems int `json:"total_items"`
CompressionType string `json:"compression_type"`
Version string `json:"version"`
}
ArchiveMetadata contains metadata about the archived data
type AuthorizedFetchService ¶
type AuthorizedFetchService struct {
// contains filtered or unexported fields
}
AuthorizedFetchService handles authorized fetch for ActivityPub objects
func NewAuthorizedFetchService ¶
func NewAuthorizedFetchService(store core.RepositoryStorage, domain string, logger *zap.Logger) *AuthorizedFetchService
NewAuthorizedFetchService creates a new authorized fetch service
func (*AuthorizedFetchService) FetchActor ¶
func (f *AuthorizedFetchService) FetchActor(ctx context.Context, actorURL string, signingActor *activitypub.Actor) (*activitypub.Actor, error)
FetchActor fetches an ActivityPub actor with authorization
func (*AuthorizedFetchService) FetchObject ¶
func (f *AuthorizedFetchService) FetchObject(ctx context.Context, objectURL string, signingActor *activitypub.Actor) (any, error)
FetchObject fetches an ActivityPub object with authorization
func (*AuthorizedFetchService) IsAuthorizedFetchEnabled ¶
func (f *AuthorizedFetchService) IsAuthorizedFetchEnabled(ctx context.Context) bool
IsAuthorizedFetchEnabled checks if authorized fetch is enabled
func (*AuthorizedFetchService) VerifyAuthorizedFetch ¶
func (f *AuthorizedFetchService) VerifyAuthorizedFetch(ctx context.Context, req *http.Request) (*activitypub.Actor, error)
VerifyAuthorizedFetch verifies an incoming authorized fetch request
type CompressionPipeline ¶
type CompressionPipeline struct {
// contains filtered or unexported fields
}
CompressionPipeline implements the progressive compression strategy from federation-analytics-guidance.md for time series data
func NewCompressionPipeline ¶
func NewCompressionPipeline(federationRepo *repositories.FederationRepository, logger *zap.Logger) *CompressionPipeline
NewCompressionPipeline creates a new compression pipeline
func (*CompressionPipeline) CompressOldData ¶
func (c *CompressionPipeline) CompressOldData(ctx context.Context) error
CompressOldData implements the multi-level compression strategy
type CompressionStats ¶
type CompressionStats struct {
TotalRecords int `json:"total_records"`
CompressedRecords int `json:"compressed_records"`
OriginalSizeBytes int64 `json:"original_size_bytes"`
CompressedSizeBytes int64 `json:"compressed_size_bytes"`
CompressionRatio float64 `json:"compression_ratio"`
ProcessingTimeMs int64 `json:"processing_time_ms"`
}
CompressionStats tracks compression effectiveness
func CalculateCompressionStats ¶
func CalculateCompressionStats(originalSize, compressedSize int64, recordCount int) *CompressionStats
CalculateCompressionStats calculates compression effectiveness metrics
type ConnectionError ¶
type ConnectionError struct {
SourceDomain string
TargetDomain string
ActivityType string
ErrorType string // timeout/dns/refused/invalid_cert/etc
ErrorMessage string
TimeoutMs float64
OccurredAt time.Time
}
ConnectionError represents a federation connection error
type CostBreakdown ¶
type CostBreakdown struct {
LambdaCost int64 `json:"lambda_cost"`
SignatureVerificationCost int64 `json:"signature_verification_cost"`
HTTPRequestCost int64 `json:"http_request_cost"`
DataTransferCost int64 `json:"data_transfer_cost"`
DynamoDBCost int64 `json:"dynamodb_cost"`
NetworkingCost int64 `json:"networking_cost"`
RetryCost int64 `json:"retry_cost"`
}
CostBreakdown shows the breakdown of estimated costs
type CostCalculationParams ¶
type CostCalculationParams struct {
// Activity identification
ActivityID string
Domain string
ActivityType string
Direction string // inbound, outbound
OperationType string // inbox_processing, outbox_delivery, signature_verification
// Success/failure tracking
Success bool
ErrorMessage string
Timestamp time.Time
// Resource usage metrics
LambdaDurationMs int64 // Lambda execution time in milliseconds
LambdaMemoryMB int64 // Lambda memory allocation in MB
SignatureVerificationMs int64 // Time spent verifying signatures
HTTPRequestCount int64 // Number of outbound HTTP requests
DataTransferBytes int64 // Bytes transferred
DynamoDBWriteCount int64 // Number of DynamoDB write operations
DynamoDBReadCount int64 // Number of DynamoDB read operations
DNSLookupCount int64 // Number of DNS lookups
WebFingerCount int64 // Number of WebFinger lookups
SQSMessageCount int64 // Number of SQS messages sent
RetryCount int // Number of retry attempts
// Performance metrics
ResponseTimeMs int64 // Total response time
ProcessingTimeMs int64 // Time spent processing
QueueWaitTimeMs int64 // Time spent waiting in queue
// Data volume metrics
PayloadSize int64 // Size of activity payload
CompressedSize int64 // Size after compression
}
CostCalculationParams holds parameters for cost calculation
type CostCalculator ¶
type CostCalculator struct {
// Cost rates in microdollars (1/1,000,000 of a dollar)
LambdaMemoryGBSecondRate int64 // $0.0000166667 per GB-second -> 16.6667 microdollars
HTTPRequestRate int64 // $0.0001 per request -> 100 microdollars
DataTransferOutboundGBRate int64 // $0.09 per GB -> 90,000 microdollars
DataTransferInboundGBRate int64 // Free for inbound
DynamoDBWriteRequestRate int64 // $1.25 per million requests -> 1.25 microdollars per request
DynamoDBReadRequestRate int64 // $0.25 per million requests -> 0.25 microdollars per request
DNSQueryRate int64 // $0.0004 per query -> 400 microdollars per 1000 queries -> 0.4 microdollars per query
SQSMessageRate int64 // $0.0000004 per message -> 0.4 microdollars per 1000 messages -> 0.0004 microdollars per message
SignatureVerificationCPURate int64 // Estimated CPU cost for signature verification
}
CostCalculator provides standardized cost calculations for federation activities
func NewCostCalculator ¶
func NewCostCalculator() *CostCalculator
NewCostCalculator creates a new cost calculator with standard AWS pricing
func (*CostCalculator) CalculateFederationCosts ¶
func (c *CostCalculator) CalculateFederationCosts(params *CostCalculationParams) *models.FederationCostTracking
CalculateFederationCosts calculates comprehensive costs for a federation activity
func (*CostCalculator) EstimateInboundActivityCost ¶
func (c *CostCalculator) EstimateInboundActivityCost(activityType string, payloadSize int64, requiresSignatureVerification bool) int64
EstimateInboundActivityCost estimates the cost of processing an inbound activity
func (*CostCalculator) EstimateOutboundActivityCost ¶
func (c *CostCalculator) EstimateOutboundActivityCost(activityType string, payloadSize int64, targetCount int) int64
EstimateOutboundActivityCost estimates the cost of delivering an outbound activity
func (*CostCalculator) GetCostEstimate ¶
func (c *CostCalculator) GetCostEstimate(params *CostCalculationParams) *CostEstimate
GetCostEstimate returns a cost estimate with breakdown
type CostEstimate ¶
type CostEstimate struct {
EstimatedCostMicroCents int64 `json:"estimated_cost_micro_cents"`
EstimatedCostDollars float64 `json:"estimated_cost_dollars"`
Breakdown *CostBreakdown `json:"breakdown"`
Confidence string `json:"confidence"` // high, medium, low
Notes []string `json:"notes,omitempty"`
}
CostEstimate represents a cost estimate for a federation operation
type DeliveryAttempt ¶
type DeliveryAttempt struct {
SourceDomain string
TargetDomain string
ActivityType string
Success bool
ResponseTimeMs float64
Timestamp time.Time
UserID string // Local user ID for user-level tracking
}
DeliveryAttempt represents a federation delivery attempt
type DeliveryService ¶
type DeliveryService struct {
// contains filtered or unexported fields
}
DeliveryService handles sending activities to remote instances
func NewDeliveryService ¶
func NewDeliveryService(store FederationStorage, cfg *appConfig.Config) *DeliveryService
NewDeliveryService creates a new delivery service
func (*DeliveryService) DeliverActivity ¶
func (d *DeliveryService) DeliverActivity(ctx context.Context, activity *activitypub.Activity, targetInbox string, signingActor *activitypub.Actor) error
DeliverActivity delivers an activity to a remote inbox
func (*DeliveryService) DeliverActivityWithPrivacy ¶
func (d *DeliveryService) DeliverActivityWithPrivacy(ctx context.Context, activity *activitypub.Activity, targetInbox string, signingActor *activitypub.Actor, targetActorID string) error
DeliverActivityWithPrivacy delivers an activity to a remote inbox with privacy controls
func (*DeliveryService) DeliverDirectMessage ¶
func (d *DeliveryService) DeliverDirectMessage(ctx context.Context, activity *activitypub.Activity, signingActor *activitypub.Actor) error
DeliverDirectMessage delivers a direct message to specific recipients
func (*DeliveryService) DeliverToFollowers ¶
func (d *DeliveryService) DeliverToFollowers(ctx context.Context, activity *activitypub.Activity, actor *activitypub.Actor) error
DeliverToFollowers delivers an activity to all followers of an actor
func (*DeliveryService) DeliverToRecipients ¶
func (d *DeliveryService) DeliverToRecipients(ctx context.Context, activity *activitypub.Activity, actor *activitypub.Actor) error
DeliverToRecipients delivers an activity to specific recipients (to, cc, bto, bcc)
func (*DeliveryService) DeliverToRecipientsWithPrivacy ¶
func (d *DeliveryService) DeliverToRecipientsWithPrivacy(ctx context.Context, activity *activitypub.Activity, actor *activitypub.Actor) error
DeliverToRecipientsWithPrivacy delivers an activity to specific recipients with privacy controls
func (*DeliveryService) QueueDelivery ¶
func (d *DeliveryService) QueueDelivery(ctx context.Context, activity *activitypub.Activity, targetInbox string, signingActor *activitypub.Actor) error
QueueDelivery queues an activity for async delivery with proper retry handling
type DomainHealthStatus ¶
type DomainHealthStatus struct {
Domain string `json:"domain"`
Status string `json:"status"` // HEALTHY, DEGRADED, UNHEALTHY, CRITICAL
HealthScore float64 `json:"health_score"` // 0-100
CheckedAt time.Time `json:"checked_at"`
RecentActivities int64 `json:"recent_activities"`
RecentErrors int64 `json:"recent_errors"`
RecentErrorRate float64 `json:"recent_error_rate"`
AvgLatencyMs int64 `json:"avg_latency_ms"`
AvailabilityRate float64 `json:"availability_rate"`
ShouldAlert bool `json:"should_alert"`
AlertMessage string `json:"alert_message,omitempty"`
}
DomainHealthStatus represents the current health status of a federated domain
type DomainStats ¶
type DomainStats struct {
Domain string
TotalVolume int64
ConnectionCount int64
ErrorCount int64
TotalResponseTime float64
LastActivity time.Time
}
DomainStats represents statistics for a domain
type DynamORMFederationStorage ¶
type DynamORMFederationStorage struct {
// contains filtered or unexported fields
}
DynamORMFederationStorage implements FederationStorage using DynamORM repositories. This provides a clean, minimal interface for federation operations without the overhead of the full storage.Storage interface.
func NewDynamORMFederationStorage ¶
func NewDynamORMFederationStorage( db core.DB, tableName string, logger *zap.Logger, ) *DynamORMFederationStorage
NewDynamORMFederationStorage creates a new DynamORM-based federation storage implementation.
func (*DynamORMFederationStorage) CacheRemoteActor ¶
func (s *DynamORMFederationStorage) CacheRemoteActor(ctx context.Context, handle string, actor *activitypub.Actor, ttl time.Duration) error
CacheRemoteActor caches a remote actor for future lookups.
func (*DynamORMFederationStorage) GetActor ¶
func (s *DynamORMFederationStorage) GetActor(ctx context.Context, username string) (*activitypub.Actor, error)
GetActor retrieves an actor by username.
func (*DynamORMFederationStorage) GetActorPrivateKey ¶
func (s *DynamORMFederationStorage) GetActorPrivateKey(ctx context.Context, username string) (string, error)
GetActorPrivateKey retrieves the private key for an actor by username.
func (*DynamORMFederationStorage) GetCachedRemoteActor ¶
func (s *DynamORMFederationStorage) GetCachedRemoteActor(ctx context.Context, actorID string) (*activitypub.Actor, error)
GetCachedRemoteActor retrieves a cached remote actor by actor ID.
func (*DynamORMFederationStorage) GetFollowers ¶
func (s *DynamORMFederationStorage) GetFollowers(ctx context.Context, username string, limit int, cursor string) ([]string, string, error)
GetFollowers retrieves the list of follower usernames for an actor.
func (*DynamORMFederationStorage) RecordFederationActivity ¶
func (s *DynamORMFederationStorage) RecordFederationActivity(ctx context.Context, activity *storage.FederationActivity) error
RecordFederationActivity records federation activity for cost tracking and metrics.
type EnhancedHTTPSignature ¶
type EnhancedHTTPSignature struct {
HTTPSignature
Created int64 // Unix timestamp (for hs2019)
Expires int64 // Unix timestamp (for hs2019)
}
EnhancedHTTPSignature provides enhanced HTTP signature with support for hs2019
type EnhancedRetryMessage ¶
type EnhancedRetryMessage struct {
DeliveryID string `json:"delivery_id"`
Activity *activitypub.Activity `json:"activity"`
SigningActorID string `json:"signing_actor_id"`
ActivityType string `json:"activity_type"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
RetryPolicy string `json:"retry_policy"`
MaxRetryDuration time.Duration `json:"max_retry_duration"`
CreatedAt time.Time `json:"created_at"`
NextRetryAt time.Time `json:"next_retry_at"`
TargetInboxes []string `json:"target_inboxes,omitempty"`
Recipients []string `json:"recipients,omitempty"`
FailedInboxes map[string]string `json:"failed_inboxes,omitempty"` // inbox -> error message
SuccessfulInboxes []string `json:"successful_inboxes,omitempty"`
}
EnhancedRetryMessage represents a message for polynomial retry delivery
type EnhancedRetryProcessor ¶
type EnhancedRetryProcessor struct {
// contains filtered or unexported fields
}
EnhancedRetryProcessor handles polynomial retry delivery for critical activities
func NewEnhancedRetryProcessor ¶
func NewEnhancedRetryProcessor(deliveryService *DeliveryService, sqsClient enhancedRetrySQSClient, queueURL string) *EnhancedRetryProcessor
NewEnhancedRetryProcessor creates a new enhanced retry processor
func (*EnhancedRetryProcessor) ProcessEnhancedRetry ¶
func (p *EnhancedRetryProcessor) ProcessEnhancedRetry(ctx context.Context, message *EnhancedRetryMessage) error
ProcessEnhancedRetry processes a retry message with polynomial backoff
func (*EnhancedRetryProcessor) QueueForEnhancedRetry ¶
func (p *EnhancedRetryProcessor) QueueForEnhancedRetry(ctx context.Context, activity *activitypub.Activity, actor *activitypub.Actor, recipients []string, activityType string) error
QueueForEnhancedRetry queues an activity for polynomial retry delivery
type ErrorRateDataPoint ¶
type ErrorRateDataPoint struct {
Timestamp time.Time `json:"timestamp"`
ErrorRate float64 `json:"error_rate"`
}
ErrorRateDataPoint represents a single error rate data point
type ErrorRateTrend ¶
type ErrorRateTrend struct {
DataPoints []ErrorRateDataPoint `json:"data_points"`
Direction string `json:"direction"` // increasing/decreasing/stable
AverageErrorRate float64 `json:"average_error_rate"`
MinErrorRate float64 `json:"min_error_rate"`
MaxErrorRate float64 `json:"max_error_rate"`
}
ErrorRateTrend represents error rate trend analysis
type FederationHooks ¶
type FederationHooks struct {
// contains filtered or unexported fields
}
FederationHooks provides hooks into federation activities for tracking
func NewFederationHooks ¶
func NewFederationHooks(store core.RepositoryStorage, monitor *monitoring.PerformanceMonitor, db dynamormcore.DB, logger *zap.Logger) *FederationHooks
NewFederationHooks creates a new federation hooks instance
func (*FederationHooks) GetFederationRecommendations ¶
func (fh *FederationHooks) GetFederationRecommendations(ctx context.Context, domain string) ([]*FederationRecommendation, error)
GetFederationRecommendations provides recommendations for improving federation
func (*FederationHooks) GetRelationshipAnalysis ¶
func (fh *FederationHooks) GetRelationshipAnalysis(ctx context.Context, sourceDomain, targetDomain string) (*RelationshipAnalysis, error)
GetRelationshipAnalysis provides analysis for a specific relationship
func (*FederationHooks) OnConnectionError ¶
func (fh *FederationHooks) OnConnectionError(ctx context.Context, connError *ConnectionError) error
OnConnectionError is called when there's an error connecting to an instance
func (*FederationHooks) OnInboxReceive ¶
func (fh *FederationHooks) OnInboxReceive(ctx context.Context, activity *InboxActivity) error
OnInboxReceive is called when an activity is received from another instance
func (*FederationHooks) OnInstanceDiscovery ¶
func (fh *FederationHooks) OnInstanceDiscovery(ctx context.Context, instance *InstanceDiscovery) error
OnInstanceDiscovery is called when a new instance is discovered
func (*FederationHooks) OnOutboxDelivery ¶
func (fh *FederationHooks) OnOutboxDelivery(ctx context.Context, delivery *OutboxDelivery) error
OnOutboxDelivery is called when an activity is delivered to another instance
type FederationRecommendation ¶
type FederationRecommendation struct {
Type string `json:"type"` // performance/opportunity/cost/security
Priority string `json:"priority"` // high/medium/low
TargetDomain string `json:"target_domain,omitempty"`
Description string `json:"description"`
Action string `json:"action"`
Metrics map[string]any `json:"metrics,omitempty"`
}
FederationRecommendation represents a recommendation for improving federation
type FederationStorage ¶
type FederationStorage interface {
// Actor operations needed for federation
GetActorPrivateKey(ctx context.Context, username string) (string, error)
GetActor(ctx context.Context, username string) (*activitypub.Actor, error)
GetFollowers(ctx context.Context, username string, limit int, cursor string) ([]string, string, error)
// Remote actor caching for federation efficiency
GetCachedRemoteActor(ctx context.Context, actorID string) (*activitypub.Actor, error)
CacheRemoteActor(ctx context.Context, handle string, actor *activitypub.Actor, ttl time.Duration) error
// Federation activity tracking and cost monitoring
RecordFederationActivity(ctx context.Context, activity *storage.FederationActivity) error
}
FederationStorage defines the minimal storage interface needed for federation delivery. This is a much smaller interface than storage.Storage, focused only on federation needs.
func NewRepositoryStorageAdapter ¶
func NewRepositoryStorageAdapter(repos core.RepositoryStorage) FederationStorage
NewRepositoryStorageAdapter creates a new adapter
type HTTPSignature ¶
HTTPSignature represents a parsed HTTP signature
func ParseSignatureHeader ¶
func ParseSignatureHeader(header string) (*HTTPSignature, error)
ParseSignatureHeader parses the Signature header according to draft-cavage-http-signatures-12
type InboundActivity ¶
type InboundActivity struct {
SourceDomain string
TargetDomain string
ActivityType string
Timestamp time.Time
UserID string // Local user ID for user-level tracking
}
InboundActivity represents an inbound federation activity
type InboxActivity ¶
type InboxActivity struct {
SourceDomain string
TargetDomain string
ActivityType string
ActivityID string
Valid bool
ProcessedAt time.Time
}
InboxActivity represents an inbound federation activity event
type InstanceDiscovery ¶
type InstanceDiscovery struct {
Domain string
DisplayName string
Description string
Software string
Version string
UserCount int64
StatusCount int64
DiscoveredAt time.Time
}
InstanceDiscovery represents a discovered federated instance
type Metric ¶
type Metric struct {
InboundBytes int64 `json:"inbound_bytes"`
OutboundBytes int64 `json:"outbound_bytes"`
ResponseTimeMs int64 `json:"response_time_ms"`
SignatureTimeMs int64 `json:"signature_time_ms"`
Success bool `json:"success"`
ErrorType string `json:"error_type,omitempty"` // signature_failure, timeout, rate_limit, etc.
ActivityType string `json:"activity_type"` // follow, like, announce, etc.
}
Metric represents a single federation event to be recorded
type OutboxDelivery ¶
type OutboxDelivery struct {
SourceDomain string
TargetDomain string
ActivityType string
ActivityID string
Success bool
ResponseTimeMs float64
HTTPStatus int
ErrorMessage string
}
OutboxDelivery represents an outbound federation delivery event
type RelationshipAnalysis ¶
type RelationshipAnalysis struct {
SourceDomain string `json:"source_domain"`
TargetDomain string `json:"target_domain"`
InboundVolume int64 `json:"inbound_volume"`
OutboundVolume int64 `json:"outbound_volume"`
TotalVolume int64 `json:"total_volume"`
InboundStrength float64 `json:"inbound_strength"`
OutboundStrength float64 `json:"outbound_strength"`
OverallStrength float64 `json:"overall_strength"`
Reciprocity float64 `json:"reciprocity"`
RelationshipType string `json:"relationship_type"`
LastInboundActivity time.Time `json:"last_inbound_activity"`
LastOutboundActivity time.Time `json:"last_outbound_activity"`
Timestamp time.Time `json:"timestamp"`
}
RelationshipAnalysis represents analysis of federation relationship strength
type RelationshipTracker ¶
type RelationshipTracker struct {
// contains filtered or unexported fields
}
RelationshipTracker tracks and analyzes federation relationships with persistence and lifecycle management
func NewRelationshipTracker ¶
func NewRelationshipTracker(store core.RepositoryStorage, db dynamormcore.DB, logger *zap.Logger) *RelationshipTracker
NewRelationshipTracker creates a new relationship tracker with DynamORM persistence
func NewRelationshipTrackerWithS3 ¶
func NewRelationshipTrackerWithS3(store core.RepositoryStorage, db dynamormcore.DB, logger *zap.Logger, s3Client *s3.Client, archiveBucket string) *RelationshipTracker
NewRelationshipTrackerWithS3 creates a new relationship tracker with S3 archival support
func (*RelationshipTracker) AnalyzeRelationshipStrength ¶
func (rt *RelationshipTracker) AnalyzeRelationshipStrength(ctx context.Context, sourceDomain, targetDomain string) (*RelationshipAnalysis, error)
AnalyzeRelationshipStrength calculates the strength of relationships between instances
func (*RelationshipTracker) ArchiveInstanceGroup ¶
func (rt *RelationshipTracker) ArchiveInstanceGroup(ctx context.Context, targetInstance string, relationships []models.FederationRelationship) error
ArchiveInstanceGroup archives a group of relationships for a single instance (public interface)
func (*RelationshipTracker) BatchArchiveToS3 ¶
func (rt *RelationshipTracker) BatchArchiveToS3(ctx context.Context, relationships []models.FederationRelationship) error
BatchArchiveToS3 archives multiple relationships to S3 in a single compressed file for efficiency
func (*RelationshipTracker) BatchRestoreRelationships ¶
func (rt *RelationshipTracker) BatchRestoreRelationships(ctx context.Context, archiveLocations []string) error
BatchRestoreRelationships restores multiple relationships from archives using batch operations
func (*RelationshipTracker) ForceStateTransition ¶
func (rt *RelationshipTracker) ForceStateTransition(ctx context.Context, userID, targetInstance, relType string, newState models.RelationshipState) error
ForceStateTransition manually transitions a relationship to a new state
func (*RelationshipTracker) GenerateRecommendations ¶
func (rt *RelationshipTracker) GenerateRecommendations(ctx context.Context, domain string) ([]*FederationRecommendation, error)
GenerateRecommendations generates relationship-based recommendations
func (*RelationshipTracker) GetHealthScore ¶
func (rt *RelationshipTracker) GetHealthScore(ctx context.Context, targetInstance string) (float64, error)
GetHealthScore calculates a health score for relationships with a target instance
func (*RelationshipTracker) GetInstanceAggregate ¶
func (rt *RelationshipTracker) GetInstanceAggregate(ctx context.Context, instanceDomain string, period string) (*models.FederationRelationshipAggregate, error)
GetInstanceAggregate retrieves the current aggregate metrics for an instance
func (*RelationshipTracker) GetRelationshipByID ¶
func (rt *RelationshipTracker) GetRelationshipByID(ctx context.Context, userID, targetInstance, relType string) (*models.FederationRelationship, error)
GetRelationshipByID retrieves a specific federation relationship
func (*RelationshipTracker) GetRelationshipsByState ¶
func (rt *RelationshipTracker) GetRelationshipsByState(ctx context.Context, state models.RelationshipState, limit int) ([]*models.FederationRelationship, error)
GetRelationshipsByState retrieves relationships in a specific state
func (*RelationshipTracker) GetSuccessRate ¶
func (rt *RelationshipTracker) GetSuccessRate(ctx context.Context, targetDomain string) (float64, error)
GetSuccessRate returns the current 15-minute success rate for a target domain
func (*RelationshipTracker) GetUnhealthyRelationships ¶
func (rt *RelationshipTracker) GetUnhealthyRelationships(ctx context.Context, threshold float64, limit int) ([]*models.FederationRelationship, error)
GetUnhealthyRelationships returns relationships that are performing poorly
func (*RelationshipTracker) GetUserRelationships ¶
func (rt *RelationshipTracker) GetUserRelationships(ctx context.Context, userID string, limit int) ([]*models.FederationRelationship, error)
GetUserRelationships retrieves all federation relationships for a user
func (*RelationshipTracker) ReactivateRelationship ¶
func (rt *RelationshipTracker) ReactivateRelationship(ctx context.Context, userID, targetInstance, relType string) error
ReactivateRelationship manually reactivates an archived relationship
func (*RelationshipTracker) TrackDeliveryAttempt ¶
func (rt *RelationshipTracker) TrackDeliveryAttempt(ctx context.Context, attempt *DeliveryAttempt) error
TrackDeliveryAttempt records a federation delivery attempt with comprehensive relationship tracking
func (*RelationshipTracker) TrackInboundActivity ¶
func (rt *RelationshipTracker) TrackInboundActivity(ctx context.Context, activity *InboundActivity) error
TrackInboundActivity records inbound federation activity with comprehensive relationship tracking
func (*RelationshipTracker) UpdateInstanceMetadata ¶
func (rt *RelationshipTracker) UpdateInstanceMetadata(ctx context.Context, metadata *storage.InstanceMetadata) error
UpdateInstanceMetadata updates stored metadata for a federated instance.
type RelayBudgetRecommendations ¶
type RelayBudgetRecommendations struct {
RelayURL string `json:"relay_url"`
AnalysisPeriod string `json:"analysis_period"`
Recommendations []string `json:"recommendations"`
EstimatedSavings int64 `json:"estimated_savings_micro_cents"`
}
RelayBudgetRecommendations provides budget optimization recommendations
type RelayBudgetService ¶
type RelayBudgetService struct {
// contains filtered or unexported fields
}
RelayBudgetService manages relay budgets and cost limits
func NewRelayBudgetService ¶
func NewRelayBudgetService(costRepo relayBudgetCostRepository, logger *zap.Logger) *RelayBudgetService
NewRelayBudgetService creates a new relay budget service
func (*RelayBudgetService) AggregateRelayCosts ¶
func (rbs *RelayBudgetService) AggregateRelayCosts(ctx context.Context, relayURL string) error
AggregateRelayCosts aggregates relay costs into metrics for budget tracking
func (*RelayBudgetService) CheckRelayBudget ¶
func (rbs *RelayBudgetService) CheckRelayBudget(ctx context.Context, relayURL string, estimatedCostMicroCents int64) error
CheckRelayBudget checks if a relay operation would exceed budget
func (*RelayBudgetService) CreateRelayBudget ¶
func (rbs *RelayBudgetService) CreateRelayBudget(ctx context.Context, relayURL, period string, limitMicroCents int64, warningThreshold, criticalThreshold float64) error
CreateRelayBudget creates a new budget configuration for a relay
func (*RelayBudgetService) GetRelayBudgetRecommendations ¶
func (rbs *RelayBudgetService) GetRelayBudgetRecommendations(ctx context.Context, relayURL string) (*RelayBudgetRecommendations, error)
GetRelayBudgetRecommendations provides budget optimization recommendations
func (*RelayBudgetService) GetRelayBudgetStatus ¶
func (rbs *RelayBudgetService) GetRelayBudgetStatus(ctx context.Context, relayURL, period string) (*RelayBudgetStatus, error)
GetRelayBudgetStatus returns the current budget status for a relay
func (*RelayBudgetService) UpdateRelayBudgetUsage ¶
func (rbs *RelayBudgetService) UpdateRelayBudgetUsage(ctx context.Context, relayURL, period string, additionalCostMicroCents int64) error
UpdateRelayBudgetUsage updates the current usage for a relay budget
type RelayBudgetStatus ¶
type RelayBudgetStatus struct {
RelayURL string `json:"relay_url"`
Period string `json:"period"`
HasBudget bool `json:"has_budget"`
LimitMicroCents int64 `json:"limit_micro_cents"`
CurrentUsageMicroCents int64 `json:"current_usage_micro_cents"`
UsagePercent float64 `json:"usage_percent"`
BudgetExceeded bool `json:"budget_exceeded"`
WarningThresholdReached bool `json:"warning_threshold_reached"`
CriticalThresholdReached bool `json:"critical_threshold_reached"`
PauseRelay bool `json:"pause_relay"`
ReduceFrequency bool `json:"reduce_frequency"`
LastResetAt time.Time `json:"last_reset_at"`
}
RelayBudgetStatus represents the current budget status for a relay
type RelayInfo ¶
type RelayInfo struct {
URL string `json:"url"`
InboxURL string `json:"inbox_url"`
Active bool `json:"active"`
CreatedAt time.Time `json:"created_at"`
LastSeenAt time.Time `json:"last_seen_at"`
}
RelayInfo represents information about a relay
type RelayService ¶
type RelayService struct {
// contains filtered or unexported fields
}
RelayService handles ActivityPub relay functionality
func NewRelayService ¶
func NewRelayService(store core.RepositoryStorage, domain string, logger *zap.Logger) *RelayService
NewRelayService creates a new relay service
func (*RelayService) ForwardToRelays ¶
func (r *RelayService) ForwardToRelays(ctx context.Context, activity *activitypub.Activity, actor *activitypub.Actor) error
ForwardToRelays forwards an activity to all active relays
func (*RelayService) HandleRelayActivity ¶
func (r *RelayService) HandleRelayActivity(ctx context.Context, activity *activitypub.Activity, relayURL string) error
HandleRelayActivity handles an activity received from a relay
func (*RelayService) SubscribeToRelay ¶
func (r *RelayService) SubscribeToRelay(ctx context.Context, relayURL string, actorUsername string) error
SubscribeToRelay subscribes to a relay
func (*RelayService) UnsubscribeFromRelay ¶
func (r *RelayService) UnsubscribeFromRelay(ctx context.Context, relayURL string, actorUsername string) error
UnsubscribeFromRelay unsubscribes from a relay
type RemoteSearchService ¶
type RemoteSearchService struct {
// contains filtered or unexported fields
}
RemoteSearchService handles remote actor discovery via WebFinger and ActivityPub
func NewRemoteSearchService ¶
func NewRemoteSearchService(store core.RepositoryStorage) *RemoteSearchService
NewRemoteSearchService creates a new remote search service
func (*RemoteSearchService) ResolveActor ¶
func (s *RemoteSearchService) ResolveActor(ctx context.Context, handle string) (*SearchResult, error)
ResolveActor resolves an actor handle (user@domain) to an Actor object It first checks local cache, then performs WebFinger lookup if needed
func (*RemoteSearchService) ResolveActorURL ¶
func (s *RemoteSearchService) ResolveActorURL(ctx context.Context, actorURL string) (*SearchResult, error)
ResolveActorURL resolves an ActivityPub actor URL to an Actor object. It checks cache using an inferred handle when possible, then fetches the actor document and caches it.
func (*RemoteSearchService) SearchRemoteActors ¶
func (s *RemoteSearchService) SearchRemoteActors(ctx context.Context, query string, limit int) ([]*SearchResult, error)
SearchRemoteActors searches for actors on remote instances Supports both exact @user@domain matches and fuzzy search on known instances
type RepositoryStorageAdapter ¶
type RepositoryStorageAdapter struct {
// contains filtered or unexported fields
}
RepositoryStorageAdapter adapts RepositoryStorage to implement FederationStorage This is a temporary adapter to ease the migration from custom interfaces
func (*RepositoryStorageAdapter) CacheRemoteActor ¶
func (a *RepositoryStorageAdapter) CacheRemoteActor(ctx context.Context, handle string, actor *activitypub.Actor, ttl time.Duration) error
CacheRemoteActor caches a remote actor with a TTL
func (*RepositoryStorageAdapter) GetActor ¶
func (a *RepositoryStorageAdapter) GetActor(ctx context.Context, username string) (*activitypub.Actor, error)
GetActor retrieves an actor by username
func (*RepositoryStorageAdapter) GetActorPrivateKey ¶
func (a *RepositoryStorageAdapter) GetActorPrivateKey(ctx context.Context, username string) (string, error)
GetActorPrivateKey retrieves the private key for an actor by username
func (*RepositoryStorageAdapter) GetCachedRemoteActor ¶
func (a *RepositoryStorageAdapter) GetCachedRemoteActor(ctx context.Context, actorID string) (*activitypub.Actor, error)
GetCachedRemoteActor retrieves a cached remote actor by handle
func (*RepositoryStorageAdapter) GetFollowers ¶
func (a *RepositoryStorageAdapter) GetFollowers(ctx context.Context, username string, limit int, cursor string) ([]string, string, error)
GetFollowers retrieves the list of follower usernames for an actor
func (*RepositoryStorageAdapter) RecordFederationActivity ¶
func (a *RepositoryStorageAdapter) RecordFederationActivity(ctx context.Context, activity *storage.FederationActivity) error
RecordFederationActivity records a federation activity
type ResponseTimeDataPoint ¶
type ResponseTimeDataPoint struct {
Timestamp time.Time `json:"timestamp"`
ResponseTime float64 `json:"response_time"`
}
ResponseTimeDataPoint represents a single response time data point
type ResponseTimeTrend ¶
type ResponseTimeTrend struct {
DataPoints []ResponseTimeDataPoint `json:"data_points"`
Direction string `json:"direction"` // improving/degrading/stable
AverageResponseTime float64 `json:"average_response_time"`
MinResponseTime float64 `json:"min_response_time"`
MaxResponseTime float64 `json:"max_response_time"`
}
ResponseTimeTrend represents response time trend analysis
type SearchResult ¶
type SearchResult struct {
Actor *activitypub.Actor
IsRemote bool
RemoteDomain string
}
SearchResult represents a remote search result
type SignatureService ¶
type SignatureService struct {
// contains filtered or unexported fields
}
SignatureService provides enhanced HTTP signature verification with caching and retry logic
func NewSignatureService ¶
func NewSignatureService(publicKeyCacheRepo *repositories.PublicKeyCacheRepository, logger *zap.Logger) *SignatureService
NewSignatureService creates a new signature service
func (*SignatureService) VerifyDigestWithCompatibility ¶
func (s *SignatureService) VerifyDigestWithCompatibility(req *http.Request, body []byte) error
VerifyDigestWithCompatibility verifies digest with both SHA-256 and sha-256 prefixes for compatibility
func (*SignatureService) VerifySignature ¶
func (s *SignatureService) VerifySignature(ctx context.Context, req *http.Request, actorURL string) error
VerifySignature verifies an HTTP request signature with caching and retry logic
type StatisticalSummary ¶
type StatisticalSummary struct {
Domain string `json:"domain"`
Period string `json:"period"`
Timestamp time.Time `json:"timestamp"`
ActivityCount int64 `json:"activity_count"`
SuccessfulActivities int64 `json:"successful_activities"`
FailedActivities int64 `json:"failed_activities"`
HealthScore float64 `json:"health_score"`
Percentiles map[string]int64 `json:"percentiles"`
ErrorRate float64 `json:"error_rate"`
InstanceReachability float64 `json:"instance_reachability"`
TotalBytes int64 `json:"total_bytes"`
}
StatisticalSummary represents a compressed statistical summary of time series data
type TrendAnalysis ¶
type TrendAnalysis struct {
Domain string `json:"domain"`
Period time.Duration `json:"period"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
VolumeTrend *VolumeTrend `json:"volume_trend"`
ResponseTimeTrend *ResponseTimeTrend `json:"response_time_trend"`
ErrorRateTrend *ErrorRateTrend `json:"error_rate_trend"`
TrendingInstances []*TrendingInstance `json:"trending_instances"`
Patterns []*ActivityPattern `json:"patterns"`
OverallTrendScore float64 `json:"overall_trend_score"`
}
TrendAnalysis represents comprehensive trend analysis for a federation domain
type TrendAnalyzer ¶
type TrendAnalyzer struct {
// contains filtered or unexported fields
}
TrendAnalyzer analyzes federation flow trends and patterns
func NewTrendAnalyzer ¶
func NewTrendAnalyzer(store core.RepositoryStorage) *TrendAnalyzer
NewTrendAnalyzer creates a new trend analyzer
func (*TrendAnalyzer) AnalyzeTrends ¶
func (ta *TrendAnalyzer) AnalyzeTrends(ctx context.Context, domain string, period time.Duration) (*TrendAnalysis, error)
AnalyzeTrends analyzes federation trends for a specific domain
type TrendingInstance ¶
type TrendingInstance struct {
Domain string `json:"domain"`
TrendScore float64 `json:"trend_score"`
VolumeChange int64 `json:"volume_change"`
ResponseTime float64 `json:"response_time"`
ErrorRate float64 `json:"error_rate"`
LastActivity time.Time `json:"last_activity"`
TrendReason string `json:"trend_reason"`
}
TrendingInstance represents a trending federated instance
type VolumeDataPoint ¶
VolumeDataPoint represents a single volume data point
type VolumeTrend ¶
type VolumeTrend struct {
DataPoints []VolumeDataPoint `json:"data_points"`
Direction string `json:"direction"` // increasing/decreasing/stable
Slope float64 `json:"slope"`
R2 float64 `json:"r2"`
TotalVolume int64 `json:"total_volume"`
PeakVolume int64 `json:"peak_volume"`
}
VolumeTrend represents volume trend analysis
Source Files
¶
- analytics_aggregator.go
- authorized_fetch.go
- compression_pipeline.go
- constants.go
- cost_calculator.go
- delivery.go
- dynamorm_storage.go
- enhanced_retry.go
- errors.go
- hooks.go
- httpsig.go
- httpsig_enhanced.go
- relationship_tracker.go
- relay.go
- relay_budget.go
- remote_search.go
- signature_service.go
- storage.go
- storage_adapter.go
- trend_analyzer.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package circuit provides error constants for circuit breaker operations.
|
Package circuit provides error constants for circuit breaker operations. |
|
Package cost provides AWS cost calculation utilities for federation operations.
|
Package cost provides AWS cost calculation utilities for federation operations. |
|
Package health defines event types for federation health monitoring and EventBridge integration.
|
Package health defines event types for federation health monitoring and EventBridge integration. |
|
Package routing provides distributed circuit breaker implementation for federation request routing.
|
Package routing provides distributed circuit breaker implementation for federation request routing. |
|
Package sync provides thread synchronization utilities for ActivityPub conversation management.
|
Package sync provides thread synchronization utilities for ActivityPub conversation management. |
|
Package types defines shared data structures for ActivityPub federation.
|
Package types defines shared data structures for ActivityPub federation. |