From b5d075d04f1e555f85aad27e74f16073a50b2ae6 Mon Sep 17 00:00:00 2001 From: Mike Frysinger Date: Mon, 1 Mar 2021 00:56:38 -0500 Subject: command: add a helper for the parallel execution boilerplate Now that we have a bunch of subcommands doing parallel execution, a common pattern arises that we can factor out for most of them. We leave forall alone as it's a bit too complicated atm to cut over. Change-Id: I3617a4f7c66142bcd1ab030cb4cca698a65010ac Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/301942 Tested-by: Mike Frysinger Reviewed-by: Chris Mcdonald --- subcmds/sync.py | 49 +++++++++++++++++++------------------------------ 1 file changed, 19 insertions(+), 30 deletions(-) (limited to 'subcmds/sync.py') diff --git a/subcmds/sync.py b/subcmds/sync.py index 21166af5..4763fadc 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py @@ -51,7 +51,7 @@ import git_superproject import gitc_utils from project import Project from project import RemoteSpec -from command import Command, MirrorSafeCommand, WORKER_BATCH_SIZE +from command import Command, MirrorSafeCommand from error import RepoChangedException, GitError, ManifestParseError import platform_utils from project import SyncBuffer @@ -428,11 +428,12 @@ later is required to fix a server side protocol bug. return (ret, fetched) - def _CheckoutOne(self, opt, project): + def _CheckoutOne(self, detach_head, force_sync, project): """Checkout work tree for one project Args: - opt: Program options returned from optparse. See _Options(). + detach_head: Whether to leave a detached HEAD. + force_sync: Force checking out of the repo. project: Project object for the project to checkout. Returns: @@ -440,10 +441,10 @@ later is required to fix a server side protocol bug. """ start = time.time() syncbuf = SyncBuffer(self.manifest.manifestProject.config, - detach_head=opt.detach_head) + detach_head=detach_head) success = False try: - project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync) + project.Sync_LocalHalf(syncbuf, force_sync=force_sync) success = syncbuf.Finish() except Exception as e: print('error: Cannot checkout %s: %s: %s' % @@ -464,44 +465,32 @@ later is required to fix a server side protocol bug. opt: Program options returned from optparse. See _Options(). err_results: A list of strings, paths to git repos where checkout failed. """ - ret = True - jobs = opt.jobs_checkout if opt.jobs_checkout else self.jobs - # Only checkout projects with worktrees. all_projects = [x for x in all_projects if x.worktree] - pm = Progress('Checking out', len(all_projects), quiet=opt.quiet) - - def _ProcessResults(results): + def _ProcessResults(pool, pm, results): + ret = True for (success, project, start, finish) in results: self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, start, finish, success) # Check for any errors before running any more tasks. # ...we'll let existing jobs finish, though. if not success: + ret = False err_results.append(project.relpath) if opt.fail_fast: - return False + if pool: + pool.close() + return ret pm.update(msg=project.name) - return True - - # NB: Multiprocessing is heavy, so don't spin it up for one job. - if len(all_projects) == 1 or jobs == 1: - if not _ProcessResults(self._CheckoutOne(opt, x) for x in all_projects): - ret = False - else: - with multiprocessing.Pool(jobs) as pool: - results = pool.imap_unordered( - functools.partial(self._CheckoutOne, opt), - all_projects, - chunksize=WORKER_BATCH_SIZE) - if not _ProcessResults(results): - ret = False - pool.close() - - pm.end() + return ret - return ret and not err_results + return self.ExecuteInParallel( + opt.jobs_checkout if opt.jobs_checkout else self.jobs, + functools.partial(self._CheckoutOne, opt.detach_head, opt.force_sync), + all_projects, + callback=_ProcessResults, + output=Progress('Checking out', len(all_projects), quiet=opt.quiet)) and not err_results def _GCProjects(self, projects, opt, err_event): pm = Progress('Garbage collecting', len(projects), delay=False, quiet=opt.quiet) -- cgit v1.2.3-54-g00ecf