diff options
Diffstat (limited to 'command.py')
-rw-r--r-- | command.py | 40 |
1 files changed, 40 insertions, 0 deletions
@@ -12,6 +12,7 @@ | |||
12 | # See the License for the specific language governing permissions and | 12 | # See the License for the specific language governing permissions and |
13 | # limitations under the License. | 13 | # limitations under the License. |
14 | 14 | ||
15 | import multiprocessing | ||
15 | import os | 16 | import os |
16 | import optparse | 17 | import optparse |
17 | import platform | 18 | import platform |
@@ -21,6 +22,7 @@ import sys | |||
21 | from event_log import EventLog | 22 | from event_log import EventLog |
22 | from error import NoSuchProjectError | 23 | from error import NoSuchProjectError |
23 | from error import InvalidProjectGroupsError | 24 | from error import InvalidProjectGroupsError |
25 | import progress | ||
24 | 26 | ||
25 | 27 | ||
26 | # Number of projects to submit to a single worker process at a time. | 28 | # Number of projects to submit to a single worker process at a time. |
@@ -156,6 +158,44 @@ class Command(object): | |||
156 | """ | 158 | """ |
157 | raise NotImplementedError | 159 | raise NotImplementedError |
158 | 160 | ||
161 | @staticmethod | ||
162 | def ExecuteInParallel(jobs, func, inputs, callback, output=None, ordered=False): | ||
163 | """Helper for managing parallel execution boiler plate. | ||
164 | |||
165 | For subcommands that can easily split their work up. | ||
166 | |||
167 | Args: | ||
168 | jobs: How many parallel processes to use. | ||
169 | func: The function to apply to each of the |inputs|. Usually a | ||
170 | functools.partial for wrapping additional arguments. It will be run | ||
171 | in a separate process, so it must be pickalable, so nested functions | ||
172 | won't work. Methods on the subcommand Command class should work. | ||
173 | inputs: The list of items to process. Must be a list. | ||
174 | callback: The function to pass the results to for processing. It will be | ||
175 | executed in the main thread and process the results of |func| as they | ||
176 | become available. Thus it may be a local nested function. Its return | ||
177 | value is passed back directly. It takes three arguments: | ||
178 | - The processing pool (or None with one job). | ||
179 | - The |output| argument. | ||
180 | - An iterator for the results. | ||
181 | output: An output manager. May be progress.Progess or color.Coloring. | ||
182 | ordered: Whether the jobs should be processed in order. | ||
183 | |||
184 | Returns: | ||
185 | The |callback| function's results are returned. | ||
186 | """ | ||
187 | try: | ||
188 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | ||
189 | if len(inputs) == 1 or jobs == 1: | ||
190 | return callback(None, output, (func(x) for x in inputs)) | ||
191 | else: | ||
192 | with multiprocessing.Pool(jobs) as pool: | ||
193 | submit = pool.imap if ordered else pool.imap_unordered | ||
194 | return callback(pool, output, submit(func, inputs, chunksize=WORKER_BATCH_SIZE)) | ||
195 | finally: | ||
196 | if isinstance(output, progress.Progress): | ||
197 | output.end() | ||
198 | |||
159 | def _ResetPathToProjectMap(self, projects): | 199 | def _ResetPathToProjectMap(self, projects): |
160 | self._by_path = dict((p.worktree, p) for p in projects) | 200 | self._by_path = dict((p.worktree, p) for p in projects) |
161 | 201 | ||