summaryrefslogtreecommitdiffstats
path: root/command.py
diff options
context:
space:
mode:
Diffstat (limited to 'command.py')
-rw-r--r--command.py40
1 files changed, 40 insertions, 0 deletions
diff --git a/command.py b/command.py
index be2d6a6e..9b1220dc 100644
--- a/command.py
+++ b/command.py
@@ -12,6 +12,7 @@
12# See the License for the specific language governing permissions and 12# See the License for the specific language governing permissions and
13# limitations under the License. 13# limitations under the License.
14 14
15import multiprocessing
15import os 16import os
16import optparse 17import optparse
17import platform 18import platform
@@ -21,6 +22,7 @@ import sys
21from event_log import EventLog 22from event_log import EventLog
22from error import NoSuchProjectError 23from error import NoSuchProjectError
23from error import InvalidProjectGroupsError 24from error import InvalidProjectGroupsError
25import progress
24 26
25 27
26# Number of projects to submit to a single worker process at a time. 28# Number of projects to submit to a single worker process at a time.
@@ -156,6 +158,44 @@ class Command(object):
156 """ 158 """
157 raise NotImplementedError 159 raise NotImplementedError
158 160
161 @staticmethod
162 def ExecuteInParallel(jobs, func, inputs, callback, output=None, ordered=False):
163 """Helper for managing parallel execution boiler plate.
164
165 For subcommands that can easily split their work up.
166
167 Args:
168 jobs: How many parallel processes to use.
169 func: The function to apply to each of the |inputs|. Usually a
170 functools.partial for wrapping additional arguments. It will be run
171 in a separate process, so it must be pickalable, so nested functions
172 won't work. Methods on the subcommand Command class should work.
173 inputs: The list of items to process. Must be a list.
174 callback: The function to pass the results to for processing. It will be
175 executed in the main thread and process the results of |func| as they
176 become available. Thus it may be a local nested function. Its return
177 value is passed back directly. It takes three arguments:
178 - The processing pool (or None with one job).
179 - The |output| argument.
180 - An iterator for the results.
181 output: An output manager. May be progress.Progess or color.Coloring.
182 ordered: Whether the jobs should be processed in order.
183
184 Returns:
185 The |callback| function's results are returned.
186 """
187 try:
188 # NB: Multiprocessing is heavy, so don't spin it up for one job.
189 if len(inputs) == 1 or jobs == 1:
190 return callback(None, output, (func(x) for x in inputs))
191 else:
192 with multiprocessing.Pool(jobs) as pool:
193 submit = pool.imap if ordered else pool.imap_unordered
194 return callback(pool, output, submit(func, inputs, chunksize=WORKER_BATCH_SIZE))
195 finally:
196 if isinstance(output, progress.Progress):
197 output.end()
198
159 def _ResetPathToProjectMap(self, projects): 199 def _ResetPathToProjectMap(self, projects):
160 self._by_path = dict((p.worktree, p) for p in projects) 200 self._by_path = dict((p.worktree, p) for p in projects)
161 201