summaryrefslogtreecommitdiffstats
path: root/subcmds/forall.py
diff options
context:
space:
mode:
authorKuang-che Wu <kcwu@google.com>2024-10-22 21:04:41 +0800
committerLUCI <gerrit-scoped@luci-project-accounts.iam.gserviceaccount.com>2024-10-23 23:34:34 +0000
commit8da4861b3860c505e39341b4135c21f67569e4d8 (patch)
tree6f300266c91322df0e61953b84381e1f403074a5 /subcmds/forall.py
parent39ffd9977e2f6cb1ca1757e59173fc93e0eab72c (diff)
downloadgit-repo-8da4861b3860c505e39341b4135c21f67569e4d8.tar.gz
subcmds: reduce multiprocessing serialization overhead
Follow the same approach as 39ffd9977e to reduce serialization overhead. Below benchmarks are tested with 2.7k projects on my workstation (warm cache). git tracing is disabled for benchmark. (seconds) | v2.48 | v2.48 | this CL | this CL | | -j32 | | -j32 ----------------------------------------------------------- with clean tree state: branches (none) | 5.6 | 5.9 | 1.0 | 0.9 status (clean) | 21.3 | 9.4 | 19.4 | 4.7 diff (none) | 7.6 | 7.2 | 5.7 | 2.2 prune (none) | 5.7 | 6.1 | 1.3 | 1.2 abandon (none) | 19.4 | 18.6 | 0.9 | 0.8 upload (none) | 19.7 | 18.7 | 0.9 | 0.8 forall -c true | 7.5 | 7.6 | 0.6 | 0.6 forall -c "git log -1" | 11.3 | 11.1 | 0.6 | 0.6 with branches: start BRANCH --all | 21.9 | 20.3 | 13.6 | 2.6 checkout BRANCH | 29.1 | 27.8 | 1.1 | 1.0 branches (2) | 28.0 | 28.6 | 1.5 | 1.3 abandon BRANCH | 29.2 | 27.5 | 9.7 | 2.2 Bug: b/371638995 Change-Id: I53989a3d1e43063587b3f52f852b1c2c56b49412 Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/440221 Reviewed-by: Josip Sokcevic <sokcevic@google.com> Tested-by: Kuang-che Wu <kcwu@google.com> Commit-Queue: Kuang-che Wu <kcwu@google.com>
Diffstat (limited to 'subcmds/forall.py')
-rw-r--r--subcmds/forall.py90
1 files changed, 48 insertions, 42 deletions
diff --git a/subcmds/forall.py b/subcmds/forall.py
index 287f2e04..e5fc9e80 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -15,7 +15,6 @@
15import errno 15import errno
16import functools 16import functools
17import io 17import io
18import multiprocessing
19import os 18import os
20import re 19import re
21import signal 20import signal
@@ -26,7 +25,6 @@ from color import Coloring
26from command import Command 25from command import Command
27from command import DEFAULT_LOCAL_JOBS 26from command import DEFAULT_LOCAL_JOBS
28from command import MirrorSafeCommand 27from command import MirrorSafeCommand
29from command import WORKER_BATCH_SIZE
30from error import ManifestInvalidRevisionError 28from error import ManifestInvalidRevisionError
31from repo_logging import RepoLogger 29from repo_logging import RepoLogger
32 30
@@ -241,7 +239,6 @@ without iterating through the remaining projects.
241 cmd.insert(cmd.index(cn) + 1, "--color") 239 cmd.insert(cmd.index(cn) + 1, "--color")
242 240
243 mirror = self.manifest.IsMirror 241 mirror = self.manifest.IsMirror
244 rc = 0
245 242
246 smart_sync_manifest_name = "smart_sync_override.xml" 243 smart_sync_manifest_name = "smart_sync_override.xml"
247 smart_sync_manifest_path = os.path.join( 244 smart_sync_manifest_path = os.path.join(
@@ -264,32 +261,41 @@ without iterating through the remaining projects.
264 261
265 os.environ["REPO_COUNT"] = str(len(projects)) 262 os.environ["REPO_COUNT"] = str(len(projects))
266 263
264 def _ProcessResults(_pool, _output, results):
265 rc = 0
266 first = True
267 for r, output in results:
268 if output:
269 if first:
270 first = False
271 elif opt.project_header:
272 print()
273 # To simplify the DoWorkWrapper, take care of automatic
274 # newlines.
275 end = "\n"
276 if output[-1] == "\n":
277 end = ""
278 print(output, end=end)
279 rc = rc or r
280 if r != 0 and opt.abort_on_errors:
281 raise Exception("Aborting due to previous error")
282 return rc
283
267 try: 284 try:
268 config = self.manifest.manifestProject.config 285 config = self.manifest.manifestProject.config
269 with multiprocessing.Pool(opt.jobs, InitWorker) as pool: 286 with self.ParallelContext():
270 results_it = pool.imap( 287 self.get_parallel_context()["projects"] = projects
288 rc = self.ExecuteInParallel(
289 opt.jobs,
271 functools.partial( 290 functools.partial(
272 DoWorkWrapper, mirror, opt, cmd, shell, config 291 self.DoWorkWrapper, mirror, opt, cmd, shell, config
273 ), 292 ),
274 enumerate(projects), 293 range(len(projects)),
275 chunksize=WORKER_BATCH_SIZE, 294 callback=_ProcessResults,
295 ordered=True,
296 initializer=self.InitWorker,
297 chunksize=1,
276 ) 298 )
277 first = True
278 for r, output in results_it:
279 if output:
280 if first:
281 first = False
282 elif opt.project_header:
283 print()
284 # To simplify the DoWorkWrapper, take care of automatic
285 # newlines.
286 end = "\n"
287 if output[-1] == "\n":
288 end = ""
289 print(output, end=end)
290 rc = rc or r
291 if r != 0 and opt.abort_on_errors:
292 raise Exception("Aborting due to previous error")
293 except (KeyboardInterrupt, WorkerKeyboardInterrupt): 299 except (KeyboardInterrupt, WorkerKeyboardInterrupt):
294 # Catch KeyboardInterrupt raised inside and outside of workers 300 # Catch KeyboardInterrupt raised inside and outside of workers
295 rc = rc or errno.EINTR 301 rc = rc or errno.EINTR
@@ -304,29 +310,29 @@ without iterating through the remaining projects.
304 if rc != 0: 310 if rc != 0:
305 sys.exit(rc) 311 sys.exit(rc)
306 312
313 @classmethod
314 def InitWorker(cls):
315 signal.signal(signal.SIGINT, signal.SIG_IGN)
307 316
308class WorkerKeyboardInterrupt(Exception): 317 @classmethod
309 """Keyboard interrupt exception for worker processes.""" 318 def DoWorkWrapper(cls, mirror, opt, cmd, shell, config, project_idx):
310 319 """A wrapper around the DoWork() method.
311
312def InitWorker():
313 signal.signal(signal.SIGINT, signal.SIG_IGN)
314 320
321 Catch the KeyboardInterrupt exceptions here and re-raise them as a
322 different, ``Exception``-based exception to stop it flooding the console
323 with stacktraces and making the parent hang indefinitely.
315 324
316def DoWorkWrapper(mirror, opt, cmd, shell, config, args): 325 """
317 """A wrapper around the DoWork() method. 326 project = cls.get_parallel_context()["projects"][project_idx]
327 try:
328 return DoWork(project, mirror, opt, cmd, shell, project_idx, config)
329 except KeyboardInterrupt:
330 print("%s: Worker interrupted" % project.name)
331 raise WorkerKeyboardInterrupt()
318 332
319 Catch the KeyboardInterrupt exceptions here and re-raise them as a
320 different, ``Exception``-based exception to stop it flooding the console
321 with stacktraces and making the parent hang indefinitely.
322 333
323 """ 334class WorkerKeyboardInterrupt(Exception):
324 cnt, project = args 335 """Keyboard interrupt exception for worker processes."""
325 try:
326 return DoWork(project, mirror, opt, cmd, shell, cnt, config)
327 except KeyboardInterrupt:
328 print("%s: Worker interrupted" % project.name)
329 raise WorkerKeyboardInterrupt()
330 336
331 337
332def DoWork(project, mirror, opt, cmd, shell, cnt, config): 338def DoWork(project, mirror, opt, cmd, shell, cnt, config):