diff --git a/src/clusterfuzz/_internal/bot/tasks/setup.py b/src/clusterfuzz/_internal/bot/tasks/setup.py index 2080e461116..0a32cc76384 100644 --- a/src/clusterfuzz/_internal/bot/tasks/setup.py +++ b/src/clusterfuzz/_internal/bot/tasks/setup.py @@ -508,7 +508,7 @@ def update_data_bundle( file_host.create_directory( worker_data_bundle_directory, create_intermediates=True) - result = untrusted_corpus_manager.RemoteGSUtilRunner().rsync( + result = untrusted_corpus_manager.RemoteGCloudStorageRunner().rsync( data_bundle_corpus.gcs_url, worker_data_bundle_directory, delete=False) diff --git a/src/clusterfuzz/_internal/bot/untrusted_runner/corpus_manager.py b/src/clusterfuzz/_internal/bot/untrusted_runner/corpus_manager.py index e291972de50..cfd93b1972a 100644 --- a/src/clusterfuzz/_internal/bot/untrusted_runner/corpus_manager.py +++ b/src/clusterfuzz/_internal/bot/untrusted_runner/corpus_manager.py @@ -19,8 +19,8 @@ from . import remote_process_host -class RemoteGSUtilRunner(gsutil.GSUtilRunner): - """Remote GSUtil runner.""" +class RemoteGCloudStorageRunner(gsutil.GCloudStorageRunner): + """Remote GCloud Storage runner.""" def __init__(self): super().__init__(process_runner=remote_process_host.RemoteProcessRunner) @@ -36,4 +36,4 @@ def __init__(self, fuzzer_name, fuzzer_executable_name, quarantine=False): quarantine, # Never log results for remote corpora since the state is on the worker. log_results=False, - gsutil_runner_func=RemoteGSUtilRunner) + gcloud_storage_runner_func=RemoteGCloudStorageRunner) diff --git a/src/clusterfuzz/_internal/bot/untrusted_runner/environment.py b/src/clusterfuzz/_internal/bot/untrusted_runner/environment.py index 10a20ac1240..53083e44f43 100644 --- a/src/clusterfuzz/_internal/bot/untrusted_runner/environment.py +++ b/src/clusterfuzz/_internal/bot/untrusted_runner/environment.py @@ -39,6 +39,7 @@ r'^FUZZ_TARGET$', r'^FUZZ_TEST_TIMEOUT$', r'^GSUTIL_PATH$', + r'^GCLOUD_PATH$', r'^JOB_NAME$', r'^LOCAL_DEVELOPMENT$', r'^MSAN_OPTIONS$', diff --git a/src/clusterfuzz/_internal/cron/chrome_tests_syncer.py b/src/clusterfuzz/_internal/cron/chrome_tests_syncer.py index 844039313bc..76a49b44a7d 100644 --- a/src/clusterfuzz/_internal/cron/chrome_tests_syncer.py +++ b/src/clusterfuzz/_internal/cron/chrome_tests_syncer.py @@ -240,7 +240,7 @@ def filter_members(member, path): remote_archive = f'gs://autozilli/autozilli-{archive_suffix}.tgz' logs.info(f'Processing {remote_archive}') local_archive = os.path.join(fuzzilli_tests_directory, 'tmp.tgz') - gsutil.GSUtilRunner().download_file(remote_archive, local_archive) + gsutil.GCloudStorageRunner().download_file(remote_archive, local_archive) # Extract relevant files. with tarfile.open(local_archive) as tar: @@ -333,7 +333,8 @@ def sync_tests(tests_archive_bucket: str, tests_archive_name: str, '*.svn*', ], cwd=tests_directory) - gsutil.GSUtilRunner().upload_file(tests_archive_local, tests_archive_remote) + gsutil.GCloudStorageRunner().upload_file(tests_archive_local, + tests_archive_remote) logs.info('Sync complete.') monitoring_metrics.CHROME_TEST_SYNCER_SUCCESS.increment() diff --git a/src/clusterfuzz/_internal/cron/job_exporter.py b/src/clusterfuzz/_internal/cron/job_exporter.py index aedbe3ec400..28cb906be11 100644 --- a/src/clusterfuzz/_internal/cron/job_exporter.py +++ b/src/clusterfuzz/_internal/cron/job_exporter.py @@ -52,12 +52,12 @@ class GCloudCLIRSync(RSyncClient): Unsuitable for unit testing.""" def __init__(self): - self._runner = gsutil.GSUtilRunner() + self._runner = gsutil.GCloudStorageRunner() def rsync(self, source: str, target: str): """Rsyncs a source to a target destination. Returns True if successful, False if there was any failure. Considers successful - any gsutil execution with a 0 return code.""" + any gcloud execution with a 0 return code.""" rsync_process_output = self._runner.rsync(source, target) return_code = rsync_process_output.return_code return return_code == 0 diff --git a/src/clusterfuzz/_internal/fuzzing/corpus_manager.py b/src/clusterfuzz/_internal/fuzzing/corpus_manager.py index c489bedfc93..561235501c5 100644 --- a/src/clusterfuzz/_internal/fuzzing/corpus_manager.py +++ b/src/clusterfuzz/_internal/fuzzing/corpus_manager.py @@ -36,11 +36,11 @@ # Disable "invalid-name" because fixing the issue will cause pylint to # complain the None assignment is incorrectly named. - DEFAULT_GSUTIL_RUNNER = gsutil.GSUtilRunner # pylint: disable=invalid-name + DEFAULT_GCLOUD_STORAGE_RUNNER = gsutil.GCloudStorageRunner # pylint: disable=invalid-name except: # This is expected to fail on App Engine. gsutil = None - DEFAULT_GSUTIL_RUNNER = None + DEFAULT_GCLOUD_STORAGE_RUNNER = None BACKUP_ARCHIVE_FORMAT = 'zip' CORPUS_FILES_SYNC_TIMEOUT = 60 * 60 @@ -61,9 +61,9 @@ COPY_BUFFER_SIZE = 16 * 1024 -def _rsync_errors_below_threshold(gsutil_result, max_errors): +def _rsync_errors_below_threshold(gcloud_result, max_errors): """Check if the number of errors during rsync is lower than our threshold.""" - match = re.search(RSYNC_ERROR_REGEX, gsutil_result.output, re.MULTILINE) + match = re.search(RSYNC_ERROR_REGEX, gcloud_result.output, re.MULTILINE) if not match: return False @@ -71,23 +71,23 @@ def _rsync_errors_below_threshold(gsutil_result, max_errors): # Ignore NotFoundException(s) since they can happen when files can get deleted # e.g. when pruning task is updating corpus. - error_count -= gsutil_result.output.count(b'NotFoundException') - error_count -= gsutil_result.output.count(b'No such file or directory') + error_count -= gcloud_result.output.count(b'NotFoundException') + error_count -= gcloud_result.output.count(b'No such file or directory') return error_count <= max_errors -def _handle_rsync_result(gsutil_result, max_errors): +def _handle_rsync_result(gcloud_result, max_errors): """Handle rsync result.""" - if gsutil_result.return_code == 0: + if gcloud_result.return_code == 0: sync_succeeded = True else: - logs.warning('gsutil rsync got non-zero:\n' + logs.warning('gcloud rsync got non-zero:\n' 'Command: %s\n' - 'Output: %s\n' % (gsutil_result.command, gsutil_result.output)) - sync_succeeded = _rsync_errors_below_threshold(gsutil_result, max_errors) + 'Output: %s\n' % (gcloud_result.command, gcloud_result.output)) + sync_succeeded = _rsync_errors_below_threshold(gcloud_result, max_errors) - return sync_succeeded and not gsutil_result.timed_out + return sync_succeeded and not gcloud_result.timed_out def _count_corpus_files(directory): @@ -153,7 +153,7 @@ def __init__(self, bucket_name, bucket_path='/', log_results=True, - gsutil_runner_func=DEFAULT_GSUTIL_RUNNER): + gcloud_storage_runner_func=DEFAULT_GCLOUD_STORAGE_RUNNER): """Inits the GcsCorpus. Args: @@ -163,7 +163,7 @@ def __init__(self, self._bucket_name = bucket_name self._bucket_path = bucket_path self._log_results = log_results - self._gsutil_runner = gsutil_runner_func() + self._gcloud_storage_runner = gcloud_storage_runner_func() @property def bucket_name(self): @@ -174,7 +174,7 @@ def bucket_path(self): return self._bucket_path def get_gcs_url(self): - """Build corpus GCS URL for gsutil. + """Build corpus GCS URL for gcloud storage. Returns: A string giving the GCS URL. """ @@ -195,7 +195,7 @@ def rsync_from_disk(self, Args: directory: Path to directory to sync from. - timeout: Timeout for gsutil. + timeout: Timeout for gcloud storage. delete: Whether or not to delete files on GCS that don't exist locally. Returns: @@ -203,8 +203,8 @@ def rsync_from_disk(self, """ corpus_gcs_url = self.get_gcs_url() legalize_corpus_files(directory) - result = self._gsutil_runner.rsync( - directory, corpus_gcs_url, timeout, delete=delete) + result = self._gcloud_storage_runner.rsync( + directory, corpus_gcs_url, timeout=timeout, delete=delete) # Allow a small number of files to fail to be synced. return _handle_rsync_result(result, max_errors=MAX_SYNC_ERRORS) @@ -213,11 +213,11 @@ def rsync_to_disk(self, directory, timeout=CORPUS_FILES_SYNC_TIMEOUT, delete=True): - """Run gsutil to download corpus files from GCS. + """Run gcloud storage to download corpus files from GCS. Args: directory: Path to directory to sync to. - timeout: Timeout for gsutil. + timeout: Timeout for gcloud storage. delete: Whether or not to delete files on disk that don't exist locally. Returns: @@ -226,8 +226,8 @@ def rsync_to_disk(self, shell.create_directory(directory, create_intermediates=True) corpus_gcs_url = self.get_gcs_url() - result = self._gsutil_runner.rsync(corpus_gcs_url, directory, timeout, - delete) + result = self._gcloud_storage_runner.rsync( + corpus_gcs_url, directory, timeout=timeout, delete=delete) # Allow a small number of files to fail to be synced. return _handle_rsync_result(result, max_errors=MAX_SYNC_ERRORS) @@ -248,7 +248,7 @@ def upload_files(self, file_paths, timeout=CORPUS_FILES_SYNC_TIMEOUT): # legal on Windows. file_paths = legalize_filenames(file_paths) gcs_url = self.get_gcs_url() - return self._gsutil_runner.upload_files_to_url( + return self._gcloud_storage_runner.upload_files_to_url( file_paths, gcs_url, timeout=timeout) @@ -261,7 +261,7 @@ def __init__(self, quarantine=False, log_results=True, include_regressions=False, - gsutil_runner_func=DEFAULT_GSUTIL_RUNNER): + gcloud_storage_runner_func=DEFAULT_GCLOUD_STORAGE_RUNNER): """Inits the FuzzTargetCorpus. Args: @@ -292,7 +292,7 @@ def __init__(self, sync_corpus_bucket_name, f'/{self._engine}/{self._project_qualified_target_name}', log_results=log_results, - gsutil_runner_func=gsutil_runner_func, + gcloud_storage_runner_func=gcloud_storage_runner_func, ) self._regressions_corpus = GcsCorpus( @@ -300,7 +300,8 @@ def __init__(self, f'/{self._engine}/{self._project_qualified_target_name}' f'{REGRESSIONS_GCS_PATH_SUFFIX}', log_results=log_results, - gsutil_runner_func=gsutil_runner_func) if include_regressions else None + gcloud_storage_runner_func=gcloud_storage_runner_func + ) if include_regressions else None @property def engine(self): @@ -320,7 +321,7 @@ def rsync_from_disk(self, Args: directory: Path to directory to sync to. - timeout: Timeout for gsutil. + timeout: Timeout for gcloud storage. delete: Whether or not to delete files on GCS that don't exist locally. Returns: @@ -339,13 +340,13 @@ def rsync_to_disk(self, directory, timeout=CORPUS_FILES_SYNC_TIMEOUT, delete=True): - """Run gsutil to download corpus files from GCS. + """Run gcloud storage to download corpus files from GCS. Overridden to have additional logging. Args: directory: Path to directory to sync to. - timeout: Timeout for gsutil. + timeout: Timeout for gcloud storage. delete: Whether or not to delete files on disk that don't exist locally. Returns: @@ -413,7 +414,7 @@ def rsync_from_disk(self, Args: directory: Path to directory to sync to. - timeout: Timeout for gsutil. + timeout: Timeout for gcloud storage. delete: Whether or not to delete files on GCS that don't exist locally. Returns: @@ -613,7 +614,7 @@ def _get_regressions_corpus_gcs_url(bucket_name, bucket_path): def _get_gcs_url(bucket_name, bucket_path, suffix=''): - """Build corpus GCS URL for gsutil. + """Build corpus GCS URL for gcloud storage. Returns: A string giving the GCS URL. """ @@ -661,7 +662,7 @@ def sync_data_bundle_corpus_to_disk(data_bundle_corpus, directory): if (not task_types.task_main_runs_on_uworker() and not environment.is_uworker()): # Fast path for when we don't need an untrusted worker to run a task. - return gsutil.GSUtilRunner().rsync( + return gsutil.GCloudStorageRunner().rsync( data_bundle_corpus.gcs_url, directory, delete=False).return_code == 0 results = storage.download_signed_urls(data_bundle_corpus.corpus_urls, directory) diff --git a/src/clusterfuzz/_internal/google_cloud_utils/gsutil.py b/src/clusterfuzz/_internal/google_cloud_utils/gsutil.py index 0a02ea6a71e..8a2448ba3c1 100755 --- a/src/clusterfuzz/_internal/google_cloud_utils/gsutil.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/gsutil.py @@ -11,86 +11,44 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Functions for running gsutil.""" +"""Functions for running gcloud storage.""" import os import shutil -from clusterfuzz._internal.base import utils from clusterfuzz._internal.google_cloud_utils import storage from clusterfuzz._internal.metrics import logs from clusterfuzz._internal.system import environment from clusterfuzz._internal.system import new_process -# Default timeout for a GSUtil sync. +# Default timeout for a rsync. FILES_SYNC_TIMEOUT = 5 * 60 * 60 -def use_gcloud_for_command(command): - """Returns whether to use gcloud storage for the given command.""" - return bool(environment.get_value(f'USE_GCLOUD_STORAGE_{command.upper()}')) - - def get_gcloud_path(): """Get path to gcloud executable.""" gcloud_executable = 'gcloud' if environment.platform() == 'WINDOWS': gcloud_executable += '.cmd' + gcloud_storage_dir = environment.get_value('GCLOUD_PATH') + if not gcloud_storage_dir: + # Fallback to older GSUTIL_PATH, if needed. + gcloud_storage_dir = environment.get_value('GSUTIL_PATH') + + if gcloud_storage_dir: + return os.path.join(gcloud_storage_dir, gcloud_executable) + # Try searching the binary in path. gcloud_absolute_path = shutil.which(gcloud_executable) if gcloud_absolute_path: return gcloud_absolute_path - # TEMP: use GSUTIL_PATH as fallback to find the gcloud bin directory. - # This seems to only be used by mac and android. - gcloud_directory = environment.get_value('GSUTIL_PATH') - if gcloud_directory: - gcloud_absolute_path = os.path.join(gcloud_directory, gcloud_executable) - if os.path.exists(gcloud_absolute_path): - logs.info(f'Using gcloud executable from GSUTIL_PATH: {gcloud_directory}') - return gcloud_absolute_path - - logs.error('Cannot locate gcloud in PATH or it does not exist.') + logs.error('Cannot locate gcloud in PATH, set GCLOUD_PATH to directory ' + 'containing gcloud binary.') return None -def get_gsutil_path(): - """Get path to gsutil executable. - - Returns: - Path to gsutil executable on the system. - """ - gsutil_executable = 'gsutil' - if environment.platform() == 'WINDOWS': - gsutil_executable += '.cmd' - - gsutil_directory = environment.get_value('GSUTIL_PATH') - if not gsutil_directory: - # Try searching the binary in path. - gsutil_absolute_path = shutil.which(gsutil_executable) - if gsutil_absolute_path: - return gsutil_absolute_path - - logs.error('Cannot locate gsutil in PATH, set GSUTIL_PATH to directory ' - 'containing gsutil binary.') - return None - - logs.info(f'Using gsutil executable from GSUTIL_PATH: {gsutil_directory}') - gsutil_absolute_path = os.path.join(gsutil_directory, gsutil_executable) - return gsutil_absolute_path - - -def _multiprocessing_args(): - """Get multiprocessing args for gsutil.""" - if utils.cpu_count() == 1: - # GSUtil's default thread count is 5 as it assumes the common configuration - # is many CPUs (GSUtil uses num_cpu processes). - return ['-o', 'GSUtil:parallel_thread_count=16'] - - return [] - - def _filter_path(path, write=False): """Filters path if needed. In local development environment, this uses local paths from an emulated GCS instead of real GCS. `write` indicates whether if @@ -114,47 +72,41 @@ def _filter_path(path, write=False): return local_path -class GSUtilRunner: - """GSUtil/gcloud storage runner.""" +class GCloudStorageRunner: + """Gcloud storage runner.""" def __init__(self, process_runner=new_process.ProcessRunner): - self._process_runner = process_runner - - def _get_runner_and_args(self, use_gcloud_storage, quiet=False): - """Get the process runner and default arguments.""" - if use_gcloud_storage: - executable_path = get_gcloud_path() - default_args = ['storage'] - runner = self._process_runner( - executable_path=executable_path, default_args=default_args) - additional_args = [] - else: - executable_path = get_gsutil_path() - default_args = ['-m'] - default_args.extend(_multiprocessing_args()) - runner = self._process_runner( - executable_path=executable_path, default_args=default_args) - - # gcloud storage does not have a -q flag, it is a global gcloud flag - # --quiet, but that suppresses all output, which is not what we want. - # gsutil's -q suppresses the "Copying..." summary, which is desired. - additional_args = ['-q'] if quiet else [] - - return runner, additional_args - - def run_gsutil(self, arguments, use_gcloud_storage, quiet=False, **kwargs): - """Run GSUtil or gcloud storage.""" - runner, additional_args = self._get_runner_and_args(use_gcloud_storage, - quiet) + """Initialize a gcloud process runner. + + For gcloud storage, all operations run in parallel by default, so no need + to add -m. Also, as gcloud storage handles the distribution of threads + dynamically, it is smart enough to not underwhelm the thread pool, + so we don't need to set the thread count for a single cpu. + """ + self.gcloud_runner = process_runner( + executable_path=get_gcloud_path(), default_args=['storage']) + + def run_gcloud_storage(self, arguments, quiet=True, verbose=True, **kwargs): + """Run a gcloud storage command.""" + + # Enable user intended output to console. Useful for logging as this is + # stored at result from subprocess. + additional_args = (['--user-output-enabled'] + if verbose else ['--no-user-output-enabled']) + # Disable all interactive prompts. + additional_args += ['-q'] if quiet else [] + cmd = arguments[0] if arguments else 'unknown' arguments = additional_args + arguments - tool_name = 'gcloud storage' if use_gcloud_storage else 'gsutil' + # Get some info for logging. + tool_name = 'gcloud storage' arg_str = ' '.join(arguments) try: cwd = os.getcwd() except OSError: cwd = 'unknown' + logs.info( f'Running {cmd} with {tool_name}.', tool_name=tool_name, @@ -162,14 +114,8 @@ def run_gsutil(self, arguments, use_gcloud_storage, quiet=False, **kwargs): cwd=cwd, arguments=arg_str) - env = os.environ.copy() - if not use_gcloud_storage and 'PYTHONPATH' in env: - # GSUtil may be on Python 3, and our PYTHONPATH breaks it because we're on - # Python 2. - env.pop('PYTHONPATH') - try: - result = runner.run_and_wait(arguments, env=env, **kwargs) + result = self.gcloud_runner.run_and_wait(arguments, **kwargs) logs.info( f'Finished running {cmd} with {tool_name}.', tool_name=tool_name, @@ -194,39 +140,48 @@ def rsync(self, destination, timeout=FILES_SYNC_TIMEOUT, delete=True, - exclusion_pattern=None): - """Rsync with gsutil or gcloud storage.""" - use_gcloud = use_gcloud_for_command('rsync') - if use_gcloud: - command = ['rsync'] - # gcloud storage rsync is recursive by default. - if delete: - command.append('--delete-unmatched-destination-objects') - if exclusion_pattern: - command.extend(['--exclude', exclusion_pattern]) - else: - command = ['rsync', '-r'] - if delete: - command.append('-d') - if exclusion_pattern: - command.extend(['-x', exclusion_pattern]) - - command.extend([ + exclusion_pattern=None, + recursive=True): + """Synchronize content of two buckets/dirs using gcloud rsync. + + For instance, to download corpus files from a GCS url. + + Args: + source: Source to rsync from. + destination: Destination to rsync to. + timeout: Timeout for gcloud storage. + delete: Whether to delete files on destination that don't exist in source. + exclusion_pattern: Regex for objects to no be included in rsync. + recursive: Whether to recursevely sync the content from the directories. + + Returns: + Result from the process that executed the gcloud command. + """ + rsync_command = [ + 'rsync', _filter_path(source, write=True), - _filter_path(destination, write=True), - ]) + _filter_path(destination, write=True) + ] - return self.run_gsutil(command, use_gcloud, timeout=timeout, quiet=True) + if recursive: + rsync_command.append('--recursive') + if delete: + rsync_command.append('--delete-unmatched-destination-objects') + if exclusion_pattern: + rsync_command.extend(['--exclude', exclusion_pattern]) + + return self.run_gcloud_storage( + rsync_command, timeout=timeout, verbose=False) def download_file(self, gcs_url, file_path, timeout=None): """Download a file from GCS.""" - use_gcloud = use_gcloud_for_command('cp') command = ['cp', _filter_path(gcs_url), file_path] - result = self.run_gsutil(command, use_gcloud, timeout=timeout) + result = self.run_gcloud_storage(command, timeout=timeout) if result.return_code: - logs.error('GSUtilRunner.download_file failed:\nCommand: %s\n' - 'Url: %s\n' - 'Output %s' % (result.command, gcs_url, result.output)) + logs.error('GCloudStorageRunner.download_file failed:\n' + f'Command: {result.command}\n' + f'Url: {gcs_url}\n' + f'Output {result.output}') return result.return_code == 0 @@ -235,100 +190,78 @@ def upload_file(self, gcs_url, timeout=None, gzip=False, - metadata=None): - """Upload a single file to a given GCS url.""" + metadata=None, + custom_metadata=None): + """Upload a single file to a GCS url and updates its metadata if needed.""" if not file_path or not gcs_url: return False - use_gcloud = use_gcloud_for_command('cp') - if use_gcloud: - # For gcloud, setting metadata is a separate step after uploading. - cp_command = ['cp'] - if gzip: - cp_command.append('--gzip-in-flight-all') - - cp_command.extend([file_path, _filter_path(gcs_url, write=True)]) - result = self.run_gsutil(cp_command, use_gcloud, timeout=timeout) + cp_command = ['cp'] + if gzip: + cp_command.append('--gzip-local-all') + cp_command.extend([file_path, _filter_path(gcs_url, write=True)]) + + result = self.run_gcloud_storage(cp_command, timeout=timeout) + if result.return_code != 0: + logs.error('GCloudStorageRunner.upload_file (cp step) failed:\n' + f'Command: {result.command}\n' + f'Filename: {file_path}\n' + f'Output: {result.output}') + return False + # For gcloud, setting metadata is a separate step after uploading. + metadata_args = [] + if metadata: + # Metadata dict assumes only standard headers like 'content-type'. + metadata_args.extend([f'--{k}={v}' for k, v in metadata.items()]) + + if custom_metadata: + # Custom metadata dict is assumed to contain only custom metadata keys. + custom_metadata_args = ','.join( + [f'{k}={v}' for k, v in custom_metadata.items()]) + metadata_args.append(f'--update-custom-metadata={custom_metadata_args}') + + if metadata_args: + update_command = ['objects', 'update', _filter_path(gcs_url, write=True)] + update_command.extend(metadata_args) + result = self.run_gcloud_storage(update_command, timeout=timeout) if result.return_code != 0: - logs.error('GSUtilRunner.upload_file (cp step) failed:\nCommand: %s\n' - 'Filename: %s\n' - 'Output: %s' % (result.command, file_path, result.output)) + logs.error( + 'GCloudStorageRunner.upload_file (update metadata step) failed:\n' + f'Command: {result.command}\n' + f'Filename: {file_path}\n' + f'Output: {result.output}') return False - if metadata: - update_command = [ - 'objects', 'update', - _filter_path(gcs_url, write=True) - ] - # The metadata dict is assumed to contain only custom metadata keys, - # not standard headers like 'Content-Type'. - metadata_args = [f'{k}={v}' for k, v in metadata.items()] - update_command.append('--update-custom-metadata') - # pylint: disable=line-too-long - update_command.append(','.join(metadata_args)) - - result = self.run_gsutil(update_command, use_gcloud, timeout=timeout) - if result.return_code != 0: - logs.error( - 'GSUtilRunner.upload_file (update metadata step) failed:\nCommand: %s\n' - 'Filename: %s\n' - 'Output: %s' % (result.command, file_path, result.output)) - return False - - return True - - # gsutil can set metadata during cp. - command = [] - if metadata: - for key, value in metadata.items(): - # gsutil uses headers for metadata. For custom metadata, the - # convention is 'x-goog-meta-'. The caller is responsible for this. - # pylint: disable=line-too-long - command.extend(['-h', f'{key}:{value}']) - - command.append('cp') - if gzip: - command.append('-Z') - - command.extend([file_path, _filter_path(gcs_url, write=True)]) - result = self.run_gsutil(command, use_gcloud, timeout=timeout) - - if result.return_code: - logs.error('GSUtilRunner.upload_file failed:\nCommand: %s\n' - 'Filename: %s\n' - 'Output: %s' % (result.command, file_path, result.output)) - return False - return True def upload_files_to_url(self, file_paths, gcs_url, timeout=None): - """Upload files to the given GCS url.""" + """Upload files to the given GCS url. + + Args: + file_paths: A sequence of file paths to upload. + gcs_url: GCS URL to upload files to. + timeout: Timeout for gcloud storage. + + Returns: + Result from the process that executed the gcloud command. + """ if not file_paths or not gcs_url: return False - use_gcloud = use_gcloud_for_command('cp') - if use_gcloud: - command = [ - 'cp', '--read-paths-from-stdin', - _filter_path(gcs_url, write=True) - ] - else: - command = ['cp', '-I', _filter_path(gcs_url, write=True)] - + command = [ + 'cp', '--read-paths-from-stdin', + _filter_path(gcs_url, write=True) + ] filenames_buffer = '\n'.join(file_paths) - result = self.run_gsutil( - command, - use_gcloud, - input_data=filenames_buffer.encode('utf-8'), - timeout=timeout) - + result = self.run_gcloud_storage( + command, input_data=filenames_buffer.encode('utf-8'), timeout=timeout) # Check result of command execution, log output if command failed. if result.return_code: - logs.error( - 'GSUtilRunner.upload_files_to_url failed:\nCommand: %s\n' - 'Filenames:%s\n' - 'Output: %s' % (result.command, filenames_buffer, result.output)) + logs.error('GCloudStorageRunner.upload_files_to_url failed:\n' + f'Command: {result.command}\n' + f'Filenames: {filenames_buffer}\n' + f'Output: {result.output}') return result.return_code == 0 diff --git a/src/clusterfuzz/_internal/tests/core/bot/fuzzers/libFuzzer/libfuzzer_test.py b/src/clusterfuzz/_internal/tests/core/bot/fuzzers/libFuzzer/libfuzzer_test.py index 5ae921ca57d..9138c9ea354 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/fuzzers/libFuzzer/libfuzzer_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/fuzzers/libFuzzer/libfuzzer_test.py @@ -30,7 +30,7 @@ BOT_NAME = 'test-bot' BUILD_DIR = '/fake/build_dir' FUZZ_INPUTS_DISK = '/fake/inputs-disk' -GSUTIL_PATH = '/fake/gsutil_path' +GCLOUD_PATH = '/fake/gcloud_path' FAKE_ROOT_DIR = '/fake_root' SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__)) diff --git a/src/clusterfuzz/_internal/tests/core/bot/testcase_manager_test.py b/src/clusterfuzz/_internal/tests/core/bot/testcase_manager_test.py index 74b2616188f..8bd08159128 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/testcase_manager_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/testcase_manager_test.py @@ -111,8 +111,8 @@ def setUp(self): def test_upload_with_timestamp_from_stats(self): """Log name should be generated using timestamp value from the stats.""" - mock_gsutil = mock.MagicMock() - self.mock.write_data.return_value = mock_gsutil + mock_gcloud = mock.MagicMock() + self.mock.write_data.return_value = mock_gcloud self.fs.create_file( self.testcase_path + '.stats2', @@ -138,8 +138,8 @@ def test_upload_with_timestamp_from_stats(self): def test_upload_with_hostname(self): """Log name should be generated using current (mocked) timestamp value.""" - mock_gsutil = mock.MagicMock() - self.mock.write_data.return_value = mock_gsutil + mock_gcloud = mock.MagicMock() + self.mock.write_data.return_value = mock_gcloud environment.set_value('BOT_NAME', 'hostname.company.com') self.fs.create_file( @@ -160,8 +160,8 @@ def test_upload_with_hostname(self): def test_upload_with_hostname_and_serial(self): """Log name should be generated using current (mocked) timestamp value.""" - mock_gsutil = mock.MagicMock() - self.mock.write_data.return_value = mock_gsutil + mock_gcloud = mock.MagicMock() + self.mock.write_data.return_value = mock_gcloud environment.set_value('BOT_NAME', 'hostname.company.com') environment.set_value('OS_OVERRIDE', 'ANDROID_KERNEL') environment.set_value('ANDROID_SERIAL', '123456789') @@ -185,8 +185,8 @@ def test_upload_with_hostname_and_serial(self): def test_upload_without_timestamp(self): """Log name should be generated using current (mocked) timestamp value.""" - mock_gsutil = mock.MagicMock() - self.mock.write_data.return_value = mock_gsutil + mock_gcloud = mock.MagicMock() + self.mock.write_data.return_value = mock_gcloud self.fs.create_file( self.testcase_path + '.stats2', @@ -209,8 +209,8 @@ def test_upload_without_component_revisions(self): """Log should contain message on empty component revisions.""" self.mock.get_component_range_list.return_value = [] - mock_gsutil = mock.MagicMock() - self.mock.write_data.return_value = mock_gsutil + mock_gcloud = mock.MagicMock() + self.mock.write_data.return_value = mock_gcloud self.fs.create_file( self.testcase_path + '.stats2', diff --git a/src/clusterfuzz/_internal/tests/core/fuzzing/corpus_manager_test.py b/src/clusterfuzz/_internal/tests/core/fuzzing/corpus_manager_test.py index a29a4379203..17c227f2568 100644 --- a/src/clusterfuzz/_internal/tests/core/fuzzing/corpus_manager_test.py +++ b/src/clusterfuzz/_internal/tests/core/fuzzing/corpus_manager_test.py @@ -53,7 +53,7 @@ def setUp(self): self.mock.Popen.return_value.communicate.return_value = (None, None) self.mock._count_corpus_files.return_value = 1 # pylint: disable=protected-access - os.environ['GSUTIL_PATH'] = '/gsutil_path' + os.environ['GCLOUD_PATH'] = '/gcloud_path' @mock.patch( 'clusterfuzz._internal.google_cloud_utils.storage.list_blobs', @@ -65,16 +65,18 @@ def test_rsync_to_disk(self, _): self.assertTrue(corpus.rsync_to_disk('/dir', timeout=60)) self.assertEqual(self.mock.Popen.call_args[0][0], [ - '/gsutil_path/gsutil', '-m', '-o', 'GSUtil:parallel_thread_count=16', - '-q', 'rsync', '-r', '-d', 'gs://bucket/', '/dir' + '/gcloud_path/gcloud', 'storage', '--no-user-output-enabled', '-q', + 'rsync', 'gs://bucket/', '/dir', '--recursive', + '--delete-unmatched-destination-objects' ]) self.mock.cpu_count.return_value = 2 corpus = corpus_manager.GcsCorpus('bucket') self.assertTrue(corpus.rsync_to_disk('/dir', timeout=60)) self.assertEqual(self.mock.Popen.call_args[0][0], [ - '/gsutil_path/gsutil', '-m', '-q', 'rsync', '-r', '-d', 'gs://bucket/', - '/dir' + '/gcloud_path/gcloud', 'storage', '--no-user-output-enabled', '-q', + 'rsync', 'gs://bucket/', '/dir', '--recursive', + '--delete-unmatched-destination-objects' ]) def test_rsync_from_disk(self): @@ -84,16 +86,18 @@ def test_rsync_from_disk(self): self.assertTrue(corpus.rsync_from_disk('/dir')) self.assertEqual(self.mock.Popen.call_args[0][0], [ - '/gsutil_path/gsutil', '-m', '-o', 'GSUtil:parallel_thread_count=16', - '-q', 'rsync', '-r', '-d', '/dir', 'gs://bucket/' + '/gcloud_path/gcloud', 'storage', '--no-user-output-enabled', '-q', + 'rsync', '/dir', 'gs://bucket/', '--recursive', + '--delete-unmatched-destination-objects' ]) self.mock.cpu_count.return_value = 2 corpus = corpus_manager.GcsCorpus('bucket') self.assertTrue(corpus.rsync_from_disk('/dir')) self.assertEqual(self.mock.Popen.call_args[0][0], [ - '/gsutil_path/gsutil', '-m', '-q', 'rsync', '-r', '-d', '/dir', - 'gs://bucket/' + '/gcloud_path/gcloud', 'storage', '--no-user-output-enabled', '-q', + 'rsync', '/dir', 'gs://bucket/', '--recursive', + '--delete-unmatched-destination-objects' ]) def test_upload_files(self): @@ -105,8 +109,8 @@ def test_upload_files(self): self.assertTrue(corpus.upload_files(['/dir/a', '/dir/b'])) self.assertEqual(self.mock.Popen.call_args[0][0], [ - '/gsutil_path/gsutil', '-m', '-o', 'GSUtil:parallel_thread_count=16', - 'cp', '-I', 'gs://bucket/' + '/gcloud_path/gcloud', 'storage', '--user-output-enabled', '-q', 'cp', + '--read-paths-from-stdin', 'gs://bucket/' ]) mock_popen.communicate.assert_called_with(b'/dir/a\n/dir/b') @@ -114,8 +118,10 @@ def test_upload_files(self): self.mock.cpu_count.return_value = 2 corpus = corpus_manager.GcsCorpus('bucket') self.assertTrue(corpus.upload_files(['/dir/a', '/dir/b'])) - self.assertEqual(self.mock.Popen.call_args[0][0], - ['/gsutil_path/gsutil', '-m', 'cp', '-I', 'gs://bucket/']) + self.assertEqual(self.mock.Popen.call_args[0][0], [ + '/gcloud_path/gcloud', 'storage', '--user-output-enabled', '-q', 'cp', + '--read-paths-from-stdin', 'gs://bucket/' + ]) class RsyncErrorHandlingTest(unittest.TestCase): @@ -124,7 +130,7 @@ class RsyncErrorHandlingTest(unittest.TestCase): def setUp(self): test_helpers.patch(self, [ 'clusterfuzz._internal.fuzzing.corpus_manager._count_corpus_files', - 'clusterfuzz._internal.google_cloud_utils.gsutil.GSUtilRunner.run_gsutil', + 'clusterfuzz._internal.google_cloud_utils.gsutil.GCloudStorageRunner.run_gcloud_storage', 'clusterfuzz._internal.google_cloud_utils.storage.copy_file_from', 'clusterfuzz._internal.google_cloud_utils.storage.list_blobs', 'clusterfuzz._internal.google_cloud_utils.storage.exists', @@ -139,7 +145,7 @@ def test_rsync_error_below_threshold(self): b'CommandException: 10 files/objects could not be copied/removed.\n') self.mock._count_corpus_files.return_value = 10 # pylint: disable=protected-access - self.mock.run_gsutil.return_value = new_process.ProcessResult( + self.mock.run_gcloud_storage.return_value = new_process.ProcessResult( command=['/fake'], return_code=1, output=output, @@ -150,7 +156,7 @@ def test_rsync_error_below_threshold(self): corpus = corpus_manager.GcsCorpus('bucket') self.assertTrue(corpus.rsync_to_disk('/dir', timeout=60)) - self.mock.run_gsutil.return_value = new_process.ProcessResult( + self.mock.run_gcloud_storage.return_value = new_process.ProcessResult( command=['/fake'], return_code=1, output=output, @@ -168,7 +174,7 @@ def test_rsync_error_below_threshold_with_not_found_errors(self): b'CommandException: 200 files/objects could not be copied/removed.\n') self.mock._count_corpus_files.return_value = 10 # pylint: disable=protected-access - self.mock.run_gsutil.return_value = new_process.ProcessResult( + self.mock.run_gcloud_storage.return_value = new_process.ProcessResult( command=['/fake'], return_code=1, output=output, @@ -179,7 +185,7 @@ def test_rsync_error_below_threshold_with_not_found_errors(self): corpus = corpus_manager.GcsCorpus('bucket') self.assertTrue(corpus.rsync_to_disk('/dir', timeout=60)) - self.mock.run_gsutil.return_value = new_process.ProcessResult( + self.mock.run_gcloud_storage.return_value = new_process.ProcessResult( command=['/fake'], return_code=1, output=output, @@ -195,7 +201,7 @@ def test_rsync_error_above_threshold(self): b'blah\n' b'CommandException: 11 files/objects could not be copied/removed.\n') - self.mock.run_gsutil.return_value = new_process.ProcessResult( + self.mock.run_gcloud_storage.return_value = new_process.ProcessResult( command=['/fake'], return_code=1, output=output, @@ -214,7 +220,7 @@ def setUp(self): """Setup for fuzz target corpus test.""" test_helpers.patch_environ(self) - os.environ['GSUTIL_PATH'] = '/gsutil_path' + os.environ['GCLOUD_PATH'] = '/gcloud_path' os.environ['CORPUS_BUCKET'] = 'bucket' test_helpers.patch(self, [ @@ -247,14 +253,9 @@ def test_rsync_to_disk(self): corpus = corpus_manager.FuzzTargetCorpus('libFuzzer', 'fuzzer') self.assertTrue(corpus.rsync_to_disk('/dir', timeout=60)) self.assertEqual(self.mock.Popen.call_args[0][0], [ - '/gsutil_path/gsutil', - '-m', - '-q', - 'rsync', - '-r', - '-d', - 'gs://bucket/libFuzzer/fuzzer/', - '/dir', + '/gcloud_path/gcloud', 'storage', '--no-user-output-enabled', '-q', + 'rsync', 'gs://bucket/libFuzzer/fuzzer/', '/dir', '--recursive', + '--delete-unmatched-destination-objects' ]) def test_rsync_to_disk_with_regressions(self): @@ -267,23 +268,25 @@ def test_rsync_to_disk_with_regressions(self): self.assertEqual(commands, [ [ - '/gsutil_path/gsutil', - '-m', + '/gcloud_path/gcloud', + 'storage', + '--no-user-output-enabled', '-q', 'rsync', - '-r', - '-d', 'gs://bucket/libFuzzer/fuzzer/', '/dir', + '--recursive', + '--delete-unmatched-destination-objects', ], [ - '/gsutil_path/gsutil', - '-m', + '/gcloud_path/gcloud', + 'storage', + '--no-user-output-enabled', '-q', 'rsync', - '-r', 'gs://bucket/libFuzzer/fuzzer_regressions/', '/dir/regressions', + '--recursive', ], ]) @@ -292,8 +295,9 @@ def test_rsync_from_disk(self): corpus = corpus_manager.FuzzTargetCorpus('libFuzzer', 'fuzzer') self.assertTrue(corpus.rsync_from_disk('/dir')) self.assertEqual(self.mock.Popen.call_args_list[0][0][0], [ - '/gsutil_path/gsutil', '-m', '-q', 'rsync', '-r', '-d', '/dir', - 'gs://bucket/libFuzzer/fuzzer/' + '/gcloud_path/gcloud', 'storage', '--no-user-output-enabled', '-q', + 'rsync', '/dir', 'gs://bucket/libFuzzer/fuzzer/', '--recursive', + '--delete-unmatched-destination-objects' ]) def test_upload_files(self): @@ -305,7 +309,8 @@ def test_upload_files(self): mock_popen.communicate.assert_called_with(b'/dir/a\n/dir/b') self.assertEqual(self.mock.Popen.call_args[0][0], [ - '/gsutil_path/gsutil', '-m', 'cp', '-I', 'gs://bucket/libFuzzer/fuzzer/' + '/gcloud_path/gcloud', 'storage', '--user-output-enabled', '-q', 'cp', + '--read-paths-from-stdin', 'gs://bucket/libFuzzer/fuzzer/' ]) @@ -325,7 +330,7 @@ def setUp(self): test_utils.set_up_pyfakefs(self) self.fs.create_dir('/dir') - os.environ['GSUTIL_PATH'] = '/gsutil_path' + os.environ['GCLOUD_PATH'] = '/gcloud_path' os.environ['CORPUS_BUCKET'] = 'bucket' test_helpers.patch(self, [ diff --git a/src/clusterfuzz/_internal/tests/core/google_cloud_utils/gsutil_test.py b/src/clusterfuzz/_internal/tests/core/google_cloud_utils/gsutil_test.py index e4b1271b099..cba044f1dc1 100644 --- a/src/clusterfuzz/_internal/tests/core/google_cloud_utils/gsutil_test.py +++ b/src/clusterfuzz/_internal/tests/core/google_cloud_utils/gsutil_test.py @@ -11,12 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for gsutil.""" +"""Tests for gcloud storage.""" import os -from unittest import mock -from parameterized import parameterized from pyfakefs import fake_filesystem_unittest from clusterfuzz._internal.google_cloud_utils import gsutil @@ -25,378 +23,306 @@ from clusterfuzz._internal.tests.test_libs import test_utils -class GSUtilRunnerTest(fake_filesystem_unittest.TestCase): - """GSUtilRunner tests.""" +class GCloudStorageRunnerTest(fake_filesystem_unittest.TestCase): + """GCloudStorageRunner tests.""" def setUp(self): test_helpers.patch_environ(self) - patcher = mock.patch( - 'clusterfuzz._internal.system.new_process.ProcessRunner.run_and_wait') - self.mock_run_and_wait = patcher.start() - self.addCleanup(patcher.stop) - + test_helpers.patch( + self, + ['clusterfuzz._internal.system.new_process.ProcessRunner.run_and_wait']) test_utils.set_up_pyfakefs(self) + self.gcloud_runner_obj = gsutil.GCloudStorageRunner() - @parameterized.expand([(True,), (False,)]) - def test_rsync(self, use_gcloud_storage): - """Test rsync.""" - os.environ['USE_GCLOUD_STORAGE_RSYNC'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( - return_code=0) - gsutil_runner_obj.rsync('gs://source_bucket/source_path', - 'gs://target_bucket/target_path') - if use_gcloud_storage: - expected_args = [ - 'rsync', '--delete-unmatched-destination-objects', - 'gs://source_bucket/source_path', 'gs://target_bucket/target_path' - ] - else: - expected_args = [ - '-q', 'rsync', '-r', '-d', 'gs://source_bucket/source_path', - 'gs://target_bucket/target_path' - ] - - self.mock_run_and_wait.assert_called_with( - expected_args, timeout=18000, env=mock.ANY) - - @parameterized.expand([(True,), (False,)]) - def test_rsync_local_gcs(self, use_gcloud_storage): - """Test rsync.""" - os.environ['USE_GCLOUD_STORAGE_RSYNC'] = '1' if use_gcloud_storage else '0' + def _default_args(self, verbose=True, quiet=True): + additional_args = ['--user-output-enabled'] if verbose else [ + '--no-user-output-enabled' + ] + additional_args += ['-q'] if quiet else [] + return additional_args + + def test_rsync(self): + """Test remote rsync.""" + self.gcloud_runner_obj.rsync('gs://source_bucket/source_path', + 'gs://target_bucket/target_path') + expected_args = [ + 'rsync', 'gs://source_bucket/source_path', + 'gs://target_bucket/target_path', '--recursive', + '--delete-unmatched-destination-objects' + ] + expected_args = self._default_args(verbose=False) + expected_args + self.mock.run_and_wait.assert_called_with( + self.gcloud_runner_obj.gcloud_runner, expected_args, timeout=18000) + + def test_rsync_local_gcs(self): + """Test rsync locally.""" os.environ['LOCAL_GCS_BUCKETS_PATH'] = '/local' self.fs.create_dir('/local/source_bucket') self.fs.create_dir('/local/target_bucket') - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( - return_code=0) - gsutil_runner_obj.rsync('gs://source_bucket/source_path', - 'gs://target_bucket/target_path') - if use_gcloud_storage: - expected_args = [ - 'rsync', '--delete-unmatched-destination-objects', - '/local/source_bucket/objects/source_path', - '/local/target_bucket/objects/target_path' - ] - else: - expected_args = [ - '-q', 'rsync', '-r', '-d', '/local/source_bucket/objects/source_path', - '/local/target_bucket/objects/target_path' - ] - self.mock_run_and_wait.assert_called_with( - expected_args, timeout=18000, env=mock.ANY) + self.gcloud_runner_obj.rsync('gs://source_bucket/source_path', + 'gs://target_bucket/target_path') + + expected_args = [ + 'rsync', '/local/source_bucket/objects/source_path', + '/local/target_bucket/objects/target_path', '--recursive', + '--delete-unmatched-destination-objects' + ] + expected_args = self._default_args(verbose=False) + expected_args + self.mock.run_and_wait.assert_called_with( + self.gcloud_runner_obj.gcloud_runner, expected_args, timeout=18000) self.assertTrue(os.path.exists('/local/target_bucket/objects')) - @parameterized.expand([(True,), (False,)]) - def test_rsync_with_timeout(self, use_gcloud_storage): - """Test rsync.""" - os.environ['USE_GCLOUD_STORAGE_RSYNC'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( - return_code=0) - gsutil_runner_obj.rsync( + def test_rsync_with_timeout(self): + """Test remote rsync with timeout.""" + self.gcloud_runner_obj.rsync( 'gs://source_bucket/source_path', 'gs://target_bucket/target_path', timeout=1337) - if use_gcloud_storage: - expected_args = [ - 'rsync', '--delete-unmatched-destination-objects', - 'gs://source_bucket/source_path', 'gs://target_bucket/target_path' - ] - else: - expected_args = [ - '-q', 'rsync', '-r', '-d', 'gs://source_bucket/source_path', - 'gs://target_bucket/target_path' - ] - self.mock_run_and_wait.assert_called_with( - expected_args, timeout=1337, env=mock.ANY) - - @parameterized.expand([(True,), (False,)]) - def test_rsync_no_delete(self, use_gcloud_storage): - """Test rsync.""" - os.environ['USE_GCLOUD_STORAGE_RSYNC'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( - return_code=0) - gsutil_runner_obj.rsync( + expected_args = [ + 'rsync', 'gs://source_bucket/source_path', + 'gs://target_bucket/target_path', '--recursive', + '--delete-unmatched-destination-objects' + ] + expected_args = self._default_args(verbose=False) + expected_args + self.mock.run_and_wait.assert_called_with( + self.gcloud_runner_obj.gcloud_runner, expected_args, timeout=1337) + + def test_rsync_local_gcs_with_timeout(self): + """Test rsync locally with timeout.""" + os.environ['LOCAL_GCS_BUCKETS_PATH'] = '/local' + self.fs.create_dir('/local/source_bucket') + self.fs.create_dir('/local/target_bucket') + self.gcloud_runner_obj.rsync( + 'gs://source_bucket/source_path', + 'gs://target_bucket/target_path', + timeout=1337) + + expected_args = [ + 'rsync', '/local/source_bucket/objects/source_path', + '/local/target_bucket/objects/target_path', '--recursive', + '--delete-unmatched-destination-objects' + ] + expected_args = self._default_args(verbose=False) + expected_args + self.mock.run_and_wait.assert_called_with( + self.gcloud_runner_obj.gcloud_runner, expected_args, timeout=1337) + self.assertTrue(os.path.exists('/local/target_bucket/objects')) + + def test_rsync_no_delete(self): + """Test remote rsync without delete.""" + self.gcloud_runner_obj.rsync( 'gs://source_bucket/source_path', 'gs://target_bucket/target_path', delete=False) - if use_gcloud_storage: - expected_args = [ - 'rsync', 'gs://source_bucket/source_path', - 'gs://target_bucket/target_path' - ] - else: - expected_args = [ - '-q', 'rsync', '-r', 'gs://source_bucket/source_path', - 'gs://target_bucket/target_path' - ] - self.mock_run_and_wait.assert_called_with( - expected_args, timeout=18000, env=mock.ANY) - - @parameterized.expand([(True,), (False,)]) - def test_rsync_with_exclusion(self, use_gcloud_storage): - """Test rsync.""" - os.environ['USE_GCLOUD_STORAGE_RSYNC'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( - return_code=0) - gsutil_runner_obj.rsync( + expected_args = [ + 'rsync', 'gs://source_bucket/source_path', + 'gs://target_bucket/target_path', '--recursive' + ] + expected_args = self._default_args(verbose=False) + expected_args + self.mock.run_and_wait.assert_called_with( + self.gcloud_runner_obj.gcloud_runner, expected_args, timeout=18000) + + def test_rsync_local_gcs_without_delete(self): + """Test rsync locally without delete.""" + os.environ['LOCAL_GCS_BUCKETS_PATH'] = '/local' + self.fs.create_dir('/local/source_bucket') + self.fs.create_dir('/local/target_bucket') + self.gcloud_runner_obj.rsync( + 'gs://source_bucket/source_path', + 'gs://target_bucket/target_path', + delete=False) + + expected_args = [ + 'rsync', '/local/source_bucket/objects/source_path', + '/local/target_bucket/objects/target_path', '--recursive' + ] + expected_args = self._default_args(verbose=False) + expected_args + self.mock.run_and_wait.assert_called_with( + self.gcloud_runner_obj.gcloud_runner, expected_args, timeout=18000) + self.assertTrue(os.path.exists('/local/target_bucket/objects')) + + def test_rsync_with_exclusion(self): + """Test remote rsync with exclusion pattern.""" + self.gcloud_runner_obj.rsync( 'gs://source_bucket/source_path', 'gs://target_bucket/target_path', - timeout=1337, - delete=False, exclusion_pattern='"*.txt$"') - if use_gcloud_storage: - expected_args = [ - 'rsync', '--exclude', '"*.txt$"', 'gs://source_bucket/source_path', - 'gs://target_bucket/target_path' - ] - else: - expected_args = [ - '-q', 'rsync', '-r', '-x', '"*.txt$"', - 'gs://source_bucket/source_path', 'gs://target_bucket/target_path' - ] - self.mock_run_and_wait.assert_called_with( - expected_args, timeout=1337, env=mock.ANY) - - @parameterized.expand([(True,), (False,)]) - def test_download_file(self, use_gcloud_storage): - """Test download_file.""" - os.environ['USE_GCLOUD_STORAGE_CP'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( + expected_args = [ + 'rsync', 'gs://source_bucket/source_path', + 'gs://target_bucket/target_path', '--recursive', + '--delete-unmatched-destination-objects', '--exclude', '"*.txt$"' + ] + expected_args = self._default_args(verbose=False) + expected_args + self.mock.run_and_wait.assert_called_with( + self.gcloud_runner_obj.gcloud_runner, expected_args, timeout=18000) + + def test_download_file(self): + """Test remote download_file.""" + self.mock.run_and_wait.return_value = new_process.ProcessResult( return_code=0) self.assertTrue( - gsutil_runner_obj.download_file('gs://source_bucket/source_path', - '/target_path')) - self.mock_run_and_wait.assert_called_with( - ['cp', 'gs://source_bucket/source_path', '/target_path'], - timeout=None, - env=mock.ANY) - - @parameterized.expand([(True,), (False,)]) - def test_download_file_local_gcs(self, use_gcloud_storage): - """Test download_file.""" - os.environ['USE_GCLOUD_STORAGE_CP'] = '1' if use_gcloud_storage else '0' + self.gcloud_runner_obj.download_file('gs://source_bucket/source_path', + '/target_path')) + expected_args = self._default_args() + [ + 'cp', 'gs://source_bucket/source_path', '/target_path' + ] + self.mock.run_and_wait.assert_called_with( + self.gcloud_runner_obj.gcloud_runner, expected_args, timeout=None) + + def test_download_file_local_gcs(self): + """Test download_file locally.""" os.environ['LOCAL_GCS_BUCKETS_PATH'] = '/local' - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( + self.mock.run_and_wait.return_value = new_process.ProcessResult( return_code=0) self.assertTrue( - gsutil_runner_obj.download_file('gs://source_bucket/source_path', - '/target_path')) - self.mock_run_and_wait.assert_called_with( - ['cp', '/local/source_bucket/objects/source_path', '/target_path'], - timeout=None, - env=mock.ANY) - - @parameterized.expand([(True,), (False,)]) - def test_upload_file(self, use_gcloud_storage): - """Test upload_file.""" - os.environ['USE_GCLOUD_STORAGE_CP'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( + self.gcloud_runner_obj.download_file('gs://source_bucket/source_path', + '/target_path')) + expected_args = self._default_args() + [ + 'cp', '/local/source_bucket/objects/source_path', '/target_path' + ] + self.mock.run_and_wait.assert_called_with( + self.gcloud_runner_obj.gcloud_runner, expected_args, timeout=None) + + def test_upload_file(self): + """Test remote upload_file.""" + self.mock.run_and_wait.return_value = new_process.ProcessResult( return_code=0) self.assertTrue( - gsutil_runner_obj.upload_file('/source_path', - 'gs://target_bucket/target_path')) - self.mock_run_and_wait.assert_called_with( - ['cp', '/source_path', 'gs://target_bucket/target_path'], - timeout=None, - env=mock.ANY) - - @parameterized.expand([(True,), (False,)]) - def test_upload_file_local_gcs(self, use_gcloud_storage): - """Test upload_file.""" - os.environ['USE_GCLOUD_STORAGE_CP'] = '1' if use_gcloud_storage else '0' + self.gcloud_runner_obj.upload_file('/source_path', + 'gs://target_bucket/target_path')) + expected_args = self._default_args() + [ + 'cp', '/source_path', 'gs://target_bucket/target_path' + ] + self.mock.run_and_wait.assert_called_with( + self.gcloud_runner_obj.gcloud_runner, expected_args, timeout=None) + + def test_upload_file_local_gcs(self): + """Test upload_file locally.""" os.environ['LOCAL_GCS_BUCKETS_PATH'] = '/local' self.fs.create_dir('/local/target_bucket') - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( + + self.mock.run_and_wait.return_value = new_process.ProcessResult( return_code=0) self.assertTrue( - gsutil_runner_obj.upload_file('/source_path', - 'gs://target_bucket/target_path')) - self.mock_run_and_wait.assert_called_with( - ['cp', '/source_path', '/local/target_bucket/objects/target_path'], - timeout=None, - env=mock.ANY) + self.gcloud_runner_obj.upload_file('/source_path', + 'gs://target_bucket/target_path')) + expected_args = self._default_args() + [ + 'cp', '/source_path', '/local/target_bucket/objects/target_path' + ] + self.mock.run_and_wait.assert_called_with( + self.gcloud_runner_obj.gcloud_runner, expected_args, timeout=None) self.assertTrue(os.path.exists('/local/target_bucket/objects')) - @parameterized.expand([(True,), (False,)]) - def test_upload_file_with_metadata(self, use_gcloud_storage): - """Test upload_file with metadata.""" - os.environ['USE_GCLOUD_STORAGE_CP'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - - self.mock_run_and_wait.return_value = new_process.ProcessResult( + def test_upload_file_with_metadata(self): + """Test remote upload_file with metadata, gzip and timeout.""" + self.mock.run_and_wait.return_value = new_process.ProcessResult( return_code=0) - metadata = {'key1': 'value1', 'key2': 'value2'} + header_metadata = {'content-type': '"new-type"'} + custom_metadata = {'key1': 'value1', 'key2': 'value2'} self.assertTrue( - gsutil_runner_obj.upload_file( - '/source_path', 'gs://target_bucket/target_path', - metadata=metadata)) - - if use_gcloud_storage: - self.assertEqual(2, self.mock_run_and_wait.call_count) - cp_call_args = self.mock_run_and_wait.call_args_list[0].args[0] - self.assertEqual(['cp', '/source_path', 'gs://target_bucket/target_path'], - cp_call_args) - - update_call_args = self.mock_run_and_wait.call_args_list[1].args[0] - self.assertEqual('objects', update_call_args[0]) - self.assertEqual('update', update_call_args[1]) - self.assertEqual('gs://target_bucket/target_path', update_call_args[2]) - self.assertEqual('--update-custom-metadata', update_call_args[3]) - self.assertIn('key1=value1', update_call_args[4]) - self.assertIn('key2=value2', update_call_args[4]) - self.assertIn(',', update_call_args[4]) - else: - self.assertEqual(1, self.mock_run_and_wait.call_count) - called_args = self.mock_run_and_wait.call_args.args[0] - # Can't guarantee order of -h flags, so check for presence. - self.assertIn('-h', called_args) - self.assertIn('key1:value1', called_args) - self.assertIn('key2:value2', called_args) - self.assertIn('cp', called_args) - self.assertEqual('/source_path', called_args[-2]) - self.assertEqual('gs://target_bucket/target_path', called_args[-1]) - - @parameterized.expand([(True,), (False,)]) - def test_rsync_failure(self, use_gcloud_storage): - """Test rsync failure.""" - os.environ['USE_GCLOUD_STORAGE_RSYNC'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - mock_result = new_process.ProcessResult(return_code=1, output='Fake error') - self.mock_run_and_wait.return_value = mock_result - result = gsutil_runner_obj.rsync('gs://source_bucket/source_path', - 'gs://target_bucket/target_path') - self.assertEqual(mock_result, result) - - @parameterized.expand([(True,), (False,)]) - def test_download_file_failure(self, use_gcloud_storage): - """Test download_file failure.""" - os.environ['USE_GCLOUD_STORAGE_CP'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( - return_code=1, output='Fake error') - self.assertFalse( - gsutil_runner_obj.download_file('gs://source_bucket/source_path', - '/target_path')) - - @parameterized.expand([(True,), (False,)]) - def test_upload_file_failure(self, use_gcloud_storage): - """Test upload_file failure.""" - os.environ['USE_GCLOUD_STORAGE_CP'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( - return_code=1, output='Fake error') - self.assertFalse( - gsutil_runner_obj.upload_file('/source_path', - 'gs://target_bucket/target_path')) - - @parameterized.expand([(True,), (False,)]) - def test_upload_files_to_url_failure(self, use_gcloud_storage): - """Test upload_files_to_url failure.""" - os.environ['USE_GCLOUD_STORAGE_CP'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( - return_code=1, output='Fake error') - self.assertFalse( - gsutil_runner_obj.upload_files_to_url( - ['/source_path1', '/source_path2'], - 'gs://target_bucket/target_path')) - - @parameterized.expand([(True,), (False,)]) - def test_upload_files_to_url_empty(self, use_gcloud_storage): - """Test upload_files_to_url with empty file list.""" - os.environ['USE_GCLOUD_STORAGE_CP'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - self.assertFalse( - gsutil_runner_obj.upload_files_to_url([], - 'gs://target_bucket/target_path')) - self.assertEqual(0, self.mock_run_and_wait.call_count) - - @parameterized.expand([(True,), (False,)]) - def test_upload_file_with_options(self, use_gcloud_storage): - """Test upload_file.""" - os.environ['USE_GCLOUD_STORAGE_CP'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( - return_code=0) - self.assertTrue( - gsutil_runner_obj.upload_file( + self.gcloud_runner_obj.upload_file( '/source_path', 'gs://target_bucket/target_path', - timeout=1337, + metadata=header_metadata, + custom_metadata=custom_metadata, gzip=True, - metadata={'a': 'b'})) - - if use_gcloud_storage: - self.mock_run_and_wait.assert_any_call( - [ - 'cp', '--gzip-in-flight-all', '/source_path', - 'gs://target_bucket/target_path' - ], - timeout=1337, - env=mock.ANY) - else: - expected_args = [ - '-h', 'a:b', 'cp', '-Z', '/source_path', - 'gs://target_bucket/target_path' - ] - self.mock_run_and_wait.assert_called_with( - expected_args, timeout=1337, env=mock.ANY) - - @parameterized.expand([(True,), (False,)]) - def test_upload_files_to_url(self, use_gcloud_storage): - """Test upload_files_to_url.""" - os.environ['USE_GCLOUD_STORAGE_CP'] = '1' if use_gcloud_storage else '0' - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( + timeout=1337)) + self.assertEqual(2, self.mock.run_and_wait.call_count) + + # gcloud storage cp call + expected_args = self._default_args() + [ + 'cp', '--gzip-local-all', '/source_path', + 'gs://target_bucket/target_path' + ] + self.mock.run_and_wait.assert_any_call( + self.gcloud_runner_obj.gcloud_runner, expected_args, timeout=1337) + + # gcloud storage objects update call (metadata) + expected_args = self._default_args() + [ + 'objects', 'update', 'gs://target_bucket/target_path', + '--content-type="new-type"', + '--update-custom-metadata=key1=value1,key2=value2' + ] + self.mock.run_and_wait.assert_any_call( + self.gcloud_runner_obj.gcloud_runner, expected_args, timeout=1337) + + def test_upload_files_to_url(self): + """Test remote upload_files_to_url.""" + self.mock.run_and_wait.return_value = new_process.ProcessResult( return_code=0) self.assertTrue( - gsutil_runner_obj.upload_files_to_url( + self.gcloud_runner_obj.upload_files_to_url( ['/source_path1', '/source_path2'], 'gs://target_bucket/target_path')) - if use_gcloud_storage: - expected_args = [ - 'cp', '--read-paths-from-stdin', 'gs://target_bucket/target_path' - ] - else: - expected_args = ['cp', '-I', 'gs://target_bucket/target_path'] - self.mock_run_and_wait.assert_called_with( + expected_args = self._default_args() + [ + 'cp', '--read-paths-from-stdin', 'gs://target_bucket/target_path' + ] + self.mock.run_and_wait.assert_called_with( + self.gcloud_runner_obj.gcloud_runner, expected_args, input_data=b'/source_path1\n/source_path2', - timeout=None, - env=mock.ANY) + timeout=None) - @parameterized.expand([(True,), (False,)]) - def test_upload_files_to_url_local_gcs(self, use_gcloud_storage): - """Test upload_files_to_url.""" - os.environ['USE_GCLOUD_STORAGE_CP'] = '1' if use_gcloud_storage else '0' + def test_upload_files_to_url_local_gcs(self): + """Test upload_files_to_url locally.""" os.environ['LOCAL_GCS_BUCKETS_PATH'] = '/local' self.fs.create_dir('/local/target_bucket') - gsutil_runner_obj = gsutil.GSUtilRunner() - self.mock_run_and_wait.return_value = new_process.ProcessResult( + self.mock.run_and_wait.return_value = new_process.ProcessResult( return_code=0) self.assertTrue( - gsutil_runner_obj.upload_files_to_url( + self.gcloud_runner_obj.upload_files_to_url( ['/source_path1', '/source_path2'], - 'gs://target_bucket/target_path')) - if use_gcloud_storage: - expected_args = [ - 'cp', '--read-paths-from-stdin', - '/local/target_bucket/objects/target_path' - ] - else: - expected_args = ['cp', '-I', '/local/target_bucket/objects/target_path'] - self.mock_run_and_wait.assert_called_with( + 'gs://target_bucket/target_path', + timeout=1337)) + expected_args = self._default_args() + [ + 'cp', '--read-paths-from-stdin', + '/local/target_bucket/objects/target_path' + ] + self.mock.run_and_wait.assert_called_with( + self.gcloud_runner_obj.gcloud_runner, expected_args, input_data=b'/source_path1\n/source_path2', - timeout=None, - env=mock.ANY) + timeout=1337) self.assertTrue(os.path.exists('/local/target_bucket/objects')) + + def test_upload_files_to_url_empty(self): + """Test upload_files_to_url with empty file list.""" + self.mock.run_and_wait.return_value = new_process.ProcessResult( + return_code=0) + self.assertFalse( + self.gcloud_runner_obj.upload_files_to_url( + [], 'gs://target_bucket/target_path')) + self.assertEqual(0, self.mock.run_and_wait.call_count) + + def test_rsync_failure(self): + """Test rsync failure.""" + mock_result = new_process.ProcessResult(return_code=1, output='Fake error') + self.mock.run_and_wait.return_value = mock_result + result = self.gcloud_runner_obj.rsync('gs://source_bucket/source_path', + 'gs://target_bucket/target_path') + self.assertEqual(mock_result, result) + + def test_download_file_failure(self): + """Test download_file failure.""" + self.mock.run_and_wait.return_value = new_process.ProcessResult( + return_code=1, output='Fake error') + self.assertFalse( + self.gcloud_runner_obj.download_file('gs://source_bucket/source_path', + '/target_path')) + + def test_upload_file_failure(self): + """Test upload_file failure.""" + self.mock.run_and_wait.return_value = new_process.ProcessResult( + return_code=1, output='Fake error') + self.assertFalse( + self.gcloud_runner_obj.upload_file('/source_path', + 'gs://target_bucket/target_path')) + + def test_upload_files_to_url_failure(self): + """Test upload_files_to_url failure.""" + self.mock.run_and_wait.return_value = new_process.ProcessResult( + return_code=1, output='Fake error') + self.assertFalse( + self.gcloud_runner_obj.upload_files_to_url( + ['/source_path1', '/source_path2'], + 'gs://target_bucket/target_path')) diff --git a/src/clusterfuzz/_internal/tests/core/metrics/fuzzer_logs_test.py b/src/clusterfuzz/_internal/tests/core/metrics/fuzzer_logs_test.py index e1f5de09b0e..a7caace94e8 100644 --- a/src/clusterfuzz/_internal/tests/core/metrics/fuzzer_logs_test.py +++ b/src/clusterfuzz/_internal/tests/core/metrics/fuzzer_logs_test.py @@ -42,8 +42,8 @@ def setUp(self): def test_upload_to_logs(self): """Test a simple call to upload_to_logs.""" - mock_gsutil = mock.MagicMock() - self.mock.write_data.return_value = mock_gsutil + mock_gcloud = mock.MagicMock() + self.mock.write_data.return_value = mock_gcloud fuzzer_logs.upload_to_logs('fake-gcs-bucket', 'fake content') self.mock.write_data.assert_called_once_with( 'fake content', @@ -51,8 +51,8 @@ def test_upload_to_logs(self): def test_upload_to_logs_with_all_arguments(self): """Test a call to upload_to_logs with all arguments being passed.""" - mock_gsutil = mock.MagicMock() - self.mock.write_data.return_value = mock_gsutil + mock_gcloud = mock.MagicMock() + self.mock.write_data.return_value = mock_gcloud fuzzer_logs.upload_to_logs( 'gcs-bucket', 'fake content', diff --git a/src/clusterfuzz/_internal/tests/test_libs/untrusted_runner_helpers.py b/src/clusterfuzz/_internal/tests/test_libs/untrusted_runner_helpers.py index 2074d02399d..5f046169cfa 100644 --- a/src/clusterfuzz/_internal/tests/test_libs/untrusted_runner_helpers.py +++ b/src/clusterfuzz/_internal/tests/test_libs/untrusted_runner_helpers.py @@ -195,6 +195,7 @@ def setUp(self): worker_fuzz_inputs = file_host.rebase_to_worker_root(fuzz_inputs) shell.remove_directory(worker_fuzz_inputs, recreate=True) + environment.set_value('GCLOUD_PATH', os.path.dirname(_which('gcloud'))) environment.set_value('GSUTIL_PATH', os.path.dirname(_which('gsutil'))) test_utils.setup_pubsub('test-clusterfuzz')