Documentation
¶
Index ¶
Constants ¶
const ( // NoHeadBlockWaitTime is the amount of // time we wait when no blocks have been // synced. NoHeadBlockWaitTime = 1 * time.Second // NoJobsWaitTime is the amount of time // we wait when no jobs are available // to process. NoJobsWaitTime = 10 * time.Second )
Variables ¶
var ( // ErrJobsUnretrievable is returned when an error // is returned when querying for jobs. ErrJobsUnretrievable = errors.New("unable to retrieve jobs") // ErrBroadcastsUnretrievable is returned when an error // is returned when querying for broadcasts. ErrBroadcastsUnretrievable = errors.New("unable to retrieve broadcasts") // ErrNoAvailableJobs is returned when it is not possible // to process any jobs. If this is returned, you should wait // and retry. ErrNoAvailableJobs = errors.New("no jobs available") // ErrReturnFundsComplete is returned when it is not possible // to process any more ReturnFundsWorkflows or when there is no provided // ReturnsFundsWorkflow. ErrReturnFundsComplete = errors.New("return funds complete") // ErrJobMissing is returned when the coordinator is invoked with // a broadcast complete call but the job that is affected does // not exist. ErrJobMissing = errors.New("job missing") // ErrDuplicateWorkflows is returned when 2 Workflows with the same name // are provided as an input to NewCoordinator. ErrDuplicateWorkflows = errors.New("duplicate workflows") // ErrIncorrectConcurrency is returned when CreateAccount or RequestFunds // have a concurrency greater than 1. ErrIncorrectConcurrency = errors.New("incorrect concurrency") // ErrInvalidConcurrency is returned when the concurrency of a Workflow // is <= 0. ErrInvalidConcurrency = errors.New("invalid concurrency") // ErrStalled is returned when the caller does not define // a CreateAccount and/or RequestFunds workflow and we run out // of available options (i.e. we can't do anything). ErrStalled = errors.New("processing stalled") // ErrNoWorkflows is returned when no workflows are provided // during initialization. ErrNoWorkflows = errors.New("no workflows") )
Functions ¶
This section is empty.
Types ¶
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
Coordinator faciliates the creation and processing of jobs.
func New ¶
func New( storage JobStorage, helper Helper, handler Handler, parser *parser.Parser, inputWorkflows []*job.Workflow, ) (*Coordinator, error)
New parses a slice of input Workflows and creates a new *Coordinator.
func (*Coordinator) BroadcastComplete ¶
func (c *Coordinator) BroadcastComplete( ctx context.Context, dbTx database.Transaction, jobIdentifier string, transaction *types.Transaction, ) error
BroadcastComplete is called by the broadcast coordinator when a transaction broadcast has completed. If the transaction is nil, then the transaction did not succeed.
func (*Coordinator) Process ¶
func (c *Coordinator) Process( ctx context.Context, ) error
Process creates and executes jobs until failure.
func (*Coordinator) ReturnFunds ¶
func (c *Coordinator) ReturnFunds( ctx context.Context, ) error
ReturnFunds attempts to execute the ReturnFunds workflow until it is no longer satisfiable. This is typically called on shutdown to return funds to a faucet.
type Handler ¶
type Handler interface {
TransactionCreated(
context.Context,
string,
*types.TransactionIdentifier,
) error
}
Handler is an interface called by the coordinator whenever an address is created or a transaction is created.
type Helper ¶
type Helper interface {
// HeadBlockExists returns a boolean indicating if a block
// has been synced by BlockStorage.
HeadBlockExists(context.Context) bool
// DatabaseTransaction returns a new database.Transaction.
// This is used to update jobs and enque them for broadcast atomically.
DatabaseTransaction(context.Context) database.Transaction
// StoreKey is called to persist a
// *types.AccountIdentifier + KeyPair.
StoreKey(
context.Context,
database.Transaction,
*types.AccountIdentifier,
*keys.KeyPair,
) error
// GetKey is called to get the *types.KeyPair
// associated with an address.
GetKey(
context.Context,
database.Transaction,
*types.AccountIdentifier,
) (*keys.KeyPair, error)
// AllAccounts returns a slice of all known *types.AccountIdentifier.
AllAccounts(
context.Context,
database.Transaction,
) ([]*types.AccountIdentifier, error)
// LockedAccounts is a slice of all *types.AccountIdentifier currently sending or receiving
// funds.
LockedAccounts(
context.Context,
database.Transaction,
) ([]*types.AccountIdentifier, error)
// Balance returns the balance
// for a provided address.
Balance(
context.Context,
database.Transaction,
*types.AccountIdentifier,
*types.Currency,
) (*types.Amount, error)
// Coins returns all *types.Coin owned by an address.
Coins(
context.Context,
database.Transaction,
*types.AccountIdentifier,
*types.Currency,
) ([]*types.Coin, error)
// BroadcastAll broadcasts all transactions considered ready for
// broadcast (unbroadcasted or stale).
BroadcastAll(context.Context) error
// Broadcast enqueues a particular intent for broadcast.
Broadcast(
context.Context,
database.Transaction,
string,
*types.NetworkIdentifier,
[]*types.Operation,
*types.TransactionIdentifier,
string,
int64,
) error
// Derive returns a new *types.AccountIdentifier for a provided publicKey.
Derive(
context.Context,
*types.NetworkIdentifier,
*types.PublicKey,
map[string]interface{},
) (*types.AccountIdentifier, map[string]interface{}, error)
// Preprocess calls the /construction/preprocess endpoint
// on an offline node.
Preprocess(
context.Context,
*types.NetworkIdentifier,
[]*types.Operation,
map[string]interface{},
) (map[string]interface{}, []*types.AccountIdentifier, error)
// Metadata calls the /construction/metadata endpoint
// using the online node.
Metadata(
context.Context,
*types.NetworkIdentifier,
map[string]interface{},
[]*types.PublicKey,
) (map[string]interface{}, []*types.Amount, error)
// Payloads calls the /construction/payloads endpoint
// using the offline node.
Payloads(
context.Context,
*types.NetworkIdentifier,
[]*types.Operation,
map[string]interface{},
[]*types.PublicKey,
) (string, []*types.SigningPayload, error)
// Parse calls the /construction/parse endpoint
// using the offline node.
Parse(
context.Context,
*types.NetworkIdentifier,
bool,
string,
) ([]*types.Operation, []*types.AccountIdentifier, map[string]interface{}, error)
// Combine calls the /construction/combine endpoint
// using the offline node.
Combine(
context.Context,
*types.NetworkIdentifier,
string,
[]*types.Signature,
) (string, error)
// Hash calls the /construction/hash endpoint
// using the offline node.
Hash(
context.Context,
*types.NetworkIdentifier,
string,
) (*types.TransactionIdentifier, error)
// Sign returns signatures for the provided
// payloads.
Sign(
context.Context,
[]*types.SigningPayload,
) ([]*types.Signature, error)
// SetBlob transactionally persists
// a key and value.
SetBlob(
ctx context.Context,
dbTx database.Transaction,
key string,
value []byte,
) error
// GetBlob transactionally retrieves
// a key and value.
GetBlob(
ctx context.Context,
dbTx database.Transaction,
key string,
) (bool, []byte, error)
}
Helper is used by the coordinator to process Jobs. It is a superset of functions required by the constructor/worker.Helper.
type JobStorage ¶
type JobStorage interface {
// Ready returns the jobs that are ready to be processed.
Ready(
context.Context,
database.Transaction,
) ([]*job.Job, error)
// Broadcasting returns all jobs that are broadcasting.
Broadcasting(
context.Context,
database.Transaction,
) ([]*job.Job, error)
// Processing returns the number of jobs processing
// for a particular workflow.
Processing(
context.Context,
database.Transaction,
string,
) ([]*job.Job, error)
// Update stores an updated *Job in storage
// and returns its UUID (which won't exist
// on first update).
Update(context.Context, database.Transaction, *job.Job) (string, error)
// Get fetches a *Job by Identifier. It returns an error
// if the identifier doesn't exist.
Get(context.Context, database.Transaction, string) (*job.Job, error)
}
JobStorage allows for the persistent and transactional storage of Jobs.