summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKuang-che Wu <kcwu@google.com>2024-10-22 21:04:41 +0800
committerLUCI <gerrit-scoped@luci-project-accounts.iam.gserviceaccount.com>2024-10-23 23:34:34 +0000
commit8da4861b3860c505e39341b4135c21f67569e4d8 (patch)
tree6f300266c91322df0e61953b84381e1f403074a5
parent39ffd9977e2f6cb1ca1757e59173fc93e0eab72c (diff)
downloadgit-repo-8da4861b3860c505e39341b4135c21f67569e4d8.tar.gz
subcmds: reduce multiprocessing serialization overhead
Follow the same approach as 39ffd9977e to reduce serialization overhead. Below benchmarks are tested with 2.7k projects on my workstation (warm cache). git tracing is disabled for benchmark. (seconds) | v2.48 | v2.48 | this CL | this CL | | -j32 | | -j32 ----------------------------------------------------------- with clean tree state: branches (none) | 5.6 | 5.9 | 1.0 | 0.9 status (clean) | 21.3 | 9.4 | 19.4 | 4.7 diff (none) | 7.6 | 7.2 | 5.7 | 2.2 prune (none) | 5.7 | 6.1 | 1.3 | 1.2 abandon (none) | 19.4 | 18.6 | 0.9 | 0.8 upload (none) | 19.7 | 18.7 | 0.9 | 0.8 forall -c true | 7.5 | 7.6 | 0.6 | 0.6 forall -c "git log -1" | 11.3 | 11.1 | 0.6 | 0.6 with branches: start BRANCH --all | 21.9 | 20.3 | 13.6 | 2.6 checkout BRANCH | 29.1 | 27.8 | 1.1 | 1.0 branches (2) | 28.0 | 28.6 | 1.5 | 1.3 abandon BRANCH | 29.2 | 27.5 | 9.7 | 2.2 Bug: b/371638995 Change-Id: I53989a3d1e43063587b3f52f852b1c2c56b49412 Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/440221 Reviewed-by: Josip Sokcevic <sokcevic@google.com> Tested-by: Kuang-che Wu <kcwu@google.com> Commit-Queue: Kuang-che Wu <kcwu@google.com>
-rw-r--r--command.py10
-rw-r--r--subcmds/abandon.py30
-rw-r--r--subcmds/branches.py49
-rw-r--r--subcmds/checkout.py34
-rw-r--r--subcmds/diff.py27
-rw-r--r--subcmds/forall.py90
-rw-r--r--subcmds/grep.py49
-rw-r--r--subcmds/prune.py20
-rw-r--r--subcmds/start.py40
-rw-r--r--subcmds/status.py27
-rw-r--r--subcmds/upload.py24
11 files changed, 228 insertions, 172 deletions
diff --git a/command.py b/command.py
index 2a2ce138..22115ac2 100644
--- a/command.py
+++ b/command.py
@@ -268,8 +268,10 @@ class Command:
268 cls._parallel_context = None 268 cls._parallel_context = None
269 269
270 @classmethod 270 @classmethod
271 def _SetParallelContext(cls, context): 271 def _InitParallelWorker(cls, context, initializer):
272 cls._parallel_context = context 272 cls._parallel_context = context
273 if initializer:
274 initializer()
273 275
274 @classmethod 276 @classmethod
275 def ExecuteInParallel( 277 def ExecuteInParallel(
@@ -281,6 +283,7 @@ class Command:
281 output=None, 283 output=None,
282 ordered=False, 284 ordered=False,
283 chunksize=WORKER_BATCH_SIZE, 285 chunksize=WORKER_BATCH_SIZE,
286 initializer=None,
284 ): 287 ):
285 """Helper for managing parallel execution boiler plate. 288 """Helper for managing parallel execution boiler plate.
286 289
@@ -307,6 +310,7 @@ class Command:
307 ordered: Whether the jobs should be processed in order. 310 ordered: Whether the jobs should be processed in order.
308 chunksize: The number of jobs processed in batch by parallel 311 chunksize: The number of jobs processed in batch by parallel
309 workers. 312 workers.
313 initializer: Worker initializer.
310 314
311 Returns: 315 Returns:
312 The |callback| function's results are returned. 316 The |callback| function's results are returned.
@@ -318,8 +322,8 @@ class Command:
318 else: 322 else:
319 with multiprocessing.Pool( 323 with multiprocessing.Pool(
320 jobs, 324 jobs,
321 initializer=cls._SetParallelContext, 325 initializer=cls._InitParallelWorker,
322 initargs=(cls._parallel_context,), 326 initargs=(cls._parallel_context, initializer),
323 ) as pool: 327 ) as pool:
324 submit = pool.imap if ordered else pool.imap_unordered 328 submit = pool.imap if ordered else pool.imap_unordered
325 return callback( 329 return callback(
diff --git a/subcmds/abandon.py b/subcmds/abandon.py
index e280d69e..3208be6b 100644
--- a/subcmds/abandon.py
+++ b/subcmds/abandon.py
@@ -70,8 +70,10 @@ It is equivalent to "git branch -D <branchname>".
70 else: 70 else:
71 args.insert(0, "'All local branches'") 71 args.insert(0, "'All local branches'")
72 72
73 def _ExecuteOne(self, all_branches, nb, project): 73 @classmethod
74 def _ExecuteOne(cls, all_branches, nb, project_idx):
74 """Abandon one project.""" 75 """Abandon one project."""
76 project = cls.get_parallel_context()["projects"][project_idx]
75 if all_branches: 77 if all_branches:
76 branches = project.GetBranches() 78 branches = project.GetBranches()
77 else: 79 else:
@@ -89,7 +91,7 @@ It is equivalent to "git branch -D <branchname>".
89 if status is not None: 91 if status is not None:
90 ret[name] = status 92 ret[name] = status
91 93
92 return (ret, project, errors) 94 return (ret, project_idx, errors)
93 95
94 def Execute(self, opt, args): 96 def Execute(self, opt, args):
95 nb = args[0].split() 97 nb = args[0].split()
@@ -102,7 +104,8 @@ It is equivalent to "git branch -D <branchname>".
102 _RelPath = lambda p: p.RelPath(local=opt.this_manifest_only) 104 _RelPath = lambda p: p.RelPath(local=opt.this_manifest_only)
103 105
104 def _ProcessResults(_pool, pm, states): 106 def _ProcessResults(_pool, pm, states):
105 for results, project, errors in states: 107 for results, project_idx, errors in states:
108 project = all_projects[project_idx]
106 for branch, status in results.items(): 109 for branch, status in results.items():
107 if status: 110 if status:
108 success[branch].append(project) 111 success[branch].append(project)
@@ -111,15 +114,18 @@ It is equivalent to "git branch -D <branchname>".
111 aggregate_errors.extend(errors) 114 aggregate_errors.extend(errors)
112 pm.update(msg="") 115 pm.update(msg="")
113 116
114 self.ExecuteInParallel( 117 with self.ParallelContext():
115 opt.jobs, 118 self.get_parallel_context()["projects"] = all_projects
116 functools.partial(self._ExecuteOne, opt.all, nb), 119 self.ExecuteInParallel(
117 all_projects, 120 opt.jobs,
118 callback=_ProcessResults, 121 functools.partial(self._ExecuteOne, opt.all, nb),
119 output=Progress( 122 range(len(all_projects)),
120 f"Abandon {nb}", len(all_projects), quiet=opt.quiet 123 callback=_ProcessResults,
121 ), 124 output=Progress(
122 ) 125 f"Abandon {nb}", len(all_projects), quiet=opt.quiet
126 ),
127 chunksize=1,
128 )
123 129
124 width = max( 130 width = max(
125 itertools.chain( 131 itertools.chain(
diff --git a/subcmds/branches.py b/subcmds/branches.py
index 59b5cb28..08c6389c 100644
--- a/subcmds/branches.py
+++ b/subcmds/branches.py
@@ -98,6 +98,22 @@ is shown, then the branch appears in all projects.
98""" 98"""
99 PARALLEL_JOBS = DEFAULT_LOCAL_JOBS 99 PARALLEL_JOBS = DEFAULT_LOCAL_JOBS
100 100
101 @classmethod
102 def _ExpandProjectToBranches(cls, project_idx):
103 """Expands a project into a list of branch names & associated info.
104
105 Args:
106 project_idx: project.Project index
107
108 Returns:
109 List[Tuple[str, git_config.Branch, int]]
110 """
111 branches = []
112 project = cls.get_parallel_context()["projects"][project_idx]
113 for name, b in project.GetBranches().items():
114 branches.append((name, b, project_idx))
115 return branches
116
101 def Execute(self, opt, args): 117 def Execute(self, opt, args):
102 projects = self.GetProjects( 118 projects = self.GetProjects(
103 args, all_manifests=not opt.this_manifest_only 119 args, all_manifests=not opt.this_manifest_only
@@ -107,17 +123,20 @@ is shown, then the branch appears in all projects.
107 project_cnt = len(projects) 123 project_cnt = len(projects)
108 124
109 def _ProcessResults(_pool, _output, results): 125 def _ProcessResults(_pool, _output, results):
110 for name, b in itertools.chain.from_iterable(results): 126 for name, b, project_idx in itertools.chain.from_iterable(results):
127 b.project = projects[project_idx]
111 if name not in all_branches: 128 if name not in all_branches:
112 all_branches[name] = BranchInfo(name) 129 all_branches[name] = BranchInfo(name)
113 all_branches[name].add(b) 130 all_branches[name].add(b)
114 131
115 self.ExecuteInParallel( 132 with self.ParallelContext():
116 opt.jobs, 133 self.get_parallel_context()["projects"] = projects
117 expand_project_to_branches, 134 self.ExecuteInParallel(
118 projects, 135 opt.jobs,
119 callback=_ProcessResults, 136 self._ExpandProjectToBranches,
120 ) 137 range(len(projects)),
138 callback=_ProcessResults,
139 )
121 140
122 names = sorted(all_branches) 141 names = sorted(all_branches)
123 142
@@ -191,19 +210,3 @@ is shown, then the branch appears in all projects.
191 else: 210 else:
192 out.write(" in all projects") 211 out.write(" in all projects")
193 out.nl() 212 out.nl()
194
195
196def expand_project_to_branches(project):
197 """Expands a project into a list of branch names & associated information.
198
199 Args:
200 project: project.Project
201
202 Returns:
203 List[Tuple[str, git_config.Branch]]
204 """
205 branches = []
206 for name, b in project.GetBranches().items():
207 b.project = project
208 branches.append((name, b))
209 return branches
diff --git a/subcmds/checkout.py b/subcmds/checkout.py
index 379bfa18..859ddf6c 100644
--- a/subcmds/checkout.py
+++ b/subcmds/checkout.py
@@ -20,7 +20,6 @@ from command import DEFAULT_LOCAL_JOBS
20from error import GitError 20from error import GitError
21from error import RepoExitError 21from error import RepoExitError
22from progress import Progress 22from progress import Progress
23from project import Project
24from repo_logging import RepoLogger 23from repo_logging import RepoLogger
25 24
26 25
@@ -30,7 +29,7 @@ logger = RepoLogger(__file__)
30class CheckoutBranchResult(NamedTuple): 29class CheckoutBranchResult(NamedTuple):
31 # Whether the Project is on the branch (i.e. branch exists and no errors) 30 # Whether the Project is on the branch (i.e. branch exists and no errors)
32 result: bool 31 result: bool
33 project: Project 32 project_idx: int
34 error: Exception 33 error: Exception
35 34
36 35
@@ -62,15 +61,17 @@ The command is equivalent to:
62 if not args: 61 if not args:
63 self.Usage() 62 self.Usage()
64 63
65 def _ExecuteOne(self, nb, project): 64 @classmethod
65 def _ExecuteOne(cls, nb, project_idx):
66 """Checkout one project.""" 66 """Checkout one project."""
67 error = None 67 error = None
68 result = None 68 result = None
69 project = cls.get_parallel_context()["projects"][project_idx]
69 try: 70 try:
70 result = project.CheckoutBranch(nb) 71 result = project.CheckoutBranch(nb)
71 except GitError as e: 72 except GitError as e:
72 error = e 73 error = e
73 return CheckoutBranchResult(result, project, error) 74 return CheckoutBranchResult(result, project_idx, error)
74 75
75 def Execute(self, opt, args): 76 def Execute(self, opt, args):
76 nb = args[0] 77 nb = args[0]
@@ -83,22 +84,25 @@ The command is equivalent to:
83 84
84 def _ProcessResults(_pool, pm, results): 85 def _ProcessResults(_pool, pm, results):
85 for result in results: 86 for result in results:
87 project = all_projects[result.project_idx]
86 if result.error is not None: 88 if result.error is not None:
87 err.append(result.error) 89 err.append(result.error)
88 err_projects.append(result.project) 90 err_projects.append(project)
89 elif result.result: 91 elif result.result:
90 success.append(result.project) 92 success.append(project)
91 pm.update(msg="") 93 pm.update(msg="")
92 94
93 self.ExecuteInParallel( 95 with self.ParallelContext():
94 opt.jobs, 96 self.get_parallel_context()["projects"] = all_projects
95 functools.partial(self._ExecuteOne, nb), 97 self.ExecuteInParallel(
96 all_projects, 98 opt.jobs,
97 callback=_ProcessResults, 99 functools.partial(self._ExecuteOne, nb),
98 output=Progress( 100 range(len(all_projects)),
99 f"Checkout {nb}", len(all_projects), quiet=opt.quiet 101 callback=_ProcessResults,
100 ), 102 output=Progress(
101 ) 103 f"Checkout {nb}", len(all_projects), quiet=opt.quiet
104 ),
105 )
102 106
103 if err_projects: 107 if err_projects:
104 for p in err_projects: 108 for p in err_projects:
diff --git a/subcmds/diff.py b/subcmds/diff.py
index d9d72b40..7bb0cbbd 100644
--- a/subcmds/diff.py
+++ b/subcmds/diff.py
@@ -40,7 +40,8 @@ to the Unix 'patch' command.
40 help="paths are relative to the repository root", 40 help="paths are relative to the repository root",
41 ) 41 )
42 42
43 def _ExecuteOne(self, absolute, local, project): 43 @classmethod
44 def _ExecuteOne(cls, absolute, local, project_idx):
44 """Obtains the diff for a specific project. 45 """Obtains the diff for a specific project.
45 46
46 Args: 47 Args:
@@ -48,12 +49,13 @@ to the Unix 'patch' command.
48 local: a boolean, if True, the path is relative to the local 49 local: a boolean, if True, the path is relative to the local
49 (sub)manifest. If false, the path is relative to the outermost 50 (sub)manifest. If false, the path is relative to the outermost
50 manifest. 51 manifest.
51 project: Project to get status of. 52 project_idx: Project index to get status of.
52 53
53 Returns: 54 Returns:
54 The status of the project. 55 The status of the project.
55 """ 56 """
56 buf = io.StringIO() 57 buf = io.StringIO()
58 project = cls.get_parallel_context()["projects"][project_idx]
57 ret = project.PrintWorkTreeDiff(absolute, output_redir=buf, local=local) 59 ret = project.PrintWorkTreeDiff(absolute, output_redir=buf, local=local)
58 return (ret, buf.getvalue()) 60 return (ret, buf.getvalue())
59 61
@@ -71,12 +73,15 @@ to the Unix 'patch' command.
71 ret = 1 73 ret = 1
72 return ret 74 return ret
73 75
74 return self.ExecuteInParallel( 76 with self.ParallelContext():
75 opt.jobs, 77 self.get_parallel_context()["projects"] = all_projects
76 functools.partial( 78 return self.ExecuteInParallel(
77 self._ExecuteOne, opt.absolute, opt.this_manifest_only 79 opt.jobs,
78 ), 80 functools.partial(
79 all_projects, 81 self._ExecuteOne, opt.absolute, opt.this_manifest_only
80 callback=_ProcessResults, 82 ),
81 ordered=True, 83 range(len(all_projects)),
82 ) 84 callback=_ProcessResults,
85 ordered=True,
86 chunksize=1,
87 )
diff --git a/subcmds/forall.py b/subcmds/forall.py
index 287f2e04..e5fc9e80 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -15,7 +15,6 @@
15import errno 15import errno
16import functools 16import functools
17import io 17import io
18import multiprocessing
19import os 18import os
20import re 19import re
21import signal 20import signal
@@ -26,7 +25,6 @@ from color import Coloring
26from command import Command 25from command import Command
27from command import DEFAULT_LOCAL_JOBS 26from command import DEFAULT_LOCAL_JOBS
28from command import MirrorSafeCommand 27from command import MirrorSafeCommand
29from command import WORKER_BATCH_SIZE
30from error import ManifestInvalidRevisionError 28from error import ManifestInvalidRevisionError
31from repo_logging import RepoLogger 29from repo_logging import RepoLogger
32 30
@@ -241,7 +239,6 @@ without iterating through the remaining projects.
241 cmd.insert(cmd.index(cn) + 1, "--color") 239 cmd.insert(cmd.index(cn) + 1, "--color")
242 240
243 mirror = self.manifest.IsMirror 241 mirror = self.manifest.IsMirror
244 rc = 0
245 242
246 smart_sync_manifest_name = "smart_sync_override.xml" 243 smart_sync_manifest_name = "smart_sync_override.xml"
247 smart_sync_manifest_path = os.path.join( 244 smart_sync_manifest_path = os.path.join(
@@ -264,32 +261,41 @@ without iterating through the remaining projects.
264 261
265 os.environ["REPO_COUNT"] = str(len(projects)) 262 os.environ["REPO_COUNT"] = str(len(projects))
266 263
264 def _ProcessResults(_pool, _output, results):
265 rc = 0
266 first = True
267 for r, output in results:
268 if output:
269 if first:
270 first = False
271 elif opt.project_header:
272 print()
273 # To simplify the DoWorkWrapper, take care of automatic
274 # newlines.
275 end = "\n"
276 if output[-1] == "\n":
277 end = ""
278 print(output, end=end)
279 rc = rc or r
280 if r != 0 and opt.abort_on_errors:
281 raise Exception("Aborting due to previous error")
282 return rc
283
267 try: 284 try:
268 config = self.manifest.manifestProject.config 285 config = self.manifest.manifestProject.config
269 with multiprocessing.Pool(opt.jobs, InitWorker) as pool: 286 with self.ParallelContext():
270 results_it = pool.imap( 287 self.get_parallel_context()["projects"] = projects
288 rc = self.ExecuteInParallel(
289 opt.jobs,
271 functools.partial( 290 functools.partial(
272 DoWorkWrapper, mirror, opt, cmd, shell, config 291 self.DoWorkWrapper, mirror, opt, cmd, shell, config
273 ), 292 ),
274 enumerate(projects), 293 range(len(projects)),
275 chunksize=WORKER_BATCH_SIZE, 294 callback=_ProcessResults,
295 ordered=True,
296 initializer=self.InitWorker,
297 chunksize=1,
276 ) 298 )
277 first = True
278 for r, output in results_it:
279 if output:
280 if first:
281 first = False
282 elif opt.project_header:
283 print()
284 # To simplify the DoWorkWrapper, take care of automatic
285 # newlines.
286 end = "\n"
287 if output[-1] == "\n":
288 end = ""
289 print(output, end=end)
290 rc = rc or r
291 if r != 0 and opt.abort_on_errors:
292 raise Exception("Aborting due to previous error")
293 except (KeyboardInterrupt, WorkerKeyboardInterrupt): 299 except (KeyboardInterrupt, WorkerKeyboardInterrupt):
294 # Catch KeyboardInterrupt raised inside and outside of workers 300 # Catch KeyboardInterrupt raised inside and outside of workers
295 rc = rc or errno.EINTR 301 rc = rc or errno.EINTR
@@ -304,29 +310,29 @@ without iterating through the remaining projects.
304 if rc != 0: 310 if rc != 0:
305 sys.exit(rc) 311 sys.exit(rc)
306 312
313 @classmethod
314 def InitWorker(cls):
315 signal.signal(signal.SIGINT, signal.SIG_IGN)
307 316
308class WorkerKeyboardInterrupt(Exception): 317 @classmethod
309 """Keyboard interrupt exception for worker processes.""" 318 def DoWorkWrapper(cls, mirror, opt, cmd, shell, config, project_idx):
310 319 """A wrapper around the DoWork() method.
311
312def InitWorker():
313 signal.signal(signal.SIGINT, signal.SIG_IGN)
314 320
321 Catch the KeyboardInterrupt exceptions here and re-raise them as a
322 different, ``Exception``-based exception to stop it flooding the console
323 with stacktraces and making the parent hang indefinitely.
315 324
316def DoWorkWrapper(mirror, opt, cmd, shell, config, args): 325 """
317 """A wrapper around the DoWork() method. 326 project = cls.get_parallel_context()["projects"][project_idx]
327 try:
328 return DoWork(project, mirror, opt, cmd, shell, project_idx, config)
329 except KeyboardInterrupt:
330 print("%s: Worker interrupted" % project.name)
331 raise WorkerKeyboardInterrupt()
318 332
319 Catch the KeyboardInterrupt exceptions here and re-raise them as a
320 different, ``Exception``-based exception to stop it flooding the console
321 with stacktraces and making the parent hang indefinitely.
322 333
323 """ 334class WorkerKeyboardInterrupt(Exception):
324 cnt, project = args 335 """Keyboard interrupt exception for worker processes."""
325 try:
326 return DoWork(project, mirror, opt, cmd, shell, cnt, config)
327 except KeyboardInterrupt:
328 print("%s: Worker interrupted" % project.name)
329 raise WorkerKeyboardInterrupt()
330 336
331 337
332def DoWork(project, mirror, opt, cmd, shell, cnt, config): 338def DoWork(project, mirror, opt, cmd, shell, cnt, config):
diff --git a/subcmds/grep.py b/subcmds/grep.py
index b677b6bd..918651d9 100644
--- a/subcmds/grep.py
+++ b/subcmds/grep.py
@@ -23,7 +23,6 @@ from error import GitError
23from error import InvalidArgumentsError 23from error import InvalidArgumentsError
24from error import SilentRepoExitError 24from error import SilentRepoExitError
25from git_command import GitCommand 25from git_command import GitCommand
26from project import Project
27from repo_logging import RepoLogger 26from repo_logging import RepoLogger
28 27
29 28
@@ -40,7 +39,7 @@ class GrepColoring(Coloring):
40class ExecuteOneResult(NamedTuple): 39class ExecuteOneResult(NamedTuple):
41 """Result from an execute instance.""" 40 """Result from an execute instance."""
42 41
43 project: Project 42 project_idx: int
44 rc: int 43 rc: int
45 stdout: str 44 stdout: str
46 stderr: str 45 stderr: str
@@ -262,8 +261,10 @@ contain a line that matches both expressions:
262 help="Show only file names not containing matching lines", 261 help="Show only file names not containing matching lines",
263 ) 262 )
264 263
265 def _ExecuteOne(self, cmd_argv, project): 264 @classmethod
265 def _ExecuteOne(cls, cmd_argv, project_idx):
266 """Process one project.""" 266 """Process one project."""
267 project = cls.get_parallel_context()["projects"][project_idx]
267 try: 268 try:
268 p = GitCommand( 269 p = GitCommand(
269 project, 270 project,
@@ -274,7 +275,7 @@ contain a line that matches both expressions:
274 verify_command=True, 275 verify_command=True,
275 ) 276 )
276 except GitError as e: 277 except GitError as e:
277 return ExecuteOneResult(project, -1, None, str(e), e) 278 return ExecuteOneResult(project_idx, -1, None, str(e), e)
278 279
279 try: 280 try:
280 error = None 281 error = None
@@ -282,10 +283,12 @@ contain a line that matches both expressions:
282 except GitError as e: 283 except GitError as e:
283 rc = 1 284 rc = 1
284 error = e 285 error = e
285 return ExecuteOneResult(project, rc, p.stdout, p.stderr, error) 286 return ExecuteOneResult(project_idx, rc, p.stdout, p.stderr, error)
286 287
287 @staticmethod 288 @staticmethod
288 def _ProcessResults(full_name, have_rev, opt, _pool, out, results): 289 def _ProcessResults(
290 full_name, have_rev, opt, projects, _pool, out, results
291 ):
289 git_failed = False 292 git_failed = False
290 bad_rev = False 293 bad_rev = False
291 have_match = False 294 have_match = False
@@ -293,9 +296,10 @@ contain a line that matches both expressions:
293 errors = [] 296 errors = []
294 297
295 for result in results: 298 for result in results:
299 project = projects[result.project_idx]
296 if result.rc < 0: 300 if result.rc < 0:
297 git_failed = True 301 git_failed = True
298 out.project("--- project %s ---" % _RelPath(result.project)) 302 out.project("--- project %s ---" % _RelPath(project))
299 out.nl() 303 out.nl()
300 out.fail("%s", result.stderr) 304 out.fail("%s", result.stderr)
301 out.nl() 305 out.nl()
@@ -311,9 +315,7 @@ contain a line that matches both expressions:
311 ): 315 ):
312 bad_rev = True 316 bad_rev = True
313 else: 317 else:
314 out.project( 318 out.project("--- project %s ---" % _RelPath(project))
315 "--- project %s ---" % _RelPath(result.project)
316 )
317 out.nl() 319 out.nl()
318 out.fail("%s", result.stderr.strip()) 320 out.fail("%s", result.stderr.strip())
319 out.nl() 321 out.nl()
@@ -331,13 +333,13 @@ contain a line that matches both expressions:
331 rev, line = line.split(":", 1) 333 rev, line = line.split(":", 1)
332 out.write("%s", rev) 334 out.write("%s", rev)
333 out.write(":") 335 out.write(":")
334 out.project(_RelPath(result.project)) 336 out.project(_RelPath(project))
335 out.write("/") 337 out.write("/")
336 out.write("%s", line) 338 out.write("%s", line)
337 out.nl() 339 out.nl()
338 elif full_name: 340 elif full_name:
339 for line in r: 341 for line in r:
340 out.project(_RelPath(result.project)) 342 out.project(_RelPath(project))
341 out.write("/") 343 out.write("/")
342 out.write("%s", line) 344 out.write("%s", line)
343 out.nl() 345 out.nl()
@@ -381,16 +383,19 @@ contain a line that matches both expressions:
381 cmd_argv.extend(opt.revision) 383 cmd_argv.extend(opt.revision)
382 cmd_argv.append("--") 384 cmd_argv.append("--")
383 385
384 git_failed, bad_rev, have_match, errors = self.ExecuteInParallel( 386 with self.ParallelContext():
385 opt.jobs, 387 self.get_parallel_context()["projects"] = projects
386 functools.partial(self._ExecuteOne, cmd_argv), 388 git_failed, bad_rev, have_match, errors = self.ExecuteInParallel(
387 projects, 389 opt.jobs,
388 callback=functools.partial( 390 functools.partial(self._ExecuteOne, cmd_argv),
389 self._ProcessResults, full_name, have_rev, opt 391 range(len(projects)),
390 ), 392 callback=functools.partial(
391 output=out, 393 self._ProcessResults, full_name, have_rev, opt, projects
392 ordered=True, 394 ),
393 ) 395 output=out,
396 ordered=True,
397 chunksize=1,
398 )
394 399
395 if git_failed: 400 if git_failed:
396 raise GrepCommandError( 401 raise GrepCommandError(
diff --git a/subcmds/prune.py b/subcmds/prune.py
index f99082a4..18bfc680 100644
--- a/subcmds/prune.py
+++ b/subcmds/prune.py
@@ -27,8 +27,10 @@ class Prune(PagedCommand):
27""" 27"""
28 PARALLEL_JOBS = DEFAULT_LOCAL_JOBS 28 PARALLEL_JOBS = DEFAULT_LOCAL_JOBS
29 29
30 def _ExecuteOne(self, project): 30 @classmethod
31 def _ExecuteOne(cls, project_idx):
31 """Process one project.""" 32 """Process one project."""
33 project = cls.get_parallel_context()["projects"][project_idx]
32 return project.PruneHeads() 34 return project.PruneHeads()
33 35
34 def Execute(self, opt, args): 36 def Execute(self, opt, args):
@@ -41,13 +43,15 @@ class Prune(PagedCommand):
41 def _ProcessResults(_pool, _output, results): 43 def _ProcessResults(_pool, _output, results):
42 return list(itertools.chain.from_iterable(results)) 44 return list(itertools.chain.from_iterable(results))
43 45
44 all_branches = self.ExecuteInParallel( 46 with self.ParallelContext():
45 opt.jobs, 47 self.get_parallel_context()["projects"] = projects
46 self._ExecuteOne, 48 all_branches = self.ExecuteInParallel(
47 projects, 49 opt.jobs,
48 callback=_ProcessResults, 50 self._ExecuteOne,
49 ordered=True, 51 range(len(projects)),
50 ) 52 callback=_ProcessResults,
53 ordered=True,
54 )
51 55
52 if not all_branches: 56 if not all_branches:
53 return 57 return
diff --git a/subcmds/start.py b/subcmds/start.py
index 56008f42..6dca7e4e 100644
--- a/subcmds/start.py
+++ b/subcmds/start.py
@@ -21,7 +21,6 @@ from error import RepoExitError
21from git_command import git 21from git_command import git
22from git_config import IsImmutable 22from git_config import IsImmutable
23from progress import Progress 23from progress import Progress
24from project import Project
25from repo_logging import RepoLogger 24from repo_logging import RepoLogger
26 25
27 26
@@ -29,7 +28,7 @@ logger = RepoLogger(__file__)
29 28
30 29
31class ExecuteOneResult(NamedTuple): 30class ExecuteOneResult(NamedTuple):
32 project: Project 31 project_idx: int
33 error: Exception 32 error: Exception
34 33
35 34
@@ -80,18 +79,20 @@ revision specified in the manifest.
80 if not git.check_ref_format("heads/%s" % nb): 79 if not git.check_ref_format("heads/%s" % nb):
81 self.OptionParser.error("'%s' is not a valid name" % nb) 80 self.OptionParser.error("'%s' is not a valid name" % nb)
82 81
83 def _ExecuteOne(self, revision, nb, project): 82 @classmethod
83 def _ExecuteOne(cls, revision, nb, default_revisionExpr, project_idx):
84 """Start one project.""" 84 """Start one project."""
85 # If the current revision is immutable, such as a SHA1, a tag or 85 # If the current revision is immutable, such as a SHA1, a tag or
86 # a change, then we can't push back to it. Substitute with 86 # a change, then we can't push back to it. Substitute with
87 # dest_branch, if defined; or with manifest default revision instead. 87 # dest_branch, if defined; or with manifest default revision instead.
88 branch_merge = "" 88 branch_merge = ""
89 error = None 89 error = None
90 project = cls.get_parallel_context()["projects"][project_idx]
90 if IsImmutable(project.revisionExpr): 91 if IsImmutable(project.revisionExpr):
91 if project.dest_branch: 92 if project.dest_branch:
92 branch_merge = project.dest_branch 93 branch_merge = project.dest_branch
93 else: 94 else:
94 branch_merge = self.manifest.default.revisionExpr 95 branch_merge = default_revisionExpr
95 96
96 try: 97 try:
97 project.StartBranch( 98 project.StartBranch(
@@ -100,7 +101,7 @@ revision specified in the manifest.
100 except Exception as e: 101 except Exception as e:
101 logger.error("error: unable to checkout %s: %s", project.name, e) 102 logger.error("error: unable to checkout %s: %s", project.name, e)
102 error = e 103 error = e
103 return ExecuteOneResult(project, error) 104 return ExecuteOneResult(project_idx, error)
104 105
105 def Execute(self, opt, args): 106 def Execute(self, opt, args):
106 nb = args[0] 107 nb = args[0]
@@ -120,19 +121,28 @@ revision specified in the manifest.
120 def _ProcessResults(_pool, pm, results): 121 def _ProcessResults(_pool, pm, results):
121 for result in results: 122 for result in results:
122 if result.error: 123 if result.error:
123 err_projects.append(result.project) 124 project = all_projects[result.project_idx]
125 err_projects.append(project)
124 err.append(result.error) 126 err.append(result.error)
125 pm.update(msg="") 127 pm.update(msg="")
126 128
127 self.ExecuteInParallel( 129 with self.ParallelContext():
128 opt.jobs, 130 self.get_parallel_context()["projects"] = all_projects
129 functools.partial(self._ExecuteOne, opt.revision, nb), 131 self.ExecuteInParallel(
130 all_projects, 132 opt.jobs,
131 callback=_ProcessResults, 133 functools.partial(
132 output=Progress( 134 self._ExecuteOne,
133 f"Starting {nb}", len(all_projects), quiet=opt.quiet 135 opt.revision,
134 ), 136 nb,
135 ) 137 self.manifest.default.revisionExpr,
138 ),
139 range(len(all_projects)),
140 callback=_ProcessResults,
141 output=Progress(
142 f"Starting {nb}", len(all_projects), quiet=opt.quiet
143 ),
144 chunksize=1,
145 )
136 146
137 if err_projects: 147 if err_projects:
138 for p in err_projects: 148 for p in err_projects:
diff --git a/subcmds/status.py b/subcmds/status.py
index dac61ab6..cda73627 100644
--- a/subcmds/status.py
+++ b/subcmds/status.py
@@ -88,7 +88,8 @@ the following meanings:
88 "projects", 88 "projects",
89 ) 89 )
90 90
91 def _StatusHelper(self, quiet, local, project): 91 @classmethod
92 def _StatusHelper(cls, quiet, local, project_idx):
92 """Obtains the status for a specific project. 93 """Obtains the status for a specific project.
93 94
94 Obtains the status for a project, redirecting the output to 95 Obtains the status for a project, redirecting the output to
@@ -99,12 +100,13 @@ the following meanings:
99 local: a boolean, if True, the path is relative to the local 100 local: a boolean, if True, the path is relative to the local
100 (sub)manifest. If false, the path is relative to the outermost 101 (sub)manifest. If false, the path is relative to the outermost
101 manifest. 102 manifest.
102 project: Project to get status of. 103 project_idx: Project index to get status of.
103 104
104 Returns: 105 Returns:
105 The status of the project. 106 The status of the project.
106 """ 107 """
107 buf = io.StringIO() 108 buf = io.StringIO()
109 project = cls.get_parallel_context()["projects"][project_idx]
108 ret = project.PrintWorkTreeStatus( 110 ret = project.PrintWorkTreeStatus(
109 quiet=quiet, output_redir=buf, local=local 111 quiet=quiet, output_redir=buf, local=local
110 ) 112 )
@@ -143,15 +145,18 @@ the following meanings:
143 ret += 1 145 ret += 1
144 return ret 146 return ret
145 147
146 counter = self.ExecuteInParallel( 148 with self.ParallelContext():
147 opt.jobs, 149 self.get_parallel_context()["projects"] = all_projects
148 functools.partial( 150 counter = self.ExecuteInParallel(
149 self._StatusHelper, opt.quiet, opt.this_manifest_only 151 opt.jobs,
150 ), 152 functools.partial(
151 all_projects, 153 self._StatusHelper, opt.quiet, opt.this_manifest_only
152 callback=_ProcessResults, 154 ),
153 ordered=True, 155 range(len(all_projects)),
154 ) 156 callback=_ProcessResults,
157 ordered=True,
158 chunksize=1,
159 )
155 160
156 if not opt.quiet and len(all_projects) == counter: 161 if not opt.quiet and len(all_projects) == counter:
157 print("nothing to commit (working directory clean)") 162 print("nothing to commit (working directory clean)")
diff --git a/subcmds/upload.py b/subcmds/upload.py
index 8039a1cd..6344ee31 100644
--- a/subcmds/upload.py
+++ b/subcmds/upload.py
@@ -713,16 +713,17 @@ Gerrit Code Review: https://www.gerritcodereview.com/
713 merge_branch = p.stdout.strip() 713 merge_branch = p.stdout.strip()
714 return merge_branch 714 return merge_branch
715 715
716 @staticmethod 716 @classmethod
717 def _GatherOne(opt, project): 717 def _GatherOne(cls, opt, project_idx):
718 """Figure out the upload status for |project|.""" 718 """Figure out the upload status for |project|."""
719 project = cls.get_parallel_context()["projects"][project_idx]
719 if opt.current_branch: 720 if opt.current_branch:
720 cbr = project.CurrentBranch 721 cbr = project.CurrentBranch
721 up_branch = project.GetUploadableBranch(cbr) 722 up_branch = project.GetUploadableBranch(cbr)
722 avail = [up_branch] if up_branch else None 723 avail = [up_branch] if up_branch else None
723 else: 724 else:
724 avail = project.GetUploadableBranches(opt.branch) 725 avail = project.GetUploadableBranches(opt.branch)
725 return (project, avail) 726 return (project_idx, avail)
726 727
727 def Execute(self, opt, args): 728 def Execute(self, opt, args):
728 projects = self.GetProjects( 729 projects = self.GetProjects(
@@ -732,8 +733,9 @@ Gerrit Code Review: https://www.gerritcodereview.com/
732 def _ProcessResults(_pool, _out, results): 733 def _ProcessResults(_pool, _out, results):
733 pending = [] 734 pending = []
734 for result in results: 735 for result in results:
735 project, avail = result 736 project_idx, avail = result
736 if avail is None: 737 if avail is None:
738 project = projects[project_idx]
737 logger.error( 739 logger.error(
738 'repo: error: %s: Unable to upload branch "%s". ' 740 'repo: error: %s: Unable to upload branch "%s". '
739 "You might be able to fix the branch by running:\n" 741 "You might be able to fix the branch by running:\n"
@@ -746,12 +748,14 @@ Gerrit Code Review: https://www.gerritcodereview.com/
746 pending.append(result) 748 pending.append(result)
747 return pending 749 return pending
748 750
749 pending = self.ExecuteInParallel( 751 with self.ParallelContext():
750 opt.jobs, 752 self.get_parallel_context()["projects"] = projects
751 functools.partial(self._GatherOne, opt), 753 pending = self.ExecuteInParallel(
752 projects, 754 opt.jobs,
753 callback=_ProcessResults, 755 functools.partial(self._GatherOne, opt),
754 ) 756 range(len(projects)),
757 callback=_ProcessResults,
758 )
755 759
756 if not pending: 760 if not pending:
757 if opt.branch is None: 761 if opt.branch is None: