Workflows (internal API)
Base Workflow infrastructure.
See Workflows for a high level explanation of concepts used here.
- class debusine.server.workflows.base.Workflow(work_request: WorkRequest)[source]
Bases:
BaseServerTask[WD,DTD]Base class for workflow orchestrators.
This is the base API for running
WorkflowInstancelogic.- TASK_TYPE: TaskTypes = 'Workflow'
The worker type must be suitable for the task type. TaskTypes.WORKER requires an external worker; TaskTypes.SERVER requires a Celery worker; TaskTypes.SIGNING requires a signing worker.
- __init__(work_request: WorkRequest)[source]
Instantiate a Workflow with its database instance.
- callback(work_request: WorkRequest) None[source]
Perform an orchestration step.
Called with the workflow callback work request (note that this is not the same as
self.work_request) when the workflow node becomes ready to execute.Called with a
WorkRequestof type internal/workflow to perform an orchestration step triggered by a workflow callback.This method is required to be idempotent: calling it multiple times with the same argument MUST result in the same
WorkRequeststructure as calling it once.Subclasses may provide
callback_{step.replace('-', '_')}methods to implement workflow callbacks with the correspondingstepset in their workflow data.
- dynamic_task_data_type: type[DTD] = ~DTD
Class used as the in-memory representation of dynamic task data.
- ensure_dynamic_data(task_database: TaskDatabaseInterface) None[source]
Ensure that this workflow’s dynamic task data has been computed.
- get_input_artifacts_ids() list[int][source]
Return the list of input artifact IDs used by this task.
- lookup_singleton_collection(category: CollectionCategory, *, workspace: Workspace | None = None) Collection[source]
Look up a singleton collection related to this workflow.
- abstractmethod populate() None[source]
Create the initial WorkRequest structure.
This is called once, when the workflow first becomes runnable.
validate_input()will already have been called.This method is required to be idempotent: calling it multiple times with the same argument MUST result in the same
WorkRequeststructure as calling it once.
- static provides_artifact(work_request: WorkRequest, category: ArtifactCategory, name: str, *, data: dict[str, Any] | None = None, artifact_filters: dict[str, Any] | None = None) None[source]
Indicate work_request will provide an artifact.
- Parameters:
work_request – work request that will provide the artifact
category – category of the artifact that will be provided
name – name of this item in the workflow’s internal collection
data – add it to the data dictionary for the event reaction
artifact_filters – for the update-collection-with-artifacts action, to allow workflows to add filtering
- Raises:
LookupError – if a key in “data” starts with
promise_
Create an event reaction for
on_creationadding a promise: this work request will create an artifact.Create an event reaction for
on_successto update the collection with the relevant artifact.
- static requires_artifact(work_request: WorkRequest, lookup: int | str | LookupMultiple) None[source]
Indicate that work_request requires input (lookup).
- Parameters:
work_request – for each lookup result call
work_request.add_dependency(promise["promise_work_request_id"])lookup – resolve the lookup and iterate over the results (for PROMISES only)
- validate_input() None[source]
Thorough validation of input data.
This is run only once at workflow instantiation time, and can do slower things like database lookups to validate artifact or collection types.
- classmethod validate_template_data(data: dict[str, Any]) None[source]
Validate WorkflowTemplate data.
- work_request_ensure_child(*, task_type: TaskTypes = TaskTypes.WORKER, task_name: str, task_data: BaseTaskData, task_data_filter: Q | None = None, workflow_data: WorkRequestWorkflowData, relative_priority: int = 0) WorkRequest[source]
Create a child work request if one does not already exist.
- Parameters:
task_type – the task type for the child work request.
task_name – the task name for the child work request.
task_data – the task data for the child work request.
task_data_filter – if given, use these conditions to test whether an existing work request has matching task data; by default, look for work requests that exactly match the given
task_data.workflow_data – the workflow data for the child work request.
relative_priority – if creating a new work request, set the base priority of the child to the effective priority of this workflow plus this relative priority.
- Returns:
new or existing
WorkRequest.
- exception debusine.server.workflows.base.WorkflowRunError(work_request: WorkRequest, message: str, code: str)[source]
Bases:
ExceptionRunning a workflow orchestrator or callback failed.
- exception debusine.server.workflows.base.WorkflowValidationError[source]
Bases:
ExceptionRaised if a workflow fails to validate its inputs.
- debusine.server.workflows.base.orchestrate_workflow(work_request: WorkRequest) None[source]
Orchestrate a workflow in whatever way is appropriate.
For a workflow callback, run
callbackand mark the work request as completed. For a workflow, runpopulateand unblock workflow children, but leave the workflow running until all its children have finished. For any other work request, raise an error.
Utility functions for workflows.
- exception debusine.server.workflows.workflow_utils.ArtifactHasNoArchitecture[source]
Bases:
ExceptionRaised if it’s not possible to determine the artifact’s architecture.
- exception debusine.server.workflows.workflow_utils.ArtifactHasNoBinaryPackageName[source]
Bases:
ExceptionRaised if it’s not possible to determine the artifact’s binary name.
- debusine.server.workflows.workflow_utils.configure_for_overlay_suite(workflow: Workflow[Any, Any], *, extra_repositories: list[ExtraRepository] | None, vendor: str, codename: str, environment: int | str, backend: BackendType, architecture: str, try_variant: str) list[ExtraRepository] | None[source]
Return any needed extra repository to use an overlay suite.
- debusine.server.workflows.workflow_utils.filter_artifact_lookup_by_arch(workflow: Workflow[Any, Any], lookup: LookupMultiple, architectures: Iterable[str]) LookupMultiple[source]
Filter an artifact lookup by architecture.
- debusine.server.workflows.workflow_utils.follow_artifact_relation(artifact: Artifact, relation_type: Relations, category: ArtifactCategory) Artifact[source]
Follow relations from artifact to find an artifact of category.
- debusine.server.workflows.workflow_utils.get_architectures(workflow: Workflow[Any, Any], lookup: LookupMultiple) set[str][source]
Return set with all the architectures in the artifacts from the lookup.
The architectures are extracted from each lookup result using
lookup_result_architecture().
- debusine.server.workflows.workflow_utils.get_available_architectures(workflow: Workflow[Any, Any], *, vendor: str, codename: str) set[str][source]
Get architectures available for use with this vendor/codename.
- debusine.server.workflows.workflow_utils.get_source_package_names(results: Sequence[LookupResult], *, configuration_key: str, artifact_expected_categories: Collection[ArtifactCategory]) list[str][source]
Return a sorted list of source package names from results.
It ensures that:
The
LookupResultobjects contain either an artifact or promise.Artifacts belong to the artifact_expected_categories.
If
LookupResultis a promise: extracts the name from the promise datasource_package_name.
- Parameters:
results – A sequence of
LookupResultobjects representing artifacts to be processed. Each entry is expected to be either an artifact or a promise.configuration_key – A string used by
BaseTask.ensure_artifact_categories()for the exception message.artifact_expected_categories – valid
ArtifactCategorythat artifacts must belong to.
- Returns:
A sorted list of source package names.
- debusine.server.workflows.workflow_utils.locate_debian_source_package(configuration_key: str, artifact: Artifact) Artifact[source]
Accept a debian:upload or debian:source-package in a workflow.
Resolve to the debian:source-package.
- debusine.server.workflows.workflow_utils.locate_debian_source_package_lookup(workflow: Workflow[Any, Any], configuration_key: str, lookup: int | str) int | str[source]
Return a lookup to a debian:source-package.
If the specified lookup returns a debian:source-package, return it. If it returns a debian:upload, find the related debian:source-package and return a lookup to it.
- debusine.server.workflows.workflow_utils.lookup_result_architecture(result: LookupResult) str[source]
Get architecture from result of looking up an artifact.
- debusine.server.workflows.workflow_utils.lookup_result_artifact_category(result: LookupResult) str[source]
Get artifact category from result of looking up an artifact.
The result may be either an artifact or a promise.
- debusine.server.workflows.workflow_utils.lookup_result_binary_package_name(result: LookupResult) str[source]
Get binary package name from result of looking up an artifact.
- debusine.server.workflows.workflow_utils.source_package(workflow: Workflow[Any, Any]) Artifact[source]
Retrieve the source package artifact.
If
workflow.data.inputexists, useworkflow.data.input.source_artifact, otherwiseworkflow.data.source_artifact.If the source artifact is a debian:upload, returns its debian:source-package.