summaryrefslogtreecommitdiffstats
path: root/subcmds/forall.py
diff options
context:
space:
mode:
Diffstat (limited to 'subcmds/forall.py')
-rw-r--r--subcmds/forall.py90
1 files changed, 48 insertions, 42 deletions
diff --git a/subcmds/forall.py b/subcmds/forall.py
index 287f2e04..e5fc9e80 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -15,7 +15,6 @@
15import errno 15import errno
16import functools 16import functools
17import io 17import io
18import multiprocessing
19import os 18import os
20import re 19import re
21import signal 20import signal
@@ -26,7 +25,6 @@ from color import Coloring
26from command import Command 25from command import Command
27from command import DEFAULT_LOCAL_JOBS 26from command import DEFAULT_LOCAL_JOBS
28from command import MirrorSafeCommand 27from command import MirrorSafeCommand
29from command import WORKER_BATCH_SIZE
30from error import ManifestInvalidRevisionError 28from error import ManifestInvalidRevisionError
31from repo_logging import RepoLogger 29from repo_logging import RepoLogger
32 30
@@ -241,7 +239,6 @@ without iterating through the remaining projects.
241 cmd.insert(cmd.index(cn) + 1, "--color") 239 cmd.insert(cmd.index(cn) + 1, "--color")
242 240
243 mirror = self.manifest.IsMirror 241 mirror = self.manifest.IsMirror
244 rc = 0
245 242
246 smart_sync_manifest_name = "smart_sync_override.xml" 243 smart_sync_manifest_name = "smart_sync_override.xml"
247 smart_sync_manifest_path = os.path.join( 244 smart_sync_manifest_path = os.path.join(
@@ -264,32 +261,41 @@ without iterating through the remaining projects.
264 261
265 os.environ["REPO_COUNT"] = str(len(projects)) 262 os.environ["REPO_COUNT"] = str(len(projects))
266 263
264 def _ProcessResults(_pool, _output, results):
265 rc = 0
266 first = True
267 for r, output in results:
268 if output:
269 if first:
270 first = False
271 elif opt.project_header:
272 print()
273 # To simplify the DoWorkWrapper, take care of automatic
274 # newlines.
275 end = "\n"
276 if output[-1] == "\n":
277 end = ""
278 print(output, end=end)
279 rc = rc or r
280 if r != 0 and opt.abort_on_errors:
281 raise Exception("Aborting due to previous error")
282 return rc
283
267 try: 284 try:
268 config = self.manifest.manifestProject.config 285 config = self.manifest.manifestProject.config
269 with multiprocessing.Pool(opt.jobs, InitWorker) as pool: 286 with self.ParallelContext():
270 results_it = pool.imap( 287 self.get_parallel_context()["projects"] = projects
288 rc = self.ExecuteInParallel(
289 opt.jobs,
271 functools.partial( 290 functools.partial(
272 DoWorkWrapper, mirror, opt, cmd, shell, config 291 self.DoWorkWrapper, mirror, opt, cmd, shell, config
273 ), 292 ),
274 enumerate(projects), 293 range(len(projects)),
275 chunksize=WORKER_BATCH_SIZE, 294 callback=_ProcessResults,
295 ordered=True,
296 initializer=self.InitWorker,
297 chunksize=1,
276 ) 298 )
277 first = True
278 for r, output in results_it:
279 if output:
280 if first:
281 first = False
282 elif opt.project_header:
283 print()
284 # To simplify the DoWorkWrapper, take care of automatic
285 # newlines.
286 end = "\n"
287 if output[-1] == "\n":
288 end = ""
289 print(output, end=end)
290 rc = rc or r
291 if r != 0 and opt.abort_on_errors:
292 raise Exception("Aborting due to previous error")
293 except (KeyboardInterrupt, WorkerKeyboardInterrupt): 299 except (KeyboardInterrupt, WorkerKeyboardInterrupt):
294 # Catch KeyboardInterrupt raised inside and outside of workers 300 # Catch KeyboardInterrupt raised inside and outside of workers
295 rc = rc or errno.EINTR 301 rc = rc or errno.EINTR
@@ -304,29 +310,29 @@ without iterating through the remaining projects.
304 if rc != 0: 310 if rc != 0:
305 sys.exit(rc) 311 sys.exit(rc)
306 312
313 @classmethod
314 def InitWorker(cls):
315 signal.signal(signal.SIGINT, signal.SIG_IGN)
307 316
308class WorkerKeyboardInterrupt(Exception): 317 @classmethod
309 """Keyboard interrupt exception for worker processes.""" 318 def DoWorkWrapper(cls, mirror, opt, cmd, shell, config, project_idx):
310 319 """A wrapper around the DoWork() method.
311
312def InitWorker():
313 signal.signal(signal.SIGINT, signal.SIG_IGN)
314 320
321 Catch the KeyboardInterrupt exceptions here and re-raise them as a
322 different, ``Exception``-based exception to stop it flooding the console
323 with stacktraces and making the parent hang indefinitely.
315 324
316def DoWorkWrapper(mirror, opt, cmd, shell, config, args): 325 """
317 """A wrapper around the DoWork() method. 326 project = cls.get_parallel_context()["projects"][project_idx]
327 try:
328 return DoWork(project, mirror, opt, cmd, shell, project_idx, config)
329 except KeyboardInterrupt:
330 print("%s: Worker interrupted" % project.name)
331 raise WorkerKeyboardInterrupt()
318 332
319 Catch the KeyboardInterrupt exceptions here and re-raise them as a
320 different, ``Exception``-based exception to stop it flooding the console
321 with stacktraces and making the parent hang indefinitely.
322 333
323 """ 334class WorkerKeyboardInterrupt(Exception):
324 cnt, project = args 335 """Keyboard interrupt exception for worker processes."""
325 try:
326 return DoWork(project, mirror, opt, cmd, shell, cnt, config)
327 except KeyboardInterrupt:
328 print("%s: Worker interrupted" % project.name)
329 raise WorkerKeyboardInterrupt()
330 336
331 337
332def DoWork(project, mirror, opt, cmd, shell, cnt, config): 338def DoWork(project, mirror, opt, cmd, shell, cnt, config):