Documentation
¶
Overview ¶
internal methods for flowable.go
Index ¶
- func RunEveryIntervalUntilFinish[T any](ctx context.Context, runFunc func() (finished bool, result T, err error), ...) (T, error)
- type CancelTableAdditionActivity
- func (a *CancelTableAdditionActivity) CleanupCurrentParentMirror(ctx context.Context, flowJobName string, workflowId string) error
- func (a *CancelTableAdditionActivity) CleanupIncompleteTablesInStats(ctx context.Context, flowJobName string, ...) error
- func (a *CancelTableAdditionActivity) GetCompletedTablesFromQrepRuns(ctx context.Context, flowJobName string, workflowId string) ([]string, error)
- func (a *CancelTableAdditionActivity) GetFlowInfoFromCatalog(ctx context.Context, flowJobName string) (*protos.GetFlowInfoToCancelFromCatalogOutput, error)
- func (a *CancelTableAdditionActivity) GetTableOIDsFromCatalog(ctx context.Context, flowJobName string, tableMappings []*protos.TableMapping) (map[uint32]string, error)
- func (a *CancelTableAdditionActivity) RemoveCancelledTablesFromPublicationIfApplicable(ctx context.Context, flowJobName string, sourcePeerName string, ...) error
- func (a *CancelTableAdditionActivity) StartNewCDCFlow(ctx context.Context, flowConfig *protos.FlowConnectionConfigsCore, ...) error
- func (a *CancelTableAdditionActivity) UpdateCdcJobEntry(ctx context.Context, connectionConfigs *protos.FlowConnectionConfigsCore, ...) error
- func (a *CancelTableAdditionActivity) WaitForNewRunningMirrorToBeInRunningState(ctx context.Context, flowJobName string, workflowId string) error
- type CheckMetadataTablesResult
- type FlowableActivity
- func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigsCore, ...) error
- func (a *FlowableActivity) Alert(ctx context.Context, alert *protos.AlertInput) error
- func (a *FlowableActivity) CheckConnection(ctx context.Context, config *protos.SetupInput) error
- func (a *FlowableActivity) CheckMetadataTables(ctx context.Context, config *protos.SetupInput) (*CheckMetadataTablesResult, error)
- func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error
- func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig, runUUID string) error
- func (a *FlowableActivity) CreateNormalizedTable(ctx context.Context, config *protos.SetupNormalizedTableBatchInput) (*protos.SetupNormalizedTableBatchOutput, error)
- func (a *FlowableActivity) CreateRawTable(ctx context.Context, config *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error)
- func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) (*protos.CreateTablesFromExistingOutput, error)
- func (a *FlowableActivity) DeleteMirrorStats(ctx context.Context, flowName string) error
- func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.DropFlowActivityInput) error
- func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropFlowActivityInput) error
- func (a *FlowableActivity) EnsurePullability(ctx context.Context, config *protos.EnsurePullabilityBatchInput) (*protos.EnsurePullabilityBatchOutput, error)
- func (a *FlowableActivity) GetFlowMetadata(ctx context.Context, input *protos.FlowContextMetadataInput) (*protos.FlowContextMetadata, error)
- func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition, ...) (*protos.QRepParitionResult, error)
- func (a *FlowableActivity) MigratePostgresTableOIDs(ctx context.Context, flowName string, oidToTableNameMapping map[uint32]string, ...) error
- func (a *FlowableActivity) PeerDBFullRefreshOverwriteMode(ctx context.Context, env map[string]string) (bool, error)
- func (a *FlowableActivity) QRepHasNewRows(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition) (bool, error)
- func (a *FlowableActivity) RecordMetricsAggregates(ctx context.Context) error
- func (a *FlowableActivity) RecordMetricsCritical(ctx context.Context) error
- func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error
- func (a *FlowableActivity) RemoveFlowDetailsFromCatalog(ctx context.Context, req *model.RemoveFlowDetailsFromCatalogRequest) error
- func (a *FlowableActivity) RemoveTablesFromCatalog(ctx context.Context, cfg *protos.FlowConnectionConfigsCore, ...) error
- func (a *FlowableActivity) RemoveTablesFromPublication(ctx context.Context, cfg *protos.FlowConnectionConfigsCore, ...) error
- func (a *FlowableActivity) RemoveTablesFromRawTable(ctx context.Context, cfg *protos.FlowConnectionConfigsCore, ...) error
- func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (*protos.RenameTablesOutput, error)
- func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, config *protos.QRepConfig, ...) error
- func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, config *protos.QRepConfig, ...) (int64, error)
- func (a *FlowableActivity) ReportStatusMetric(ctx context.Context, status protos.FlowStatus) error
- func (a *FlowableActivity) ScheduledTasks(ctx context.Context) error
- func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error
- func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *protos.SetupInput) error
- func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error
- func (a *FlowableActivity) SetupTableSchema(ctx context.Context, config *protos.SetupTableSchemaBatchInput) error
- func (a *FlowableActivity) SyncFlow(ctx context.Context, config *protos.FlowConnectionConfigsCore, ...) error
- func (a *FlowableActivity) UpdateCDCConfigInCatalogActivity(ctx context.Context, cfg *protos.FlowConnectionConfigsCore) error
- type MaintenanceActivity
- func (a *MaintenanceActivity) BackgroundAlerter(ctx context.Context) error
- func (a *MaintenanceActivity) BackupAllPreviouslyRunningFlows(ctx context.Context, mirrors *protos.MaintenanceMirrors) error
- func (a *MaintenanceActivity) CleanBackedUpFlows(ctx context.Context) error
- func (a *MaintenanceActivity) DisableMaintenanceMode(ctx context.Context) error
- func (a *MaintenanceActivity) EnableMaintenanceMode(ctx context.Context) error
- func (a *MaintenanceActivity) GetAllMirrors(ctx context.Context) (*protos.MaintenanceMirrors, error)
- func (a *MaintenanceActivity) GetBackedUpFlows(ctx context.Context) (*protos.MaintenanceMirrors, error)
- func (a *MaintenanceActivity) PauseMirrorIfRunning(ctx context.Context, mirror *protos.MaintenanceMirror) (bool, error)
- func (a *MaintenanceActivity) ResumeMirror(ctx context.Context, mirror *protos.MaintenanceMirror) error
- func (a *MaintenanceActivity) WaitForRunningSnapshotsAndIntermediateStates(ctx context.Context, skippedFlows map[string]struct{}) (*protos.MaintenanceMirrors, error)
- type PeerType
- type QRepStreamCloser
- type SlotSnapshotState
- type SnapshotActivity
- func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName string) error
- func (a *SnapshotActivity) GetDefaultPartitionKeyForTables(ctx context.Context, input *protos.FlowConnectionConfigsCore) (*protos.GetDefaultPartitionKeyForTablesOutput, error)
- func (a *SnapshotActivity) GetPeerType(ctx context.Context, name string) (protos.DBType, error)
- func (a *SnapshotActivity) LoadTableSchema(ctx context.Context, flowName string, tableName string) (*protos.TableSchema, error)
- func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, flowName string, peer string, ...) error
- func (a *SnapshotActivity) SetupReplication(ctx context.Context, config *protos.SetupReplicationInput) (*protos.SetupReplicationOutput, error)
- func (a *SnapshotActivity) WaitForExportSnapshot(ctx context.Context, sessionID string) (*TxSnapshotState, error)
- type TxSnapshotState
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type CancelTableAdditionActivity ¶
type CancelTableAdditionActivity struct {
CatalogPool shared.CatalogPool
Alerter *alerting.Alerter
TemporalClient client.Client
OtelManager *otel_metrics.OtelManager
}
func (*CancelTableAdditionActivity) CleanupCurrentParentMirror ¶
func (*CancelTableAdditionActivity) CleanupIncompleteTablesInStats ¶
func (a *CancelTableAdditionActivity) CleanupIncompleteTablesInStats( ctx context.Context, flowJobName string, completedTables []*protos.TableMapping, ) error
func (*CancelTableAdditionActivity) GetCompletedTablesFromQrepRuns ¶
func (a *CancelTableAdditionActivity) GetCompletedTablesFromQrepRuns( ctx context.Context, flowJobName string, workflowId string, ) ([]string, error)
GetCompletedTablesFromQrepRuns gets the list of tables in the addition request whose snapshot has completed
func (*CancelTableAdditionActivity) GetFlowInfoFromCatalog ¶
func (a *CancelTableAdditionActivity) GetFlowInfoFromCatalog( ctx context.Context, flowJobName string, ) (*protos.GetFlowInfoToCancelFromCatalogOutput, error)
func (*CancelTableAdditionActivity) GetTableOIDsFromCatalog ¶
func (a *CancelTableAdditionActivity) GetTableOIDsFromCatalog( ctx context.Context, flowJobName string, tableMappings []*protos.TableMapping, ) (map[uint32]string, error)
func (*CancelTableAdditionActivity) RemoveCancelledTablesFromPublicationIfApplicable ¶
func (a *CancelTableAdditionActivity) RemoveCancelledTablesFromPublicationIfApplicable( ctx context.Context, flowJobName string, sourcePeerName string, publicationNameInConfig string, finalListOfTables []*protos.TableMapping, ) error
func (*CancelTableAdditionActivity) StartNewCDCFlow ¶
func (a *CancelTableAdditionActivity) StartNewCDCFlow( ctx context.Context, flowConfig *protos.FlowConnectionConfigsCore, state *cdc_state.CDCFlowWorkflowState, workflowID string, ) error
func (*CancelTableAdditionActivity) UpdateCdcJobEntry ¶
func (a *CancelTableAdditionActivity) UpdateCdcJobEntry( ctx context.Context, connectionConfigs *protos.FlowConnectionConfigsCore, workflowID string, ) error
func (*CancelTableAdditionActivity) WaitForNewRunningMirrorToBeInRunningState ¶
type CheckMetadataTablesResult ¶
type CheckMetadataTablesResult struct {
NeedsSetupMetadataTables bool
}
type FlowableActivity ¶
type FlowableActivity struct {
CatalogPool shared.CatalogPool
Alerter *alerting.Alerter
OtelManager *otel_metrics.OtelManager
TemporalClient client.Client
}
func (*FlowableActivity) AddTablesToPublication ¶
func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigsCore, additionalTableMappings []*protos.TableMapping, ) error
func (*FlowableActivity) Alert ¶
func (a *FlowableActivity) Alert( ctx context.Context, alert *protos.AlertInput, ) error
func (*FlowableActivity) CheckConnection ¶
func (a *FlowableActivity) CheckConnection( ctx context.Context, config *protos.SetupInput, ) error
func (*FlowableActivity) CheckMetadataTables ¶
func (a *FlowableActivity) CheckMetadataTables( ctx context.Context, config *protos.SetupInput, ) (*CheckMetadataTablesResult, error)
func (*FlowableActivity) CleanupQRepFlow ¶
func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error
func (*FlowableActivity) ConsolidateQRepPartitions ¶
func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig, runUUID string, ) error
func (*FlowableActivity) CreateNormalizedTable ¶
func (a *FlowableActivity) CreateNormalizedTable( ctx context.Context, config *protos.SetupNormalizedTableBatchInput, ) (*protos.SetupNormalizedTableBatchOutput, error)
CreateNormalizedTable creates normalized tables in destination.
func (*FlowableActivity) CreateRawTable ¶
func (a *FlowableActivity) CreateRawTable( ctx context.Context, config *protos.CreateRawTableInput, ) (*protos.CreateRawTableOutput, error)
CreateRawTable creates a raw table in the destination flowable.
func (*FlowableActivity) CreateTablesFromExisting ¶
func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) ( *protos.CreateTablesFromExistingOutput, error, )
func (*FlowableActivity) DeleteMirrorStats ¶
func (a *FlowableActivity) DeleteMirrorStats(ctx context.Context, flowName string) error
func (*FlowableActivity) DropFlowDestination ¶
func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.DropFlowActivityInput) error
func (*FlowableActivity) DropFlowSource ¶
func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropFlowActivityInput) error
func (*FlowableActivity) EnsurePullability ¶
func (a *FlowableActivity) EnsurePullability( ctx context.Context, config *protos.EnsurePullabilityBatchInput, ) (*protos.EnsurePullabilityBatchOutput, error)
func (*FlowableActivity) GetFlowMetadata ¶
func (a *FlowableActivity) GetFlowMetadata( ctx context.Context, input *protos.FlowContextMetadataInput, ) (*protos.FlowContextMetadata, error)
NOTE: this activity is used on the path between CDCFlowWorkflow start and the signal handler for running state. If it's unable to progress for whatever reason, the upgrades will break and very unpleasant manual recovery will be needed. If you have to modify it, do it carefully and think through the edge cases.
func (*FlowableActivity) GetQRepPartitions ¶
func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition, runUUID string, ) (*protos.QRepParitionResult, error)
GetQRepPartitions returns the partitions for a given QRepConfig.
func (*FlowableActivity) MigratePostgresTableOIDs ¶
func (a *FlowableActivity) MigratePostgresTableOIDs( ctx context.Context, flowName string, oidToTableNameMapping map[uint32]string, tableMappings []*protos.TableMapping, ) error
*
- MigratePostgresTableOIDs migrates the OIDs for source Postgres tables to the catalog's table_schema_mapping
func (*FlowableActivity) PeerDBFullRefreshOverwriteMode ¶
func (*FlowableActivity) QRepHasNewRows ¶
func (a *FlowableActivity) QRepHasNewRows(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition, ) (bool, error)
func (*FlowableActivity) RecordMetricsAggregates ¶
func (a *FlowableActivity) RecordMetricsAggregates(ctx context.Context) error
func (*FlowableActivity) RecordMetricsCritical ¶
func (a *FlowableActivity) RecordMetricsCritical(ctx context.Context) error
func (*FlowableActivity) RecordSlotSizes ¶
func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error
func (*FlowableActivity) RemoveFlowDetailsFromCatalog ¶
func (a *FlowableActivity) RemoveFlowDetailsFromCatalog( ctx context.Context, req *model.RemoveFlowDetailsFromCatalogRequest, ) error
func (*FlowableActivity) RemoveTablesFromCatalog ¶
func (a *FlowableActivity) RemoveTablesFromCatalog( ctx context.Context, cfg *protos.FlowConnectionConfigsCore, tablesToRemove []*protos.TableMapping, ) error
func (*FlowableActivity) RemoveTablesFromPublication ¶
func (a *FlowableActivity) RemoveTablesFromPublication( ctx context.Context, cfg *protos.FlowConnectionConfigsCore, removedTablesMapping []*protos.TableMapping, ) error
func (*FlowableActivity) RemoveTablesFromRawTable ¶
func (a *FlowableActivity) RemoveTablesFromRawTable( ctx context.Context, cfg *protos.FlowConnectionConfigsCore, tablesToRemove []*protos.TableMapping, ) error
func (*FlowableActivity) RenameTables ¶
func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (*protos.RenameTablesOutput, error)
func (*FlowableActivity) ReplicateQRepPartitions ¶
func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, config *protos.QRepConfig, partitions *protos.QRepPartitionBatch, runUUID string, ) error
func (*FlowableActivity) ReplicateXminPartition ¶
func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition, runUUID string, ) (int64, error)
func (*FlowableActivity) ReportStatusMetric ¶
func (a *FlowableActivity) ReportStatusMetric(ctx context.Context, status protos.FlowStatus) error
func (*FlowableActivity) ScheduledTasks ¶
func (a *FlowableActivity) ScheduledTasks(ctx context.Context) error
func (*FlowableActivity) SendWALHeartbeat ¶
func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error
func (*FlowableActivity) SetupMetadataTables ¶
func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *protos.SetupInput) error
func (*FlowableActivity) SetupQRepMetadataTables ¶
func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error
SetupQRepMetadataTables sets up the metadata tables for QReplication.
func (*FlowableActivity) SetupTableSchema ¶
func (a *FlowableActivity) SetupTableSchema( ctx context.Context, config *protos.SetupTableSchemaBatchInput, ) error
SetupTableSchema populates table_schema_mapping
func (*FlowableActivity) SyncFlow ¶
func (a *FlowableActivity) SyncFlow( ctx context.Context, config *protos.FlowConnectionConfigsCore, options *protos.SyncFlowOptions, ) error
func (*FlowableActivity) UpdateCDCConfigInCatalogActivity ¶
func (a *FlowableActivity) UpdateCDCConfigInCatalogActivity(ctx context.Context, cfg *protos.FlowConnectionConfigsCore) error
type MaintenanceActivity ¶
type MaintenanceActivity struct {
CatalogPool shared.CatalogPool
Alerter *alerting.Alerter
TemporalClient client.Client
OtelManager *otel_metrics.OtelManager
}
func (*MaintenanceActivity) BackgroundAlerter ¶
func (a *MaintenanceActivity) BackgroundAlerter(ctx context.Context) error
func (*MaintenanceActivity) BackupAllPreviouslyRunningFlows ¶
func (a *MaintenanceActivity) BackupAllPreviouslyRunningFlows(ctx context.Context, mirrors *protos.MaintenanceMirrors) error
func (*MaintenanceActivity) CleanBackedUpFlows ¶
func (a *MaintenanceActivity) CleanBackedUpFlows(ctx context.Context) error
func (*MaintenanceActivity) DisableMaintenanceMode ¶
func (a *MaintenanceActivity) DisableMaintenanceMode(ctx context.Context) error
func (*MaintenanceActivity) EnableMaintenanceMode ¶
func (a *MaintenanceActivity) EnableMaintenanceMode(ctx context.Context) error
func (*MaintenanceActivity) GetAllMirrors ¶
func (a *MaintenanceActivity) GetAllMirrors(ctx context.Context) (*protos.MaintenanceMirrors, error)
func (*MaintenanceActivity) GetBackedUpFlows ¶
func (a *MaintenanceActivity) GetBackedUpFlows(ctx context.Context) (*protos.MaintenanceMirrors, error)
func (*MaintenanceActivity) PauseMirrorIfRunning ¶
func (a *MaintenanceActivity) PauseMirrorIfRunning(ctx context.Context, mirror *protos.MaintenanceMirror) (bool, error)
func (*MaintenanceActivity) ResumeMirror ¶
func (a *MaintenanceActivity) ResumeMirror(ctx context.Context, mirror *protos.MaintenanceMirror) error
func (*MaintenanceActivity) WaitForRunningSnapshotsAndIntermediateStates ¶
func (a *MaintenanceActivity) WaitForRunningSnapshotsAndIntermediateStates( ctx context.Context, skippedFlows map[string]struct{}, ) (*protos.MaintenanceMirrors, error)
type QRepStreamCloser ¶
type SlotSnapshotState ¶
type SlotSnapshotState struct {
// contains filtered or unexported fields
}
type SnapshotActivity ¶
type SnapshotActivity struct {
Alerter *alerting.Alerter
CatalogPool shared.CatalogPool
SlotSnapshotStates map[string]SlotSnapshotState
TxSnapshotStates map[string]TxSnapshotState
SnapshotStatesMutex sync.Mutex
}
func (*SnapshotActivity) CloseSlotKeepAlive ¶
func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName string) error
closes the slot signal
func (*SnapshotActivity) GetDefaultPartitionKeyForTables ¶
func (a *SnapshotActivity) GetDefaultPartitionKeyForTables( ctx context.Context, input *protos.FlowConnectionConfigsCore, ) (*protos.GetDefaultPartitionKeyForTablesOutput, error)
func (*SnapshotActivity) GetPeerType ¶
func (*SnapshotActivity) LoadTableSchema ¶
func (a *SnapshotActivity) LoadTableSchema( ctx context.Context, flowName string, tableName string, ) (*protos.TableSchema, error)
func (*SnapshotActivity) MaintainTx ¶
func (*SnapshotActivity) SetupReplication ¶
func (a *SnapshotActivity) SetupReplication( ctx context.Context, config *protos.SetupReplicationInput, ) (*protos.SetupReplicationOutput, error)
func (*SnapshotActivity) WaitForExportSnapshot ¶
func (a *SnapshotActivity) WaitForExportSnapshot(ctx context.Context, sessionID string) (*TxSnapshotState, error)
type TxSnapshotState ¶
type TxSnapshotState struct {
SnapshotName string
}