diff options
Diffstat (limited to 'subcmds/forall.py')
-rw-r--r-- | subcmds/forall.py | 90 |
1 files changed, 48 insertions, 42 deletions
diff --git a/subcmds/forall.py b/subcmds/forall.py index 287f2e04..e5fc9e80 100644 --- a/subcmds/forall.py +++ b/subcmds/forall.py | |||
@@ -15,7 +15,6 @@ | |||
15 | import errno | 15 | import errno |
16 | import functools | 16 | import functools |
17 | import io | 17 | import io |
18 | import multiprocessing | ||
19 | import os | 18 | import os |
20 | import re | 19 | import re |
21 | import signal | 20 | import signal |
@@ -26,7 +25,6 @@ from color import Coloring | |||
26 | from command import Command | 25 | from command import Command |
27 | from command import DEFAULT_LOCAL_JOBS | 26 | from command import DEFAULT_LOCAL_JOBS |
28 | from command import MirrorSafeCommand | 27 | from command import MirrorSafeCommand |
29 | from command import WORKER_BATCH_SIZE | ||
30 | from error import ManifestInvalidRevisionError | 28 | from error import ManifestInvalidRevisionError |
31 | from repo_logging import RepoLogger | 29 | from repo_logging import RepoLogger |
32 | 30 | ||
@@ -241,7 +239,6 @@ without iterating through the remaining projects. | |||
241 | cmd.insert(cmd.index(cn) + 1, "--color") | 239 | cmd.insert(cmd.index(cn) + 1, "--color") |
242 | 240 | ||
243 | mirror = self.manifest.IsMirror | 241 | mirror = self.manifest.IsMirror |
244 | rc = 0 | ||
245 | 242 | ||
246 | smart_sync_manifest_name = "smart_sync_override.xml" | 243 | smart_sync_manifest_name = "smart_sync_override.xml" |
247 | smart_sync_manifest_path = os.path.join( | 244 | smart_sync_manifest_path = os.path.join( |
@@ -264,32 +261,41 @@ without iterating through the remaining projects. | |||
264 | 261 | ||
265 | os.environ["REPO_COUNT"] = str(len(projects)) | 262 | os.environ["REPO_COUNT"] = str(len(projects)) |
266 | 263 | ||
264 | def _ProcessResults(_pool, _output, results): | ||
265 | rc = 0 | ||
266 | first = True | ||
267 | for r, output in results: | ||
268 | if output: | ||
269 | if first: | ||
270 | first = False | ||
271 | elif opt.project_header: | ||
272 | print() | ||
273 | # To simplify the DoWorkWrapper, take care of automatic | ||
274 | # newlines. | ||
275 | end = "\n" | ||
276 | if output[-1] == "\n": | ||
277 | end = "" | ||
278 | print(output, end=end) | ||
279 | rc = rc or r | ||
280 | if r != 0 and opt.abort_on_errors: | ||
281 | raise Exception("Aborting due to previous error") | ||
282 | return rc | ||
283 | |||
267 | try: | 284 | try: |
268 | config = self.manifest.manifestProject.config | 285 | config = self.manifest.manifestProject.config |
269 | with multiprocessing.Pool(opt.jobs, InitWorker) as pool: | 286 | with self.ParallelContext(): |
270 | results_it = pool.imap( | 287 | self.get_parallel_context()["projects"] = projects |
288 | rc = self.ExecuteInParallel( | ||
289 | opt.jobs, | ||
271 | functools.partial( | 290 | functools.partial( |
272 | DoWorkWrapper, mirror, opt, cmd, shell, config | 291 | self.DoWorkWrapper, mirror, opt, cmd, shell, config |
273 | ), | 292 | ), |
274 | enumerate(projects), | 293 | range(len(projects)), |
275 | chunksize=WORKER_BATCH_SIZE, | 294 | callback=_ProcessResults, |
295 | ordered=True, | ||
296 | initializer=self.InitWorker, | ||
297 | chunksize=1, | ||
276 | ) | 298 | ) |
277 | first = True | ||
278 | for r, output in results_it: | ||
279 | if output: | ||
280 | if first: | ||
281 | first = False | ||
282 | elif opt.project_header: | ||
283 | print() | ||
284 | # To simplify the DoWorkWrapper, take care of automatic | ||
285 | # newlines. | ||
286 | end = "\n" | ||
287 | if output[-1] == "\n": | ||
288 | end = "" | ||
289 | print(output, end=end) | ||
290 | rc = rc or r | ||
291 | if r != 0 and opt.abort_on_errors: | ||
292 | raise Exception("Aborting due to previous error") | ||
293 | except (KeyboardInterrupt, WorkerKeyboardInterrupt): | 299 | except (KeyboardInterrupt, WorkerKeyboardInterrupt): |
294 | # Catch KeyboardInterrupt raised inside and outside of workers | 300 | # Catch KeyboardInterrupt raised inside and outside of workers |
295 | rc = rc or errno.EINTR | 301 | rc = rc or errno.EINTR |
@@ -304,29 +310,29 @@ without iterating through the remaining projects. | |||
304 | if rc != 0: | 310 | if rc != 0: |
305 | sys.exit(rc) | 311 | sys.exit(rc) |
306 | 312 | ||
313 | @classmethod | ||
314 | def InitWorker(cls): | ||
315 | signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
307 | 316 | ||
308 | class WorkerKeyboardInterrupt(Exception): | 317 | @classmethod |
309 | """Keyboard interrupt exception for worker processes.""" | 318 | def DoWorkWrapper(cls, mirror, opt, cmd, shell, config, project_idx): |
310 | 319 | """A wrapper around the DoWork() method. | |
311 | |||
312 | def InitWorker(): | ||
313 | signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
314 | 320 | ||
321 | Catch the KeyboardInterrupt exceptions here and re-raise them as a | ||
322 | different, ``Exception``-based exception to stop it flooding the console | ||
323 | with stacktraces and making the parent hang indefinitely. | ||
315 | 324 | ||
316 | def DoWorkWrapper(mirror, opt, cmd, shell, config, args): | 325 | """ |
317 | """A wrapper around the DoWork() method. | 326 | project = cls.get_parallel_context()["projects"][project_idx] |
327 | try: | ||
328 | return DoWork(project, mirror, opt, cmd, shell, project_idx, config) | ||
329 | except KeyboardInterrupt: | ||
330 | print("%s: Worker interrupted" % project.name) | ||
331 | raise WorkerKeyboardInterrupt() | ||
318 | 332 | ||
319 | Catch the KeyboardInterrupt exceptions here and re-raise them as a | ||
320 | different, ``Exception``-based exception to stop it flooding the console | ||
321 | with stacktraces and making the parent hang indefinitely. | ||
322 | 333 | ||
323 | """ | 334 | class WorkerKeyboardInterrupt(Exception): |
324 | cnt, project = args | 335 | """Keyboard interrupt exception for worker processes.""" |
325 | try: | ||
326 | return DoWork(project, mirror, opt, cmd, shell, cnt, config) | ||
327 | except KeyboardInterrupt: | ||
328 | print("%s: Worker interrupted" % project.name) | ||
329 | raise WorkerKeyboardInterrupt() | ||
330 | 336 | ||
331 | 337 | ||
332 | def DoWork(project, mirror, opt, cmd, shell, cnt, config): | 338 | def DoWork(project, mirror, opt, cmd, shell, cnt, config): |