Documentation
¶
Index ¶
- Constants
- Variables
- func AdminBatchRefreshWorkflowTasks(c *cli.Context, clientFactory ClientFactory, prompter *Prompter) error
- func AdminDecodeBase64(c *cli.Context) error
- func AdminDecodeProto(c *cli.Context) error
- func AdminDeleteWorkflow(c *cli.Context, clientFactory ClientFactory, prompter *Prompter) error
- func AdminDescribeExecution(c *cli.Context, clientFactory ClientFactory) error
- func AdminDescribeHistoryHost(c *cli.Context, clientFactory ClientFactory) error
- func AdminDescribeShard(c *cli.Context, clientFactory ClientFactory) error
- func AdminDescribeTaskQueuePartition(c *cli.Context, clientFactory ClientFactory) error
- func AdminForceUnloadTaskQueuePartition(c *cli.Context, clientFactory ClientFactory) error
- func AdminGetShardID(c *cli.Context) error
- func AdminImportWorkflow(c *cli.Context, clientFactory ClientFactory) error
- func AdminListClusterMembers(c *cli.Context, clientFactory ClientFactory) error
- func AdminListGossipMembers(c *cli.Context, clientFactory ClientFactory) error
- func AdminListShardTasks(c *cli.Context, clientFactory ClientFactory, ...) error
- func AdminListTaskQueueTasks(c *cli.Context, clientFactory ClientFactory) error
- func AdminMigrateSchedule(c *cli.Context, clientFactory ClientFactory) error
- func AdminRebuildMutableState(c *cli.Context, clientFactory ClientFactory) error
- func AdminRefreshWorkflowTasks(c *cli.Context, clientFactory ClientFactory) error
- func AdminRemoveTask(c *cli.Context, clientFactory ClientFactory, ...) error
- func AdminReplicateWorkflow(c *cli.Context, clientFactory ClientFactory) error
- func AdminShardManagement(c *cli.Context, clientFactory ClientFactory) error
- func AdminShowWorkflow(c *cli.Context, clientFactory ClientFactory) error
- func NewCliApp(opts ...Option) *cli.App
- func StringToEnum(search string, candidates map[string]int32) (int32, error)
- type BoolFlagLookup
- type ClientFactory
- type ClientFactoryOption
- type DLQJobService
- type DLQMessage
- type DLQService
- type DLQServiceProvider
- type DLQV1Service
- type DLQV2Service
- type DefaultFrontendAddressProvider
- type HttpGetter
- type Option
- type Params
- type PredefinedTaskBlobDeserializer
- type Prompter
- type PrompterFactory
- type PrompterOption
- type PrompterParams
- type ProtoTaskBlobEncoder
- type TaskBlobEncoder
- type TaskBlobEncoderFn
- type TaskBlobProtoDeserializer
- type TaskPayload
Examples ¶
Constants ¶
const (
DefaultFrontendAddress = "127.0.0.1:7233"
)
Variables ¶
var ( FlagAddress = "address" FlagHistoryAddress = "history-address" FlagNamespaceID = "namespace-id" FlagNamespace = "namespace" FlagNamespaceAlias = []string{"n"} FlagShardID = "shard-id" FlagWorkflowID = "workflow-id" FlagWorkflowIDAlias = []string{"wid"} FlagRunID = "run-id" FlagRunIDAlias = []string{"rid"} FlagBusinessID = "business-id" FlagBusinessIDAlias = []string{"bid", FlagWorkflowID, FlagWorkflowIDAlias[0]} FlagArchetype = "archetype" FlagNumberOfShards = "number-of-shards" FlagMinEventID = "min-event-id" FlagMaxEventID = "max-event-id" FlagTaskQueue = "task-queue" FlagTaskQueueType = "task-queue-type" FlagContextTimeout = "context-timeout" FlagContextTimeoutAlias = []string{"ct"} FlagCluster = "cluster" FlagTargetCluster = "target-cluster" FlagPageSize = "pagesize" FlagFrom = "from" FlagPrintFullyDetail = "print-full" FlagPrintJSON = "print-json" FlagHeartbeatedWithin = "heartbeated-within" FlagInputFilename = "input-filename" FlagOutputFilename = "output-filename" FlagClusterMembershipRole = "role" FlagSkipErrorMode = "skip-errors" FlagTaskID = "task-id" FlagTaskCategory = "task-category" FlagTaskVisibilityTimestamp = "task-timestamp" FlagMinVisibilityTimestamp = "min-visibility-ts" FlagMaxVisibilityTimestamp = "max-visibility-ts" FlagEnableTLS = "tls" FlagTLSCertPath = "tls-cert-path" FlagTLSKeyPath = "tls-key-path" FlagTLSCaPath = "tls-ca-path" FlagTLSDisableHostVerification = "tls-disable-host-verification" FlagTLSServerName = "tls-server-name" FlagLastMessageID = "last-message-id" FlagJobToken = "job-token" FlagReason = "reason" FlagYes = "yes" FlagMore = "more" FlagMinEventVersion = "min-event-version" FlagMaxEventVersion = "max-event-version" FlagMinTaskID = "min-task-id" FlagMaxTaskID = "max-task-id" FlagSubqueue = "subqueue" FlagDLQType = "dlq-type" FlagQueueType = "queue-type" FlagDLQVersion = "dlq-version" FlagMaxMessageCount = "max-message-count" FlagProtoType = "type" FlagHexData = "hex-data" FlagHexFile = "hex-file" FlagBinaryFile = "binary-file" FlagBase64Data = "base64-data" FlagBase64File = "base64-file" FlagTaskCategoryID = "task-category-id" FlagEncoding = "encoding" FlagPartitionID = "partition-id" FlagStickyName = "sticky-name" FlagBuildIDs = "select-build-id" FlagUnversioned = "select-unversioned" FlagAllActive = "select-all-active" FlagFair = "fair" FlagMinPass = "min-pass" FlagVisibilityQuery = "query" FlagJobID = "job-id" FlagDecode = "decode" FlagScheduleID = "schedule-id" FlagScheduleIDAlias = []string{"sid"} FlagTarget = "target" )
Flags used to specify cli command line arguments
Functions ¶
func AdminBatchRefreshWorkflowTasks ¶
func AdminBatchRefreshWorkflowTasks(c *cli.Context, clientFactory ClientFactory, prompter *Prompter) error
AdminBatchRefreshWorkflowTasks starts a batch job to refresh workflow tasks for multiple workflows
func AdminDecodeBase64 ¶
func AdminDecodeProto ¶
func AdminDeleteWorkflow ¶
func AdminDeleteWorkflow(c *cli.Context, clientFactory ClientFactory, prompter *Prompter) error
AdminDeleteWorkflow force deletes a workflow's mutable state (both concrete and current), history, and visibility records as long as it's possible. It should only be used as a troubleshooting tool since no additional check will be done before the deletion. (e.g. if a child workflow has recorded its result in the parent workflow) Please use normal workflow delete command to gracefully delete a workflow execution.
func AdminDescribeExecution ¶
func AdminDescribeExecution(c *cli.Context, clientFactory ClientFactory) error
AdminDescribeExecution describes a Temporal execution (CHASM tree or workflow).
func AdminDescribeHistoryHost ¶
func AdminDescribeHistoryHost(c *cli.Context, clientFactory ClientFactory) error
AdminDescribeHistoryHost describes history host
func AdminDescribeShard ¶
func AdminDescribeShard(c *cli.Context, clientFactory ClientFactory) error
AdminDescribeShard describes shard by shard id
func AdminDescribeTaskQueuePartition ¶
func AdminDescribeTaskQueuePartition(c *cli.Context, clientFactory ClientFactory) error
AdminDescribeTaskQueuePartition displays task queue partition information
func AdminForceUnloadTaskQueuePartition ¶
func AdminForceUnloadTaskQueuePartition(c *cli.Context, clientFactory ClientFactory) error
AdminForceUnloadTaskQueuePartition forcefully unloads a task queue partition
func AdminImportWorkflow ¶
func AdminImportWorkflow(c *cli.Context, clientFactory ClientFactory) error
AdminImportWorkflow imports history
func AdminListClusterMembers ¶
func AdminListClusterMembers(c *cli.Context, clientFactory ClientFactory) error
AdminListClusterMembers outputs a list of cluster members
func AdminListGossipMembers ¶
func AdminListGossipMembers(c *cli.Context, clientFactory ClientFactory) error
AdminListGossipMembers outputs a list of gossip members
func AdminListShardTasks ¶
func AdminListShardTasks(c *cli.Context, clientFactory ClientFactory, registry tasks.TaskCategoryRegistry) error
AdminListShardTasks outputs a list of a tasks for given Shard and Task Category
func AdminListTaskQueueTasks ¶
func AdminListTaskQueueTasks(c *cli.Context, clientFactory ClientFactory) error
AdminListTaskQueueTasks displays task information
func AdminMigrateSchedule ¶
func AdminMigrateSchedule(c *cli.Context, clientFactory ClientFactory) error
AdminMigrateSchedule migrates a schedule between V1 (workflow-backed) and V2 (CHASM).
func AdminRebuildMutableState ¶
func AdminRebuildMutableState(c *cli.Context, clientFactory ClientFactory) error
AdminRebuildMutableState rebuild a workflow mutable state using persisted history events
func AdminRefreshWorkflowTasks ¶
func AdminRefreshWorkflowTasks(c *cli.Context, clientFactory ClientFactory) error
AdminRefreshWorkflowTasks refreshes all the tasks of a workflow
func AdminRemoveTask ¶
func AdminRemoveTask( c *cli.Context, clientFactory ClientFactory, taskCategoryRegistry tasks.TaskCategoryRegistry, ) error
AdminRemoveTask describes history host
func AdminReplicateWorkflow ¶
func AdminReplicateWorkflow( c *cli.Context, clientFactory ClientFactory, ) error
AdminReplicateWorkflow force replicates a workflow by generating replication tasks
func AdminShardManagement ¶
func AdminShardManagement(c *cli.Context, clientFactory ClientFactory) error
AdminShardManagement describes history host
func AdminShowWorkflow ¶
func AdminShowWorkflow(c *cli.Context, clientFactory ClientFactory) error
AdminShowWorkflow shows history
Types ¶
type BoolFlagLookup ¶
BoolFlagLookup can be satisfied by github.com/urfave/cli/v2.Context.
type ClientFactory ¶
type ClientFactory interface {
AdminClient(c *cli.Context) adminservice.AdminServiceClient
WorkflowClient(c *cli.Context) workflowservice.WorkflowServiceClient
}
ClientFactory is used to construct rpc clients
func NewClientFactory ¶
func NewClientFactory(opts ...ClientFactoryOption) ClientFactory
NewClientFactory creates a new ClientFactory
type ClientFactoryOption ¶
type ClientFactoryOption func(params *clientFactoryParams)
ClientFactoryOption is used to configure the ClientFactory via NewClientFactory.
func WithFrontendAddress ¶
func WithFrontendAddress(address string) ClientFactoryOption
WithFrontendAddress ensures that admin clients created by the factory will connect to the specified address.
type DLQJobService ¶
type DLQJobService struct {
// contains filtered or unexported fields
}
func (*DLQJobService) DescribeJob ¶
func (ac *DLQJobService) DescribeJob(c *cli.Context) error
type DLQMessage ¶
type DLQMessage struct {
// MessageID is the ID of the message within the DLQ. You can use this ID as an input to the `--last_message_id`
// flag for the `purge` and `merge` commands.
MessageID int64 `json:"message_id"`
// ShardID is only used for non-namespace replication tasks.
ShardID int32 `json:"shard_id"`
// Payload contains the parsed task metadata from the server.
Payload *TaskPayload `json:"payload"`
}
DLQMessage is used primarily to form the JSON output of the `read` command. It's only used for v2.
type DLQService ¶
type DLQServiceProvider ¶
type DLQServiceProvider struct {
// contains filtered or unexported fields
}
func NewDLQServiceProvider ¶
func NewDLQServiceProvider( clientFactory ClientFactory, taskBlobEncoder TaskBlobEncoder, taskCategoryRegistry tasks.TaskCategoryRegistry, writer io.Writer, prompterFactory PrompterFactory, ) *DLQServiceProvider
func (*DLQServiceProvider) GetDLQJobService ¶
func (p *DLQServiceProvider) GetDLQJobService() DLQJobService
GetDLQJobService returns a DLQJobService.
func (*DLQServiceProvider) GetDLQService ¶
func (p *DLQServiceProvider) GetDLQService( c *cli.Context, ) (DLQService, error)
GetDLQService returns a DLQService based on FlagDLQVersion.
type DLQV1Service ¶
type DLQV1Service struct {
// contains filtered or unexported fields
}
func NewDLQV1Service ¶
func NewDLQV1Service(clientFactory ClientFactory, prompter *Prompter, writer io.Writer) *DLQV1Service
func (*DLQV1Service) ListQueues ¶
func (ac *DLQV1Service) ListQueues(c *cli.Context) error
func (*DLQV1Service) MergeMessages ¶
func (ac *DLQV1Service) MergeMessages(c *cli.Context) error
func (*DLQV1Service) PurgeMessages ¶
func (ac *DLQV1Service) PurgeMessages(c *cli.Context) error
func (*DLQV1Service) ReadMessages ¶
func (ac *DLQV1Service) ReadMessages(c *cli.Context) (err error)
type DLQV2Service ¶
type DLQV2Service struct {
// contains filtered or unexported fields
}
DLQV2Service implements DLQService for persistence.QueueV2.
func NewDLQJobService ¶
func NewDLQJobService( clientFactory ClientFactory, writer io.Writer, ) *DLQV2Service
func NewDLQV2Service ¶
func NewDLQV2Service( category tasks.Category, sourceCluster string, targetCluster string, clientFactory ClientFactory, writer io.Writer, prompter *Prompter, taskBlobEncoder TaskBlobEncoder, ) *DLQV2Service
func (*DLQV2Service) ListQueues ¶
func (ac *DLQV2Service) ListQueues(c *cli.Context) (err error)
func (*DLQV2Service) MergeMessages ¶
func (ac *DLQV2Service) MergeMessages(c *cli.Context) error
func (*DLQV2Service) PurgeMessages ¶
func (ac *DLQV2Service) PurgeMessages(c *cli.Context) error
func (*DLQV2Service) ReadMessages ¶
func (ac *DLQV2Service) ReadMessages(c *cli.Context) (err error)
type DefaultFrontendAddressProvider ¶
type DefaultFrontendAddressProvider struct{}
DefaultFrontendAddressProvider uses FlagAddress to determine the frontend address, defaulting to DefaultFrontendAddress if FlagAddress is not set or is empty.
func (DefaultFrontendAddressProvider) GetFrontendAddress ¶
func (d DefaultFrontendAddressProvider) GetFrontendAddress(c *cli.Context) string
type HttpGetter ¶
HttpGetter defines http.Client.Get(...) as an interface so we can mock it
type Params ¶
type Params struct {
// ClientFactory creates Temporal service clients for tdbg to use.
ClientFactory ClientFactory
// TaskCategoryRegistry is used to determine which task categories are available for tdbg to use.
TaskCategoryRegistry tasks.TaskCategoryRegistry
// Writer is used to write output from tdbg. The default is os.Stdout.
Writer io.Writer
// ErrWriter is used to write errors from tdbg. The default is os.Stderr.
ErrWriter io.Writer
// TaskBlobEncoder is needed for custom task serialization. The default uses PredefinedTaskBlobDeserializer.
TaskBlobEncoder TaskBlobEncoder
}
Params which are customizable for the CLI application.
type PredefinedTaskBlobDeserializer ¶
type PredefinedTaskBlobDeserializer struct{}
PredefinedTaskBlobDeserializer is a TaskBlobProtoDeserializer that deserializes task blobs into the predefined task categories that are used by Temporal. If your server has custom categories, you'll want to build something on top of this.
func NewPredefinedTaskBlobDeserializer ¶
func NewPredefinedTaskBlobDeserializer() PredefinedTaskBlobDeserializer
NewPredefinedTaskBlobDeserializer returns a TaskBlobProtoDeserializer that works for the stock task categories of the server. You need to extend this if you have custom task categories.
func (PredefinedTaskBlobDeserializer) Deserialize ¶
func (d PredefinedTaskBlobDeserializer) Deserialize(categoryID int, blob *commonpb.DataBlob) (proto.Message, error)
Deserialize a task blob from one of the server's predefined task categories into a proto message.
type Prompter ¶
type Prompter struct {
// contains filtered or unexported fields
}
Prompter is a helper for prompting the user for confirmation.
func NewPrompter ¶
func NewPrompter(c BoolFlagLookup, opts ...PrompterOption) *Prompter
NewPrompter creates a new Prompter. In most cases, the first argument should be github.com/urfave/cli/v2.Context.
type PrompterFactory ¶
type PrompterFactory func(c BoolFlagLookup) *Prompter
func NewPrompterFactory ¶
func NewPrompterFactory(opts ...PrompterOption) PrompterFactory
type PrompterOption ¶
type PrompterOption func(*PrompterParams)
PrompterOption is used to override default PrompterParams.
type PrompterParams ¶
type PrompterParams struct {
// Reader defaults to os.Stdin.
Reader io.Reader
// Writer defaults to os.Stdout.
Writer io.Writer
// Exiter defaults to [os.Exit].
Exiter func(code int)
}
PrompterParams is used to configure a new Prompter.
type ProtoTaskBlobEncoder ¶
type ProtoTaskBlobEncoder struct {
// contains filtered or unexported fields
}
ProtoTaskBlobEncoder is a TaskBlobEncoder that uses a TaskBlobProtoDeserializer to deserialize the blob into a proto message, and then uses protojson to marshal the proto message into a human-readable format.
func NewProtoTaskBlobEncoder ¶
func NewProtoTaskBlobEncoder(deserializer TaskBlobProtoDeserializer) *ProtoTaskBlobEncoder
NewProtoTaskBlobEncoder returns a TaskBlobEncoder that uses a TaskBlobProtoDeserializer to deserialize the blob.
type TaskBlobEncoder ¶
type TaskBlobEncoder interface {
Encode(writer io.Writer, taskCategoryID int, blob *commonpb.DataBlob) error
}
TaskBlobEncoder takes a blob for a given task category and encodes it to a human-readable format. Here's a breakdown of the relationship between all the related types needed to implement a custom encoder: - NewCliApp accepts a list of Option objects. - Option objects modify Params. - Params contain a TaskBlobEncoder. - TaskBlobEncoder is implemented by ProtoTaskBlobEncoder. - ProtoTaskBlobEncoder uses protojson to marshal proto.Message objects from a TaskBlobProtoDeserializer. - TaskBlobProtoDeserializer is implemented by the stock PredefinedTaskBlobDeserializer. - PredefinedTaskBlobDeserializer deserializes commonpb.DataBlob objects into proto.Message objects.
Example ¶
package main
import (
"bytes"
"fmt"
"io"
"os"
"strconv"
commonpb "go.temporal.io/api/common/v1"
"github.com/hanzoai/tasks/service/history/tasks"
"github.com/hanzoai/tasks/tools/tdbg"
enumspb "go.temporal.io/api/enums/v1"
)
var customCategory = tasks.Category{}
func main() {
var output bytes.Buffer
app := tdbg.NewCliApp(func(params *tdbg.Params) {
params.Writer = &output
stockEncoder := params.TaskBlobEncoder
params.TaskBlobEncoder = tdbg.TaskBlobEncoderFn(func(
writer io.Writer,
taskCategoryID int,
blob *commonpb.DataBlob,
) error {
if taskCategoryID == customCategory.ID() {
_, err := writer.Write(append([]byte("hello, "), blob.Data...))
return err
}
return stockEncoder.Encode(writer, taskCategoryID, blob)
})
})
file, err := os.CreateTemp("", "*")
if err != nil {
panic(err)
}
defer func() {
if err := os.Remove(file.Name()); err != nil {
panic(err)
}
}()
_, err = file.Write([]byte("\"world\""))
if err != nil {
panic(err)
}
err = app.Run([]string{
"tdbg", "decode", "task",
"--" + tdbg.FlagEncoding, enumspb.ENCODING_TYPE_JSON.String(),
"--" + tdbg.FlagTaskCategoryID, strconv.Itoa(customCategory.ID()),
"--" + tdbg.FlagBinaryFile, file.Name(),
})
if err != nil {
panic(err)
}
fmt.Println(output.String())
}
Output: hello, "world"
type TaskBlobEncoderFn ¶
TaskBlobEncoderFn implements TaskBlobEncoder by calling a function.
type TaskBlobProtoDeserializer ¶
type TaskBlobProtoDeserializer interface {
Deserialize(taskCategoryID int, blob *commonpb.DataBlob) (proto.Message, error)
}
TaskBlobProtoDeserializer is used to deserialize task blobs into proto messages. This makes it easier to create an encoder if your tasks are all backed by protos. We separate this from the encoder because we don't want the encoder to be tied to protos as the wire format.
type TaskPayload ¶
type TaskPayload struct {
// contains filtered or unexported fields
}
TaskPayload implements both json.Marshaler and json.Unmarshaler. This allows us to pretty-print tasks using jsonpb when serializing and then store the raw bytes of the task payload for later use when deserializing. We need to store the raw bytes instead of immediately decoding to a concrete type because that logic is dynamic and can't depend solely on the task category ID in case there are additional task categories in use.
func (*TaskPayload) Bytes ¶
func (p *TaskPayload) Bytes() []byte
Bytes returns the raw bytes of the deserialized TaskPayload. This will return nil if the payload has not been deserialized yet.
func (*TaskPayload) MarshalJSON ¶
func (p *TaskPayload) MarshalJSON() ([]byte, error)
func (*TaskPayload) UnmarshalJSON ¶
func (p *TaskPayload) UnmarshalJSON(b []byte) error