From b5d075d04f1e555f85aad27e74f16073a50b2ae6 Mon Sep 17 00:00:00 2001 From: Mike Frysinger Date: Mon, 1 Mar 2021 00:56:38 -0500 Subject: command: add a helper for the parallel execution boilerplate Now that we have a bunch of subcommands doing parallel execution, a common pattern arises that we can factor out for most of them. We leave forall alone as it's a bit too complicated atm to cut over. Change-Id: I3617a4f7c66142bcd1ab030cb4cca698a65010ac Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/301942 Tested-by: Mike Frysinger Reviewed-by: Chris Mcdonald --- command.py | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) (limited to 'command.py') diff --git a/command.py b/command.py index be2d6a6e..9b1220dc 100644 --- a/command.py +++ b/command.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import multiprocessing import os import optparse import platform @@ -21,6 +22,7 @@ import sys from event_log import EventLog from error import NoSuchProjectError from error import InvalidProjectGroupsError +import progress # Number of projects to submit to a single worker process at a time. @@ -156,6 +158,44 @@ class Command(object): """ raise NotImplementedError + @staticmethod + def ExecuteInParallel(jobs, func, inputs, callback, output=None, ordered=False): + """Helper for managing parallel execution boiler plate. + + For subcommands that can easily split their work up. + + Args: + jobs: How many parallel processes to use. + func: The function to apply to each of the |inputs|. Usually a + functools.partial for wrapping additional arguments. It will be run + in a separate process, so it must be pickalable, so nested functions + won't work. Methods on the subcommand Command class should work. + inputs: The list of items to process. Must be a list. + callback: The function to pass the results to for processing. It will be + executed in the main thread and process the results of |func| as they + become available. Thus it may be a local nested function. Its return + value is passed back directly. It takes three arguments: + - The processing pool (or None with one job). + - The |output| argument. + - An iterator for the results. + output: An output manager. May be progress.Progess or color.Coloring. + ordered: Whether the jobs should be processed in order. + + Returns: + The |callback| function's results are returned. + """ + try: + # NB: Multiprocessing is heavy, so don't spin it up for one job. + if len(inputs) == 1 or jobs == 1: + return callback(None, output, (func(x) for x in inputs)) + else: + with multiprocessing.Pool(jobs) as pool: + submit = pool.imap if ordered else pool.imap_unordered + return callback(pool, output, submit(func, inputs, chunksize=WORKER_BATCH_SIZE)) + finally: + if isinstance(output, progress.Progress): + output.end() + def _ResetPathToProjectMap(self, projects): self._by_path = dict((p.worktree, p) for p in projects) -- cgit v1.2.3-54-g00ecf