Documentation
¶
Index ¶
- Variables
- func ConnectDB(dsConfig config.DataSourceConfig) (*sql.DB, error)
- type Datasource
- func (d Datasource) ClaimPendingOutboxEntries(ctx context.Context, batchSize int, lockDuration time.Duration) ([]model.LineageOutbox, error)
- func (d *Datasource) Close() error
- func (s *Datasource) CreateAPIKey(ctx context.Context, name, ownerID string, scopes []string, ...) (*model.APIKey, error)
- func (d Datasource) CreateAccount(account model.Account) (model.Account, error)
- func (d Datasource) CreateBalance(balance model.Balance) (model.Balance, error)
- func (d Datasource) CreateIdentity(identity model.Identity) (model.Identity, error)
- func (d Datasource) CreateLedger(ledger model.Ledger) (model.Ledger, error)
- func (d Datasource) CreateMonitor(monitor model.BalanceMonitor) (model.BalanceMonitor, error)
- func (d Datasource) DeleteAccount(id string) error
- func (d Datasource) DeleteIdentity(id string) error
- func (d Datasource) DeleteLineageMapping(ctx context.Context, id int64) error
- func (d Datasource) DeleteMatchingRule(ctx context.Context, id string) error
- func (d Datasource) DeleteMonitor(id string) error
- func (d Datasource) FetchAndGroupExternalTransactions(ctx context.Context, uploadID string, groupCriteria string, batchSize int, ...) (map[string][]*model.Transaction, error)
- func (s *Datasource) GetAPIKey(ctx context.Context, key string) (*model.APIKey, error)
- func (d Datasource) GetAccountByID(id string, include []string) (*model.Account, error)
- func (d Datasource) GetAccountByNumber(number string) (*model.Account, error)
- func (d Datasource) GetAllAccounts() ([]model.Account, error)
- func (d Datasource) GetAllAccountsWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Account, error)
- func (d Datasource) GetAllAccountsWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, ...) ([]model.Account, *int64, error)
- func (d Datasource) GetAllBalances(limit, offset int) ([]model.Balance, error)
- func (d Datasource) GetAllBalancesWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Balance, error)
- func (d Datasource) GetAllBalancesWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, ...) ([]model.Balance, *int64, error)
- func (d Datasource) GetAllIdentities() ([]model.Identity, error)
- func (d Datasource) GetAllIdentitiesPaginated(limit, offset int) ([]model.Identity, error)
- func (d Datasource) GetAllIdentitiesWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Identity, error)
- func (d Datasource) GetAllIdentitiesWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, ...) ([]model.Identity, *int64, error)
- func (d Datasource) GetAllLedgers(limit, offset int) ([]model.Ledger, error)
- func (d Datasource) GetAllLedgersWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Ledger, error)
- func (d Datasource) GetAllLedgersWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, ...) ([]model.Ledger, *int64, error)
- func (d Datasource) GetAllMonitors() ([]model.BalanceMonitor, error)
- func (d Datasource) GetAllTransactions(ctx context.Context, limit, offset int) ([]model.Transaction, error)
- func (d Datasource) GetAllTransactionsWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Transaction, error)
- func (d Datasource) GetAllTransactionsWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, ...) ([]model.Transaction, *int64, error)
- func (d Datasource) GetBalanceAtTime(ctx context.Context, balanceID string, targetTime time.Time, fromSource bool) (*model.Balance, error)
- func (d Datasource) GetBalanceByID(id string, include []string, withQueued bool) (*model.Balance, error)
- func (d Datasource) GetBalanceByIDLite(id string) (*model.Balance, error)
- func (d Datasource) GetBalanceByIndicator(indicator, currency string) (*model.Balance, error)
- func (d Datasource) GetBalanceMonitors(balanceID string) ([]model.BalanceMonitor, error)
- func (d Datasource) GetBalancesByIDsLite(ctx context.Context, ids []string) (map[string]*model.Balance, error)
- func (d Datasource) GetExternalTransactionsByReconciliationID(ctx context.Context, reconciliationID string) ([]*model.ExternalTransaction, error)
- func (d Datasource) GetExternalTransactionsPaginated(ctx context.Context, uploadID string, batchSize int, offset int64) ([]*model.ExternalTransaction, error)
- func (d Datasource) GetIdentityByID(id string) (*model.Identity, error)
- func (d Datasource) GetInflightTransactionsByParentID(ctx context.Context, parentTransactionID string, batchSize int, offset int64) ([]*model.Transaction, error)
- func (d Datasource) GetLedgerByID(id string) (*model.Ledger, error)
- func (d Datasource) GetLineageMappingByProvider(ctx context.Context, balanceID, provider string) (*model.LineageMapping, error)
- func (d Datasource) GetLineageMappings(ctx context.Context, balanceID string) ([]model.LineageMapping, error)
- func (d Datasource) GetMatchesByReconciliationID(ctx context.Context, reconciliationID string) ([]*model.Match, error)
- func (d Datasource) GetMatchingRule(ctx context.Context, id string) (*model.MatchingRule, error)
- func (d Datasource) GetMatchingRules(ctx context.Context) ([]*model.MatchingRule, error)
- func (d Datasource) GetMonitorByID(id string) (*model.BalanceMonitor, error)
- func (d Datasource) GetOutboxByTransactionID(ctx context.Context, transactionID string) (*model.LineageOutbox, error)
- func (d Datasource) GetQueuedAmounts(ctx context.Context, balanceID string) (debit, credit *big.Int, err error)
- func (d Datasource) GetReconciliation(ctx context.Context, id string) (*model.Reconciliation, error)
- func (d Datasource) GetReconciliationsByUploadID(ctx context.Context, uploadID string) ([]*model.Reconciliation, error)
- func (d Datasource) GetRefundableTransactionsByParentID(ctx context.Context, parentTransactionID string, batchSize int, offset int64) ([]*model.Transaction, error)
- func (d Datasource) GetSourceDestination(sourceId, destinationId string) ([]*model.Balance, error)
- func (d Datasource) GetStuckQueuedTransactions(ctx context.Context, threshold time.Duration, batchSize int) ([]*model.Transaction, error)
- func (d Datasource) GetTotalCommittedTransactions(ctx context.Context, parentID string) (*big.Int, error)
- func (d Datasource) GetTransaction(ctx context.Context, id string) (*model.Transaction, error)
- func (d Datasource) GetTransactionByRef(ctx context.Context, reference string) (model.Transaction, error)
- func (d Datasource) GetTransactionsByCriteria(ctx context.Context, minAmount, maxAmount *float64, currency *string, ...) ([]*model.Transaction, error)
- func (d Datasource) GetTransactionsByParent(ctx context.Context, parentID string, limit int, offset int64) ([]*model.Transaction, error)
- func (d Datasource) GetTransactionsByShadowFor(ctx context.Context, parentTransactionID string) ([]model.Transaction, error)
- func (d Datasource) GetTransactionsPaginated(ctx context.Context, _ string, batchSize int, offset int64) ([]*model.Transaction, error)
- func (d Datasource) GroupTransactions(ctx context.Context, groupCriteria string, batchSize int, offset int64) (map[string][]*model.Transaction, error)
- func (d Datasource) HasPendingCreditOutbox(ctx context.Context, balanceID string) (bool, error)
- func (d Datasource) InsertLineageOutbox(ctx context.Context, outbox *model.LineageOutbox) error
- func (d Datasource) InsertLineageOutboxInTx(ctx context.Context, tx *sql.Tx, outbox *model.LineageOutbox) error
- func (d Datasource) IsParentTransactionVoid(ctx context.Context, parentID string) (bool, error)
- func (d Datasource) IsTransactionRefunded(ctx context.Context, transaction *model.Transaction) (bool, error)
- func (s *Datasource) ListAPIKeys(ctx context.Context, ownerID string) ([]*model.APIKey, error)
- func (d Datasource) LoadReconciliationProgress(ctx context.Context, reconciliationID string) (model.ReconciliationProgress, error)
- func (d Datasource) MarkOutboxCompleted(ctx context.Context, id int64) error
- func (d Datasource) MarkOutboxFailed(ctx context.Context, id int64, errMsg string) error
- func (d Datasource) RecordExternalTransaction(ctx context.Context, tx *model.ExternalTransaction, uploadID string) error
- func (d Datasource) RecordMatch(ctx context.Context, match *model.Match) error
- func (d Datasource) RecordMatches(ctx context.Context, reconciliationID string, matches []model.Match) error
- func (d Datasource) RecordMatchingRule(ctx context.Context, rule *model.MatchingRule) error
- func (d Datasource) RecordReconciliation(ctx context.Context, rec *model.Reconciliation) error
- func (d Datasource) RecordTransaction(ctx context.Context, txn *model.Transaction) (*model.Transaction, error)
- func (d Datasource) RecordTransactionWithBalances(ctx context.Context, txn *model.Transaction, ...) (*model.Transaction, error)
- func (d Datasource) RecordTransactionWithBalancesAndOutbox(ctx context.Context, txn *model.Transaction, ...) (*model.Transaction, error)
- func (d Datasource) RecordUnmatched(ctx context.Context, reconciliationID string, results []string) error
- func (s *Datasource) RevokeAPIKey(ctx context.Context, id, ownerID string) error
- func (d Datasource) SaveReconciliationProgress(ctx context.Context, reconciliationID string, ...) error
- func (d Datasource) TakeBalanceSnapshots(ctx context.Context, batchSize int) (int, error)
- func (d Datasource) TransactionExistsByIDOrParentID(ctx context.Context, id string) (bool, error)
- func (d Datasource) TransactionExistsByRef(ctx context.Context, reference string) (bool, error)
- func (d Datasource) UpdateAccount(account *model.Account) error
- func (d Datasource) UpdateBalance(balance *model.Balance) error
- func (d Datasource) UpdateBalanceIdentity(balanceID string, identityID string) error
- func (d *Datasource) UpdateBalanceMetadata(ctx context.Context, id string, metadata map[string]interface{}) error
- func (d Datasource) UpdateBalances(ctx context.Context, sourceBalance, destinationBalance *model.Balance) error
- func (d Datasource) UpdateIdentity(identity *model.Identity) error
- func (d *Datasource) UpdateIdentityMetadata(id string, metadata map[string]interface{}) error
- func (s *Datasource) UpdateLastUsed(ctx context.Context, id string) error
- func (d Datasource) UpdateLedger(id, name string) (*model.Ledger, error)
- func (d *Datasource) UpdateLedgerMetadata(id string, metadata map[string]interface{}) error
- func (d Datasource) UpdateMatchingRule(ctx context.Context, rule *model.MatchingRule) error
- func (d Datasource) UpdateMonitor(monitor *model.BalanceMonitor) error
- func (d Datasource) UpdateReconciliationStatus(ctx context.Context, id string, status string, ...) error
- func (d *Datasource) UpdateTransactionMetadata(ctx context.Context, id string, metadata map[string]interface{}) error
- func (d Datasource) UpdateTransactionStatus(ctx context.Context, id string, status string) error
- func (d Datasource) UpsertLineageMapping(ctx context.Context, mapping model.LineageMapping) error
- type IDataSource
Constants ¶
This section is empty.
Variables ¶
var ( ErrAPIKeyNotFound = errors.New("api key not found") ErrInvalidAPIKey = errors.New("invalid api key") )
Functions ¶
Types ¶
type Datasource ¶
func GetDBConnection ¶
func GetDBConnection(configuration *config.Configuration) (*Datasource, error)
GetDBConnection ensures a single database connection instance.
func (Datasource) ClaimPendingOutboxEntries ¶ added in v0.13.0
func (d Datasource) ClaimPendingOutboxEntries(ctx context.Context, batchSize int, lockDuration time.Duration) ([]model.LineageOutbox, error)
ClaimPendingOutboxEntries claims a batch of pending outbox entries for processing. It uses SELECT FOR UPDATE SKIP LOCKED to allow concurrent processors.
func (*Datasource) Close ¶ added in v0.13.2
func (d *Datasource) Close() error
Close closes the underlying database connection pool.
func (*Datasource) CreateAPIKey ¶
func (s *Datasource) CreateAPIKey(ctx context.Context, name, ownerID string, scopes []string, expiresAt time.Time) (*model.APIKey, error)
CreateAPIKey creates a new API key with the specified parameters and stores it in the database. The key is hashed using bcrypt before storage for security. The plain text key is returned ONLY during creation and should be displayed to the user immediately as it cannot be retrieved later.
Parameters: - ctx: Context for managing the request lifecycle and cancellation. - name: A human-readable name for the API key. - ownerID: The ID of the user or entity that owns this API key. - scopes: A slice of permission scopes that this API key grants access to. - expiresAt: The timestamp when this API key will expire.
Returns: - *model.APIKey: The created API key object with the PLAIN TEXT key (store this immediately, it won't be retrievable later). - error: An error if the API key creation fails during generation or database insertion.
func (Datasource) CreateAccount ¶
CreateAccount inserts a new Account into the database. This function handles metadata serialization and database insertion. Parameters: - account: The account model containing fields such as name, number, bank name, currency, ledger ID, identity ID, and balance ID. Returns: - model.Account: The created account with the assigned account ID and creation timestamp. - error: Returns an error if any issue occurs while marshalling metadata or executing the database query.
func (Datasource) CreateBalance ¶
CreateBalance inserts a new balance record into the `blnk.balances` table in the database. It handles the generation of a unique balance ID, default values for fields, and any necessary error handling.
Parameters: - balance: A model.Balance object containing the balance information to be created.
Returns: - model.Balance: The created balance with its ID and timestamp populated. - error: Returns an APIError in case of failures such as database conflicts or other issues.
func (Datasource) CreateIdentity ¶
CreateIdentity inserts a new identity record into the database. It generates a unique IdentityID, sets the creation timestamp, and stores the identity metadata. Parameters: - identity: The identity object to be inserted. Returns: - The created identity object, or an error if the creation fails.
func (Datasource) CreateLedger ¶
CreateLedger inserts a new ledger record into the database, ensuring metadata is properly marshaled into JSON format. It assigns a unique ledger ID with a suffix and captures the current timestamp as the creation time.
Parameters: - ledger: The ledger data to be inserted into the database.
Returns: - model.Ledger: The created ledger object including the generated LedgerID and creation timestamp. - error: An error if the ledger creation fails, including specific database error handling for conflicts.
func (Datasource) CreateMonitor ¶
func (d Datasource) CreateMonitor(monitor model.BalanceMonitor) (model.BalanceMonitor, error)
CreateMonitor creates a new BalanceMonitor record in the database. This function generates a unique MonitorID for the monitor, sets the creation timestamp, and inserts the monitor's data into the `blnk.balance_monitors` table.
Parameters:
- monitor: A model.BalanceMonitor object containing details of the monitor to be created. It includes fields like balance ID, field to monitor, operator, value, precision, precise_value, description, callback URL, etc.
Returns: - model.BalanceMonitor: The newly created BalanceMonitor object with updated MonitorID and CreatedAt timestamp. - error: If any errors occur during the creation process, an `APIError` is returned.
func (Datasource) DeleteAccount ¶
func (d Datasource) DeleteAccount(id string) error
DeleteAccount deletes a specific account from the database. It removes the account with the given account ID from the accounts table. Parameters: - id: The unique ID of the account to be deleted. Returns: - An error if the deletion fails, otherwise returns nil.
func (Datasource) DeleteIdentity ¶
func (d Datasource) DeleteIdentity(id string) error
DeleteIdentity deletes a specific identity record from the database. It executes the SQL delete query based on the provided identity ID. Parameters: - id: The ID of the identity to be deleted. Returns: - An error if the deletion fails, or nil if successful.
func (Datasource) DeleteLineageMapping ¶ added in v0.13.0
func (d Datasource) DeleteLineageMapping(ctx context.Context, id int64) error
DeleteLineageMapping deletes a lineage mapping by its ID.
func (Datasource) DeleteMatchingRule ¶
func (d Datasource) DeleteMatchingRule(ctx context.Context, id string) error
DeleteMatchingRule deletes a specific matching rule from the database. Parameters: - ctx: Context for managing the request and tracing. - id: The ID of the matching rule to be deleted. Returns: - An error wrapped in an APIError if the operation fails, or nil if the deletion is successful.
func (Datasource) DeleteMonitor ¶
func (d Datasource) DeleteMonitor(id string) error
DeleteMonitor deletes a balance monitor from the database by its monitor ID. It removes the monitor from the `blnk.balance_monitors` table.
Parameters: - id: The ID of the monitor to be deleted.
Returns: - error: If the deletion fails or the monitor is not found, an appropriate `APIError` is returned.
func (Datasource) FetchAndGroupExternalTransactions ¶
func (d Datasource) FetchAndGroupExternalTransactions(ctx context.Context, uploadID string, groupCriteria string, batchSize int, offset int64) (map[string][]*model.Transaction, error)
FetchAndGroupExternalTransactions retrieves external transactions from the database based on a specific grouping criterion and paginates the results. The function first checks if the results are available in cache, and if not, fetches the data from the database, groups it by the specified criterion, and stores the result in cache. Parameters: - ctx: Context for managing the request and tracing. - uploadID: The ID of the upload to filter external transactions. - groupCriteria: The field by which to group the transactions (e.g., "amount", "currency"). - batchSize: The number of transactions to retrieve. - offset: The pagination offset. Returns: - A map of grouped transactions where the key is the group criterion value and the value is a slice of transactions, or an error wrapped in an APIError if any issues occur.
func (*Datasource) GetAPIKey ¶
GetAPIKey retrieves an API key from the database using its key string. The provided key's prefix is used for efficient lookup, then bcrypt verifies the full key. Results are cached for 5 minutes to improve performance. This function is typically used for authentication and authorization purposes.
Parameters: - ctx: Context for managing the request lifecycle and cancellation. - key: The PLAIN TEXT API key string to authenticate.
Returns: - *model.APIKey: The API key object if found (with hashed key, not plain text). - error: Returns ErrAPIKeyNotFound if the key doesn't exist, or other database errors if the query fails.
func (Datasource) GetAccountByID ¶
GetAccountByID retrieves an account by its ID from the database. It uses a transaction to ensure consistency and can include additional related entities like balance, identity, or ledger if specified in the `include` parameter. Parameters: - id: The ID of the account to retrieve. - include: A list of related entities to include in the query result. Returns: - A pointer to the retrieved Account or an error if something goes wrong.
func (Datasource) GetAccountByNumber ¶
func (d Datasource) GetAccountByNumber(number string) (*model.Account, error)
GetAccountByNumber retrieves an account based on its number. It queries the database for an account with the given number and returns the account details if found. Parameters: - number: The account number to search for. Returns: - A pointer to the Account object if found, or an error if the account is not found or a query error occurs.
func (Datasource) GetAllAccounts ¶
func (d Datasource) GetAllAccounts() ([]model.Account, error)
GetAllAccounts retrieves all accounts from the database. It returns a list of Account objects, each populated with metadata and account details. Returns: - A slice of Account objects or an error if the query or scan fails.
func (Datasource) GetAllAccountsWithFilter ¶ added in v0.13.2
func (d Datasource) GetAllAccountsWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Account, error)
GetAllAccountsWithFilter retrieves accounts with advanced filtering support. It delegates to GetAllAccountsWithFilterAndOptions with nil options.
Parameters: - ctx: Context for the database operation. - filters: A QueryFilterSet containing the filter conditions. - limit: The maximum number of accounts to return. - offset: The offset to start fetching accounts from (for pagination).
Returns: - []model.Account: A slice of accounts matching the filter criteria. - error: An error if the query fails or if there's an issue processing the results.
func (Datasource) GetAllAccountsWithFilterAndOptions ¶ added in v0.13.2
func (d Datasource) GetAllAccountsWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, limit, offset int) ([]model.Account, *int64, error)
GetAllAccountsWithFilterAndOptions retrieves accounts with filtering, sorting, and optional count. It uses the filter package to build SQL WHERE and ORDER BY conditions.
Parameters: - ctx: Context for the database operation. - filters: A QueryFilterSet containing the filter conditions. - opts: Query options including sorting and count settings. - limit: The maximum number of accounts to return. - offset: The offset to start fetching accounts from (for pagination).
Returns: - []model.Account: A slice of accounts matching the filter criteria. - *int64: Optional total count of matching records (if opts.IncludeCount is true). - error: An error if the query fails or if there's an issue processing the results.
func (Datasource) GetAllBalances ¶
func (d Datasource) GetAllBalances(limit, offset int) ([]model.Balance, error)
GetAllBalances retrieves a limited set of balances from the database, up to 20 records. It processes each balance by scanning the query result, converting numerical fields to big.Int, and parsing metadata from JSON format. The function returns a slice of Balance objects or an error if any issues occur during the database query or data processing.
Parameters: - limit: The maximum number of balances to return (e.g., 20). - offset: The offset to start fetching balances from (for pagination).
Returns: - []model.Balance: A slice of Balance objects containing balance information such as balance amount, credit balance, debit balance, and metadata. - error: An error if any occurs during the query execution, data retrieval, or JSON parsing.
func (Datasource) GetAllBalancesWithFilter ¶ added in v0.13.2
func (d Datasource) GetAllBalancesWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Balance, error)
GetAllBalancesWithFilter retrieves balances with advanced filtering support. It delegates to GetAllBalancesWithFilterAndOptions with nil options.
Parameters: - ctx: Context for the database operation. - filters: A QueryFilterSet containing the filter conditions. - limit: The maximum number of balances to return. - offset: The offset to start fetching balances from (for pagination).
Returns: - []model.Balance: A slice of balances matching the filter criteria. - error: An error if the query fails or if there's an issue processing the results.
func (Datasource) GetAllBalancesWithFilterAndOptions ¶ added in v0.13.2
func (d Datasource) GetAllBalancesWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, limit, offset int) ([]model.Balance, *int64, error)
GetAllBalancesWithFilterAndOptions retrieves balances with filtering, sorting, and optional count. It uses the filter package to build SQL WHERE and ORDER BY conditions.
Parameters: - ctx: Context for the database operation. - filters: A QueryFilterSet containing the filter conditions. - opts: Query options including sorting and count settings. - limit: The maximum number of balances to return. - offset: The offset to start fetching balances from (for pagination).
Returns: - []model.Balance: A slice of balances matching the filter criteria. - *int64: Optional total count of matching records (if opts.IncludeCount is true). - error: An error if the query fails or if there's an issue processing the results.
func (Datasource) GetAllIdentities ¶
func (d Datasource) GetAllIdentities() ([]model.Identity, error)
GetAllIdentities retrieves all identities from the database. It executes a query to fetch all identity records, parses the result into Identity structs, and handles metadata unmarshalling. Returns: - A slice of Identity objects if successful, or an error if any operation fails.
func (Datasource) GetAllIdentitiesPaginated ¶ added in v0.13.1
func (d Datasource) GetAllIdentitiesPaginated(limit, offset int) ([]model.Identity, error)
GetAllIdentitiesPaginated retrieves identities from the database with pagination support. Parameters: - limit: The maximum number of identities to return. - offset: The offset to start fetching identities from (for pagination). Returns: - A slice of Identity objects if successful, or an error if any operation fails.
func (Datasource) GetAllIdentitiesWithFilter ¶ added in v0.13.2
func (d Datasource) GetAllIdentitiesWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Identity, error)
GetAllIdentitiesWithFilter retrieves identities with advanced filtering support. It delegates to GetAllIdentitiesWithFilterAndOptions with nil options.
Parameters: - ctx: Context for the database operation. - filters: A QueryFilterSet containing the filter conditions. - limit: The maximum number of identities to return. - offset: The offset to start fetching identities from (for pagination).
Returns: - []model.Identity: A slice of identities matching the filter criteria. - error: An error if the query fails or if there's an issue processing the results.
func (Datasource) GetAllIdentitiesWithFilterAndOptions ¶ added in v0.13.2
func (d Datasource) GetAllIdentitiesWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, limit, offset int) ([]model.Identity, *int64, error)
GetAllIdentitiesWithFilterAndOptions retrieves identities with filtering, sorting, and optional count. It uses the filter package to build SQL WHERE and ORDER BY conditions.
Parameters: - ctx: Context for the database operation. - filters: A QueryFilterSet containing the filter conditions. - opts: Query options including sorting and count settings. - limit: The maximum number of identities to return. - offset: The offset to start fetching identities from (for pagination).
Returns: - []model.Identity: A slice of identities matching the filter criteria. - *int64: Optional total count of matching records (if opts.IncludeCount is true). - error: An error if the query fails or if there's an issue processing the results.
func (Datasource) GetAllLedgers ¶
func (d Datasource) GetAllLedgers(limit, offset int) ([]model.Ledger, error)
GetAllLedgers retrieves a paginated list of ledger records from the database, unmarshaling their metadata from JSON format. This method supports pagination and can be used to efficiently retrieve all ledgers over multiple requests.
Parameters: - limit: The maximum number of ledgers to return (e.g., 20). - offset: The offset to start fetching ledgers from (for pagination).
Returns: - []model.Ledger: A slice of ledgers retrieved from the database. - error: An error if the query fails or if there's an issue processing the results.
func (Datasource) GetAllLedgersWithFilter ¶ added in v0.13.2
func (d Datasource) GetAllLedgersWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Ledger, error)
GetAllLedgersWithFilter retrieves ledgers with advanced filtering support. It delegates to GetAllLedgersWithFilterAndOptions with nil options.
Parameters: - ctx: Context for the database operation. - filters: A QueryFilterSet containing the filter conditions. - limit: The maximum number of ledgers to return. - offset: The offset to start fetching ledgers from (for pagination).
Returns: - []model.Ledger: A slice of ledgers matching the filter criteria. - error: An error if the query fails or if there's an issue processing the results.
func (Datasource) GetAllLedgersWithFilterAndOptions ¶ added in v0.13.2
func (d Datasource) GetAllLedgersWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, limit, offset int) ([]model.Ledger, *int64, error)
GetAllLedgersWithFilterAndOptions retrieves ledgers with filtering, sorting, and optional count. It uses the filter package to build SQL WHERE and ORDER BY conditions.
Parameters: - ctx: Context for the database operation. - filters: A QueryFilterSet containing the filter conditions. - opts: Query options including sorting and count settings. - limit: The maximum number of ledgers to return. - offset: The offset to start fetching ledgers from (for pagination).
Returns: - []model.Ledger: A slice of ledgers matching the filter criteria. - *int64: Optional total count of matching records (if opts.IncludeCount is true). - error: An error if the query fails or if there's an issue processing the results.
func (Datasource) GetAllMonitors ¶
func (d Datasource) GetAllMonitors() ([]model.BalanceMonitor, error)
GetAllMonitors retrieves all balance monitors from the database. It queries the `blnk.balance_monitors` table and returns a list of all monitors.
Returns: - []model.BalanceMonitor: A slice of BalanceMonitor objects if the query is successful. - error: If an error occurs during the query or while scanning the result set, an `APIError` is returned.
func (Datasource) GetAllTransactions ¶
func (d Datasource) GetAllTransactions(ctx context.Context, limit, offset int) ([]model.Transaction, error)
GetAllTransactions retrieves all transactions from the database, ordered by creation date in descending order. It traces the operation using OpenTelemetry and returns an error if the retrieval or processing fails. Parameters: - ctx: Context for managing the request and tracing. Returns: - A slice of transactions or an error if the retrieval fails.
func (Datasource) GetAllTransactionsWithFilter ¶ added in v0.13.2
func (d Datasource) GetAllTransactionsWithFilter(ctx context.Context, filters *filter.QueryFilterSet, limit, offset int) ([]model.Transaction, error)
GetAllTransactionsWithFilter retrieves transactions with advanced filtering support. It delegates to GetAllTransactionsWithFilterAndOptions with nil options.
Parameters: - ctx: Context for the database operation. - filters: A QueryFilterSet containing the filter conditions. - limit: The maximum number of transactions to return. - offset: The offset to start fetching transactions from (for pagination).
Returns: - []model.Transaction: A slice of transactions matching the filter criteria. - error: An error if the query fails or if there's an issue processing the results.
func (Datasource) GetAllTransactionsWithFilterAndOptions ¶ added in v0.13.2
func (d Datasource) GetAllTransactionsWithFilterAndOptions(ctx context.Context, filters *filter.QueryFilterSet, opts *filter.QueryOptions, limit, offset int) ([]model.Transaction, *int64, error)
GetAllTransactionsWithFilterAndOptions retrieves transactions with filtering, sorting, and optional count. It uses the filter package to build SQL WHERE and ORDER BY conditions.
Parameters: - ctx: Context for the database operation. - filters: A QueryFilterSet containing the filter conditions. - opts: Query options including sorting and count settings. - limit: The maximum number of transactions to return. - offset: The offset to start fetching transactions from (for pagination).
Returns: - []model.Transaction: A slice of transactions matching the filter criteria. - *int64: Optional total count of matching records (if opts.IncludeCount is true). - error: An error if the query fails or if there's an issue processing the results.
func (Datasource) GetBalanceAtTime ¶
func (d Datasource) GetBalanceAtTime(ctx context.Context, balanceID string, targetTime time.Time, fromSource bool) (*model.Balance, error)
GetBalanceAtTime retrieves the balance state at a specific point in time. It finds the most recent snapshot before the target time and applies any subsequent transactions to calculate the exact balance state. If fromSource is true, it skips using snapshots and calculates directly from all transactions.
Parameters: - ctx: Context for the database operations - balanceID: The ID of the balance to query - targetTime: The point in time for which to get the balance state - fromSource: If true, calculation is done from all transactions rather than using snapshots
Returns: - *Balance: The calculated balance state at the target time - error: An APIError if any issues occur during the operation
func (Datasource) GetBalanceByID ¶
func (d Datasource) GetBalanceByID(id string, include []string, withQueued bool) (*model.Balance, error)
GetBalanceByID retrieves a balance by its ID from the database, along with optional related data such as identity or ledger, based on the `include` parameter. The method starts a transaction, executes the query, and processes the result.
Parameters: - id: The unique ID of the balance to retrieve. - include: A slice of strings that specifies which related data to include in the result. Possible values include "identity" and "ledger". - withQueued: A boolean that specifies whether to include queued amounts in the result.
Returns: - *model.Balance: A pointer to the retrieved Balance object. - error: Returns an APIError in case of errors such as database failures or if the balance is not found.
func (Datasource) GetBalanceByIDLite ¶
func (d Datasource) GetBalanceByIDLite(id string) (*model.Balance, error)
GetBalanceByIDLite retrieves a balance by its unique ID with a lighter set of fields. This version avoids loading additional related data like identity and ledger.
Parameters: - id: The ID of the balance to retrieve.
Returns: - *model.Balance: A pointer to the retrieved Balance object. - error: Returns an APIError in case of errors such as database failures or if the balance is not found.
func (Datasource) GetBalanceByIndicator ¶
func (d Datasource) GetBalanceByIndicator(indicator, currency string) (*model.Balance, error)
GetBalanceByIndicator retrieves a balance from the database using the specified indicator and currency. The function scans the query result into a Balance object and converts various fields from int64 to big.Int. It returns the balance if found, or an error if the balance does not exist.
Parameters: - indicator: A unique identifier associated with the balance (e.g., an account identifier). - currency: The currency in which the balance is denominated.
Returns: - *model.Balance: The retrieved balance object or an empty Balance object if not found. - error: An error if any issues occur during the query execution or data retrieval.
func (Datasource) GetBalanceMonitors ¶
func (d Datasource) GetBalanceMonitors(balanceID string) ([]model.BalanceMonitor, error)
GetBalanceMonitors retrieves all balance monitors associated with a specific balance ID from the database. It queries the `blnk.balance_monitors` table to find all monitors linked to the provided `balanceID`.
Parameters: - balanceID: The ID of the balance for which monitors are being retrieved.
Returns: - []model.BalanceMonitor: A slice of BalanceMonitor objects associated with the balance ID. - error: If an error occurs during the query or while scanning the result set, an `APIError` is returned.
func (Datasource) GetBalancesByIDsLite ¶ added in v0.13.0
func (d Datasource) GetBalancesByIDsLite(ctx context.Context, ids []string) (map[string]*model.Balance, error)
GetBalancesByIDsLite retrieves multiple balances by their IDs in a single query. Returns a map of balance_id to Balance for easy lookup. Balances that are not found are simply not included in the result map.
Parameters: - ctx context.Context: The context for the operation. - ids []string: The list of balance IDs to retrieve.
Returns: - map[string]*model.Balance: A map of balance_id to Balance. - error: Returns an error in case of database failures.
func (Datasource) GetExternalTransactionsByReconciliationID ¶
func (d Datasource) GetExternalTransactionsByReconciliationID(ctx context.Context, reconciliationID string) ([]*model.ExternalTransaction, error)
GetExternalTransactionsByReconciliationID fetches all external transactions associated with a given reconciliation ID. Parameters: - ctx: Context for managing request and tracing. - reconciliationID: The ID of the reconciliation to fetch external transactions for. Returns: - A slice of ExternalTransaction pointers or an error wrapped in an APIError if the operation fails.
func (Datasource) GetExternalTransactionsPaginated ¶
func (d Datasource) GetExternalTransactionsPaginated(ctx context.Context, uploadID string, batchSize int, offset int64) ([]*model.ExternalTransaction, error)
GetExternalTransactionsPaginated retrieves external transactions based on the provided upload ID with pagination. It first checks the cache, and if the data is not available, it fetches from the database and caches the result for 5 minutes. Parameters: - ctx: Context for managing the request and tracing. - uploadID: The ID of the upload to filter external transactions. - batchSize: The number of transactions to retrieve per batch. - offset: The starting point to retrieve transactions from. Returns: - A slice of ExternalTransaction objects or an APIError if the operation fails.
func (Datasource) GetIdentityByID ¶
func (d Datasource) GetIdentityByID(id string) (*model.Identity, error)
GetIdentityByID retrieves an identity from the database based on the given identity ID. It starts a transaction, executes a query to fetch the identity details, and commits the transaction upon success. Parameters: - id: The ID of the identity to be retrieved. Returns: - A pointer to the Identity object if found, or an error if the identity is not found or the query fails.
func (Datasource) GetInflightTransactionsByParentID ¶
func (d Datasource) GetInflightTransactionsByParentID(ctx context.Context, parentTransactionID string, batchSize int, offset int64) ([]*model.Transaction, error)
GetInflightTransactionsByParentID retrieves all inflight transactions associated with a given parent transaction ID. It supports pagination via batchSize and offset. Transactions with status 'INFLIGHT' are fetched. If no INFLIGHT transactions exist, then transactions with status 'QUEUED' and meta_data.inflight=true are considered. Parameters: - ctx: Context for managing request and tracing. - parentTransactionID: The ID of the parent transaction to filter by. - batchSize: Number of transactions to retrieve in one batch. - offset: Number of transactions to skip before retrieving the batch. Returns: - A slice of inflight transactions or an error if retrieval fails.
func (Datasource) GetLedgerByID ¶
func (d Datasource) GetLedgerByID(id string) (*model.Ledger, error)
GetLedgerByID retrieves a ledger record from the database by its ID. It handles cases where the ledger is not found and unmarshals the metadata from JSON format.
Parameters: - id: The unique ID of the ledger to retrieve.
Returns: - *model.Ledger: The ledger object, if found. - error: An error if the ledger is not found or if the query fails.
func (Datasource) GetLineageMappingByProvider ¶ added in v0.13.0
func (d Datasource) GetLineageMappingByProvider(ctx context.Context, balanceID, provider string) (*model.LineageMapping, error)
GetLineageMappingByProvider retrieves a specific lineage mapping for a balance and provider.
func (Datasource) GetLineageMappings ¶ added in v0.13.0
func (d Datasource) GetLineageMappings(ctx context.Context, balanceID string) ([]model.LineageMapping, error)
GetLineageMappings retrieves all lineage mappings for a given balance ID.
func (Datasource) GetMatchesByReconciliationID ¶
func (d Datasource) GetMatchesByReconciliationID(ctx context.Context, reconciliationID string) ([]*model.Match, error)
GetMatchesByReconciliationID retrieves all matches associated with a given reconciliation ID. It joins the matches table with external transactions to filter matches by reconciliation ID. Parameters: - ctx: Context for managing request and tracing. - reconciliationID: The ID of the reconciliation to filter matches. Returns: - A slice of Match structs representing the matched transactions. - An error if the operation fails, wrapped in an APIError for consistency.
func (Datasource) GetMatchingRule ¶
func (d Datasource) GetMatchingRule(ctx context.Context, id string) (*model.MatchingRule, error)
GetMatchingRule retrieves a specific matching rule from the database by its rule ID. Parameters: - ctx: Context for managing the request and tracing. - id: The ID of the matching rule to be retrieved. Returns: - A pointer to the MatchingRule object if found, or an APIError if the operation fails.
func (Datasource) GetMatchingRules ¶
func (d Datasource) GetMatchingRules(ctx context.Context) ([]*model.MatchingRule, error)
GetMatchingRules retrieves all matching rules from the database. Parameters: - ctx: Context for managing the request and tracing. Returns: - A slice of MatchingRule pointers or an error wrapped in an APIError if the operation fails.
func (Datasource) GetMonitorByID ¶
func (d Datasource) GetMonitorByID(id string) (*model.BalanceMonitor, error)
GetMonitorByID retrieves a BalanceMonitor by its unique MonitorID from the database. It queries the `blnk.balance_monitors` table and maps the result into a model.BalanceMonitor object.
Parameters: - id: The MonitorID of the monitor to retrieve.
Returns: - *model.BalanceMonitor: A pointer to the BalanceMonitor object if found. - error: If the monitor is not found or if any errors occur during the query, an `APIError` is returned.
func (Datasource) GetOutboxByTransactionID ¶ added in v0.13.0
func (d Datasource) GetOutboxByTransactionID(ctx context.Context, transactionID string) (*model.LineageOutbox, error)
GetOutboxByTransactionID retrieves an outbox entry by its transaction ID.
func (Datasource) GetQueuedAmounts ¶
func (d Datasource) GetQueuedAmounts(ctx context.Context, balanceID string) (debit, credit *big.Int, err error)
GetQueuedAmounts retrieves the total queued debit and credit amounts for a given balance ID. It only includes transactions with status 'QUEUED' that don't have child transactions with status 'APPLIED' or 'REJECTED'. This ensures that queued transactions that have been processed (either applied or rejected) are excluded from the totals. Parameters: - ctx: Context for managing the request and tracing. - balanceID: The ID of the balance to retrieve queued amounts for. Returns: - The total debit and credit amounts as big.Int values, or an error if the retrieval fails.
func (Datasource) GetReconciliation ¶
func (d Datasource) GetReconciliation(ctx context.Context, id string) (*model.Reconciliation, error)
GetReconciliation fetches a reconciliation record from the database based on its ID. Parameters: - ctx: Context for managing request and tracing. - id: The reconciliation ID to search for. Returns: - A pointer to the reconciliation record if found, or an error if not found or if a failure occurs.
func (Datasource) GetReconciliationsByUploadID ¶
func (d Datasource) GetReconciliationsByUploadID(ctx context.Context, uploadID string) ([]*model.Reconciliation, error)
GetReconciliationsByUploadID retrieves all reconciliations associated with a specific upload ID, ordered by the start date in descending order. Parameters: - ctx: Context for managing request and tracing. - uploadID: The upload ID to filter the reconciliations. Returns: - A slice of Reconciliation pointers and an error if the query fails.
func (Datasource) GetRefundableTransactionsByParentID ¶
func (d Datasource) GetRefundableTransactionsByParentID(ctx context.Context, parentTransactionID string, batchSize int, offset int64) ([]*model.Transaction, error)
GetRefundableTransactionsByParentID retrieves transactions associated with a given parent transaction ID that are eligible for refunds. Refundable transactions are those with status 'APPLIED' or 'VOID'. It supports pagination with batchSize and offset. Parameters: - ctx: Context for managing request and tracing. - parentTransactionID: The ID of the parent transaction to filter by. - batchSize: Number of transactions to retrieve in one batch. - offset: Number of transactions to skip before retrieving the batch. Returns: - A slice of refundable transactions or an error if retrieval fails.
func (Datasource) GetSourceDestination ¶
func (d Datasource) GetSourceDestination(sourceId, destinationId string) ([]*model.Balance, error)
GetSourceDestination retrieves balances for both the source and destination by their IDs. It queries the database using a stored procedure `blnk.get_balances_by_id`, which takes the sourceId and destinationId as inputs. The function processes each balance, converting balance fields to big.Int and parsing the metadata from JSON format. It returns a slice of pointers to Balance objects or an error if any issues occur during the query or data processing.
Parameters: - sourceId: The ID of the source balance to retrieve. - destinationId: The ID of the destination balance to retrieve.
Returns: - []*model.Balance: A slice of pointers to Balance objects containing the source and destination balances with their details such as balance amount, credit balance, debit balance, and metadata. - error: An error if any occurs during the query execution, data retrieval, or JSON parsing.
func (Datasource) GetStuckQueuedTransactions ¶ added in v0.13.2
func (d Datasource) GetStuckQueuedTransactions(ctx context.Context, threshold time.Duration, batchSize int) ([]*model.Transaction, error)
GetStuckQueuedTransactions retrieves QUEUED transactions that are older than the threshold and have no child transactions, indicating they were never picked up by Redis processing.
func (Datasource) GetTotalCommittedTransactions ¶
func (d Datasource) GetTotalCommittedTransactions(ctx context.Context, parentID string) (*big.Int, error)
GetTotalCommittedTransactions calculates the total committed transaction amounts for a given parent transaction. It uses OpenTelemetry for tracing and returns the total or an error if the retrieval fails. Parameters: - ctx: Context for managing the request and tracing. - parentID: The ID of the parent transaction to retrieve totals for. Returns: - The total committed amount as *big.Int, or 0 if no transactions are found, along with an error if the retrieval fails.
func (Datasource) GetTransaction ¶
func (d Datasource) GetTransaction(ctx context.Context, id string) (*model.Transaction, error)
GetTransaction retrieves a transaction by its ID from the database. It logs the transaction retrieval using OpenTelemetry tracing. Parameters: - ctx: Context for managing the request and tracing. - id: The unique transaction ID. Returns: - The retrieved transaction if successful, or an error if retrieval fails.
func (Datasource) GetTransactionByRef ¶
func (d Datasource) GetTransactionByRef(ctx context.Context, reference string) (model.Transaction, error)
GetTransactionByRef retrieves a transaction from the database using the provided reference. It traces the operation using OpenTelemetry and returns the transaction or an error. Parameters: - ctx: Context for managing the request and tracing. - reference: The reference of the transaction to retrieve. Returns: - A model.Transaction representing the transaction or an error if the retrieval fails.
func (Datasource) GetTransactionsByCriteria ¶ added in v0.11.4
func (d Datasource) GetTransactionsByCriteria(ctx context.Context, minAmount, maxAmount *float64, currency *string, minDate, maxDate *time.Time, limit int, offset int64) ([]*model.Transaction, error)
GetTransactionsByCriteria retrieves transactions based on specified criteria (amount range, currency, date range). It supports pagination via limit and offset.
func (Datasource) GetTransactionsByParent ¶
func (d Datasource) GetTransactionsByParent(ctx context.Context, parentID string, limit int, offset int64) ([]*model.Transaction, error)
GetTransactionsByParent retrieves all transactions associated with a given parent transaction ID. It supports pagination via limit and offset parameters. Parameters: - ctx: Context for managing request and tracing. - parentID: The ID of the parent transaction to filter by. - limit: Maximum number of transactions to retrieve. - offset: Number of transactions to skip before retrieving the batch. Returns: - A slice of transactions or an error if retrieval fails.
func (Datasource) GetTransactionsByShadowFor ¶ added in v0.13.0
func (d Datasource) GetTransactionsByShadowFor(ctx context.Context, parentTransactionID string) ([]model.Transaction, error)
func (Datasource) GetTransactionsPaginated ¶
func (d Datasource) GetTransactionsPaginated(ctx context.Context, _ string, batchSize int, offset int64) ([]*model.Transaction, error)
GetTransactionsPaginated retrieves a batch of transactions from the database with pagination support and caches the result. If the data is found in cache, it is returned from there; otherwise, it is fetched from the database and then cached. Parameters: - ctx: Context for managing request and tracing. - batchSize: Number of transactions to retrieve in one batch. - offset: Number of transactions to skip before retrieving the batch. Returns: - A slice of transactions, or an error if the retrieval or caching fails.
func (Datasource) GroupTransactions ¶
func (d Datasource) GroupTransactions(ctx context.Context, groupCriteria string, batchSize int, offset int64) (map[string][]*model.Transaction, error)
GroupTransactions retrieves and groups transactions from the database based on a specified column (groupCriteria). It supports pagination and caches the grouped results for efficiency. If the data is found in the cache, it returns the cached data. Parameters: - ctx: Context for managing request and tracing. - groupCriteria: Column to group transactions by (e.g., "currency", "status"). - batchSize: Number of transactions to retrieve in one batch. - offset: Number of transactions to skip before retrieving the batch. Returns: - A map of grouped transactions, or an error if retrieval or grouping fails.
func (Datasource) HasPendingCreditOutbox ¶ added in v0.13.0
HasPendingCreditOutbox checks if there are pending credit outbox entries for a given balance. This is used to detect race conditions where a debit is being processed before credits are complete.
func (Datasource) InsertLineageOutbox ¶ added in v0.13.0
func (d Datasource) InsertLineageOutbox(ctx context.Context, outbox *model.LineageOutbox) error
InsertLineageOutbox inserts a lineage outbox entry directly (not within a transaction). Use this for queueing work like shadow commit/void that happens after the main transaction commits.
func (Datasource) InsertLineageOutboxInTx ¶ added in v0.13.0
func (d Datasource) InsertLineageOutboxInTx(ctx context.Context, tx *sql.Tx, outbox *model.LineageOutbox) error
InsertLineageOutboxInTx inserts a lineage outbox entry within an existing database transaction. This ensures the outbox entry is committed atomically with the main transaction.
func (Datasource) IsParentTransactionVoid ¶
IsParentTransactionVoid checks if a parent transaction has a status of 'VOID'. It uses OpenTelemetry to trace the operation and returns a boolean indicating whether any child transaction linked to the parent has a 'VOID' status. Parameters: - ctx: Context for managing the request and tracing. - parentID: The unique ID of the parent transaction. Returns: - A boolean indicating whether the parent transaction is void, or an error if the check fails.
func (Datasource) IsTransactionRefunded ¶
func (d Datasource) IsTransactionRefunded(ctx context.Context, transaction *model.Transaction) (bool, error)
IsTransactionRefunded checks if a transaction has already been refunded by looking for a transaction that has the inverse source/destination and references the original transaction as its parent. Parameters: - ctx: Context for managing request and tracing. - transaction: The original transaction to check for refunds. Returns: - bool: true if the transaction has been refunded, false otherwise - error: an error if the check fails
func (*Datasource) ListAPIKeys ¶
ListAPIKeys retrieves all API keys belonging to a specific owner from the database. The results are ordered by creation date in descending order (newest first). Note: The Key field will contain hashed values. For user display, show only a preview (e.g., last 4 characters of the key ID or a masked version).
Parameters: - ctx: Context for managing the request lifecycle and cancellation. - ownerID: The ID of the owner whose API keys should be retrieved.
Returns: - []*model.APIKey: A slice of API key objects belonging to the specified owner. - error: An error if the database query fails or if there are issues scanning the results.
func (Datasource) LoadReconciliationProgress ¶
func (d Datasource) LoadReconciliationProgress(ctx context.Context, reconciliationID string) (model.ReconciliationProgress, error)
LoadReconciliationProgress retrieves the progress of a reconciliation process from the database. If the reconciliation progress is not found, it returns an empty ReconciliationProgress object. Parameters: - ctx: Context for managing the request and tracing. - reconciliationID: The ID of the reconciliation whose progress is being retrieved. Returns: - A ReconciliationProgress object containing the current progress, or an error wrapped in an APIError if any issues occur.
func (Datasource) MarkOutboxCompleted ¶ added in v0.13.0
func (d Datasource) MarkOutboxCompleted(ctx context.Context, id int64) error
MarkOutboxCompleted marks an outbox entry as completed.
func (Datasource) MarkOutboxFailed ¶ added in v0.13.0
MarkOutboxFailed marks an outbox entry as failed and increments the attempt counter. If attempts exceed max_attempts, the status becomes 'failed', otherwise it returns to 'pending' for retry.
func (Datasource) RecordExternalTransaction ¶
func (d Datasource) RecordExternalTransaction(ctx context.Context, tx *model.ExternalTransaction, uploadID string) error
RecordExternalTransaction inserts a new external transaction into the database. It associates the transaction with an upload ID and records its details. Parameters: - ctx: Context for managing request and tracing. - tx: Pointer to the ExternalTransaction struct containing transaction data. - uploadID: The ID of the upload batch this transaction belongs to. Returns: - An error if the operation fails, wrapped in an APIError for consistency.
func (Datasource) RecordMatch ¶
RecordMatch inserts a single match into the database. It is used to record the match between external and internal transactions during reconciliation. Parameters: - ctx: Context for managing request and tracing. - match: A pointer to the Match struct containing details of the external and internal transactions, and their match status. Returns: - An error if the operation fails, wrapped in an APIError for consistency.
func (Datasource) RecordMatches ¶
func (d Datasource) RecordMatches(ctx context.Context, reconciliationID string, matches []model.Match) error
RecordMatches batches the saving of match records associated with a specific reconciliation ID. It uses a database transaction to ensure atomicity and consistency of the batch insert operation. Parameters: - ctx: Context for managing request and tracing. - reconciliationID: The ID of the reconciliation the matches are associated with. - matches: A slice of Match objects to be recorded. Returns: - An error if the operation fails, wrapped in an APIError for consistency.
func (Datasource) RecordMatchingRule ¶
func (d Datasource) RecordMatchingRule(ctx context.Context, rule *model.MatchingRule) error
RecordMatchingRule saves a new matching rule to the database. Parameters: - ctx: Context for managing the request and tracing. - rule: A pointer to the MatchingRule object containing rule details. Returns: - An error wrapped in an APIError if the operation fails.
func (Datasource) RecordReconciliation ¶
func (d Datasource) RecordReconciliation(ctx context.Context, rec *model.Reconciliation) error
RecordReconciliation saves a reconciliation record to the database. Parameters: - ctx: Context for managing request and tracing. - rec: The reconciliation data to be stored. Returns: - An error if the reconciliation record fails to save, otherwise nil.
func (Datasource) RecordTransaction ¶
func (d Datasource) RecordTransaction(ctx context.Context, txn *model.Transaction) (*model.Transaction, error)
RecordTransaction records a new transaction in the database. It logs the transaction details using OpenTelemetry tracing. Parameters: - ctx: Context for managing the request and tracing. - txn: The transaction object containing details to be recorded. Returns: - The recorded transaction if successful, or an error if the recording fails.
func (Datasource) RecordTransactionWithBalances ¶ added in v0.13.0
func (d Datasource) RecordTransactionWithBalances(ctx context.Context, txn *model.Transaction, sourceBalance, destinationBalance *model.Balance) (*model.Transaction, error)
RecordTransactionWithBalances atomically records a transaction and updates both source and destination balances within a single database transaction. This ensures that either all operations succeed together, or none of them are committed, preventing inconsistent ledger states.
Parameters: - ctx: Context for managing the request and tracing. - txn: The transaction object containing details to be recorded. - sourceBalance: The source balance to be updated. - destinationBalance: The destination balance to be updated.
Returns: - The recorded transaction if successful, or an error if any operation fails.
func (Datasource) RecordTransactionWithBalancesAndOutbox ¶ added in v0.13.0
func (d Datasource) RecordTransactionWithBalancesAndOutbox(ctx context.Context, txn *model.Transaction, sourceBalance, destinationBalance *model.Balance, outbox *model.LineageOutbox) (*model.Transaction, error)
RecordTransactionWithBalancesAndOutbox atomically records a transaction, updates balances, and optionally inserts a lineage outbox entry within a single database transaction. This ensures that the lineage processing intent is captured atomically with the main transaction, guaranteeing no lineage work is lost even if subsequent async operations fail.
Parameters: - ctx: Context for managing the request and tracing. - txn: The transaction object containing details to be recorded. - sourceBalance: The source balance to be updated. - destinationBalance: The destination balance to be updated. - outbox: Optional lineage outbox entry to insert atomically (can be nil if no lineage processing needed).
Returns: - The recorded transaction if successful, or an error if any operation fails.
func (Datasource) RecordUnmatched ¶
func (d Datasource) RecordUnmatched(ctx context.Context, reconciliationID string, results []string) error
RecordUnmatched batches the saving of unmatched external transactions related to a specific reconciliation. It uses a database transaction to ensure atomicity and consistency of the batch insert operation. Parameters: - ctx: Context for managing request and tracing. - reconciliationID: The ID of the reconciliation the unmatched transactions are associated with. - results: A slice of external transaction IDs (as strings) representing unmatched transactions. Returns: - An error if the operation fails, wrapped in an APIError for consistency.
func (*Datasource) RevokeAPIKey ¶
func (s *Datasource) RevokeAPIKey(ctx context.Context, id, ownerID string) error
RevokeAPIKey marks an API key as revoked in the database, preventing its future use. The function updates the is_revoked flag to true and sets the revoked_at timestamp. It also invalidates the cache entry for the revoked key to ensure immediate effect. Only the owner of the API key can revoke it.
Parameters: - ctx: Context for managing the request lifecycle and cancellation. - id: The unique identifier of the API key to revoke. - ownerID: The ID of the owner, used to ensure only the owner can revoke their keys.
Returns:
- error: Returns ErrAPIKeyNotFound if the key doesn't exist or doesn't belong to the owner, or other database errors if the update operation fails.
func (Datasource) SaveReconciliationProgress ¶
func (d Datasource) SaveReconciliationProgress(ctx context.Context, reconciliationID string, progress model.ReconciliationProgress) error
SaveReconciliationProgress saves the progress of a reconciliation process to the database. If a record with the given reconciliation ID already exists, it updates the progress information. The function ensures that the reconciliation progress is stored and updated properly. Parameters: - ctx: Context for managing the request and tracing. - reconciliationID: The ID of the reconciliation to track progress. - progress: A ReconciliationProgress object containing the number of processed transactions and the last processed external transaction ID. Returns: - An error if the operation fails, wrapped in an APIError if needed.
func (Datasource) TakeBalanceSnapshots ¶
TakeBalanceSnapshots creates daily snapshots of balances in batches. It uses the PostgreSQL function to process balances in chunks to avoid memory issues with large datasets.
Parameters: - ctx: Context for the operation, allowing for timeouts and cancellation - batchSize: The number of balances to process in each batch
Returns: - int: The total number of snapshots created - error: Returns an APIError if the operation fails
func (Datasource) TransactionExistsByIDOrParentID ¶
TransactionExistsByIDOrParentID checks if a transaction exists either by its direct ID or as a parent transaction ID for other transactions. Parameters: - ctx: Context for managing the request and tracing. - id: The ID to search for in both transaction_id and parent_transaction fields. Returns: - A boolean indicating whether the transaction exists in either capacity, and an error if the check fails.
func (Datasource) TransactionExistsByRef ¶
TransactionExistsByRef checks if a transaction with a given reference exists in the database. It uses OpenTelemetry to trace the operation and returns a boolean indicating whether the transaction exists. Parameters: - ctx: Context for managing the request and tracing. - reference: The reference of the transaction to check for existence. Returns: - A boolean indicating whether the transaction exists, or an error if the check fails.
func (Datasource) UpdateAccount ¶
func (d Datasource) UpdateAccount(account *model.Account) error
UpdateAccount updates a specific account in the database. It updates the account's name, number, bank name, and metadata based on the account ID. Parameters: - account: A pointer to the Account object containing the updated account information. Returns: - An error if the update fails, otherwise returns nil.
func (Datasource) UpdateBalance ¶
func (d Datasource) UpdateBalance(balance *model.Balance) error
UpdateBalance updates an existing balance entry in the database. This method takes a balance object and updates the corresponding fields in the database, based on the provided balance ID. It handles both the balance data and the associated metadata.
Parameters: - balance: A pointer to the balance object containing the updated balance information. This includes fields such as `balance`, `credit_balance`, `debit_balance`, `currency`, `currency_multiplier`, and `meta_data`.
Returns: - error: If the update operation encounters an error, such as a database failure or if the balance ID is not found, an `APIError` is returned.
func (Datasource) UpdateBalanceIdentity ¶
func (d Datasource) UpdateBalanceIdentity(balanceID string, identityID string) error
UpdateBalanceIdentity updates the identity_id of a balance entry in the database.
Parameters: - balanceID: The unique identifier of the balance whose identity reference is to be updated. - identityID: The identity ID to be associated with the balance.
Returns: - error: An error is returned if the balance or identity does not exist or the database operation fails.
func (*Datasource) UpdateBalanceMetadata ¶
func (d *Datasource) UpdateBalanceMetadata(ctx context.Context, id string, metadata map[string]interface{}) error
UpdateBalanceMetadata updates the metadata for a specific balance in the database. It marshals the metadata map to JSON before storing it.
Parameters: - ctx: The context for the database operation. - id: The ID of the balance to update. - metadata: The new metadata to store.
Returns: - error: An error if the update operation fails.
func (Datasource) UpdateBalances ¶
func (d Datasource) UpdateBalances(ctx context.Context, sourceBalance, destinationBalance *model.Balance) error
UpdateBalances updates both the source and destination balances in a single transaction. The function begins a database transaction, updates the balances, and commits the transaction if all updates succeed. In case of any failure, the transaction is rolled back to ensure data integrity.
Parameters: - ctx: The context to manage the lifecycle of the transaction. - sourceBalance: A pointer to the source balance object that needs to be updated. - destinationBalance: A pointer to the destination balance object that needs to be updated.
Returns: - error: Returns an error if there is a failure to start the transaction, update any of the balances, or commit the transaction.
func (Datasource) UpdateIdentity ¶
func (d Datasource) UpdateIdentity(identity *model.Identity) error
UpdateIdentity updates a specific identity record in the database. It marshals the identity metadata, constructs an SQL update query, and checks the result. Parameters: - identity: A pointer to the Identity object containing the updated details. Returns: - An error if the update fails, or nil if successful.
func (*Datasource) UpdateIdentityMetadata ¶
func (d *Datasource) UpdateIdentityMetadata(id string, metadata map[string]interface{}) error
UpdateIdentityMetadata updates the metadata for a specific identity in the database. It marshals the metadata map to JSON before storing it.
Parameters: - id: The ID of the identity to update. - metadata: The new metadata to store.
Returns: - error: An error if the update operation fails.
func (*Datasource) UpdateLastUsed ¶
func (s *Datasource) UpdateLastUsed(ctx context.Context, id string) error
UpdateLastUsed updates the last_used_at timestamp for an API key to the current time. This function is typically called during authentication to track API key usage patterns. Note: This does NOT invalidate the cache to avoid performance overhead on every request. The cached version will still work for authentication, and the timestamp will be updated in the database for auditing purposes.
Parameters: - ctx: Context for managing the request lifecycle and cancellation. - id: The unique identifier of the API key to update.
Returns: - error: An error if the database update operation fails.
func (Datasource) UpdateLedger ¶
func (d Datasource) UpdateLedger(id, name string) (*model.Ledger, error)
UpdateLedger updates an existing ledger's name in the database. It validates that the ledger exists and updates only the name field.
Parameters: - id: The unique ID of the ledger to update. - name: The new name for the ledger.
Returns: - *model.Ledger: The updated ledger object. - error: An error if the ledger is not found or if the update fails.
func (*Datasource) UpdateLedgerMetadata ¶
func (d *Datasource) UpdateLedgerMetadata(id string, metadata map[string]interface{}) error
UpdateLedgerMetadata updates the metadata for a specific ledger in the database. It marshals the metadata map to JSON before storing it.
Parameters: - id: The ID of the ledger to update. - metadata: The new metadata to store.
Returns: - error: An error if the update operation fails.
func (Datasource) UpdateMatchingRule ¶
func (d Datasource) UpdateMatchingRule(ctx context.Context, rule *model.MatchingRule) error
UpdateMatchingRule updates a specific matching rule in the database. Parameters: - ctx: Context for managing the request and tracing. - rule: The matching rule to be updated, including the RuleID, name, description, and criteria. Returns: - An error wrapped in an APIError if the operation fails, or nil if the update is successful.
func (Datasource) UpdateMonitor ¶
func (d Datasource) UpdateMonitor(monitor *model.BalanceMonitor) error
UpdateMonitor updates an existing balance monitor in the database. It updates fields such as `balance_id`, `field`, `operator`, `value`, `description`, and `call_back_url` for the monitor identified by `monitor_id`.
Parameters: - monitor: A pointer to the `BalanceMonitor` object containing the updated values.
Returns: - error: If the update fails, an appropriate `APIError` is returned.
func (Datasource) UpdateReconciliationStatus ¶
func (d Datasource) UpdateReconciliationStatus(ctx context.Context, id string, status string, matchedCount, unmatchedCount int) error
UpdateReconciliationStatus updates the status, matched transactions, unmatched transactions, and completed_at timestamp of a reconciliation in the database. Parameters: - ctx: Context for managing request and tracing. - id: The reconciliation ID to update. - status: The new status of the reconciliation. - matchedCount: The number of matched transactions. - unmatchedCount: The number of unmatched transactions. Returns: - An error if the update fails or the reconciliation is not found.
func (*Datasource) UpdateTransactionMetadata ¶
func (d *Datasource) UpdateTransactionMetadata(ctx context.Context, id string, metadata map[string]interface{}) error
UpdateTransactionMetadata updates the metadata for a specific transaction in the database. It merges the provided metadata with existing metadata for each matching transaction. The update applies to both the transaction with the provided ID and any transactions where this ID is set as the parent_transaction.
Parameters: - ctx: The context for the database operation. - id: The ID of the transaction to update. - metadata: The new metadata to merge with existing metadata.
Returns: - error: An error if the update operation fails.
func (Datasource) UpdateTransactionStatus ¶
UpdateTransactionStatus updates the status of a transaction in the database. It traces the operation using OpenTelemetry and returns an error if the update fails or if the transaction is not found. Parameters: - ctx: Context for managing the request and tracing. - id: The ID of the transaction to update. - status: The new status to set for the transaction. Returns: - An error if the update fails or if the transaction is not found.
func (Datasource) UpsertLineageMapping ¶ added in v0.13.0
func (d Datasource) UpsertLineageMapping(ctx context.Context, mapping model.LineageMapping) error
UpsertLineageMapping creates or updates a lineage mapping between a main balance and its shadow balances. If a mapping already exists for the same balance_id and provider, it updates the existing record.
type IDataSource ¶
type IDataSource interface {
// contains filtered or unexported methods
}
IDataSource defines the interface for data source operations, grouping related functionalities.
func NewDataSource ¶
func NewDataSource(configuration *config.Configuration) (IDataSource, error)
NewDataSource initializes a new database connection.