From 8add62325dbe4df60cde1af6b093d99e79685140 Mon Sep 17 00:00:00 2001 From: Chris McDonald Date: Wed, 9 Dec 2020 14:27:59 -0700 Subject: Add parallelism to 'branches' command Spread the operation of querying which local branches exist across a pool of processes and build the name map of projects -> branches as these tasks finish rather than blocking on the entire query. The search operations are submitted in batches to reduce the overhead of interprocess communication. The `chunksize` argument used to control this batch size was selected by incrementing through powers of two until it stopped being faster. Change-Id: Ie3d7f799ee8e83e5058536caf53e2979175408b7 Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/291342 Tested-by: Chris Mcdonald Reviewed-by: Mike Frysinger --- subcmds/branches.py | 46 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) (limited to 'subcmds/branches.py') diff --git a/subcmds/branches.py b/subcmds/branches.py index 9709f7f0..2b1f8075 100644 --- a/subcmds/branches.py +++ b/subcmds/branches.py @@ -15,10 +15,20 @@ # limitations under the License. from __future__ import print_function +import itertools +import multiprocessing import sys from color import Coloring from command import Command +# Number of projects to submit to a single worker process at a time. +# This number represents a tradeoff between the overhead of IPC and finer +# grained opportunity for parallelism. This particular value was chosen by +# iterating through powers of two until the overall performance no longer +# improved. The performance of this batch size is not a function of the +# number of cores on the system. +WORKER_BATCH_SIZE = 32 + class BranchColoring(Coloring): def __init__(self, config): @@ -97,20 +107,32 @@ is shown, then the branch appears in all projects. """ + def _Options(self, p): + """Add flags to CLI parser for this subcommand.""" + default_jobs = min(multiprocessing.cpu_count(), 8) + p.add_option( + '-j', + '--jobs', + type=int, + default=default_jobs, + help='Number of worker processes to spawn ' + '(default: %s)' % default_jobs) + def Execute(self, opt, args): projects = self.GetProjects(args) out = BranchColoring(self.manifest.manifestProject.config) all_branches = {} project_cnt = len(projects) + with multiprocessing.Pool(processes=opt.jobs) as pool: + project_branches = pool.imap_unordered( + expand_project_to_branches, projects, chunksize=WORKER_BATCH_SIZE) - for project in projects: - for name, b in project.GetBranches().items(): - b.project = project + for name, b in itertools.chain.from_iterable(project_branches): if name not in all_branches: all_branches[name] = BranchInfo(name) all_branches[name].add(b) - names = list(sorted(all_branches)) + names = sorted(all_branches) if not names: print(' (no branches)', file=sys.stderr) @@ -180,3 +202,19 @@ is shown, then the branch appears in all projects. else: out.write(' in all projects') out.nl() + + +def expand_project_to_branches(project): + """Expands a project into a list of branch names & associated information. + + Args: + project: project.Project + + Returns: + List[Tuple[str, git_config.Branch]] + """ + branches = [] + for name, b in project.GetBranches().items(): + b.project = project + branches.append((name, b)) + return branches -- cgit v1.2.3-54-g00ecf