Documentation
¶
Overview ¶
Package jd provides a comprehensive framework for interacting with Job Distributor (JD) services in the Chainlink Deployments Framework ecosystem.
Overview ¶
The JD package enables seamless integration with Job Distributor services through gRPC communication. It supports multiple authentication mechanisms and provides a unified interface for job management operations.
Architecture ¶
The package consists of three main components:
1. JD Client (client.go) - Core gRPC client implementation 2. Client Provider (provider/client_provider.go) - Connects to existing JD services 3. CTF Provider (provider/ctf_provider.go) - Creates and manages JD Docker containers for testing
Basic Usage ¶
Simple Connection ¶
For basic connectivity without authentication:
import (
"context"
"google.golang.org/grpc/credentials/insecure"
"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd"
)
config := jd.JDConfig{
GRPC: "localhost:9090",
WSRPC: "ws://localhost:9091"
}
client, err := jd.NewJDClient(config)
if err != nil {
log.Fatal(err)
}
// Use client for operations
import csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa"
keypairs, err := client.ListKeypairs(ctx, &csav1.ListKeypairsRequest{})
Provider Interface ¶
import (
"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd/provider"
)
providerConfig := provider.ClientOffchainProviderConfig{
GRPC: "localhost:9090",
WSRPC: "ws://localhost:9091",
Creds: insecure.NewCredentials(),
}
prov, err := provider.NewClientOffchainProvider(providerConfig)
if err != nil {
log.Fatal(err)
}
// Initialize with environment
err = prov.Initialize(env)
if err != nil {
log.Fatal(err)
}
// Get client
client, err := prov.OffchainClient()
// client is compatible with the Offchain field in the Environment struct
CTF Provider (Testing) ¶
For testing scenarios where you need to spin up JD Docker containers:
import (
"testing"
"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd/provider"
)
func TestWithJD(t *testing.T) {
config := provider.CTFOffchainProviderConfig{
Image: "job-distributor:latest",
// OR use environment variable: CTF_JD_IMAGE
// Optional PostgreSQL configuration
PostgresPort: 5432,
PostgresHost: "localhost",
PostgresUser: "chainlink",
PostgresPassword: "chainlink",
PostgresDBName: "chainlink",
// Optional JD configuration
GRPCPort: 14231,
WebSocketRPCPort: 8080,
CSAEncryptionKey: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
JDSQLDumpPath: "./migrations.sql", // Optional
}
// Create CTF provider
jdProvider := provider.NewCTFOffchainProvider(t, config)
// Initialize (starts Docker containers with health check)
ctx := context.Background()
client, err := jdProvider.Initialize(ctx)
if err != nil {
t.Fatalf("Failed to initialize JD: %v", err)
}
// Use client for testing
keypairs, err := client.ListKeypairs(ctx, &csav1.ListKeypairsRequest{})
if err != nil {
t.Fatalf("Failed to list keypairs: %v", err)
}
t.Logf("JD service ready with %d keypairs", len(keypairs.Keypairs))
}
The CTF provider automatically: - Starts PostgreSQL container with proper schema - Starts JD container with correct configuration - Performs health checks using retry logic - Cleans up containers when tests complete
Environment Variable Configuration ¶
You can specify the JD Docker image via environment variable:
export CTF_JD_IMAGE=localhost:5001/job-distributor:latest
config := provider.CTFOffchainProviderConfig{
// Image field can be omitted when CTF_JD_IMAGE is set
}
Health Check ¶
The CTF provider includes built-in health checking that retries `GetKeypair` calls: - 10 retry attempts with 2-second delays - Ensures JD service is fully ready before returning
Authentication ¶
The package supports three authentication mechanisms:
1. No Authentication ¶
For development or internal networks:
config := jd.JDConfig{
GRPC: "localhost:9090",
Creds: insecure.NewCredentials(),
}
2. OAuth2 Authentication ¶
For services requiring OAuth2 Bearer tokens:
import "golang.org/x/oauth2"
tokenSource := oauth2.StaticTokenSource(&oauth2.Token{
AccessToken: "your-access-token",
TokenType: "Bearer",
})
config := jd.JDConfig{
GRPC: "secure.jobdistributor.com:443",
WSRPC: "wss://secure.jobdistributor.com:443/ws",
OAuth2: tokenSource,
Creds: credentials.NewTLS(&tls.Config{}),
}
Client Operations ¶
The JD client supports various operations through gRPC service interfaces:
CSA Key Management ¶
import csav1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/csa"
// List CSA keypairs
keypairs, err := client.ListKeypairs(ctx, &csav1.ListKeypairsRequest{})
if err != nil {
log.Printf("Failed to list CSA keypairs: %v", err)
}
for _, keypair := range keypairs.Keypairs {
log.Printf("CSA Public Key: %s", keypair.PublicKey)
}
Job Management ¶
import jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
// List existing jobs
jobs, err := client.ListJobs(ctx, &jobv1.ListJobsRequest{})
if err != nil {
log.Printf("Failed to list jobs: %v", err)
}
for _, job := range jobs.Jobs {
log.Printf("Job ID: %s", job.Id)
}
// Propose a new job
jobSpec := &jobv1.ProposeJobRequest{
NodeIds: []string{"node-1", "node-2"},
Spec: "job specification here",
}
response, err := client.ProposeJob(ctx, jobSpec)
if err != nil {
log.Printf("Failed to propose job: %v", err)
}
Node Management ¶
import nodev1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node"
// List registered nodes
nodes, err := client.ListNodes(ctx, &nodev1.ListNodesRequest{})
if err != nil {
log.Printf("Failed to list nodes: %v", err)
}
for _, node := range nodes.Nodes {
log.Printf("Node ID: %s", node.Id)
}
Configuration Validation ¶
The provider automatically validates configurations:
config := provider.ClientOffchainProviderConfig{
GRPC: "", // Invalid - will cause validation error
}
prov, err := provider.NewClientOffchainProvider(config)
if err != nil {
// Handle validation error
log.Printf("Invalid configuration: %v", err)
}
Dry Run Mode ¶
The package includes a dry run client that provides safe testing capabilities without affecting real Job Distributor operations:
import (
"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)
// Create a real client for read operations
realClient, err := jd.NewJDClient(jd.JDConfig{
GRPC: "localhost:9090",
Creds: insecure.NewCredentials(),
})
if err != nil {
log.Fatal(err)
}
// Wrap with dry run client
dryRunClient := jd.NewDryRunJobDistributor(realClient, logger.DefaultLogger)
// Read operations work normally (forwarded to real backend)
jobs, err := dryRunClient.ListJobs(ctx, &jobv1.ListJobsRequest{})
// Write operations are simulated (logged but not executed)
response, err := dryRunClient.ProposeJob(ctx, &jobv1.ProposeJobRequest{
NodeId: "test-node",
Spec: "test job spec",
})
// Returns mock response without actually proposing the job
Dry Run with Provider (Recommended) ¶
For a cleaner approach, use the provider's functional option:
import (
"github.com/smartcontractkit/chainlink-deployments-framework/offchain/jd/provider"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)
// Create provider with dry run mode enabled
jdProvider := provider.NewClientOffchainProvider(
provider.ClientOffchainProviderConfig{
GRPC: "localhost:9090",
Creds: insecure.NewCredentials(),
},
provider.WithDryRun(logger),
)
// Initialize - returns a dry run client automatically
client, err := jdProvider.Initialize(ctx)
if err != nil {
log.Fatal(err)
}
// All operations now use dry run mode
jobs, err := client.ListJobs(ctx, &jobv1.ListJobsRequest{}) // Read: forwarded
response, err := client.ProposeJob(ctx, &jobv1.ProposeJobRequest{ // Write: simulated
NodeId: "test-node",
Spec: "test job spec",
})
Index ¶
- type CognitoAuth
- type CognitoClient
- type CognitoTokenSource
- type DryRunJobDistributor
- func (d *DryRunJobDistributor) BatchProposeJob(ctx context.Context, in *jobv1.BatchProposeJobRequest, opts ...grpc.CallOption) (*jobv1.BatchProposeJobResponse, error)
- func (d *DryRunJobDistributor) DeleteJob(ctx context.Context, in *jobv1.DeleteJobRequest, opts ...grpc.CallOption) (*jobv1.DeleteJobResponse, error)
- func (d *DryRunJobDistributor) DisableNode(ctx context.Context, in *nodev1.DisableNodeRequest, opts ...grpc.CallOption) (*nodev1.DisableNodeResponse, error)
- func (d *DryRunJobDistributor) EnableNode(ctx context.Context, in *nodev1.EnableNodeRequest, opts ...grpc.CallOption) (*nodev1.EnableNodeResponse, error)
- func (d *DryRunJobDistributor) GetJob(ctx context.Context, in *jobv1.GetJobRequest, opts ...grpc.CallOption) (*jobv1.GetJobResponse, error)
- func (d *DryRunJobDistributor) GetKeypair(ctx context.Context, in *csav1.GetKeypairRequest, opts ...grpc.CallOption) (*csav1.GetKeypairResponse, error)
- func (d *DryRunJobDistributor) GetNode(ctx context.Context, in *nodev1.GetNodeRequest, opts ...grpc.CallOption) (*nodev1.GetNodeResponse, error)
- func (d *DryRunJobDistributor) GetProposal(ctx context.Context, in *jobv1.GetProposalRequest, opts ...grpc.CallOption) (*jobv1.GetProposalResponse, error)
- func (d *DryRunJobDistributor) ListJobs(ctx context.Context, in *jobv1.ListJobsRequest, opts ...grpc.CallOption) (*jobv1.ListJobsResponse, error)
- func (d *DryRunJobDistributor) ListKeypairs(ctx context.Context, in *csav1.ListKeypairsRequest, opts ...grpc.CallOption) (*csav1.ListKeypairsResponse, error)
- func (d *DryRunJobDistributor) ListNodeChainConfigs(ctx context.Context, in *nodev1.ListNodeChainConfigsRequest, ...) (*nodev1.ListNodeChainConfigsResponse, error)
- func (d *DryRunJobDistributor) ListNodes(ctx context.Context, in *nodev1.ListNodesRequest, opts ...grpc.CallOption) (*nodev1.ListNodesResponse, error)
- func (d *DryRunJobDistributor) ListProposals(ctx context.Context, in *jobv1.ListProposalsRequest, opts ...grpc.CallOption) (*jobv1.ListProposalsResponse, error)
- func (d *DryRunJobDistributor) ProposeJob(ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption) (*jobv1.ProposeJobResponse, error)
- func (d *DryRunJobDistributor) RegisterNode(ctx context.Context, in *nodev1.RegisterNodeRequest, opts ...grpc.CallOption) (*nodev1.RegisterNodeResponse, error)
- func (d *DryRunJobDistributor) RevokeJob(ctx context.Context, in *jobv1.RevokeJobRequest, opts ...grpc.CallOption) (*jobv1.RevokeJobResponse, error)
- func (d *DryRunJobDistributor) UpdateJob(ctx context.Context, in *jobv1.UpdateJobRequest, opts ...grpc.CallOption) (*jobv1.UpdateJobResponse, error)
- func (d *DryRunJobDistributor) UpdateNode(ctx context.Context, in *nodev1.UpdateNodeRequest, opts ...grpc.CallOption) (*nodev1.UpdateNodeResponse, error)
- type JDConfig
- type JobDistributor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CognitoAuth ¶ added in v0.37.0
type CognitoAuth struct {
AWSRegion string
AppClientID string
AppClientSecret string
Username string
Password string
}
CognitoAuth contains the Cognito authentication information required to generate a token from Cognito.
type CognitoClient ¶ added in v0.37.0
type CognitoClient interface {
InitiateAuth(
ctx context.Context,
params *cognitoidentityprovider.InitiateAuthInput,
optFns ...func(*cognitoidentityprovider.Options),
) (*cognitoidentityprovider.InitiateAuthOutput, error)
}
CognitoClient defines the interface for Cognito Identity Provider operations. This interface allows for mocking the AWS Cognito client in unit tests.
type CognitoTokenSource ¶ added in v0.37.0
type CognitoTokenSource struct {
// contains filtered or unexported fields
}
CognitoTokenSource provides a oath2 token that is used to authenticate with the Job Distributor.
func NewCognitoTokenSource ¶ added in v0.37.0
func NewCognitoTokenSource(auth CognitoAuth) *CognitoTokenSource
NewCognitoTokenSource creates a new CognitoTokenSource with the given CognitoAuth configuration. If client is nil, a real AWS Cognito client will be created when Authenticate is called.
func (*CognitoTokenSource) Authenticate ¶ added in v0.37.0
func (c *CognitoTokenSource) Authenticate(ctx context.Context) error
Authenticate performs user authentication against AWS Cognito using the USER_PASSWORD_AUTH flow.
Authentication results are cached in the authResult field and used by the Token() method to provide OAuth2 access tokens without re-authenticating.
func (*CognitoTokenSource) RefreshToken ¶ added in v0.38.0
func (c *CognitoTokenSource) RefreshToken(ctx context.Context) error
RefreshToken refreshes the access token using the stored refresh token via the REFRESH_TOKEN_AUTH flow.
This method uses the refresh token from the cached authentication result to obtain new access and ID tokens without reusing the user's credentials. This is more efficient than full re-authentication and follows OAuth2 best practices.
The method:
- Uses the cached refresh token to call InitiateAuth with REFRESH_TOKEN_AUTH flow
- Updates the cached authentication result with new tokens
- Calculates and stores the new token expiry time
Returns an error if the refresh fails (e.g., refresh token expired or invalid).
func (*CognitoTokenSource) Token ¶ added in v0.37.0
func (c *CognitoTokenSource) Token() (*oauth2.Token, error)
Token retrieves an OAuth2 access token for authenticating with the Job Distributor service.
This method implements a lazy loading pattern with automatic token refresh:
- If no authentication result is cached, it authenticates with AWS Cognito
- If the cached token has expired, it refreshes using the refresh token (REFRESH_TOKEN_AUTH flow)
- Otherwise, it returns the cached access token
- Returns the access token wrapped in an oauth2.Token struct
The method implements the oauth2.TokenSource interface, making it compatible with standard OAuth2 client libraries and HTTP clients that support token sources. Following OAuth2 best practices, it uses the refresh token when available to refresh the access token.
Note: This method uses context.Background() for authentication/refresh if no cached token exists or if the token needs to be refreshed. For more control over authentication context and timeout behavior, consider calling Authenticate() or RefreshToken() explicitly before calling Token().
Returns an OAuth2 token containing the Cognito access token
func (*CognitoTokenSource) TokenExpiresAt ¶ added in v0.38.0
func (c *CognitoTokenSource) TokenExpiresAt() time.Time
TokenExpiresAt returns the time when the cached token expires.
type DryRunJobDistributor ¶ added in v0.37.1
type DryRunJobDistributor struct {
// contains filtered or unexported fields
}
DryRunJobDistributor is a readonly JD client. Read operations are forwarded to the real backend, while write operations are ignored.
func NewDryRunJobDistributor ¶ added in v0.37.1
func NewDryRunJobDistributor(realBackend offchain.Client, lggr logger.Logger) *DryRunJobDistributor
NewDryRunJobDistributor creates a new DryRunJobDistributor.
func (*DryRunJobDistributor) BatchProposeJob ¶ added in v0.37.1
func (d *DryRunJobDistributor) BatchProposeJob(ctx context.Context, in *jobv1.BatchProposeJobRequest, opts ...grpc.CallOption) (*jobv1.BatchProposeJobResponse, error)
BatchProposeJob simulates proposing multiple jobs in a batch to the Job Distributor without actually submitting them. In dry run mode, this returns an empty response indicating the batch operation was logged but not executed.
func (*DryRunJobDistributor) DeleteJob ¶ added in v0.37.1
func (d *DryRunJobDistributor) DeleteJob(ctx context.Context, in *jobv1.DeleteJobRequest, opts ...grpc.CallOption) (*jobv1.DeleteJobResponse, error)
DeleteJob simulates deleting a job from the Job Distributor without actually executing the deletion. In dry run mode, this returns an empty response indicating the deletion was logged but not executed.
func (*DryRunJobDistributor) DisableNode ¶ added in v0.37.1
func (d *DryRunJobDistributor) DisableNode(ctx context.Context, in *nodev1.DisableNodeRequest, opts ...grpc.CallOption) (*nodev1.DisableNodeResponse, error)
DisableNode simulates disabling a node in the Job Distributor without actually executing the operation. In dry run mode, this returns an empty response indicating the node disable operation was logged but not executed.
func (*DryRunJobDistributor) EnableNode ¶ added in v0.37.1
func (d *DryRunJobDistributor) EnableNode(ctx context.Context, in *nodev1.EnableNodeRequest, opts ...grpc.CallOption) (*nodev1.EnableNodeResponse, error)
EnableNode simulates enabling a node in the Job Distributor without actually executing the operation. In dry run mode, this returns an empty response indicating the node enable operation was logged but not executed.
func (*DryRunJobDistributor) GetJob ¶ added in v0.37.1
func (d *DryRunJobDistributor) GetJob(ctx context.Context, in *jobv1.GetJobRequest, opts ...grpc.CallOption) (*jobv1.GetJobResponse, error)
GetJob retrieves a specific job by its ID from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.
func (*DryRunJobDistributor) GetKeypair ¶ added in v0.37.1
func (d *DryRunJobDistributor) GetKeypair(ctx context.Context, in *csav1.GetKeypairRequest, opts ...grpc.CallOption) (*csav1.GetKeypairResponse, error)
GetKeypair retrieves a specific CSA keypair from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.
func (*DryRunJobDistributor) GetNode ¶ added in v0.37.1
func (d *DryRunJobDistributor) GetNode(ctx context.Context, in *nodev1.GetNodeRequest, opts ...grpc.CallOption) (*nodev1.GetNodeResponse, error)
GetNode retrieves information about a specific node from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.
func (*DryRunJobDistributor) GetProposal ¶ added in v0.37.1
func (d *DryRunJobDistributor) GetProposal(ctx context.Context, in *jobv1.GetProposalRequest, opts ...grpc.CallOption) (*jobv1.GetProposalResponse, error)
GetProposal retrieves a specific job proposal by its ID from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.
func (*DryRunJobDistributor) ListJobs ¶ added in v0.37.1
func (d *DryRunJobDistributor) ListJobs(ctx context.Context, in *jobv1.ListJobsRequest, opts ...grpc.CallOption) (*jobv1.ListJobsResponse, error)
ListJobs retrieves a list of all jobs from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.
func (*DryRunJobDistributor) ListKeypairs ¶ added in v0.37.1
func (d *DryRunJobDistributor) ListKeypairs(ctx context.Context, in *csav1.ListKeypairsRequest, opts ...grpc.CallOption) (*csav1.ListKeypairsResponse, error)
ListKeypairs retrieves a list of all CSA keypairs from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.
func (*DryRunJobDistributor) ListNodeChainConfigs ¶ added in v0.37.1
func (d *DryRunJobDistributor) ListNodeChainConfigs(ctx context.Context, in *nodev1.ListNodeChainConfigsRequest, opts ...grpc.CallOption) (*nodev1.ListNodeChainConfigsResponse, error)
ListNodeChainConfigs retrieves chain configuration information for nodes from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.
func (*DryRunJobDistributor) ListNodes ¶ added in v0.37.1
func (d *DryRunJobDistributor) ListNodes(ctx context.Context, in *nodev1.ListNodesRequest, opts ...grpc.CallOption) (*nodev1.ListNodesResponse, error)
ListNodes retrieves a list of all nodes registered with the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.
func (*DryRunJobDistributor) ListProposals ¶ added in v0.37.1
func (d *DryRunJobDistributor) ListProposals(ctx context.Context, in *jobv1.ListProposalsRequest, opts ...grpc.CallOption) (*jobv1.ListProposalsResponse, error)
ListProposals retrieves a list of all job proposals from the Job Distributor. This operation is forwarded to the real backend since it's a read-only operation.
func (*DryRunJobDistributor) ProposeJob ¶ added in v0.37.1
func (d *DryRunJobDistributor) ProposeJob(ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption) (*jobv1.ProposeJobResponse, error)
ProposeJob simulates proposing a new job to the Job Distributor without actually submitting it. In dry run mode, this returns a mock proposal response with a dummy job ID indicating the job was not actually proposed to the node.
func (*DryRunJobDistributor) RegisterNode ¶ added in v0.37.1
func (d *DryRunJobDistributor) RegisterNode(ctx context.Context, in *nodev1.RegisterNodeRequest, opts ...grpc.CallOption) (*nodev1.RegisterNodeResponse, error)
RegisterNode simulates registering a new node with the Job Distributor without actually executing the registration. In dry run mode, this returns an empty response indicating the node registration was logged but not executed.
func (*DryRunJobDistributor) RevokeJob ¶ added in v0.37.1
func (d *DryRunJobDistributor) RevokeJob(ctx context.Context, in *jobv1.RevokeJobRequest, opts ...grpc.CallOption) (*jobv1.RevokeJobResponse, error)
RevokeJob simulates revoking a job from the Job Distributor without actually executing the revocation. In dry run mode, this returns an empty response indicating the revocation was logged but not executed.
func (*DryRunJobDistributor) UpdateJob ¶ added in v0.37.1
func (d *DryRunJobDistributor) UpdateJob(ctx context.Context, in *jobv1.UpdateJobRequest, opts ...grpc.CallOption) (*jobv1.UpdateJobResponse, error)
UpdateJob simulates updating an existing job in the Job Distributor without actually executing the update. In dry run mode, this returns an empty response indicating the update was logged but not executed.
func (*DryRunJobDistributor) UpdateNode ¶ added in v0.37.1
func (d *DryRunJobDistributor) UpdateNode(ctx context.Context, in *nodev1.UpdateNodeRequest, opts ...grpc.CallOption) (*nodev1.UpdateNodeResponse, error)
UpdateNode simulates updating an existing node in the Job Distributor without actually executing the update. In dry run mode, this returns an empty response indicating the node update was logged but not executed.
type JDConfig ¶
type JDConfig struct {
GRPC string
WSRPC string
Creds credentials.TransportCredentials
Auth oauth2.TokenSource
}
JDConfig is the configuration for the Job Distributor client.
type JobDistributor ¶
type JobDistributor struct {
nodev1.NodeServiceClient
jobv1.JobServiceClient
csav1.CSAServiceClient
WSRPC string
}
JobDistributor is the client for the Job Distributor service.
func NewJDClient ¶
func NewJDClient(cfg JDConfig) (*JobDistributor, error)
NewJDClient creates a new Job Distributor client
func (*JobDistributor) GetCSAPublicKey ¶
func (jd *JobDistributor) GetCSAPublicKey(ctx context.Context) (string, error)
GetCSAPublicKey returns the public key for the CSA service
func (*JobDistributor) ProposeJob ¶
func (jd *JobDistributor) ProposeJob(ctx context.Context, in *jobv1.ProposeJobRequest, opts ...grpc.CallOption) (*jobv1.ProposeJobResponse, error)
ProposeJob proposes jobs through the jobService and accepts the proposed job on selected node based on ProposeJobRequest.NodeId