Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var AirflowDagProcessorLogParseTask = legacyparser.NewParserTaskFromParser( googlecloudclustercomposer_contract.AirflowDagProcessorManagerLogParserTaskID, airflowdagprocessor.NewAirflowDagProcessorParser("/home/airflow/gcs/dags/", googlecloudclustercomposer_contract.ComposerDagProcessorManagerLogQueryTaskID.Ref(), enum.LogTypeComposerEnvironment), 102000, true, []string{googlecloudclustercomposer_contract.InspectionTypeId}, )
AirflowDagProcessorLogParseTask parses Airflow DAG processor manager logs.
var AirflowSchedulerLogParseTask = legacyparser.NewParserTaskFromParser( googlecloudclustercomposer_contract.AirflowSchedulerLogParserTaskID, airflowscheduler.NewAirflowSchedulerParser(googlecloudclustercomposer_contract.ComposerSchedulerLogQueryTaskID.Ref(), enum.LogTypeComposerEnvironment), 100000, true, []string{googlecloudclustercomposer_contract.InspectionTypeId}, )
AirflowSchedulerLogParseTask parses Airflow scheduler logs.
var AirflowWorkerLogParseTask = legacyparser.NewParserTaskFromParser( googlecloudclustercomposer_contract.AirflowWorkerLogParserTaskID, airflowworker.NewAirflowWorkerParser(googlecloudclustercomposer_contract.ComposerWorkerLogQueryTaskID.Ref(), enum.LogTypeComposerEnvironment), 101000, true, []string{googlecloudclustercomposer_contract.InspectionTypeId}, )
AirflowWorkerLogParseTask parses Airflow worker logs.
var AutocompleteComposerClusterNamesTask = inspectiontaskbase.NewCachedTask(googlecloudclustercomposer_contract.AutocompleteComposerClusterNamesTaskID, []taskid.UntypedTaskReference{ googlecloudclustercomposer_contract.ComposerEnvironmentClusterFinderTaskID.Ref(), googlecloudcommon_contract.InputProjectIdTaskID.Ref(), googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(), }, func(ctx context.Context, prevValue inspectiontaskbase.CacheableTaskResult[*googlecloudk8scommon_contract.AutocompleteResult]) (inspectiontaskbase.CacheableTaskResult[*googlecloudk8scommon_contract.AutocompleteResult], error) { projectID := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputProjectIdTaskID.Ref()) environment := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref()) dependencyDigest := fmt.Sprintf("%s-%s", projectID, environment) isWIP := projectID == "" || environment == "" if isWIP { return inspectiontaskbase.CacheableTaskResult[*googlecloudk8scommon_contract.AutocompleteResult]{ DependencyDigest: dependencyDigest, Value: &googlecloudk8scommon_contract.AutocompleteResult{ Values: []string{}, Error: "Project ID or Composer environment name is empty", }, }, nil } if environment != "" && dependencyDigest == prevValue.DependencyDigest { return prevValue, nil } clusterFinder := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.ComposerEnvironmentClusterFinderTaskID.Ref()) clusterName, err := clusterFinder.GetGKEClusterName(ctx, projectID, environment) if err != nil { if errors.Is(err, googlecloudclustercomposer_contract.ErrEnvironmentClusterNotFound) { return inspectiontaskbase.CacheableTaskResult[*googlecloudk8scommon_contract.AutocompleteResult]{ DependencyDigest: dependencyDigest, Value: &googlecloudk8scommon_contract.AutocompleteResult{ Values: []string{}, Error: `Not found. It works for the clusters existed in the past but make sure the cluster name is right if you believe the cluster should be there. Note: Composer 3 does not run on your GKE. Please remove all Kubernetes/GKE questies from the previous section.`, }, }, nil } return inspectiontaskbase.CacheableTaskResult[*googlecloudk8scommon_contract.AutocompleteResult]{ DependencyDigest: dependencyDigest, Value: &googlecloudk8scommon_contract.AutocompleteResult{ Values: []string{}, Error: "Failed to fetch the list GKE cluster. Please confirm if the Project ID is correct, or retry later", }, }, nil } return inspectiontaskbase.CacheableTaskResult[*googlecloudk8scommon_contract.AutocompleteResult]{ DependencyDigest: dependencyDigest, Value: &googlecloudk8scommon_contract.AutocompleteResult{ Values: []string{clusterName}, }, }, nil }, inspectioncore_contract.InspectionTypeLabel(googlecloudclustercomposer_contract.InspectionTypeId), coretask.WithSelectionPriority(1000), )
AutocompleteComposerClusterNamesTask is an implementation for googlecloudk8scommon_contract.AutocompleteClusterNamesTaskID the task returns GKE cluster name where the provided Composer environment is running.
var AutocompleteComposerEnvironmentNamesTask = inspectiontaskbase.NewCachedTask(googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentNamesTaskID, []taskid.UntypedTaskReference{ googlecloudclustercomposer_contract.ComposerEnvironmentListFetcherTaskID.Ref(), googlecloudcommon_contract.InputLocationsTaskID.Ref(), googlecloudcommon_contract.InputProjectIdTaskID.Ref(), }, func(ctx context.Context, prevValue inspectiontaskbase.CacheableTaskResult[[]string]) (inspectiontaskbase.CacheableTaskResult[[]string], error) { projectID := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputProjectIdTaskID.Ref()) location := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputLocationsTaskID.Ref()) dependencyDigest := fmt.Sprintf("%s-%s", projectID, location) if prevValue.DependencyDigest == dependencyDigest { return prevValue, nil } if projectID != "" && location != "" { composerEnvironmentFetcher := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.ComposerEnvironmentListFetcherTaskID.Ref()) environments, err := composerEnvironmentFetcher.GetEnvironmentNames(ctx, projectID, location) if err != nil { return inspectiontaskbase.CacheableTaskResult[[]string]{ DependencyDigest: dependencyDigest, Value: []string{}, }, nil } return inspectiontaskbase.CacheableTaskResult[[]string]{ DependencyDigest: dependencyDigest, Value: environments, }, nil } return inspectiontaskbase.CacheableTaskResult[[]string]{ DependencyDigest: dependencyDigest, Value: []string{}, }, nil })
AutocompleteComposerEnvironmentNamesTask is the task that autocompletes composer environment names.
var ComposerClusterNamePrefixTask = coretask.NewTask(googlecloudclustercomposer_contract.ComposerClusterNamePrefixTaskID, []taskid.UntypedTaskReference{}, func(ctx context.Context) (string, error) { return "", nil }, inspectioncore_contract.InspectionTypeLabel(googlecloudclustercomposer_contract.InspectionTypeId))
ComposerClusterNamePrefixTask is the task that returns the composer cluster name prefix.
var ComposerDagProcessorManagerLogQueryTask = googlecloudcommon_contract.NewLegacyCloudLoggingListLogTask( googlecloudclustercomposer_contract.ComposerDagProcessorManagerLogQueryTaskID, "Composer Environment/DAG Processor Manager", enum.LogTypeComposerEnvironment, []taskid.UntypedTaskReference{ googlecloudcommon_contract.InputProjectIdTaskID.Ref(), googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(), }, &googlecloudcommon_contract.ProjectIDDefaultResourceNamesGenerator{}, createGenerator("dag-processor-manager"), generateQueryForComponent("sample-composer-environment", "test-project", "dag-processor-manager"), )
ComposerDagProcessorManagerLogQueryTask defines a task that gather Cloud Composer scheduler logs from Cloud Logging.
var ComposerEnvironmentClusterFinderTask = coretask.NewTask( googlecloudclustercomposer_contract.ComposerEnvironmentClusterFinderTaskID, []taskid.UntypedTaskReference{ googlecloudcommon_contract.APIClientFactoryTaskID.Ref(), }, func(ctx context.Context) (googlecloudclustercomposer_contract.ComposerEnvironmentClusterFinder, error) { return &googlecloudclustercomposer_contract.EnvironmentClusterFinderImpl{}, nil }, )
ComposerEnvironmentClusterFinderTask injects ComposerEnvironmentClusterFinder implementation.
var ComposerEnvironmentListFetcherTask = coretask.NewTask( googlecloudclustercomposer_contract.ComposerEnvironmentListFetcherTaskID, []taskid.UntypedTaskReference{ googlecloudcommon_contract.APIClientFactoryTaskID.Ref(), }, func(ctx context.Context) (googlecloudclustercomposer_contract.ComposerEnvironmentListFetcher, error) { return &googlecloudclustercomposer_contract.ComposerEnvironmentListFetcherImpl{}, nil }, )
ComposerEnvironmentListFetcherTask injects ComposerEnvironmentListFetcher implementation.
var ComposerMonitoringLogQueryTask = googlecloudcommon_contract.NewLegacyCloudLoggingListLogTask( googlecloudclustercomposer_contract.ComposerMonitoringLogQueryTaskID, "Composer Environment/Airflow Monitoring", enum.LogTypeComposerEnvironment, []taskid.UntypedTaskReference{ googlecloudcommon_contract.InputProjectIdTaskID.Ref(), googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(), }, &googlecloudcommon_contract.ProjectIDDefaultResourceNamesGenerator{}, createGenerator("airflow-monitoring"), generateQueryForComponent("sample-composer-environment", "test-project", "airflow-monitoring"), )
ComposerMonitoringLogQueryTask defines a task that gather Cloud Composer scheduler logs from Cloud Logging.
var ComposerSchedulerLogQueryTask = googlecloudcommon_contract.NewLegacyCloudLoggingListLogTask( googlecloudclustercomposer_contract.ComposerSchedulerLogQueryTaskID, "Composer Environment/Airflow Scheduler", enum.LogTypeComposerEnvironment, []taskid.UntypedTaskReference{ googlecloudcommon_contract.InputProjectIdTaskID.Ref(), googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(), }, &googlecloudcommon_contract.ProjectIDDefaultResourceNamesGenerator{}, createGenerator("airflow-scheduler"), generateQueryForComponent("sample-composer-environment", "test-project", "airflow-scheduler"), )
ComposerSchedulerLogQueryTask defines a task that gather Cloud Composer scheduler logs from Cloud Logging.
var ComposerWorkerLogQueryTask = googlecloudcommon_contract.NewLegacyCloudLoggingListLogTask( googlecloudclustercomposer_contract.ComposerWorkerLogQueryTaskID, "Composer Environment/Airflow Worker", enum.LogTypeComposerEnvironment, []taskid.UntypedTaskReference{ googlecloudcommon_contract.InputProjectIdTaskID.Ref(), googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(), }, &googlecloudcommon_contract.ProjectIDDefaultResourceNamesGenerator{}, createGenerator("airflow-worker"), generateQueryForComponent("sample-composer-environment", "test-project", "airflow-worker"), )
ComposerWorkerLogQueryTask defines a task that gather Cloud Composer scheduler logs from Cloud Logging.
var InputComposerEnvironmentNameTask = formtask.NewTextFormTaskBuilder(googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID, googlecloudcommon_contract.PriorityForResourceIdentifierGroup+4400, "Composer Environment Name").WithDependencies( []taskid.UntypedTaskReference{googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentNamesTaskID.Ref()}, ).WithSuggestionsFunc(func(ctx context.Context, value string, previousValues []string) ([]string, error) { environments := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentNamesTaskID.Ref()) return common.SortForAutocomplete(value, environments), nil }).Build()
InputComposerEnvironmentNameTask is the task that inputs composer environment name.
Functions ¶
func Register ¶
func Register(registry coreinspection.InspectionTaskRegistry) error
Register registers all googlecloudclustercomposer inspection tasks to the registry.
Types ¶
This section is empty.