Documentation
¶
Overview ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func IsInflightTransaction(transaction *model.Transaction) bool
- func NewBalanceTracker() *model.BalanceTracker
- type Allocation
- type BalanceLineage
- type BatchJobResult
- type Blnk
- func (b *Blnk) Close() error
- func (l *Blnk) CommitInflightTransaction(ctx context.Context, transactionID string, amount *big.Int) (*model.Transaction, error)
- func (l *Blnk) CommitInflightTransactionWithQueue(ctx context.Context, transactionID string, amount *big.Int) (*model.Transaction, error)
- func (l *Blnk) CommitWorker(ctx context.Context, jobs <-chan *model.Transaction, ...)
- func (b *Blnk) Config() *config.Configuration
- func (l *Blnk) CreateAPIKey(ctx context.Context, name, ownerID string, scopes []string, ...) (*model.APIKey, error)
- func (l *Blnk) CreateAccount(account model.Account) (model.Account, error)
- func (l *Blnk) CreateBalance(ctx context.Context, balance model.Balance) (model.Balance, error)
- func (l *Blnk) CreateBulkTransactions(ctx context.Context, req *model.BulkTransactionRequest) (*model.BulkTransactionResult, error)
- func (l *Blnk) CreateIdentity(identity model.Identity) (model.Identity, error)
- func (l *Blnk) CreateLedger(ledger model.Ledger) (model.Ledger, error)
- func (s *Blnk) CreateMatchingRule(ctx context.Context, rule model.MatchingRule) (*model.MatchingRule, error)
- func (l *Blnk) CreateMonitor(ctx context.Context, monitor model.BalanceMonitor) (model.BalanceMonitor, error)
- func (l *Blnk) DeleteIdentity(id string) error
- func (s *Blnk) DeleteMatchingRule(ctx context.Context, id string) error
- func (l *Blnk) DeleteMonitor(ctx context.Context, id string) error
- func (l *Blnk) DetokenizeIdentity(identityID string) (map[string]string, error)
- func (l *Blnk) DetokenizeIdentityField(identityID, fieldName string) (string, error)
- func (l *Blnk) GetAPIKeyByKey(ctx context.Context, key string) (*model.APIKey, error)
- func (l *Blnk) GetAccount(id string, include []string) (*model.Account, error)
- func (l *Blnk) GetAccountByNumber(id string) (*model.Account, error)
- func (l *Blnk) GetAllAccounts() ([]model.Account, error)
- func (l *Blnk) GetAllAccountsWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Account, error)
- func (l *Blnk) GetAllAccountsWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, ...) ([]model.Account, *int64, error)
- func (l *Blnk) GetAllBalances(ctx context.Context, limit, offset int) ([]model.Balance, error)
- func (l *Blnk) GetAllBalancesWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Balance, error)
- func (l *Blnk) GetAllBalancesWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, ...) ([]model.Balance, *int64, error)
- func (l *Blnk) GetAllIdentities() ([]model.Identity, error)
- func (l *Blnk) GetAllIdentitiesWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Identity, error)
- func (l *Blnk) GetAllIdentitiesWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, ...) ([]model.Identity, *int64, error)
- func (l *Blnk) GetAllLedgers(limit, offset int) ([]model.Ledger, error)
- func (l *Blnk) GetAllLedgersWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Ledger, error)
- func (l *Blnk) GetAllLedgersWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, ...) ([]model.Ledger, *int64, error)
- func (l *Blnk) GetAllMonitors(ctx context.Context) ([]model.BalanceMonitor, error)
- func (l *Blnk) GetAllTransactions(limit, offset int) ([]model.Transaction, error)
- func (l *Blnk) GetAllTransactionsWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Transaction, error)
- func (l *Blnk) GetAllTransactionsWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, ...) ([]model.Transaction, *int64, error)
- func (l *Blnk) GetBalanceAtTime(ctx context.Context, balanceID string, targetTime time.Time, fromSource bool) (*model.Balance, error)
- func (l *Blnk) GetBalanceByID(ctx context.Context, id string, include []string, withQueued bool) (*model.Balance, error)
- func (l *Blnk) GetBalanceByIndicator(ctx context.Context, indicator, currency string) (*model.Balance, error)
- func (l *Blnk) GetBalanceLineage(ctx context.Context, balanceID string) (*BalanceLineage, error)
- func (l *Blnk) GetBalanceMonitors(ctx context.Context, balanceID string) ([]model.BalanceMonitor, error)
- func (b *Blnk) GetDataSource() database.IDataSource
- func (l *Blnk) GetDetokenizedIdentity(identityID string) (*model.Identity, error)
- func (l *Blnk) GetIdentity(id string) (*model.Identity, error)
- func (l *Blnk) GetInflightTransactionsByParentID(ctx context.Context, parentTransactionID string, batchSize int, offset int64) ([]*model.Transaction, error)
- func (l *Blnk) GetLedgerByID(id string) (*model.Ledger, error)
- func (s *Blnk) GetMatchingRule(ctx context.Context, id string) (*model.MatchingRule, error)
- func (l *Blnk) GetMonitorByID(ctx context.Context, id string) (*model.BalanceMonitor, error)
- func (s *Blnk) GetReconciliation(ctx context.Context, reconciliationID string) (*model.Reconciliation, error)
- func (l *Blnk) GetRefundableTransactionsByParentID(ctx context.Context, parentTransactionID string, batchSize int, offset int64) ([]*model.Transaction, error)
- func (b *Blnk) GetSearchClient() *search.TypesenseClient
- func (l *Blnk) GetTransaction(ctx context.Context, TransactionID string) (*model.Transaction, error)
- func (l *Blnk) GetTransactionByRef(ctx context.Context, reference string) (model.Transaction, error)
- func (l *Blnk) GetTransactionLineage(ctx context.Context, transactionID string) (*TransactionLineage, error)
- func (l *Blnk) ListAPIKeys(ctx context.Context, ownerID string) ([]*model.APIKey, error)
- func (s *Blnk) ListMatchingRules(ctx context.Context) ([]*model.MatchingRule, error)
- func (l *Blnk) MultiSearch(searchParams *api.MultiSearchSearchesParameter) (*api.MultiSearchResult, error)
- func (l *Blnk) PrepareLineageOutbox(ctx context.Context, txn *model.Transaction, ...) *model.LineageOutbox
- func (l *Blnk) ProcessLineageFromOutbox(ctx context.Context, entry model.LineageOutbox) error
- func (l *Blnk) ProcessTransactionInBatches(ctx context.Context, parentTransactionID string, amount *big.Int, ...) ([]*model.Transaction, error)
- func (b *Blnk) ProcessWebhook(_ context.Context, task *asynq.Task) error
- func (l *Blnk) QueueTransaction(ctx context.Context, transaction *model.Transaction) (*model.Transaction, error)
- func (l *Blnk) RecordTransaction(ctx context.Context, transaction *model.Transaction) (*model.Transaction, error)
- func (b *Blnk) RecoverQueuedTransactions(ctx context.Context, threshold time.Duration) (int, error)
- func (l *Blnk) RefundTransaction(ctx context.Context, transactionID string, skipQueue bool) (*model.Transaction, error)
- func (l *Blnk) RefundWorker(ctx context.Context, jobs <-chan *model.Transaction, ...)
- func (l *Blnk) RejectTransaction(ctx context.Context, transaction *model.Transaction, reason string) (*model.Transaction, error)
- func (l *Blnk) RevokeAPIKey(ctx context.Context, id, ownerID string) error
- func (l *Blnk) Search(collection string, query *api.SearchCollectionParams) (interface{}, error)
- func (b *Blnk) SendWebhook(newWebhook NewWebhook) error
- func (s *Blnk) StartInstantReconciliation(ctx context.Context, externalTransactions []model.ExternalTransaction, ...) (string, error)
- func (s *Blnk) StartReconciliation(ctx context.Context, uploadID string, strategy string, groupCriteria string, ...) (string, error)
- func (l *Blnk) TakeBalanceSnapshots(ctx context.Context, batchSize int)
- func (l *Blnk) TokenizeAllPII(identityID string) error
- func (l *Blnk) TokenizeIdentity(identityID string, fields []string) error
- func (l *Blnk) TokenizeIdentityField(identityID, fieldName string) error
- func (l *Blnk) UpdateBalanceIdentity(balanceID, identityID string) error
- func (l *Blnk) UpdateIdentity(identity *model.Identity) error
- func (l *Blnk) UpdateLastUsed(ctx context.Context, id string) error
- func (l *Blnk) UpdateLedger(id, name string) (*model.Ledger, error)
- func (s *Blnk) UpdateMatchingRule(ctx context.Context, rule model.MatchingRule) (*model.MatchingRule, error)
- func (l *Blnk) UpdateMetadata(ctx context.Context, entityID string, newMetadata map[string]interface{}) (map[string]interface{}, error)
- func (l *Blnk) UpdateMonitor(ctx context.Context, monitor *model.BalanceMonitor) error
- func (l *Blnk) UpdateTransactionStatus(ctx context.Context, id string, status string) error
- func (s *Blnk) UploadExternalData(ctx context.Context, source string, reader io.Reader, filename string) (string, int, error)
- func (l *Blnk) VoidInflightTransaction(ctx context.Context, transactionID string) (*model.Transaction, error)
- func (l *Blnk) VoidWorker(ctx context.Context, jobs <-chan *model.Transaction, ...)
- type LineageOutboxPayload
- type LineageOutboxProcessor
- func (p *LineageOutboxProcessor) IsRunning() bool
- func (p *LineageOutboxProcessor) Start(ctx context.Context)
- func (p *LineageOutboxProcessor) Stop()
- func (p *LineageOutboxProcessor) WithBatchSize(size int) *LineageOutboxProcessor
- func (p *LineageOutboxProcessor) WithLockDuration(duration time.Duration) *LineageOutboxProcessor
- func (p *LineageOutboxProcessor) WithPollInterval(interval time.Duration) *LineageOutboxProcessor
- type LineageSource
- type NewWebhook
- type ProviderBreakdown
- type Queue
- type QueuedTransactionRecoveryProcessor
- type TransactionLineage
- type TransactionTypePayload
Constants ¶
const ( AllocationFIFO = "FIFO" AllocationLIFO = "LIFO" AllocationProp = "PROPORTIONAL" )
Allocation strategies for fund lineage debit processing.
const ( StatusStarted = "started" // Indicates the process has started. StatusInProgress = "in_progress" // Indicates the process is ongoing. StatusCompleted = "completed" // Indicates the process is finished successfully. StatusFailed = "failed" // Indicates the process has failed. )
Status constants representing the various states a process can be in.
const ( StatusQueued = "QUEUED" StatusApplied = "APPLIED" StatusScheduled = "SCHEDULED" StatusInflight = "INFLIGHT" StatusVoid = "VOID" StatusCommit = "COMMIT" StatusRejected = "REJECTED" )
const (
GeneralLedgerID = "general_ledger_id"
)
const LineageFundAllocation = "BLNK_FUND_ALLOCATION"
LineageFundAllocation is the metadata key used to store fund allocation details in a transaction.
const LineageProviderKey = "BLNK_LINEAGE_PROVIDER"
LineageProviderKey is the metadata key used to identify the provider of funds in a transaction.
Variables ¶
var SQLFiles embed.FS
Functions ¶
func IsInflightTransaction ¶
func IsInflightTransaction(transaction *model.Transaction) bool
IsInflightTransaction checks whether a transaction is considered "inflight" based on its status and metadata. A transaction is considered inflight if: 1. It has a status of StatusInflight, OR 2. It has a status of StatusQueued AND has the inflight flag set to true in its metadata
Parameters: - transaction *model.Transaction: The transaction to check
Returns: - bool: true if the transaction is considered inflight, false otherwise
func NewBalanceTracker ¶
func NewBalanceTracker() *model.BalanceTracker
NewBalanceTracker creates a new BalanceTracker instance. It initializes the Balances and Frequencies maps.
Returns: - *model.BalanceTracker: A pointer to the newly created BalanceTracker instance.
Types ¶
type Allocation ¶ added in v0.13.0
Allocation represents the amount allocated from a specific shadow balance during debit processing.
Fields: - BalanceID string: The ID of the shadow balance from which funds are allocated. - Amount *big.Int: The amount allocated from this balance.
type BalanceLineage ¶ added in v0.13.0
type BalanceLineage struct {
BalanceID string `json:"balance_id"`
TotalWithLineage *big.Int `json:"total_with_lineage"`
AggregateBalanceID string `json:"aggregate_balance_id"`
Providers []ProviderBreakdown `json:"providers"`
}
BalanceLineage represents the complete fund lineage for a balance.
Fields: - BalanceID string: The ID of the balance being queried. - TotalWithLineage *big.Int: The total funds tracked across all providers. - AggregateBalanceID string: The ID of the aggregate shadow balance. - Providers []ProviderBreakdown: The breakdown of funds by provider.
type BatchJobResult ¶
type BatchJobResult struct {
Txn *model.Transaction
Error error
}
BatchJobResult represents the result of processing a transaction in a batch job.
Fields: - Txn *model.Transaction: A pointer to the processed Transaction model. - Error error: An error if the transaction could not be processed.
type Blnk ¶
type Blnk struct {
Hooks hooks.HookManager
// contains filtered or unexported fields
}
Blnk represents the main struct for the Blnk application.
func NewBlnk ¶
func NewBlnk(db database.IDataSource) (*Blnk, error)
NewBlnk initializes a new instance of Blnk with the provided database datasource. It fetches the configuration, initializes Redis client, balance tracker, queue, and search client.
Parameters: - db database.IDataSource: The datasource for database operations.
Returns: - *Blnk: A pointer to the newly created Blnk instance. - error: An error if any of the initialization steps fail.
func (*Blnk) CommitInflightTransaction ¶
func (l *Blnk) CommitInflightTransaction(ctx context.Context, transactionID string, amount *big.Int) (*model.Transaction, error)
CommitInflightTransaction commits an inflight transaction by validating and updating its amount, and finalizing the commitment. It starts a tracing span, fetches and validates the inflight transaction, updates the amount, and finalizes the commitment.
Parameters: - ctx context.Context: The context for the operation. - transactionID string: The ID of the inflight transaction to be committed. - amount float64: The amount to be validated and updated in the transaction.
Returns: - *model.Transaction: A pointer to the committed Transaction model. - error: An error if the transaction could not be committed.
func (*Blnk) CommitInflightTransactionWithQueue ¶
func (*Blnk) CommitWorker ¶
func (l *Blnk) CommitWorker(ctx context.Context, jobs <-chan *model.Transaction, results chan<- BatchJobResult, wg *sync.WaitGroup, amount *big.Int)
CommitWorker processes commit transactions from the jobs channel and sends the results to the results channel. It starts a tracing span, processes each transaction, and records relevant events and errors.
Parameters: - ctx context.Context: The context for the operation. - jobs <-chan *model.Transaction: A channel from which transactions are received for processing. - results chan<- BatchJobResult: A channel to which the results of the processing are sent. - wg *sync.WaitGroup: A wait group to synchronize the completion of the worker. - amount *big.Int: The amount to be processed in the transaction.
func (*Blnk) Config ¶ added in v0.13.0
func (b *Blnk) Config() *config.Configuration
Config returns the cached configuration for the Blnk instance. Falls back to config.Fetch() if not initialized (for backward compatibility with tests).
func (*Blnk) CreateAPIKey ¶
func (l *Blnk) CreateAPIKey(ctx context.Context, name, ownerID string, scopes []string, expiresAt time.Time) (*model.APIKey, error)
CreateAPIKey creates a new API key for the specified owner
Parameters: - ctx: The context for the operation - name: Name of the API key - ownerID: ID of the key owner - scopes: List of permission scopes - expiresAt: Expiration time for the key
Returns: - *model.APIKey: The created API key - error: An error if the operation fails
func (*Blnk) CreateAccount ¶
CreateAccount creates a new account in the database. It overrides the ledger and identity details, applies the account name, and fetches external account details.
Parameters: - account model.Account: The Account model to be created.
Returns: - model.Account: The created Account model. - error: An error if the account could not be created.
func (*Blnk) CreateBalance ¶
CreateBalance creates a new balance. It starts a tracing span, creates the balance, and performs post-creation actions.
Parameters: - ctx context.Context: The context for the operation. - balance model.Balance: The Balance model to be created.
Returns: - model.Balance: The created Balance model. - error: An error if the balance could not be created.
func (*Blnk) CreateBulkTransactions ¶
func (l *Blnk) CreateBulkTransactions(ctx context.Context, req *model.BulkTransactionRequest) (*model.BulkTransactionResult, error)
CreateBulkTransactions handles the creation of multiple transactions in a batch. If atomic is true: Any failure will cause all transactions to be rolled back (or voided if inflight). If atomic is false: Failures will stop processing but previous transactions remain unaffected. If run_async is true: Processing happens in background with webhook notifications.
func (*Blnk) CreateIdentity ¶
CreateIdentity creates a new identity in the database.
Parameters: - identity model.Identity: The Identity model to be created.
Returns: - model.Identity: The created Identity model. - error: An error if the identity could not be created.
func (*Blnk) CreateLedger ¶
CreateLedger creates a new ledger. It calls postLedgerActions after a successful creation.
Parameters: - ledger: A Ledger model representing the ledger to be created.
Returns: - model.Ledger: The created Ledger model. - error: An error if the ledger could not be created.
func (*Blnk) CreateMatchingRule ¶
func (s *Blnk) CreateMatchingRule(ctx context.Context, rule model.MatchingRule) (*model.MatchingRule, error)
CreateMatchingRule creates a new matching rule after validating it. Parameters: - ctx: The context for managing the request. - rule: The matching rule to be created. Returns the created rule, or an error if validation or storage fails.
func (*Blnk) CreateMonitor ¶
func (l *Blnk) CreateMonitor(ctx context.Context, monitor model.BalanceMonitor) (model.BalanceMonitor, error)
CreateMonitor creates a new balance monitor. It starts a tracing span, applies precision to the monitor's condition value, and creates the monitor. It records relevant events and errors.
Parameters: - ctx context.Context: The context for the operation. - monitor model.BalanceMonitor: The BalanceMonitor model to be created.
Returns: - model.BalanceMonitor: The created BalanceMonitor model. - error: An error if the monitor could not be created.
func (*Blnk) DeleteIdentity ¶
DeleteIdentity deletes an identity by its ID.
Parameters: - id string: The ID of the identity to delete.
Returns: - error: An error if the identity could not be deleted.
func (*Blnk) DeleteMatchingRule ¶
DeleteMatchingRule deletes a matching rule by its ID. Parameters: - ctx: The context for managing the request. - id: The ID of the rule to delete. Returns an error if deletion fails.
func (*Blnk) DeleteMonitor ¶
DeleteMonitor deletes a balance monitor by its ID. It starts a tracing span, deletes the monitor, and records relevant events and errors.
Parameters: - ctx context.Context: The context for the operation. - id string: The ID of the monitor to delete.
Returns: - error: An error if the monitor could not be deleted.
func (*Blnk) DetokenizeIdentity ¶
DetokenizeIdentity detokenizes and returns all tokenized fields in an identity.
Parameters: - identityID string: The ID of the identity.
Returns: - map[string]string: A map of field names to their detokenized values. - error: An error if any field could not be detokenized.
func (*Blnk) DetokenizeIdentityField ¶
DetokenizeIdentityField detokenizes a specific field in an identity.
Parameters: - identityID string: The ID of the identity. - fieldName string: The name of the field to detokenize.
Returns: - string: The detokenized field value. - error: An error if the field could not be detokenized.
func (*Blnk) GetAPIKeyByKey ¶
GetAPIKeyByKey retrieves an API key by its key string
Parameters: - ctx: The context for the operation - key: The API key string
Returns: - *model.APIKey: The API key if found - error: An error if the operation fails
func (*Blnk) GetAccount ¶
GetAccount retrieves an account by its ID. It fetches the account from the datasource and includes additional data as specified.
Parameters: - id string: The ID of the account to retrieve. - include []string: A slice of strings specifying additional data to include.
Returns: - *model.Account: A pointer to the Account model if found. - error: An error if the account could not be retrieved.
func (*Blnk) GetAccountByNumber ¶
GetAccountByNumber retrieves an account from the database by its account number.
Parameters: - id string: The account number of the account to retrieve.
Returns: - *model.Account: A pointer to the Account model if found. - error: An error if the account could not be retrieved.
func (*Blnk) GetAllAccounts ¶
GetAllAccounts retrieves all accounts from the database.
Returns: - []model.Account: A slice of Account models. - error: An error if the accounts could not be retrieved.
func (*Blnk) GetAllAccountsWithFilter ¶ added in v0.13.2
func (l *Blnk) GetAllAccountsWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Account, error)
GetAllAccountsWithFilter retrieves accounts using advanced filters.
Parameters: - ctx context.Context: The context for the operation. - filters *filter.QueryFilterSet: Filter conditions to apply. - limit int: Maximum number of accounts to return. - offset int: Offset for pagination.
Returns: - []model.Account: A slice of Account models matching the filter criteria. - error: An error if the accounts could not be retrieved.
func (*Blnk) GetAllAccountsWithFilterAndOptions ¶ added in v0.13.2
func (l *Blnk) GetAllAccountsWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, limit, offset int) ([]model.Account, *int64, error)
GetAllAccountsWithFilterAndOptions retrieves accounts with filters, sorting, and optional count.
Parameters: - ctx context.Context: The context for the operation. - filters *filter.QueryFilterSet: Filter conditions to apply. - opts *filter.QueryOptions: Query options including sorting and count settings. - limit int: Maximum number of accounts to return. - offset int: Offset for pagination.
Returns: - []model.Account: A slice of Account models matching the filter criteria. - *int64: Optional total count of matching records (if opts.IncludeCount is true). - error: An error if the accounts could not be retrieved.
func (*Blnk) GetAllBalances ¶
GetAllBalances retrieves all balances. It starts a tracing span, fetches all balances, and records relevant events.
Parameters: - ctx context.Context: The context for the operation.
Returns: - []model.Balance: A slice of Balance models. - error: An error if the balances could not be retrieved.
func (*Blnk) GetAllBalancesWithFilter ¶ added in v0.13.2
func (l *Blnk) GetAllBalancesWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Balance, error)
GetAllBalancesWithFilter retrieves balances using advanced filters. It starts a tracing span, fetches balances matching the filter criteria, and records relevant events.
Parameters: - ctx context.Context: The context for the operation. - filters *filter.QueryFilterSet: Filter conditions to apply. - limit int: Maximum number of balances to return. - offset int: Offset for pagination.
Returns: - []model.Balance: A slice of Balance models matching the filter criteria. - error: An error if the balances could not be retrieved.
func (*Blnk) GetAllBalancesWithFilterAndOptions ¶ added in v0.13.2
func (l *Blnk) GetAllBalancesWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, limit, offset int) ([]model.Balance, *int64, error)
GetAllBalancesWithFilterAndOptions retrieves balances with advanced filters, sorting, and optional count.
func (*Blnk) GetAllIdentities ¶
GetAllIdentities retrieves all identities from the database.
Returns: - []model.Identity: A slice of Identity models. - error: An error if the identities could not be retrieved.
func (*Blnk) GetAllIdentitiesWithFilter ¶ added in v0.13.2
func (l *Blnk) GetAllIdentitiesWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Identity, error)
GetAllIdentitiesWithFilter retrieves identities using advanced filters.
Parameters: - ctx context.Context: The context for the operation. - filters *filter.QueryFilterSet: Filter conditions to apply. - limit int: Maximum number of identities to return. - offset int: Offset for pagination.
Returns: - []model.Identity: A slice of Identity models matching the filter criteria. - error: An error if the identities could not be retrieved.
func (*Blnk) GetAllIdentitiesWithFilterAndOptions ¶ added in v0.13.2
func (l *Blnk) GetAllIdentitiesWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, limit, offset int) ([]model.Identity, *int64, error)
GetAllIdentitiesWithFilterAndOptions retrieves identities with filters, sorting, and optional count.
Parameters: - ctx context.Context: The context for the operation. - filters *filter.QueryFilterSet: Filter conditions to apply. - opts *filter.QueryOptions: Query options including sorting and count settings. - limit int: Maximum number of identities to return. - offset int: Offset for pagination.
Returns: - []model.Identity: A slice of Identity models matching the filter criteria. - *int64: Optional total count of matching records (if opts.IncludeCount is true). - error: An error if the identities could not be retrieved.
func (*Blnk) GetAllLedgers ¶
GetAllLedgers retrieves all ledgers from the datasource. It returns a slice of Ledger models and an error if the operation fails.
Returns: - []model.Ledger: A slice of Ledger models. - error: An error if the ledgers could not be retrieved.
func (*Blnk) GetAllLedgersWithFilter ¶ added in v0.13.2
func (l *Blnk) GetAllLedgersWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Ledger, error)
GetAllLedgersWithFilter retrieves ledgers from the datasource using advanced filters. It returns a slice of Ledger models and an error if the operation fails.
Parameters: - ctx: Context for the operation. - filters: A QueryFilterSet containing filter conditions. - limit: Maximum number of ledgers to return. - offset: Offset for pagination.
Returns: - []model.Ledger: A slice of Ledger models matching the filter criteria. - error: An error if the ledgers could not be retrieved.
func (*Blnk) GetAllLedgersWithFilterAndOptions ¶ added in v0.13.2
func (l *Blnk) GetAllLedgersWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, limit, offset int) ([]model.Ledger, *int64, error)
GetAllLedgersWithFilterAndOptions retrieves ledgers with filters, sorting, and optional count.
Parameters: - ctx: Context for the operation. - filters: A QueryFilterSet containing filter conditions. - opts: Query options including sorting and count settings. - limit: Maximum number of ledgers to return. - offset: Offset for pagination.
Returns: - []model.Ledger: A slice of Ledger models matching the filter criteria. - *int64: Optional total count of matching records (if opts.IncludeCount is true). - error: An error if the ledgers could not be retrieved.
func (*Blnk) GetAllMonitors ¶
GetAllMonitors retrieves all balance monitors. It starts a tracing span, fetches all monitors, and records relevant events.
Parameters: - ctx context.Context: The context for the operation.
Returns: - []model.BalanceMonitor: A slice of BalanceMonitor models. - error: An error if the monitors could not be retrieved.
func (*Blnk) GetAllTransactions ¶
func (l *Blnk) GetAllTransactions(limit, offset int) ([]model.Transaction, error)
GetAllTransactions retrieves all transactions from the datasource. It starts a tracing span, fetches all transactions, and records relevant events and errors.
Returns: - []model.Transaction: A slice of all retrieved Transaction models. - error: An error if the transactions could not be retrieved.
func (*Blnk) GetAllTransactionsWithFilter ¶ added in v0.13.2
func (l *Blnk) GetAllTransactionsWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Transaction, error)
GetAllTransactionsWithFilter retrieves transactions using advanced filters. It starts a tracing span, fetches transactions matching the filter criteria, and records relevant events.
Parameters: - ctx context.Context: The context for the operation. - filters *filter.QueryFilterSet: Filter conditions to apply. - limit int: Maximum number of transactions to return. - offset int: Offset for pagination.
Returns: - []model.Transaction: A slice of Transaction models matching the filter criteria. - error: An error if the transactions could not be retrieved.
func (*Blnk) GetAllTransactionsWithFilterAndOptions ¶ added in v0.13.2
func (l *Blnk) GetAllTransactionsWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, limit, offset int) ([]model.Transaction, *int64, error)
GetAllTransactionsWithFilterAndOptions retrieves transactions with advanced filters, sorting, and optional count.
func (*Blnk) GetBalanceAtTime ¶
func (l *Blnk) GetBalanceAtTime(ctx context.Context, balanceID string, targetTime time.Time, fromSource bool) (*model.Balance, error)
GetBalanceAtTime retrieves a balance's state at a specific point in time. It can either use balance snapshots for efficiency or calculate from all source transactions based on the fromSource parameter.
Parameters: - ctx context.Context: The context for the operation. - balanceID string: The ID of the balance to retrieve. - targetTime time.Time: The point in time for which to retrieve the balance state. - fromSource bool: If true, calculates balance from all transactions instead of using snapshots.
Returns: - *model.Balance: A pointer to the Balance model representing the state at the given time. - error: An error if the historical balance state could not be retrieved.
func (*Blnk) GetBalanceByID ¶
func (l *Blnk) GetBalanceByID(ctx context.Context, id string, include []string, withQueued bool) (*model.Balance, error)
GetBalanceByID retrieves a balance by its ID. It starts a tracing span, fetches the balance, and records relevant events.
Parameters: - ctx context.Context: The context for the operation. - id string: The ID of the balance to retrieve. - include []string: A slice of strings specifying additional data to include.
Returns: - *model.Balance: A pointer to the Balance model if found. - error: An error if the balance could not be retrieved.
func (*Blnk) GetBalanceByIndicator ¶
func (l *Blnk) GetBalanceByIndicator(ctx context.Context, indicator, currency string) (*model.Balance, error)
GetBalanceByIndicator retrieves a balance by its indicator and currency. It starts a tracing span, fetches the balance, and records relevant events.
Parameters: - ctx context.Context: The context for the operation. - indicator string: The indicator of the balance to retrieve. - currency string: The currency of the balance to retrieve.
Returns: - *model.Balance: A pointer to the Balance model if found. - error: An error if the balance could not be retrieved.
func (*Blnk) GetBalanceLineage ¶ added in v0.13.0
GetBalanceLineage retrieves the fund lineage for a balance. It returns a breakdown of funds by provider, showing how much was received and spent from each source.
Parameters: - ctx context.Context: The context for the operation. - balanceID string: The ID of the balance to get lineage for.
Returns: - *BalanceLineage: The lineage information for the balance. - error: An error if the lineage could not be retrieved.
func (*Blnk) GetBalanceMonitors ¶
func (l *Blnk) GetBalanceMonitors(ctx context.Context, balanceID string) ([]model.BalanceMonitor, error)
GetBalanceMonitors retrieves all monitors for a given balance ID. It starts a tracing span, fetches the monitors, and records relevant events.
Parameters: - ctx context.Context: The context for the operation. - balanceID string: The ID of the balance for which to retrieve monitors.
Returns: - []model.BalanceMonitor: A slice of BalanceMonitor models. - error: An error if the monitors could not be retrieved.
func (*Blnk) GetDataSource ¶ added in v0.13.1
func (b *Blnk) GetDataSource() database.IDataSource
func (*Blnk) GetDetokenizedIdentity ¶
GetDetokenizedIdentity returns a copy of the identity with all fields detokenized. Note: This does not modify the stored identity.
Parameters: - identityID string: The ID of the identity.
Returns: - *model.Identity: A pointer to the detokenized Identity model. - error: An error if the identity could not be detokenized.
func (*Blnk) GetIdentity ¶
GetIdentity retrieves an identity by its ID.
Parameters: - id string: The ID of the identity to retrieve.
Returns: - *model.Identity: A pointer to the Identity model if found. - error: An error if the identity could not be retrieved.
func (*Blnk) GetInflightTransactionsByParentID ¶
func (l *Blnk) GetInflightTransactionsByParentID(ctx context.Context, parentTransactionID string, batchSize int, offset int64) ([]*model.Transaction, error)
GetInflightTransactionsByParentID retrieves inflight transactions by their parent transaction ID. It starts a tracing span, fetches the transactions from the datasource, and records relevant events and errors.
Parameters: - ctx context.Context: The context for the operation. - parentTransactionID string: The ID of the parent transaction. - batchSize int: The number of transactions to retrieve in a batch. - offset int64: The offset for pagination.
Returns: - []*model.Transaction: A slice of pointers to the retrieved Transaction models. - error: An error if the transactions could not be retrieved.
func (*Blnk) GetLedgerByID ¶
GetLedgerByID retrieves a ledger by its ID from the datasource. It returns a pointer to the Ledger model and an error if the operation fails.
Parameters: - id: A string representing the ID of the ledger to retrieve.
Returns: - *model.Ledger: A pointer to the Ledger model if found. - error: An error if the ledger could not be retrieved.
func (*Blnk) GetMatchingRule ¶
GetMatchingRule retrieves a matching rule by its ID. Parameters: - ctx: The context for managing the request. - id: The ID of the matching rule to retrieve. Returns the matching rule, or an error if retrieval fails.
func (*Blnk) GetMonitorByID ¶
GetMonitorByID retrieves a balance monitor by its ID. It starts a tracing span, fetches the monitor, and records relevant events.
Parameters: - ctx context.Context: The context for the operation. - id string: The ID of the monitor to retrieve.
Returns: - *model.BalanceMonitor: A pointer to the BalanceMonitor model if found. - error: An error if the monitor could not be retrieved.
func (*Blnk) GetReconciliation ¶
func (s *Blnk) GetReconciliation(ctx context.Context, reconciliationID string) (*model.Reconciliation, error)
GetReconciliation retrieves a reconciliation by its ID. Parameters: - ctx: The context controlling the request. - reconciliationID: The ID of the reconciliation to retrieve. Returns: - *model.Reconciliation: The retrieved reconciliation object. - error: If the reconciliation cannot be found or if retrieval fails.
func (*Blnk) GetRefundableTransactionsByParentID ¶
func (l *Blnk) GetRefundableTransactionsByParentID(ctx context.Context, parentTransactionID string, batchSize int, offset int64) ([]*model.Transaction, error)
GetRefundableTransactionsByParentID retrieves refundable transactions by their parent transaction ID. It starts a tracing span, fetches the transactions from the datasource, and records relevant events and errors.
Parameters: - ctx context.Context: The context for the operation. - parentTransactionID string: The ID of the parent transaction. - batchSize int: The number of transactions to retrieve in a batch. - offset int64: The offset for pagination.
Returns: - []*model.Transaction: A slice of pointers to the retrieved Transaction models. - error: An error if the transactions could not be retrieved.
func (*Blnk) GetSearchClient ¶ added in v0.13.1
func (b *Blnk) GetSearchClient() *search.TypesenseClient
func (*Blnk) GetTransaction ¶
func (l *Blnk) GetTransaction(ctx context.Context, TransactionID string) (*model.Transaction, error)
GetTransaction retrieves a transaction by its ID from the datasource. It starts a tracing span, fetches the transaction, and records relevant events and errors.
Parameters: - ctx context.Context: The context for the operation. - TransactionID string: The ID of the transaction to be retrieved.
Returns: - *model.Transaction: A pointer to the retrieved Transaction model. - error: An error if the transaction could not be retrieved.
func (*Blnk) GetTransactionByRef ¶
func (l *Blnk) GetTransactionByRef(ctx context.Context, reference string) (model.Transaction, error)
GetTransactionByRef retrieves a transaction by its reference from the datasource. It starts a tracing span, fetches the transaction by reference, and records relevant events and errors.
Parameters: - ctx context.Context: The context for the operation. - reference string: The reference of the transaction to be retrieved.
Returns: - model.Transaction: The retrieved Transaction model. - error: An error if the transaction could not be retrieved.
func (*Blnk) GetTransactionLineage ¶ added in v0.13.0
func (l *Blnk) GetTransactionLineage(ctx context.Context, transactionID string) (*TransactionLineage, error)
GetTransactionLineage retrieves the lineage information for a transaction. It returns the fund allocation details and any shadow transactions created for the transaction.
Parameters: - ctx context.Context: The context for the operation. - transactionID string: The ID of the transaction to get lineage for.
Returns: - *TransactionLineage: The lineage information for the transaction. - error: An error if the lineage could not be retrieved.
func (*Blnk) ListAPIKeys ¶
ListAPIKeys retrieves all API keys for a specific owner
Parameters: - ctx: The context for the operation - ownerID: ID of the key owner
Returns: - []*model.APIKey: List of API keys - error: An error if the operation fails
func (*Blnk) ListMatchingRules ¶
ListMatchingRules retrieves all matching rules. Parameters: - ctx: The context for managing the request. Returns a list of matching rules, or an error if retrieval fails.
func (*Blnk) MultiSearch ¶
func (l *Blnk) MultiSearch(searchParams *api.MultiSearchSearchesParameter) (*api.MultiSearchResult, error)
MultiSearch performs a multi-search operation across collections.
func (*Blnk) PrepareLineageOutbox ¶ added in v0.13.0
func (l *Blnk) PrepareLineageOutbox(ctx context.Context, txn *model.Transaction, sourceBalance, destinationBalance *model.Balance) *model.LineageOutbox
PrepareLineageOutbox creates a LineageOutbox entry for atomic insertion with the transaction. This ensures lineage processing intent is captured in the same database transaction, guaranteeing no lineage work is lost even if subsequent async operations fail.
Parameters: - ctx context.Context: The context for the operation. - txn *model.Transaction: The transaction being processed. - sourceBalance *model.Balance: The source balance (may be nil). - destinationBalance *model.Balance: The destination balance (may be nil).
Returns: - *model.LineageOutbox: The outbox entry to insert, or nil if no lineage processing needed.
func (*Blnk) ProcessLineageFromOutbox ¶ added in v0.13.0
ProcessLineageFromOutbox processes a lineage outbox entry. This is called by the outbox worker to perform deferred lineage processing.
Parameters: - ctx context.Context: The context for the operation. - entry model.LineageOutbox: The outbox entry to process.
Returns: - error: An error if processing fails.
func (*Blnk) ProcessTransactionInBatches ¶
func (l *Blnk) ProcessTransactionInBatches(ctx context.Context, parentTransactionID string, amount *big.Int, maxWorkers int, streamMode bool, gt getTxns, tw transactionWorker) ([]*model.Transaction, error)
ProcessTransactionInBatches processes transactions in batches or streams them based on the provided mode. It starts a tracing span, initializes worker pools, fetches transactions, and processes them concurrently.
Parameters: - ctx context.Context: The context for the operation. - parentTransactionID string: The ID of the parent transaction. - amount *big.Int: The amount to be processed in the transaction. - maxWorkers int: The maximum number of workers to process transactions concurrently. - streamMode bool: A flag indicating whether to process transactions in streaming mode. - gt getTxns: A function to retrieve transactions in batches. - tw transactionWorker: A function to process transactions.
Returns: - []*model.Transaction: A slice of pointers to the processed Transaction models. - error: An error if the transactions could not be processed.
func (*Blnk) ProcessWebhook ¶
ProcessWebhook processes a webhook notification task from the queue.
Parameters: - _ context.Context: The context for the operation. - task *asynq.Task: The task containing the webhook notification data.
Returns: - error: An error if the webhook processing fails.
func (*Blnk) QueueTransaction ¶
func (l *Blnk) QueueTransaction(ctx context.Context, transaction *model.Transaction) (*model.Transaction, error)
QueueTransaction processes and queues a transaction for execution. It handles both single transactions and split transactions, preparing them for processing by setting metadata, status, and managing their persistence and queueing.
Parameters: - ctx context.Context: The context for the operation. - transaction *model.Transaction: The transaction to be queued.
Returns: - *model.Transaction: A pointer to the queued Transaction model. - error: An error if the transaction could not be queued.
func (*Blnk) RecordTransaction ¶
func (l *Blnk) RecordTransaction(ctx context.Context, transaction *model.Transaction) (*model.Transaction, error)
RecordTransaction records a transaction by validating, processing balances, and finalizing the transaction. It starts a tracing span, acquires a lock, and performs the necessary steps to record the transaction.
Parameters: - ctx context.Context: The context for the operation. - transaction *model.Transaction: The transaction to be recorded.
Returns: - *model.Transaction: A pointer to the recorded Transaction model. - error: An error if the transaction could not be recorded.
func (*Blnk) RecoverQueuedTransactions ¶ added in v0.13.2
RecoverQueuedTransactions triggers an immediate recovery of stuck queued transactions using the provided threshold. This is exposed for the manual trigger API endpoint.
func (*Blnk) RefundTransaction ¶
func (l *Blnk) RefundTransaction(ctx context.Context, transactionID string, skipQueue bool) (*model.Transaction, error)
RefundTransaction processes a refund for a given transaction by its ID. It starts a tracing span, retrieves the original transaction, validates its status, creates a new refund transaction, and queues it.
Parameters: - ctx context.Context: The context for the operation. - transactionID string: The ID of the transaction to be refunded.
Returns: - *model.Transaction: A pointer to the refunded Transaction model. - error: An error if the transaction could not be refunded.
func (*Blnk) RefundWorker ¶
func (l *Blnk) RefundWorker(ctx context.Context, jobs <-chan *model.Transaction, results chan<- BatchJobResult, wg *sync.WaitGroup, amount *big.Int)
RefundWorker processes refund transactions from the jobs channel and sends the results to the results channel. It starts a tracing span, processes each transaction, and records relevant events and errors.
Parameters: - ctx context.Context: The context for the operation. - jobs <-chan *model.Transaction: A channel from which transactions are received for processing. - results chan<- BatchJobResult: A channel to which the results of the processing are sent. - wg *sync.WaitGroup: A wait group to synchronize the completion of the worker. - amount float64: The amount to be processed in the transaction.
func (*Blnk) RejectTransaction ¶
func (l *Blnk) RejectTransaction(ctx context.Context, transaction *model.Transaction, reason string) (*model.Transaction, error)
RejectTransaction rejects a transaction by updating its status and recording the rejection reason. It starts a tracing span, updates the transaction status and metadata, persists the transaction, and records relevant events and errors.
Parameters: - ctx context.Context: The context for the operation. - transaction *model.Transaction: The transaction to be rejected. - reason string: The reason for rejecting the transaction.
Returns: - *model.Transaction: A pointer to the rejected Transaction model. - error: An error if the transaction could not be recorded.
func (*Blnk) RevokeAPIKey ¶
RevokeAPIKey revokes an API key if it belongs to the specified owner
Parameters: - ctx: The context for the operation - id: ID of the API key to revoke - ownerID: ID of the key owner
Returns: - error: An error if the operation fails
func (*Blnk) Search ¶
func (l *Blnk) Search(collection string, query *api.SearchCollectionParams) (interface{}, error)
Search performs a search on the specified collection using the provided query parameters.
Parameters: - collection string: The name of the collection to search. - query *api.SearchCollectionParams: The search query parameters.
Returns: - interface{}: The search results. - error: An error if the search operation fails.
func (*Blnk) SendWebhook ¶
func (b *Blnk) SendWebhook(newWebhook NewWebhook) error
SendWebhook enqueues a webhook notification task using the Blnk instance's asynq client.
Parameters: - newWebhook NewWebhook: The webhook notification data to enqueue.
Returns: - error: An error if the task could not be enqueued.
func (*Blnk) StartInstantReconciliation ¶
func (s *Blnk) StartInstantReconciliation(ctx context.Context, externalTransactions []model.ExternalTransaction, strategy string, groupCriteria string, matchingRuleIDs []string, isDryRun bool, ) (string, error)
StartInstantReconciliation initiates a reconciliation process directly with provided transactions instead of loading them from an uploaded file. Parameters: - ctx: The context controlling the reconciliation process. - externalTransactions: The array of external transactions to reconcile. - strategy: The reconciliation strategy to be used (e.g., "one_to_one"). - groupCriteria: Criteria to group transactions (optional). - matchingRuleIDs: The IDs of the rules used for matching transactions. - isDryRun: If true, the reconciliation will not commit changes (useful for testing). Returns: - string: The ID of the reconciliation process. - error: If the reconciliation fails to start.
func (*Blnk) StartReconciliation ¶
func (s *Blnk) StartReconciliation(ctx context.Context, uploadID string, strategy string, groupCriteria string, matchingRuleIDs []string, isDryRun bool) (string, error)
StartReconciliation initiates the reconciliation process by creating a new reconciliation entry and starting the process asynchronously. The process is detached to run in the background. Parameters: - ctx: The context controlling the reconciliation process. - uploadID: The ID of the uploaded transaction file to reconcile. - strategy: The reconciliation strategy to be used (e.g., "one_to_one"). - groupCriteria: Criteria to group transactions (optional). - matchingRuleIDs: The IDs of the rules used for matching transactions. - isDryRun: If true, the reconciliation will not commit changes (useful for testing). Returns: - string: The ID of the reconciliation process. - error: If the reconciliation fails to start.
func (*Blnk) TakeBalanceSnapshots ¶
TakeBalanceSnapshots creates daily snapshots of balances in batches. It accepts a batch size parameter to control the number of balances processed at once, helping to manage memory usage for large datasets.
Parameters: - ctx context.Context: The context for managing the operation's lifecycle and cancellation - batchSize int: The number of balances to process in each batch
Returns: - int: The total number of snapshots created - error: An error if the snapshot creation fails
func (*Blnk) TokenizeAllPII ¶
TokenizeAllPII tokenizes all eligible PII fields in an identity.
Parameters: - identityID string: The ID of the identity.
Returns: - error: An error if any field could not be tokenized.
func (*Blnk) TokenizeIdentity ¶
TokenizeIdentity tokenizes all specified fields in an identity.
Parameters: - identityID string: The ID of the identity. - fields []string: The names of the fields to tokenize.
Returns: - error: An error if any field could not be tokenized.
func (*Blnk) TokenizeIdentityField ¶
TokenizeIdentityField tokenizes a specific field in an identity.
Parameters: - identityID string: The ID of the identity. - fieldName string: The name of the field to tokenize.
Returns: - error: An error if the field could not be tokenized.
func (*Blnk) UpdateBalanceIdentity ¶
UpdateBalanceIdentity updates only the identity_id associated with a balance. It validates that both the balance and the identity exist before applying the change.
Parameters: - balanceID string: The ID of the balance whose identity reference should be modified. - identityID string: The new identity ID to associate with the balance.
Returns: - error: An error is returned if either the balance or identity records are not found or the update fails.
func (*Blnk) UpdateIdentity ¶
UpdateIdentity updates an existing identity in the database.
Parameters: - identity *model.Identity: A pointer to the Identity model to be updated.
Returns: - error: An error if the identity could not be updated.
func (*Blnk) UpdateLastUsed ¶
UpdateLastUsed updates the last used timestamp of an API key
Parameters: - ctx: The context for the operation - id: ID of the API key to update
Returns: - error: An error if the operation fails
func (*Blnk) UpdateLedger ¶
UpdateLedger updates an existing ledger's name. It calls postLedgerActions after a successful update to handle indexing and webhooks.
Parameters: - id: A string representing the ID of the ledger to update. - name: A string representing the new name for the ledger.
Returns: - *model.Ledger: A pointer to the updated Ledger model. - error: An error if the ledger could not be updated.
func (*Blnk) UpdateMatchingRule ¶
func (s *Blnk) UpdateMatchingRule(ctx context.Context, rule model.MatchingRule) (*model.MatchingRule, error)
UpdateMatchingRule updates an existing matching rule. It retrieves the existing rule, validates the new data, and then updates the rule. Parameters: - ctx: The context for managing the request. - rule: The updated rule data. Returns the updated rule, or an error if validation or update fails.
func (*Blnk) UpdateMetadata ¶
func (l *Blnk) UpdateMetadata(ctx context.Context, entityID string, newMetadata map[string]interface{}) (map[string]interface{}, error)
UpdateMetadata updates the metadata for a given entity ID. It first determines the entity type, retrieves current metadata, merges it with new metadata, updates the entity with the merged metadata, and queues the updated entity for re-indexing in Typesense.
Parameters: - ctx: The context for the operation. - entityID: A string representing the ID of the entity to update. - newMetadata: A map containing the new metadata to merge.
Returns: - map[string]interface{}: The merged metadata after the update. - error: An error if the update operation fails.
func (*Blnk) UpdateMonitor ¶
UpdateMonitor updates an existing balance monitor. It starts a tracing span, updates the monitor, and records relevant events and errors.
Parameters: - ctx context.Context: The context for the operation. - monitor *model.BalanceMonitor: A pointer to the BalanceMonitor model to be updated.
Returns: - error: An error if the monitor could not be updated.
func (*Blnk) UpdateTransactionStatus ¶
UpdateTransactionStatus updates the status of a transaction by its ID in the datasource. It starts a tracing span, updates the transaction status, and records relevant events and errors.
Parameters: - ctx context.Context: The context for the operation. - id string: The ID of the transaction to be updated. - status string: The new status to be set for the transaction.
Returns: - error: An error if the transaction status could not be updated.
func (*Blnk) UploadExternalData ¶
func (s *Blnk) UploadExternalData(ctx context.Context, source string, reader io.Reader, filename string) (string, int, error)
UploadExternalData handles the process of uploading external data by detecting file type, parsing, and storing it. Parameters: - ctx: The context for controlling execution. - source: The source of the external data. - reader: An io.Reader for reading the data. - filename: The name of the file being uploaded. Returns: - string: The ID of the upload. - int: The total number of records processed. - error: If any step of the process fails.
func (*Blnk) VoidInflightTransaction ¶
func (l *Blnk) VoidInflightTransaction(ctx context.Context, transactionID string) (*model.Transaction, error)
VoidInflightTransaction voids an inflight transaction by validating it, calculating the remaining amount, and finalizing the void. It starts a tracing span, fetches and validates the inflight transaction, calculates the remaining amount, and finalizes the void.
Parameters: - ctx context.Context: The context for the operation. - transactionID string: The ID of the inflight transaction to be voided.
Returns: - *model.Transaction: A pointer to the voided Transaction model. - error: An error if the transaction could not be voided.
func (*Blnk) VoidWorker ¶
func (l *Blnk) VoidWorker(ctx context.Context, jobs <-chan *model.Transaction, results chan<- BatchJobResult, wg *sync.WaitGroup, amount *big.Int)
VoidWorker processes void transactions from the jobs channel and sends the results to the results channel. It starts a tracing span, processes each transaction, and records relevant events and errors.
Parameters: - ctx context.Context: The context for the operation. - jobs <-chan *model.Transaction: A channel from which transactions are received for processing. - results chan<- BatchJobResult: A channel to which the results of the processing are sent. - wg *sync.WaitGroup: A wait group to synchronize the completion of the worker. - amount *big.Int: The amount to be processed in the transaction.
type LineageOutboxPayload ¶ added in v0.13.0
type LineageOutboxPayload struct {
Amount float64 `json:"amount"`
PreciseAmount string `json:"precise_amount"`
Currency string `json:"currency"`
Precision float64 `json:"precision"`
Reference string `json:"reference"`
SkipQueue bool `json:"skip_queue"`
Inflight bool `json:"inflight"`
}
LineageOutboxPayload contains the transaction data needed for deferred lineage processing.
type LineageOutboxProcessor ¶ added in v0.13.0
type LineageOutboxProcessor struct {
// contains filtered or unexported fields
}
LineageOutboxProcessor processes pending lineage outbox entries. It polls the database for pending entries and processes them asynchronously, ensuring that lineage work is never lost even if Redis/queue operations fail.
func NewLineageOutboxProcessor ¶ added in v0.13.0
func NewLineageOutboxProcessor(blnk *Blnk) *LineageOutboxProcessor
NewLineageOutboxProcessor creates a new outbox processor.
Parameters: - blnk *Blnk: The Blnk instance containing the datasource and processing logic.
Returns: - *LineageOutboxProcessor: The configured processor.
func (*LineageOutboxProcessor) IsRunning ¶ added in v0.13.0
func (p *LineageOutboxProcessor) IsRunning() bool
IsRunning returns whether the processor is currently running.
Returns: - bool: True if the processor is running.
func (*LineageOutboxProcessor) Start ¶ added in v0.13.0
func (p *LineageOutboxProcessor) Start(ctx context.Context)
Start begins processing outbox entries in the background. The processor will poll for pending entries at the configured interval.
Parameters: - ctx context.Context: The context for the operation. When cancelled, processing stops.
func (*LineageOutboxProcessor) Stop ¶ added in v0.13.0
func (p *LineageOutboxProcessor) Stop()
Stop gracefully stops the outbox processor. It signals the processor to stop and waits for pending work to complete.
func (*LineageOutboxProcessor) WithBatchSize ¶ added in v0.13.0
func (p *LineageOutboxProcessor) WithBatchSize(size int) *LineageOutboxProcessor
WithBatchSize sets the batch size for processing outbox entries.
Parameters: - size int: The number of entries to process per batch.
Returns: - *LineageOutboxProcessor: The processor for chaining.
func (*LineageOutboxProcessor) WithLockDuration ¶ added in v0.13.0
func (p *LineageOutboxProcessor) WithLockDuration(duration time.Duration) *LineageOutboxProcessor
WithLockDuration sets the lock duration for claimed entries.
Parameters: - duration time.Duration: The lock duration.
Returns: - *LineageOutboxProcessor: The processor for chaining.
func (*LineageOutboxProcessor) WithPollInterval ¶ added in v0.13.0
func (p *LineageOutboxProcessor) WithPollInterval(interval time.Duration) *LineageOutboxProcessor
WithPollInterval sets the interval between polls.
Parameters: - interval time.Duration: The poll interval.
Returns: - *LineageOutboxProcessor: The processor for chaining.
type LineageSource ¶ added in v0.13.0
LineageSource represents a source of funds available for allocation during lineage debit processing.
Fields: - BalanceID string: The ID of the shadow balance holding the funds. - Balance *big.Int: The available balance amount. - CreatedAt time.Time: The creation time of the lineage mapping, used for FIFO/LIFO ordering.
type NewWebhook ¶
type NewWebhook struct {
Event string `json:"event"` // The event type that triggered the webhook.
Payload interface{} `json:"data"` // The data associated with the event.
}
NewWebhook represents the structure of a webhook notification. It includes an event type and associated payload data.
type ProviderBreakdown ¶ added in v0.13.0
type ProviderBreakdown struct {
Provider string `json:"provider"`
Amount *big.Int `json:"amount"`
Available *big.Int `json:"available"`
Spent *big.Int `json:"spent"`
BalanceID string `json:"shadow_balance_id"`
}
ProviderBreakdown represents the fund breakdown for a specific provider in a balance's lineage.
Fields: - Provider string: The name/identifier of the fund provider. - Amount *big.Int: The total amount received from this provider. - Available *big.Int: The amount still available (not yet spent). - Spent *big.Int: The amount that has been debited. - BalanceID string: The ID of the shadow balance tracking this provider's funds.
type Queue ¶
type Queue struct {
Client *asynq.Client
Inspector *asynq.Inspector
// contains filtered or unexported fields
}
Queue represents a queue for handling various tasks.
func NewQueue ¶
func NewQueue(conf *config.Configuration, client *asynq.Client) *Queue
NewQueue initializes a new Queue instance with the provided configuration.
Parameters: - conf *config.Configuration: The configuration for the queue.
Returns: - *Queue: A pointer to the newly created Queue instance.
func (*Queue) Enqueue ¶
Enqueue enqueues a transaction to the Redis queue.
Parameters: - ctx context.Context: The context for the operation. - transaction *model.Transaction: The transaction to be enqueued.
Returns: - error: An error if the transaction could not be enqueued.
func (*Queue) GetTransactionFromQueue ¶
func (q *Queue) GetTransactionFromQueue(transactionID string) (*model.Transaction, error)
GetTransactionFromQueue retrieves a transaction from the queue by its ID.
Parameters: - transactionID string: The ID of the transaction to retrieve.
Returns: - *model.Transaction: A pointer to the Transaction model if found. - error: An error if the transaction could not be retrieved.
func (*Queue) QueueInflightExpiry ¶
QueueInflightExpiry handles queuing a transaction for inflight expiration. This method is separate from the main Enqueue to ensure expiration is handled regardless of whether the transaction is queued or processed immediately.
Parameters: - ctx context.Context: The context for the operation. - transaction *model.Transaction: The transaction to queue for expiration.
Returns: - error: An error if the expiration could not be queued.
type QueuedTransactionRecoveryProcessor ¶ added in v0.13.2
type QueuedTransactionRecoveryProcessor struct {
// contains filtered or unexported fields
}
func NewQueuedTransactionRecoveryProcessor ¶ added in v0.13.2
func NewQueuedTransactionRecoveryProcessor(blnk *Blnk) *QueuedTransactionRecoveryProcessor
func (*QueuedTransactionRecoveryProcessor) IsRunning ¶ added in v0.13.2
func (p *QueuedTransactionRecoveryProcessor) IsRunning() bool
func (*QueuedTransactionRecoveryProcessor) Start ¶ added in v0.13.2
func (p *QueuedTransactionRecoveryProcessor) Start(ctx context.Context)
func (*QueuedTransactionRecoveryProcessor) Stop ¶ added in v0.13.2
func (p *QueuedTransactionRecoveryProcessor) Stop()
type TransactionLineage ¶ added in v0.13.0
type TransactionLineage struct {
TransactionID string `json:"transaction_id"`
FundAllocation []map[string]interface{} `json:"fund_allocation,omitempty"`
ShadowTransactions []model.Transaction `json:"shadow_transactions"`
}
TransactionLineage represents the lineage information for a specific transaction.
Fields: - TransactionID string: The ID of the transaction being queried. - FundAllocation []map[string]interface{}: The allocation of funds by provider for debit transactions. - ShadowTransactions []model.Transaction: The shadow transactions created for this transaction.
type TransactionTypePayload ¶
type TransactionTypePayload struct {
Data model.Transaction
}
TransactionTypePayload represents the payload for a transaction type.
