From 72a4a1f980ab203ecbf8384c1e30351cb54c4cdb Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Mon, 23 Feb 2026 20:23:11 +0000 Subject: [PATCH 01/12] Compresses env vars to be processed by a startup script, updates swarming condition --- .../_internal/swarming/__init__.py | 14 +++- .../tests/core/swarming/swarming_test.py | 66 ++++++++++++++++++- 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index 1c387122fac..9c056a37262 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -17,6 +17,7 @@ import uuid from google.protobuf import json_format +from json import dumps from clusterfuzz._internal.base import utils from clusterfuzz._internal.config import local_config @@ -34,9 +35,9 @@ def _requires_gpu() -> bool: def is_swarming_task(command: str, job_name: str): - """Returns True if the task is supposed to run on swarming.""" + """Returns True if the task is supposed to run on swarming. Currently swarming only supports Linux tasks.""" job = data_types.Job.query(data_types.Job.name == job_name).get() - if not job or not _requires_gpu(): + if not job or not _requires_gpu() or not str(job.platform).upper().startswith('LINUX'): return False try: _get_new_task_spec(command, job_name, '') @@ -106,6 +107,8 @@ def _get_new_task_spec(command: str, job_name: str, key='DOCKER_IMAGE', value=instance_spec['docker_image'])) + task_environment.append(_compress_env_vars(task_environment)) + task_dimensions = [ swarming_pb2.StringPair(key='os', value=job.platform), # pylint: disable=no-member swarming_pb2.StringPair(key='pool', value=swarming_pool) # pylint: disable=no-member @@ -142,6 +145,13 @@ def _get_new_task_spec(command: str, job_name: str, return new_task_request +def _compress_env_vars(env_vars: list[swarming_pb2.StringPair]) -> swarming_pb2.StringPair: + """ + Compresses all env variables into a single JSON string , which will be used to set up the + env variables in swarming bots that launch clusterfuzz using a docker container. + """ + env_vars_dict = {pair.key: pair.value for pair in env_vars} + return swarming_pb2.StringPair(key='CF_BOT_VARS', value=dumps(env_vars_dict)) def push_swarming_task(command, download_url, job_type): """Schedules a task on swarming.""" diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py index 037463f2a84..eaa5cd40d20 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py @@ -79,6 +79,11 @@ def test_get_spec_from_config_with_docker_image(self): value= 'gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654' ), + swarming_pb2.StringPair( + key='CF_BOT_VARS', + value= + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id", "DOCKER_IMAGE": "gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654"}' + ), ], secret_bytes=base64.b64encode( 'https://download_url'.encode('utf-8')))) @@ -143,7 +148,12 @@ def test_get_spec_from_config_without_docker_image(self): swarming_pb2.StringPair( key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), swarming_pb2.StringPair(key='ENV_VAR1', value='VALUE1'), - swarming_pb2.StringPair(key='ENV_VAR2', value='VALUE2') + swarming_pb2.StringPair(key='ENV_VAR2', value='VALUE2'), + swarming_pb2.StringPair( + key='CF_BOT_VARS', + value= + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id", "ENV_VAR1": "VALUE1", "ENV_VAR2": "VALUE2"}' + ) ], env_prefixes=[ swarming_pb2.StringListPair( @@ -200,6 +210,11 @@ def test_get_spec_from_config_for_fuzz_task(self): value= 'gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654' ), + swarming_pb2.StringPair( + key='CF_BOT_VARS', + value= + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id", "DOCKER_IMAGE": "gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654"}' + ), ], secret_bytes=base64.b64encode( 'https://download_url'.encode('utf-8')))) @@ -248,6 +263,11 @@ def test_push_swarming_task(self): value= 'gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654' ), + swarming_pb2.StringPair( + key='CF_BOT_VARS', + value= + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id", "DOCKER_IMAGE": "gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654"}' + ), ], secret_bytes=base64.b64encode( 'https://download_url'.encode('utf-8')))) @@ -264,3 +284,47 @@ def test_push_swarming_task(self): url=expected_url, data=json_format.MessageToJson(expected_new_task_request), headers=expected_headers) + + def test_compress_env_vars(self): + """Tests (Happy Path) that _compress_env_vars correctly compresses environment variables into a JSON string.""" + env_vars = [ + swarming_pb2.StringPair(key='ENV_VAR1', value='VALUE1'), + swarming_pb2.StringPair(key='ENV_VAR2', value='VALUE2'), + ] + + result = swarming._compress_env_vars(env_vars) # pylint: disable=protected-access + + expected_result = swarming_pb2.StringPair( + key='CF_BOT_VARS', + value='{"ENV_VAR1": "VALUE1", "ENV_VAR2": "VALUE2"}' + ) + self.assertEqual(result, expected_result) + + def test_compress_env_vars_empty(self): + """Tests that _compress_env_vars handles an empty list of environment variables safely.""" + env_vars = [] + + result = swarming._compress_env_vars(env_vars) # pylint: disable=protected-access + + expected_result = swarming_pb2.StringPair( + key='CF_BOT_VARS', + value='{}' + ) + self.assertEqual(result, expected_result) + + def test_compress_env_vars_duplicate_keys(self): + """Tests that _compress_env_vars handles duplicate keys by taking the last value.""" + env_vars = [ + swarming_pb2.StringPair(key='DUPLICATE_KEY', value='FIRST_VALUE'), + swarming_pb2.StringPair(key='OTHER_KEY', value='OTHER_VALUE'), + swarming_pb2.StringPair(key='DUPLICATE_KEY', value='LAST_VALUE'), + ] + + result = swarming._compress_env_vars(env_vars) # pylint: disable=protected-access + + expected_result = swarming_pb2.StringPair( + key='CF_BOT_VARS', + value='{"DUPLICATE_KEY": "LAST_VALUE", "OTHER_KEY": "OTHER_VALUE"}' + ) + self.assertEqual(result, expected_result) + From b1d346f9da6e49a16856197907a20160b1157c42 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Wed, 25 Feb 2026 02:51:46 +0000 Subject: [PATCH 02/12] Adds new Swarminv api v2 protos --- .../_internal/protos/swarming_pb2.pyi | 689 ++++++++++++++++++ .../_internal/protos/swarming_pb2_grpc.py | 18 + .../_internal/swarming/__init__.py | 2 +- 3 files changed, 708 insertions(+), 1 deletion(-) create mode 100644 src/clusterfuzz/_internal/protos/swarming_pb2.pyi create mode 100644 src/clusterfuzz/_internal/protos/swarming_pb2_grpc.py diff --git a/src/clusterfuzz/_internal/protos/swarming_pb2.pyi b/src/clusterfuzz/_internal/protos/swarming_pb2.pyi new file mode 100644 index 00000000000..fb91d5d240c --- /dev/null +++ b/src/clusterfuzz/_internal/protos/swarming_pb2.pyi @@ -0,0 +1,689 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This file is based on https://source.chromium.org/chromium/infra/infra/+/main:luci/appengine/swarming/proto/api_v2/swarming.proto +This includes necessary messages to construct a NewTaskRequest +""" +import builtins +import collections.abc +import google.protobuf.descriptor +import google.protobuf.internal.containers +import google.protobuf.internal.enum_type_wrapper +import google.protobuf.message +import sys +import typing + +if sys.version_info >= (3, 10): + import typing as typing_extensions +else: + import typing_extensions + +DESCRIPTOR: google.protobuf.descriptor.FileDescriptor + +@typing_extensions.final +class StringPair(google.protobuf.message.Message): + """Messages + + Represents a mapping of string to a string. + + If the StringPair is itself repeated inside another message, the list + must be sorted by key and the keys must be unique. + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.str + value: builtins.str + def __init__( + self, + *, + key: builtins.str = ..., + value: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ... + +global___StringPair = StringPair + +@typing_extensions.final +class StringListPair(google.protobuf.message.Message): + """Represents a mapping of string to a list of strings. + + If the StringListPair is itself repeated inside another message, the list + must be sorted by key and the keys must be unique. + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.str + @property + def value(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """All the values for this key. values must be sorted. Human readable. + + This string should make sense to a user in the context of 'key'. + """ + def __init__( + self, + *, + key: builtins.str = ..., + value: collections.abc.Iterable[builtins.str] | None = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ... + +global___StringListPair = StringListPair + +@typing_extensions.final +class Digest(google.protobuf.message.Message): + """This is a [Digest][build.bazel.remote.execution.v2.Digest] of a blob on + RBE-CAS. See the explanations at the original definition. + pylint: disable=line-too-long + https://github.com/bazelbuild/remote-apis/blob/77cfb44a88577a7ade5dd2400425f6d50469ec6d/build/bazel/remote/execution/v2/remote_execution.proto#L753-L791 + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + HASH_FIELD_NUMBER: builtins.int + SIZE_BYTES_FIELD_NUMBER: builtins.int + hash: builtins.str + size_bytes: builtins.int + def __init__( + self, + *, + hash: builtins.str = ..., + size_bytes: builtins.int = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["hash", b"hash", "size_bytes", b"size_bytes"]) -> None: ... + +global___Digest = Digest + +@typing_extensions.final +class CASReference(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + CAS_INSTANCE_FIELD_NUMBER: builtins.int + DIGEST_FIELD_NUMBER: builtins.int + cas_instance: builtins.str + """Full name of RBE-CAS instance. `projects/{project_id}/instances/{instance}`. + e.g. projects/chromium-swarm/instances/default_instance + """ + @property + def digest(self) -> global___Digest: + """CAS Digest consists of hash and size bytes.""" + def __init__( + self, + *, + cas_instance: builtins.str = ..., + digest: global___Digest | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["digest", b"digest"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["cas_instance", b"cas_instance", "digest", b"digest"]) -> None: ... + +global___CASReference = CASReference + +@typing_extensions.final +class CipdPackage(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + PACKAGE_NAME_FIELD_NUMBER: builtins.int + VERSION_FIELD_NUMBER: builtins.int + PATH_FIELD_NUMBER: builtins.int + package_name: builtins.str + """A CIPD package to install in the run dir before task execution.\"\"\" + A template of a full CIPD package name, e.g. + "infra/tools/authutil/${platform}" + See also cipd.ALL_PARAMS. + """ + version: builtins.str + """Valid package version for all packages matched by package name.""" + path: builtins.str + """Path to dir, relative to the root dir, where to install the package. + If empty, the package will be installed a the root of the mapped directory. + If file names in the package and in the isolate clash, it will cause a + failure. + """ + def __init__( + self, + *, + package_name: builtins.str = ..., + version: builtins.str = ..., + path: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["package_name", b"package_name", "path", b"path", "version", b"version"]) -> None: ... + +global___CipdPackage = CipdPackage + +@typing_extensions.final +class CipdInput(google.protobuf.message.Message): + """Defines CIPD packages to install in task run directory.""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + SERVER_FIELD_NUMBER: builtins.int + CLIENT_PACKAGE_FIELD_NUMBER: builtins.int + PACKAGES_FIELD_NUMBER: builtins.int + server: builtins.str + """URL of the CIPD server. Must start with "https://" or "http://". + This field or its subfields are optional if default cipd client is defined + in the server config. + """ + @property + def client_package(self) -> global___CipdPackage: + """CIPD package of CIPD client to use. + client_package.version is required. + This field is optional is default value is defined in the server config. + client_package.path must be empty. + """ + @property + def packages(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___CipdPackage]: + """List of CIPD packages to install.""" + def __init__( + self, + *, + server: builtins.str = ..., + client_package: global___CipdPackage | None = ..., + packages: collections.abc.Iterable[global___CipdPackage] | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["client_package", b"client_package"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["client_package", b"client_package", "packages", b"packages", "server", b"server"]) -> None: ... + +global___CipdInput = CipdInput + +@typing_extensions.final +class CacheEntry(google.protobuf.message.Message): + """Describes a named cache that should be present on the bot. + + A CacheEntry in a task specified that the task prefers the cache to be present + on the bot. A symlink to the cache directory is created at /|path|. + If cache is not present on the machine, the directory is empty. + If the tasks makes any changes to the contents of the cache directory, they + are persisted on the machine. If another task runs on the same machine and + requests the same named cache, even if mapped to a different path, it will see + the changes. + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + NAME_FIELD_NUMBER: builtins.int + PATH_FIELD_NUMBER: builtins.int + name: builtins.str + """Unique name of the cache. Required. Length is limited to 4096.""" + path: builtins.str + """Relative path to the directory that will be linked to the named cache. + Required. + A path cannot be shared among multiple caches or CIPD installations. + A task will fail if a file/dir with the same name already exists. + """ + def __init__( + self, + *, + name: builtins.str = ..., + path: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["name", b"name", "path", b"path"]) -> None: ... + +global___CacheEntry = CacheEntry + +@typing_extensions.final +class Containment(google.protobuf.message.Message): + """Defines the type of "sandbox" to run the task process in. + + Unimplemented. + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + class _ContainmentType: + ValueType = typing.NewType("ValueType", builtins.int) + V: typing_extensions.TypeAlias = ValueType + + class _ContainmentTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Containment._ContainmentType.ValueType], builtins.type): # noqa: F821 + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + NOT_SPECIFIED: Containment._ContainmentType.ValueType # 0 + """Historical value, not specified. Containment may or may not be used.""" + NONE: Containment._ContainmentType.ValueType # 1 + """No containment, the default for now.""" + AUTO: Containment._ContainmentType.ValueType # 2 + """Use the containment appropriate on the platform.""" + JOB_OBJECT: Containment._ContainmentType.ValueType # 3 + """Use Job Object on Windows. Will fail if used on other platforms.""" + + class ContainmentType(_ContainmentType, metaclass=_ContainmentTypeEnumTypeWrapper): ... + NOT_SPECIFIED: Containment.ContainmentType.ValueType # 0 + """Historical value, not specified. Containment may or may not be used.""" + NONE: Containment.ContainmentType.ValueType # 1 + """No containment, the default for now.""" + AUTO: Containment.ContainmentType.ValueType # 2 + """Use the containment appropriate on the platform.""" + JOB_OBJECT: Containment.ContainmentType.ValueType # 3 + """Use Job Object on Windows. Will fail if used on other platforms.""" + + LOWER_PRIORITY_FIELD_NUMBER: builtins.int + CONTAINMENT_TYPE_FIELD_NUMBER: builtins.int + LIMIT_PROCESSES_FIELD_NUMBER: builtins.int + LIMIT_TOTAL_COMMITTED_MEMORY_FIELD_NUMBER: builtins.int + lower_priority: builtins.bool + """Lowers the priority of the task process when started. Doesn't require + containment. This gives the bot a chance to survive when the task starts an + overwhelming number of children processes. + """ + containment_type: global___Containment.ContainmentType.ValueType + """Defines the type of containment used.""" + limit_processes: builtins.int + """The values below require a form of containment to be enforced. + + Limits the number of concurrent active processes. + """ + limit_total_committed_memory: builtins.int + """Limits the total amount of memory allocated by processes.""" + def __init__( + self, + *, + lower_priority: builtins.bool = ..., + containment_type: global___Containment.ContainmentType.ValueType = ..., + limit_processes: builtins.int = ..., + limit_total_committed_memory: builtins.int = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["containment_type", b"containment_type", "limit_processes", b"limit_processes", "limit_total_committed_memory", b"limit_total_committed_memory", "lower_priority", b"lower_priority"]) -> None: ... + +global___Containment = Containment + +@typing_extensions.final +class TaskProperties(google.protobuf.message.Message): + """Important metadata about a particular task.""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + CACHES_FIELD_NUMBER: builtins.int + CIPD_INPUT_FIELD_NUMBER: builtins.int + COMMAND_FIELD_NUMBER: builtins.int + RELATIVE_CWD_FIELD_NUMBER: builtins.int + DIMENSIONS_FIELD_NUMBER: builtins.int + ENV_FIELD_NUMBER: builtins.int + ENV_PREFIXES_FIELD_NUMBER: builtins.int + EXECUTION_TIMEOUT_SECS_FIELD_NUMBER: builtins.int + GRACE_PERIOD_SECS_FIELD_NUMBER: builtins.int + IDEMPOTENT_FIELD_NUMBER: builtins.int + CAS_INPUT_ROOT_FIELD_NUMBER: builtins.int + IO_TIMEOUT_SECS_FIELD_NUMBER: builtins.int + OUTPUTS_FIELD_NUMBER: builtins.int + SECRET_BYTES_FIELD_NUMBER: builtins.int + CONTAINMENT_FIELD_NUMBER: builtins.int + @property + def caches(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___CacheEntry]: + """Specifies named caches to map into the working directory. These caches + outlives the task, which can then be reused by tasks later used on this bot + that request the same named cache. + """ + @property + def cipd_input(self) -> global___CipdInput: + """CIPD packages to install. These packages are meant to be software that is + needed (a dependency) to the task being run. Unlike isolated files, the CIPD + packages do not expire from the server. + """ + @property + def command(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """Command to run. This has priority over a command specified in the isolated + files. + """ + relative_cwd: builtins.str + """Relative working directory to start the 'command' in, defaults to the root + mapped directory or what is provided in the isolated file, if any. + """ + @property + def dimensions(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___StringPair]: + """Dimensions are what is used to determine which bot can run the task. The + bot must have all the matching dimensions, even for repeated keys with + multiple different values. It is a logical AND, all values must match. + + It should have been a StringListPair but this would be a breaking change. + """ + @property + def env(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___StringPair]: + """Environment variables to set when running the task.""" + @property + def env_prefixes(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___StringListPair]: + """Swarming-root relative paths to prepend to a given environment variable. + + These allow you to put certain subdirectories of the task into PATH, + PYTHONPATH, or other PATH-like environment variables. The order of + operations is: + * Turn slashes into native-platform slashes. + * Make the path absolute + * Prepend it to the current value of the envvar using the os-native list + separator (i.e. `;` on windows, `:` on POSIX). + + Each envvar can have multiple paths to prepend. They will be prepended in + the order seen here. + + For example, if env_prefixes was: + [("PATH", ["foo", "bar"]), + ("CUSTOMPATH", ["custom"])] + + The task would see: + PATH=/path/to/swarming/rundir/foo:/path/to/swarming/rundir/bar:$PATH + CUSTOMPATH=/path/to/swarming/rundir/custom + + The path should always be specified here with forward-slashes, and it must + not attempt to escape the swarming root (i.e. must not contain `..`). + + These are applied AFTER evaluating `env` entries. + """ + execution_timeout_secs: builtins.int + """Maximum number of seconds the task can run before its process is forcibly + terminated and the task results in TIMED_OUT. + """ + grace_period_secs: builtins.int + """Number of second to give the child process after a SIGTERM before sending a + SIGKILL. See doc/Bot.md#timeout-handling + """ + idempotent: builtins.bool + """True if the task does not access any service through the network and is + believed to be 100% reproducible with the same outcome. In the case of a + successful task, previous results will be reused if possible. + """ + @property + def cas_input_root(self) -> global___CASReference: + """Digest of the input root uploaded to RBE-CAS. + This MUST be digest of [build.bazel.remote.execution.v2.Directory]. + """ + io_timeout_secs: builtins.int + """Maximum number of seconds the task may be silent (no output to stdout nor + stderr) before it is considered hung and it forcibly terminated early and + the task results in TIMED_OUT. + """ + @property + def outputs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """Paths in the working directory to archive back.""" + secret_bytes: builtins.bytes + """Secret bytes to provide to the task. Cannot be retrieved back.""" + @property + def containment(self) -> global___Containment: + """Containment of the task processes.""" + def __init__( + self, + *, + caches: collections.abc.Iterable[global___CacheEntry] | None = ..., + cipd_input: global___CipdInput | None = ..., + command: collections.abc.Iterable[builtins.str] | None = ..., + relative_cwd: builtins.str = ..., + dimensions: collections.abc.Iterable[global___StringPair] | None = ..., + env: collections.abc.Iterable[global___StringPair] | None = ..., + env_prefixes: collections.abc.Iterable[global___StringListPair] | None = ..., + execution_timeout_secs: builtins.int = ..., + grace_period_secs: builtins.int = ..., + idempotent: builtins.bool = ..., + cas_input_root: global___CASReference | None = ..., + io_timeout_secs: builtins.int = ..., + outputs: collections.abc.Iterable[builtins.str] | None = ..., + secret_bytes: builtins.bytes = ..., + containment: global___Containment | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["cas_input_root", b"cas_input_root", "cipd_input", b"cipd_input", "containment", b"containment"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["caches", b"caches", "cas_input_root", b"cas_input_root", "cipd_input", b"cipd_input", "command", b"command", "containment", b"containment", "dimensions", b"dimensions", "env", b"env", "env_prefixes", b"env_prefixes", "execution_timeout_secs", b"execution_timeout_secs", "grace_period_secs", b"grace_period_secs", "idempotent", b"idempotent", "io_timeout_secs", b"io_timeout_secs", "outputs", b"outputs", "relative_cwd", b"relative_cwd", "secret_bytes", b"secret_bytes"]) -> None: ... + +global___TaskProperties = TaskProperties + +@typing_extensions.final +class TaskSlice(google.protobuf.message.Message): + """Defines a possible task execution for a task request to be run on the + Swarming infrastructure. + + This is one of the possible fallback on a task request. + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + PROPERTIES_FIELD_NUMBER: builtins.int + EXPIRATION_SECS_FIELD_NUMBER: builtins.int + WAIT_FOR_CAPACITY_FIELD_NUMBER: builtins.int + @property + def properties(self) -> global___TaskProperties: + """The property of the task to try to run. + + If there is no bot that can serve this properties.dimensions when this task + slice is enqueued, it is immediately denied. This can trigger if: + - There is no bot with these dimensions currently known. + - Bots that could run this task are either all dead or quarantined. + Swarming considers a bot dead if it hasn't pinged in the last N minutes + (currently 10 minutes). + """ + expiration_secs: builtins.int + """Maximum of seconds the task slice may stay PENDING. + + If this task request slice is not scheduled after waiting this long, the + next one will be processed. If this slice is the last one, the task state + will be set to EXPIRED. + """ + wait_for_capacity: builtins.bool + """When a task is scheduled and there are currently no bots available to run + the task, the TaskSlice can either be PENDING, or be denied immediately. + When denied, the next TaskSlice is enqueued, and if there's no following + TaskSlice, the task state is set to NO_RESOURCE. This should normally be + set to False to avoid unnecessary waiting. + """ + def __init__( + self, + *, + properties: global___TaskProperties | None = ..., + expiration_secs: builtins.int = ..., + wait_for_capacity: builtins.bool = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["properties", b"properties"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["expiration_secs", b"expiration_secs", "properties", b"properties", "wait_for_capacity", b"wait_for_capacity"]) -> None: ... + +global___TaskSlice = TaskSlice + +@typing_extensions.final +class ResultDBCfg(google.protobuf.message.Message): + """Swarming:ResultDB integration configuration for a task. + See NewTaskRequest.resultdb for more details. + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + ENABLE_FIELD_NUMBER: builtins.int + enable: builtins.bool + """If True and this task is not deduplicated, create + "task-{swarming_hostname}-{run_id}" invocation for this task, + provide its update token to the task subprocess via LUCI_CONTEXT + and finalize the invocation when the task is done. + If the task is deduplicated, then TaskResult.invocation_name will be the + invocation name of the original task. + Swarming:ResultDB integration is off by default, but it may change in the + future. + """ + def __init__( + self, + *, + enable: builtins.bool = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["enable", b"enable"]) -> None: ... + +global___ResultDBCfg = ResultDBCfg + +@typing_extensions.final +class NewTaskRequest(google.protobuf.message.Message): + """Description of a new task request as described by the client. + This message is used to create a new task. + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + class _PoolTaskTemplateField: + ValueType = typing.NewType("ValueType", builtins.int) + V: typing_extensions.TypeAlias = ValueType + + class _PoolTaskTemplateFieldEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[NewTaskRequest._PoolTaskTemplateField.ValueType], builtins.type): # noqa: F821 + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + AUTO: NewTaskRequest._PoolTaskTemplateField.ValueType # 0 + CANARY_PREFER: NewTaskRequest._PoolTaskTemplateField.ValueType # 1 + CANARY_NEVER: NewTaskRequest._PoolTaskTemplateField.ValueType # 2 + SKIP: NewTaskRequest._PoolTaskTemplateField.ValueType # 3 + + class PoolTaskTemplateField(_PoolTaskTemplateField, metaclass=_PoolTaskTemplateFieldEnumTypeWrapper): + """Controls the application of the pool's TaskTemplate to the creation of this + task. By default this will automatically select the pool's preference for + template, but you can also instruct swarming to prefer/prevent the + application of canary templates, as well as skipping the template + altogether. + """ + + AUTO: NewTaskRequest.PoolTaskTemplateField.ValueType # 0 + CANARY_PREFER: NewTaskRequest.PoolTaskTemplateField.ValueType # 1 + CANARY_NEVER: NewTaskRequest.PoolTaskTemplateField.ValueType # 2 + SKIP: NewTaskRequest.PoolTaskTemplateField.ValueType # 3 + + EXPIRATION_SECS_FIELD_NUMBER: builtins.int + NAME_FIELD_NUMBER: builtins.int + PARENT_TASK_ID_FIELD_NUMBER: builtins.int + PRIORITY_FIELD_NUMBER: builtins.int + PROPERTIES_FIELD_NUMBER: builtins.int + TASK_SLICES_FIELD_NUMBER: builtins.int + TAGS_FIELD_NUMBER: builtins.int + USER_FIELD_NUMBER: builtins.int + SERVICE_ACCOUNT_FIELD_NUMBER: builtins.int + PUBSUB_TOPIC_FIELD_NUMBER: builtins.int + PUBSUB_AUTH_TOKEN_FIELD_NUMBER: builtins.int + PUBSUB_USERDATA_FIELD_NUMBER: builtins.int + EVALUATE_ONLY_FIELD_NUMBER: builtins.int + POOL_TASK_TEMPLATE_FIELD_NUMBER: builtins.int + BOT_PING_TOLERANCE_SECS_FIELD_NUMBER: builtins.int + REQUEST_UUID_FIELD_NUMBER: builtins.int + RESULTDB_FIELD_NUMBER: builtins.int + REALM_FIELD_NUMBER: builtins.int + expiration_secs: builtins.int + """DEPRECATED. Use task_slices[0].expiration_secs.""" + name: builtins.str + """Task name for display purpose.""" + parent_task_id: builtins.str + """Parent Swarming run ID of the process requesting this task. This is to tell + the server about reentrancy: when a task creates children Swarming tasks, so + that the tree of tasks can be presented in the UI; the parent task will list + all the children tasks that were triggered. + """ + priority: builtins.int + """Task priority, the lower the more important.""" + @property + def properties(self) -> global___TaskProperties: + """DEPRECATED. Use task_slices[0].properties.""" + @property + def task_slices(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___TaskSlice]: + """Slice of TaskSlice, along their scheduling parameters. Cannot be used at the + same time as properties and expiration_secs. + + This defines all the various possible task execution for a task request to + be run on the Swarming infrastructure. They are processed in order, and it + is guaranteed that at most one of these will be processed. + """ + @property + def tags(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """Tags are 'key:value' strings that describes what the task is about. This can + later be leveraged to search for kinds of tasks per tag. + """ + user: builtins.str + """User on which behalf this task is run, if relevant. Not validated.""" + service_account: builtins.str + """Defines what OAuth2 credentials the task uses when calling other services. + + Possible values are: + - 'none': do not use a task service account at all, this is the default. + - 'bot': use bot's own account, works only if bots authenticate with + OAuth2. + - : use this specific service account if it is allowed in the + pool (via 'allowed_service_account' pools.cfg setting) and configured + in the token server's service_accounts.cfg. + + Note that the service account name is specified outside of task properties, + and thus it is possible to have two tasks with different service accounts, + but identical properties hash (so one can be deduped). If this is unsuitable + use 'idempotent=False' or include a service account name in properties + separately. + """ + pubsub_topic: builtins.str + """Full topic name to post task state updates to, e.g. + "projects//topics/". + """ + pubsub_auth_token: builtins.str + """Secret string to put into "auth_token" attribute of PubSub message.""" + pubsub_userdata: builtins.str + """Will be but into "userdata" fields of PubSub message.""" + evaluate_only: builtins.bool + """Only evaluate the task, as if we were going to schedule it, but don't + actually schedule it. This will return the TaskRequest, but without + a task_id. + """ + pool_task_template: global___NewTaskRequest.PoolTaskTemplateField.ValueType + bot_ping_tolerance_secs: builtins.int + """Maximum delay between bot pings before the bot is considered dead + while running a task. + """ + request_uuid: builtins.str + """This is used to make new task request idempotent in best effort. + If new request has request_uuid field, it checks memcache before scheduling + actual task to check there is already the task triggered by same request + previously. + """ + @property + def resultdb(self) -> global___ResultDBCfg: + """Configuration of Swarming:ResultDB integration.""" + realm: builtins.str + """Task realm. + See api/swarming.proto for more details. + """ + def __init__( + self, + *, + expiration_secs: builtins.int = ..., + name: builtins.str = ..., + parent_task_id: builtins.str = ..., + priority: builtins.int = ..., + properties: global___TaskProperties | None = ..., + task_slices: collections.abc.Iterable[global___TaskSlice] | None = ..., + tags: collections.abc.Iterable[builtins.str] | None = ..., + user: builtins.str = ..., + service_account: builtins.str = ..., + pubsub_topic: builtins.str = ..., + pubsub_auth_token: builtins.str = ..., + pubsub_userdata: builtins.str = ..., + evaluate_only: builtins.bool = ..., + pool_task_template: global___NewTaskRequest.PoolTaskTemplateField.ValueType = ..., + bot_ping_tolerance_secs: builtins.int = ..., + request_uuid: builtins.str = ..., + resultdb: global___ResultDBCfg | None = ..., + realm: builtins.str = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["properties", b"properties", "resultdb", b"resultdb"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["bot_ping_tolerance_secs", b"bot_ping_tolerance_secs", "evaluate_only", b"evaluate_only", "expiration_secs", b"expiration_secs", "name", b"name", "parent_task_id", b"parent_task_id", "pool_task_template", b"pool_task_template", "priority", b"priority", "properties", b"properties", "pubsub_auth_token", b"pubsub_auth_token", "pubsub_topic", b"pubsub_topic", "pubsub_userdata", b"pubsub_userdata", "realm", b"realm", "request_uuid", b"request_uuid", "resultdb", b"resultdb", "service_account", b"service_account", "tags", b"tags", "task_slices", b"task_slices", "user", b"user"]) -> None: ... + +global___NewTaskRequest = NewTaskRequest diff --git a/src/clusterfuzz/_internal/protos/swarming_pb2_grpc.py b/src/clusterfuzz/_internal/protos/swarming_pb2_grpc.py new file mode 100644 index 00000000000..7553de3ca87 --- /dev/null +++ b/src/clusterfuzz/_internal/protos/swarming_pb2_grpc.py @@ -0,0 +1,18 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index 9c056a37262..fce67eb331b 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -37,7 +37,7 @@ def _requires_gpu() -> bool: def is_swarming_task(command: str, job_name: str): """Returns True if the task is supposed to run on swarming. Currently swarming only supports Linux tasks.""" job = data_types.Job.query(data_types.Job.name == job_name).get() - if not job or not _requires_gpu() or not str(job.platform).upper().startswith('LINUX'): + if not job or not _requires_gpu(): return False try: _get_new_task_spec(command, job_name, '') From 622531863fac41a011776cb77fafc5862724196e Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Wed, 25 Feb 2026 03:22:23 +0000 Subject: [PATCH 03/12] Formats changes so they comply with lint --- .../_internal/swarming/__init__.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index fce67eb331b..f1deba628fd 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -14,10 +14,10 @@ """Swarming helpers.""" import base64 +from json import dumps import uuid from google.protobuf import json_format -from json import dumps from clusterfuzz._internal.base import utils from clusterfuzz._internal.config import local_config @@ -35,9 +35,10 @@ def _requires_gpu() -> bool: def is_swarming_task(command: str, job_name: str): - """Returns True if the task is supposed to run on swarming. Currently swarming only supports Linux tasks.""" + """Returns True if the task is supposed to run on swarming. + Currently swarming only supports Linux tasks.""" job = data_types.Job.query(data_types.Job.name == job_name).get() - if not job or not _requires_gpu(): + if not job or not _requires_gpu(): return False try: _get_new_task_spec(command, job_name, '') @@ -145,13 +146,17 @@ def _get_new_task_spec(command: str, job_name: str, return new_task_request -def _compress_env_vars(env_vars: list[swarming_pb2.StringPair]) -> swarming_pb2.StringPair: + +def _compress_env_vars( + env_vars: list[swarming_pb2.StringPair]) -> swarming_pb2.StringPair: # pylint: disable=no-member """ - Compresses all env variables into a single JSON string , which will be used to set up the - env variables in swarming bots that launch clusterfuzz using a docker container. + Compresses all env variables into a single JSON string , which will be used + to set up the env variables in swarming bots that launch clusterfuzz + using a docker container. """ env_vars_dict = {pair.key: pair.value for pair in env_vars} - return swarming_pb2.StringPair(key='CF_BOT_VARS', value=dumps(env_vars_dict)) + return swarming_pb2.StringPair(key='CF_BOT_VARS', value=dumps(env_vars_dict)) # pylint: disable=no-member + def push_swarming_task(command, download_url, job_type): """Schedules a task on swarming.""" From 004f1ab77ebe29ec5bfbef2a10b6b6df0149303f Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Wed, 25 Feb 2026 03:33:34 +0000 Subject: [PATCH 04/12] Formats test file that was left out of previous lint run --- .../tests/core/swarming/swarming_test.py | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py index eaa5cd40d20..3b6ec8e15f9 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py @@ -291,25 +291,20 @@ def test_compress_env_vars(self): swarming_pb2.StringPair(key='ENV_VAR1', value='VALUE1'), swarming_pb2.StringPair(key='ENV_VAR2', value='VALUE2'), ] - + result = swarming._compress_env_vars(env_vars) # pylint: disable=protected-access - + expected_result = swarming_pb2.StringPair( - key='CF_BOT_VARS', - value='{"ENV_VAR1": "VALUE1", "ENV_VAR2": "VALUE2"}' - ) + key='CF_BOT_VARS', value='{"ENV_VAR1": "VALUE1", "ENV_VAR2": "VALUE2"}') self.assertEqual(result, expected_result) def test_compress_env_vars_empty(self): """Tests that _compress_env_vars handles an empty list of environment variables safely.""" env_vars = [] - + result = swarming._compress_env_vars(env_vars) # pylint: disable=protected-access - - expected_result = swarming_pb2.StringPair( - key='CF_BOT_VARS', - value='{}' - ) + + expected_result = swarming_pb2.StringPair(key='CF_BOT_VARS', value='{}') self.assertEqual(result, expected_result) def test_compress_env_vars_duplicate_keys(self): @@ -319,12 +314,10 @@ def test_compress_env_vars_duplicate_keys(self): swarming_pb2.StringPair(key='OTHER_KEY', value='OTHER_VALUE'), swarming_pb2.StringPair(key='DUPLICATE_KEY', value='LAST_VALUE'), ] - + result = swarming._compress_env_vars(env_vars) # pylint: disable=protected-access - + expected_result = swarming_pb2.StringPair( key='CF_BOT_VARS', - value='{"DUPLICATE_KEY": "LAST_VALUE", "OTHER_KEY": "OTHER_VALUE"}' - ) + value='{"DUPLICATE_KEY": "LAST_VALUE", "OTHER_KEY": "OTHER_VALUE"}') self.assertEqual(result, expected_result) - From dd33866d21ffef43ae996417ef244fbc33a54e7b Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Wed, 25 Feb 2026 22:24:48 +0000 Subject: [PATCH 05/12] Adds TODO comment --- src/clusterfuzz/_internal/swarming/__init__.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index f1deba628fd..fad06840a25 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -35,10 +35,11 @@ def _requires_gpu() -> bool: def is_swarming_task(command: str, job_name: str): - """Returns True if the task is supposed to run on swarming. - Currently swarming only supports Linux tasks.""" + """Returns True if the task is supposed to run on swarming.""" + # TODO: b/487716733 - Allow clusterfuzz to trigger swarming tasks for MAC and Windows job = data_types.Job.query(data_types.Job.name == job_name).get() - if not job or not _requires_gpu(): + if not job or not _requires_gpu() or str( + job.platform).upper() == 'MAC' or str(job.platform).upper() == 'WINDOWS': return False try: _get_new_task_spec(command, job_name, '') From a234fc9c77aef70109745535f3a60845723eb1b7 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Wed, 25 Feb 2026 22:33:20 +0000 Subject: [PATCH 06/12] Fixes format --- src/clusterfuzz/_internal/swarming/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index fad06840a25..bd89d460672 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -36,7 +36,7 @@ def _requires_gpu() -> bool: def is_swarming_task(command: str, job_name: str): """Returns True if the task is supposed to run on swarming.""" - # TODO: b/487716733 - Allow clusterfuzz to trigger swarming tasks for MAC and Windows + # TODO: b/487716733 - Trigger swarming tasks for MAC and Windows job = data_types.Job.query(data_types.Job.name == job_name).get() if not job or not _requires_gpu() or str( job.platform).upper() == 'MAC' or str(job.platform).upper() == 'WINDOWS': From 79c780e0c671d52ce07f6e5ac0f37b2ff4b9d7d0 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Thu, 26 Feb 2026 17:37:34 +0000 Subject: [PATCH 07/12] Renames new method for more clarity --- src/clusterfuzz/_internal/swarming/__init__.py | 4 ++-- .../tests/core/swarming/swarming_test.py | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index bd89d460672..036ef5a5c2c 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -109,7 +109,7 @@ def _get_new_task_spec(command: str, job_name: str, key='DOCKER_IMAGE', value=instance_spec['docker_image'])) - task_environment.append(_compress_env_vars(task_environment)) + task_environment.append(_env_vars_to_json(task_environment)) task_dimensions = [ swarming_pb2.StringPair(key='os', value=job.platform), # pylint: disable=no-member @@ -148,7 +148,7 @@ def _get_new_task_spec(command: str, job_name: str, return new_task_request -def _compress_env_vars( +def _env_vars_to_json( env_vars: list[swarming_pb2.StringPair]) -> swarming_pb2.StringPair: # pylint: disable=no-member """ Compresses all env variables into a single JSON string , which will be used diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py index 3b6ec8e15f9..ad537c5b557 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py @@ -285,37 +285,37 @@ def test_push_swarming_task(self): data=json_format.MessageToJson(expected_new_task_request), headers=expected_headers) - def test_compress_env_vars(self): - """Tests (Happy Path) that _compress_env_vars correctly compresses environment variables into a JSON string.""" + def test_env_vars_to_json(self): + """Tests (Happy Path) that _env_vars_to_json correctly compresses environment variables into a JSON string.""" env_vars = [ swarming_pb2.StringPair(key='ENV_VAR1', value='VALUE1'), swarming_pb2.StringPair(key='ENV_VAR2', value='VALUE2'), ] - result = swarming._compress_env_vars(env_vars) # pylint: disable=protected-access + result = swarming._env_vars_to_json(env_vars) # pylint: disable=protected-access expected_result = swarming_pb2.StringPair( key='CF_BOT_VARS', value='{"ENV_VAR1": "VALUE1", "ENV_VAR2": "VALUE2"}') self.assertEqual(result, expected_result) - def test_compress_env_vars_empty(self): - """Tests that _compress_env_vars handles an empty list of environment variables safely.""" + def test_env_vars_to_json_empty(self): + """Tests that _env_vars_to_json handles an empty list of environment variables safely.""" env_vars = [] - result = swarming._compress_env_vars(env_vars) # pylint: disable=protected-access + result = swarming._env_vars_to_json(env_vars) # pylint: disable=protected-access expected_result = swarming_pb2.StringPair(key='CF_BOT_VARS', value='{}') self.assertEqual(result, expected_result) - def test_compress_env_vars_duplicate_keys(self): - """Tests that _compress_env_vars handles duplicate keys by taking the last value.""" + def test_env_vars_to_json_duplicate_keys(self): + """Tests that _env_vars_to_json handles duplicate keys by taking the last value.""" env_vars = [ swarming_pb2.StringPair(key='DUPLICATE_KEY', value='FIRST_VALUE'), swarming_pb2.StringPair(key='OTHER_KEY', value='OTHER_VALUE'), swarming_pb2.StringPair(key='DUPLICATE_KEY', value='LAST_VALUE'), ] - result = swarming._compress_env_vars(env_vars) # pylint: disable=protected-access + result = swarming._env_vars_to_json(env_vars) # pylint: disable=protected-access expected_result = swarming_pb2.StringPair( key='CF_BOT_VARS', From 5e7ad93c056737c91c2f19bc932b199c7a2920c1 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Thu, 26 Feb 2026 20:05:06 +0000 Subject: [PATCH 08/12] Updates proto's copyright date --- src/clusterfuzz/_internal/protos/swarming_pb2.py | 2 +- src/clusterfuzz/_internal/protos/swarming_pb2_grpc.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/clusterfuzz/_internal/protos/swarming_pb2.py b/src/clusterfuzz/_internal/protos/swarming_pb2.py index 543c11cce9f..3cf6e532c7a 100644 --- a/src/clusterfuzz/_internal/protos/swarming_pb2.py +++ b/src/clusterfuzz/_internal/protos/swarming_pb2.py @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2026 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/src/clusterfuzz/_internal/protos/swarming_pb2_grpc.py b/src/clusterfuzz/_internal/protos/swarming_pb2_grpc.py index 7553de3ca87..e88584a711d 100644 --- a/src/clusterfuzz/_internal/protos/swarming_pb2_grpc.py +++ b/src/clusterfuzz/_internal/protos/swarming_pb2_grpc.py @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2026 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 8d791634c2ec043a4e49924495a67f5a19f35e60 Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Thu, 26 Feb 2026 20:07:45 +0000 Subject: [PATCH 09/12] Reverts wrong date update for a proto --- src/clusterfuzz/_internal/protos/swarming_pb2.py | 2 +- src/clusterfuzz/_internal/protos/swarming_pb2.pyi | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/clusterfuzz/_internal/protos/swarming_pb2.py b/src/clusterfuzz/_internal/protos/swarming_pb2.py index 3cf6e532c7a..543c11cce9f 100644 --- a/src/clusterfuzz/_internal/protos/swarming_pb2.py +++ b/src/clusterfuzz/_internal/protos/swarming_pb2.py @@ -1,4 +1,4 @@ -# Copyright 2026 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/src/clusterfuzz/_internal/protos/swarming_pb2.pyi b/src/clusterfuzz/_internal/protos/swarming_pb2.pyi index fb91d5d240c..85af398cb93 100644 --- a/src/clusterfuzz/_internal/protos/swarming_pb2.pyi +++ b/src/clusterfuzz/_internal/protos/swarming_pb2.pyi @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2026 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 20af5e361dfe4788f6c3434c55adc95bbc12c1fa Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Fri, 27 Feb 2026 08:47:42 +0000 Subject: [PATCH 10/12] Refactors is_swarming_task conditional --- .../_internal/bot/tasks/task_types.py | 10 +++--- .../_internal/bot/tasks/utasks/__init__.py | 10 +++--- .../_internal/swarming/__init__.py | 33 +++++++++---------- .../tests/core/swarming/swarming_test.py | 16 ++++----- 4 files changed, 33 insertions(+), 36 deletions(-) diff --git a/src/clusterfuzz/_internal/bot/tasks/task_types.py b/src/clusterfuzz/_internal/bot/tasks/task_types.py index 1d0e38a8ae7..aba7152b54d 100644 --- a/src/clusterfuzz/_internal/bot/tasks/task_types.py +++ b/src/clusterfuzz/_internal/bot/tasks/task_types.py @@ -115,8 +115,8 @@ def is_remote_utask(command, job): # Return True even if we can't query the db. return True - return batch_service.is_remote_task( - command, job) or swarming.is_swarming_task(command, job) + return batch_service.is_remote_task(command, + job) or swarming.is_swarming_task(job) def task_main_runs_on_uworker(): @@ -181,8 +181,10 @@ def execute(self, task_argument, job_type, uworker_env): if batch_service.is_remote_task(command, job_type): tasks.add_utask_main(command, download_url, job_type) else: - assert swarming.is_swarming_task(command, job_type) - swarming.push_swarming_task(command, download_url, job_type) + swarming_task = swarming.create_new_task_request(command, job_type, + download_url) + if swarming_task: + swarming.push_swarming_task(swarming_task) @logs.task_stage_context(logs.Stage.PREPROCESS) def preprocess(self, task_argument, job_type, uworker_env): diff --git a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py index 63924785a37..107ea5d907a 100644 --- a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py +++ b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py @@ -63,10 +63,9 @@ def _timestamp_now() -> Timestamp: return ts -def _get_execution_mode(utask_module, job_type): +def _get_execution_mode(job_type): """Determines whether this task in executed on swarming or batch.""" - command = task_utils.get_command_from_module(utask_module.__name__) - if swarming.is_swarming_task(command, job_type): + if swarming.is_swarming_task(job_type): return Mode.SWARMING return Mode.BATCH @@ -410,7 +409,7 @@ def tworker_preprocess(utask_module, task_argument, job_type, uworker_env): signed download URL for the uworker's input and the (unsigned) download URL for its output.""" with _MetricRecorder(_Subtask.PREPROCESS) as recorder: - execution_mode = _get_execution_mode(utask_module, job_type) + execution_mode = _get_execution_mode(job_type) uworker_input = _preprocess(utask_module, task_argument, job_type, uworker_env, recorder, execution_mode) if not uworker_input: @@ -501,8 +500,7 @@ def tworker_postprocess(output_download_url) -> None: task_utils.reset_task_stage_env() utask_module = get_utask_module(uworker_output.uworker_input.module_name) - execution_mode = _get_execution_mode(utask_module, - uworker_output.uworker_input.job_type) + execution_mode = _get_execution_mode(uworker_output.uworker_input.job_type) recorder.set_task_details( utask_module, uworker_output.uworker_input.job_type, diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index 036ef5a5c2c..6c48829d442 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -34,18 +34,15 @@ def _requires_gpu() -> bool: return bool(utils.string_is_true(requires_gpu)) -def is_swarming_task(command: str, job_name: str): +def is_swarming_task(job_name: str): """Returns True if the task is supposed to run on swarming.""" # TODO: b/487716733 - Trigger swarming tasks for MAC and Windows job = data_types.Job.query(data_types.Job.name == job_name).get() - if not job or not _requires_gpu() or str( - job.platform).upper() == 'MAC' or str(job.platform).upper() == 'WINDOWS': - return False - try: - _get_new_task_spec(command, job_name, '') - return True - except ValueError: + swarming_config = _get_swarming_config() + platform_config = swarming_config.get('mapping').get(job.platform, None) + if not job or not _requires_gpu() or platform_config == None: return False + return True def _get_task_name(): @@ -57,10 +54,15 @@ def _get_swarming_config(): return local_config.SwarmingConfig() -def _get_new_task_spec(command: str, job_name: str, - download_url: str) -> swarming_pb2.NewTaskRequest: # pylint: disable=no-member - """Gets the configured specifications for a swarming task.""" +def create_new_task_request(command: str, job_name: str, download_url: str + ) -> swarming_pb2.NewTaskRequest | None: # pylint: disable=no-member + """Gets the configured specifications for a swarming task. + Returns None if the task should'nt be executed on swarming""" job = data_types.Job.query(data_types.Job.name == job_name).get() + + if not is_swarming_task(job_name): + return None + config_name = job.platform swarming_config = _get_swarming_config() instance_spec = swarming_config.get('mapping').get(config_name, None) @@ -159,13 +161,8 @@ def _env_vars_to_json( return swarming_pb2.StringPair(key='CF_BOT_VARS', value=dumps(env_vars_dict)) # pylint: disable=no-member -def push_swarming_task(command, download_url, job_type): +def push_swarming_task(task_request: swarming_pb2.NewTaskRequest): # pylint: disable=no-member """Schedules a task on swarming.""" - job = data_types.Job.query(data_types.Job.name == job_type).get() - if not job: - raise ValueError('invalid job_name') - - task_spec = _get_new_task_spec(command, job_type, download_url) creds, _ = credentials.get_default() headers = { 'Accept': 'application/json', @@ -175,4 +172,4 @@ def push_swarming_task(command, download_url, job_type): swarming_server = _get_swarming_config().get('swarming_server') url = f'https://{swarming_server}/prpc/swarming.v2.Tasks/NewTask' utils.post_url( - url=url, data=json_format.MessageToJson(task_spec), headers=headers) + url=url, data=json_format.MessageToJson(task_request), headers=headers) diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py index ad537c5b557..32c7566044b 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py @@ -41,7 +41,7 @@ def test_get_spec_from_config_with_docker_image(self): """Tests that _get_new_task_spec works as expected.""" job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') job.put() - spec = swarming._get_new_task_spec( # pylint: disable=protected-access + spec = swarming.create_new_task_request( # pylint: disable=protected-access 'corpus_pruning', job.name, 'https://download_url') expected_spec = swarming_pb2.NewTaskRequest( name='task_name', @@ -96,14 +96,14 @@ def test_get_spec_from_config_raises_error_on_unknown_config(self): job = data_types.Job(name='some_job_name', platform='UNKNOWN-PLATFORM') job.put() with self.assertRaises(ValueError): - swarming._get_new_task_spec( # pylint: disable=protected-access + swarming.create_new_task_request( # pylint: disable=protected-access 'corpus_pruning', job.name, 'https://download_url') def test_get_spec_from_config_without_docker_image(self): """Tests that _get_new_task_spec works as expected (without a docker image).""" job = data_types.Job(name='libfuzzer_chrome_asan', platform='MAC') job.put() - spec = swarming._get_new_task_spec( # pylint: disable=protected-access + spec = swarming.create_new_task_request( # pylint: disable=protected-access 'corpus_pruning', job.name, 'https://download_url') expected_spec = swarming_pb2.NewTaskRequest( name='task_name', @@ -172,7 +172,7 @@ def test_get_spec_from_config_for_fuzz_task(self): """Tests that _get_new_task_spec works as expected for fuzz commands.""" job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') job.put() - spec = swarming._get_new_task_spec( # pylint: disable=protected-access + spec = swarming.create_new_task_request( # pylint: disable=protected-access 'fuzz', job.name, 'https://download_url') expected_spec = swarming_pb2.NewTaskRequest( name='task_name', @@ -225,7 +225,9 @@ def test_push_swarming_task(self): """Tests that push_swarming_task works as expected.""" job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') job.put() - swarming.push_swarming_task('fuzz', 'https://download_url', job.name) + test_request = swarming.create_new_task_request( + 'fuzz', 'https://download_url', job.name) + swarming.push_swarming_task(test_request) expected_new_task_request = swarming_pb2.NewTaskRequest( name='task_name', @@ -300,9 +302,7 @@ def test_env_vars_to_json(self): def test_env_vars_to_json_empty(self): """Tests that _env_vars_to_json handles an empty list of environment variables safely.""" - env_vars = [] - - result = swarming._env_vars_to_json(env_vars) # pylint: disable=protected-access + result = swarming._env_vars_to_json([]) # pylint: disable=protected-access expected_result = swarming_pb2.StringPair(key='CF_BOT_VARS', value='{}') self.assertEqual(result, expected_result) From 672d88058942dbd20eacb2184eaa0e20c2b6710b Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Fri, 27 Feb 2026 08:54:32 +0000 Subject: [PATCH 11/12] Fixes Lint issue --- src/clusterfuzz/_internal/swarming/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index 6c48829d442..dde95b2d4c0 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -40,7 +40,7 @@ def is_swarming_task(job_name: str): job = data_types.Job.query(data_types.Job.name == job_name).get() swarming_config = _get_swarming_config() platform_config = swarming_config.get('mapping').get(job.platform, None) - if not job or not _requires_gpu() or platform_config == None: + if not job or not _requires_gpu() or platform_config is None: return False return True From e6f179cfe2554aba85056295371096d25e16d5ca Mon Sep 17 00:00:00 2001 From: Ivan Barba Date: Fri, 27 Feb 2026 09:22:26 +0000 Subject: [PATCH 12/12] Fixes UT --- .../_internal/tests/core/swarming/swarming_test.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py index 32c7566044b..9b50d0f1632 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py @@ -32,9 +32,11 @@ class SwarmingTest(unittest.TestCase): def setUp(self): helpers.patch(self, [ 'clusterfuzz._internal.base.utils.post_url', - 'clusterfuzz._internal.swarming._get_task_name' + 'clusterfuzz._internal.swarming._get_task_name', + 'clusterfuzz._internal.swarming._requires_gpu' ]) self.mock._get_task_name.return_value = 'task_name' # pylint: disable=protected-access + self.mock._requires_gpu.return_value = True # pylint: disable=protected-access self.maxDiff = None def test_get_spec_from_config_with_docker_image(self): @@ -91,13 +93,13 @@ def test_get_spec_from_config_with_docker_image(self): self.assertEqual(spec, expected_spec) - def test_get_spec_from_config_raises_error_on_unknown_config(self): - """Tests that _get_new_task_spec raises error when there's no mapping for the config.""" + def test_get_spec_from_config_returns_none_on_unknown_config(self): + """Tests that _get_new_task_spec returns None when there's no mapping for the config.""" job = data_types.Job(name='some_job_name', platform='UNKNOWN-PLATFORM') job.put() - with self.assertRaises(ValueError): - swarming.create_new_task_request( # pylint: disable=protected-access - 'corpus_pruning', job.name, 'https://download_url') + spec = swarming.create_new_task_request( # pylint: disable=protected-access + 'corpus_pruning', job.name, 'https://download_url') + self.assertIsNone(spec) def test_get_spec_from_config_without_docker_image(self): """Tests that _get_new_task_spec works as expected (without a docker image)."""