diff options
author | Kuang-che Wu <kcwu@google.com> | 2024-10-22 21:04:41 +0800 |
---|---|---|
committer | LUCI <gerrit-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2024-10-23 23:34:34 +0000 |
commit | 8da4861b3860c505e39341b4135c21f67569e4d8 (patch) | |
tree | 6f300266c91322df0e61953b84381e1f403074a5 /subcmds/forall.py | |
parent | 39ffd9977e2f6cb1ca1757e59173fc93e0eab72c (diff) | |
download | git-repo-8da4861b3860c505e39341b4135c21f67569e4d8.tar.gz |
subcmds: reduce multiprocessing serialization overhead
Follow the same approach as 39ffd9977e to reduce serialization overhead.
Below benchmarks are tested with 2.7k projects on my workstation
(warm cache). git tracing is disabled for benchmark.
(seconds) | v2.48 | v2.48 | this CL | this CL
| | -j32 | | -j32
-----------------------------------------------------------
with clean tree state:
branches (none) | 5.6 | 5.9 | 1.0 | 0.9
status (clean) | 21.3 | 9.4 | 19.4 | 4.7
diff (none) | 7.6 | 7.2 | 5.7 | 2.2
prune (none) | 5.7 | 6.1 | 1.3 | 1.2
abandon (none) | 19.4 | 18.6 | 0.9 | 0.8
upload (none) | 19.7 | 18.7 | 0.9 | 0.8
forall -c true | 7.5 | 7.6 | 0.6 | 0.6
forall -c "git log -1" | 11.3 | 11.1 | 0.6 | 0.6
with branches:
start BRANCH --all | 21.9 | 20.3 | 13.6 | 2.6
checkout BRANCH | 29.1 | 27.8 | 1.1 | 1.0
branches (2) | 28.0 | 28.6 | 1.5 | 1.3
abandon BRANCH | 29.2 | 27.5 | 9.7 | 2.2
Bug: b/371638995
Change-Id: I53989a3d1e43063587b3f52f852b1c2c56b49412
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/440221
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
Tested-by: Kuang-che Wu <kcwu@google.com>
Commit-Queue: Kuang-che Wu <kcwu@google.com>
Diffstat (limited to 'subcmds/forall.py')
-rw-r--r-- | subcmds/forall.py | 90 |
1 files changed, 48 insertions, 42 deletions
diff --git a/subcmds/forall.py b/subcmds/forall.py index 287f2e04..e5fc9e80 100644 --- a/subcmds/forall.py +++ b/subcmds/forall.py | |||
@@ -15,7 +15,6 @@ | |||
15 | import errno | 15 | import errno |
16 | import functools | 16 | import functools |
17 | import io | 17 | import io |
18 | import multiprocessing | ||
19 | import os | 18 | import os |
20 | import re | 19 | import re |
21 | import signal | 20 | import signal |
@@ -26,7 +25,6 @@ from color import Coloring | |||
26 | from command import Command | 25 | from command import Command |
27 | from command import DEFAULT_LOCAL_JOBS | 26 | from command import DEFAULT_LOCAL_JOBS |
28 | from command import MirrorSafeCommand | 27 | from command import MirrorSafeCommand |
29 | from command import WORKER_BATCH_SIZE | ||
30 | from error import ManifestInvalidRevisionError | 28 | from error import ManifestInvalidRevisionError |
31 | from repo_logging import RepoLogger | 29 | from repo_logging import RepoLogger |
32 | 30 | ||
@@ -241,7 +239,6 @@ without iterating through the remaining projects. | |||
241 | cmd.insert(cmd.index(cn) + 1, "--color") | 239 | cmd.insert(cmd.index(cn) + 1, "--color") |
242 | 240 | ||
243 | mirror = self.manifest.IsMirror | 241 | mirror = self.manifest.IsMirror |
244 | rc = 0 | ||
245 | 242 | ||
246 | smart_sync_manifest_name = "smart_sync_override.xml" | 243 | smart_sync_manifest_name = "smart_sync_override.xml" |
247 | smart_sync_manifest_path = os.path.join( | 244 | smart_sync_manifest_path = os.path.join( |
@@ -264,32 +261,41 @@ without iterating through the remaining projects. | |||
264 | 261 | ||
265 | os.environ["REPO_COUNT"] = str(len(projects)) | 262 | os.environ["REPO_COUNT"] = str(len(projects)) |
266 | 263 | ||
264 | def _ProcessResults(_pool, _output, results): | ||
265 | rc = 0 | ||
266 | first = True | ||
267 | for r, output in results: | ||
268 | if output: | ||
269 | if first: | ||
270 | first = False | ||
271 | elif opt.project_header: | ||
272 | print() | ||
273 | # To simplify the DoWorkWrapper, take care of automatic | ||
274 | # newlines. | ||
275 | end = "\n" | ||
276 | if output[-1] == "\n": | ||
277 | end = "" | ||
278 | print(output, end=end) | ||
279 | rc = rc or r | ||
280 | if r != 0 and opt.abort_on_errors: | ||
281 | raise Exception("Aborting due to previous error") | ||
282 | return rc | ||
283 | |||
267 | try: | 284 | try: |
268 | config = self.manifest.manifestProject.config | 285 | config = self.manifest.manifestProject.config |
269 | with multiprocessing.Pool(opt.jobs, InitWorker) as pool: | 286 | with self.ParallelContext(): |
270 | results_it = pool.imap( | 287 | self.get_parallel_context()["projects"] = projects |
288 | rc = self.ExecuteInParallel( | ||
289 | opt.jobs, | ||
271 | functools.partial( | 290 | functools.partial( |
272 | DoWorkWrapper, mirror, opt, cmd, shell, config | 291 | self.DoWorkWrapper, mirror, opt, cmd, shell, config |
273 | ), | 292 | ), |
274 | enumerate(projects), | 293 | range(len(projects)), |
275 | chunksize=WORKER_BATCH_SIZE, | 294 | callback=_ProcessResults, |
295 | ordered=True, | ||
296 | initializer=self.InitWorker, | ||
297 | chunksize=1, | ||
276 | ) | 298 | ) |
277 | first = True | ||
278 | for r, output in results_it: | ||
279 | if output: | ||
280 | if first: | ||
281 | first = False | ||
282 | elif opt.project_header: | ||
283 | print() | ||
284 | # To simplify the DoWorkWrapper, take care of automatic | ||
285 | # newlines. | ||
286 | end = "\n" | ||
287 | if output[-1] == "\n": | ||
288 | end = "" | ||
289 | print(output, end=end) | ||
290 | rc = rc or r | ||
291 | if r != 0 and opt.abort_on_errors: | ||
292 | raise Exception("Aborting due to previous error") | ||
293 | except (KeyboardInterrupt, WorkerKeyboardInterrupt): | 299 | except (KeyboardInterrupt, WorkerKeyboardInterrupt): |
294 | # Catch KeyboardInterrupt raised inside and outside of workers | 300 | # Catch KeyboardInterrupt raised inside and outside of workers |
295 | rc = rc or errno.EINTR | 301 | rc = rc or errno.EINTR |
@@ -304,29 +310,29 @@ without iterating through the remaining projects. | |||
304 | if rc != 0: | 310 | if rc != 0: |
305 | sys.exit(rc) | 311 | sys.exit(rc) |
306 | 312 | ||
313 | @classmethod | ||
314 | def InitWorker(cls): | ||
315 | signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
307 | 316 | ||
308 | class WorkerKeyboardInterrupt(Exception): | 317 | @classmethod |
309 | """Keyboard interrupt exception for worker processes.""" | 318 | def DoWorkWrapper(cls, mirror, opt, cmd, shell, config, project_idx): |
310 | 319 | """A wrapper around the DoWork() method. | |
311 | |||
312 | def InitWorker(): | ||
313 | signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
314 | 320 | ||
321 | Catch the KeyboardInterrupt exceptions here and re-raise them as a | ||
322 | different, ``Exception``-based exception to stop it flooding the console | ||
323 | with stacktraces and making the parent hang indefinitely. | ||
315 | 324 | ||
316 | def DoWorkWrapper(mirror, opt, cmd, shell, config, args): | 325 | """ |
317 | """A wrapper around the DoWork() method. | 326 | project = cls.get_parallel_context()["projects"][project_idx] |
327 | try: | ||
328 | return DoWork(project, mirror, opt, cmd, shell, project_idx, config) | ||
329 | except KeyboardInterrupt: | ||
330 | print("%s: Worker interrupted" % project.name) | ||
331 | raise WorkerKeyboardInterrupt() | ||
318 | 332 | ||
319 | Catch the KeyboardInterrupt exceptions here and re-raise them as a | ||
320 | different, ``Exception``-based exception to stop it flooding the console | ||
321 | with stacktraces and making the parent hang indefinitely. | ||
322 | 333 | ||
323 | """ | 334 | class WorkerKeyboardInterrupt(Exception): |
324 | cnt, project = args | 335 | """Keyboard interrupt exception for worker processes.""" |
325 | try: | ||
326 | return DoWork(project, mirror, opt, cmd, shell, cnt, config) | ||
327 | except KeyboardInterrupt: | ||
328 | print("%s: Worker interrupted" % project.name) | ||
329 | raise WorkerKeyboardInterrupt() | ||
330 | 336 | ||
331 | 337 | ||
332 | def DoWork(project, mirror, opt, cmd, shell, cnt, config): | 338 | def DoWork(project, mirror, opt, cmd, shell, cnt, config): |