diff options
author | Kuang-che Wu <kcwu@google.com> | 2024-10-22 21:04:41 +0800 |
---|---|---|
committer | LUCI <gerrit-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2024-10-23 23:34:34 +0000 |
commit | 8da4861b3860c505e39341b4135c21f67569e4d8 (patch) | |
tree | 6f300266c91322df0e61953b84381e1f403074a5 | |
parent | 39ffd9977e2f6cb1ca1757e59173fc93e0eab72c (diff) | |
download | git-repo-8da4861b3860c505e39341b4135c21f67569e4d8.tar.gz |
subcmds: reduce multiprocessing serialization overhead
Follow the same approach as 39ffd9977e to reduce serialization overhead.
Below benchmarks are tested with 2.7k projects on my workstation
(warm cache). git tracing is disabled for benchmark.
(seconds) | v2.48 | v2.48 | this CL | this CL
| | -j32 | | -j32
-----------------------------------------------------------
with clean tree state:
branches (none) | 5.6 | 5.9 | 1.0 | 0.9
status (clean) | 21.3 | 9.4 | 19.4 | 4.7
diff (none) | 7.6 | 7.2 | 5.7 | 2.2
prune (none) | 5.7 | 6.1 | 1.3 | 1.2
abandon (none) | 19.4 | 18.6 | 0.9 | 0.8
upload (none) | 19.7 | 18.7 | 0.9 | 0.8
forall -c true | 7.5 | 7.6 | 0.6 | 0.6
forall -c "git log -1" | 11.3 | 11.1 | 0.6 | 0.6
with branches:
start BRANCH --all | 21.9 | 20.3 | 13.6 | 2.6
checkout BRANCH | 29.1 | 27.8 | 1.1 | 1.0
branches (2) | 28.0 | 28.6 | 1.5 | 1.3
abandon BRANCH | 29.2 | 27.5 | 9.7 | 2.2
Bug: b/371638995
Change-Id: I53989a3d1e43063587b3f52f852b1c2c56b49412
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/440221
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
Tested-by: Kuang-che Wu <kcwu@google.com>
Commit-Queue: Kuang-che Wu <kcwu@google.com>
-rw-r--r-- | command.py | 10 | ||||
-rw-r--r-- | subcmds/abandon.py | 30 | ||||
-rw-r--r-- | subcmds/branches.py | 49 | ||||
-rw-r--r-- | subcmds/checkout.py | 34 | ||||
-rw-r--r-- | subcmds/diff.py | 27 | ||||
-rw-r--r-- | subcmds/forall.py | 90 | ||||
-rw-r--r-- | subcmds/grep.py | 49 | ||||
-rw-r--r-- | subcmds/prune.py | 20 | ||||
-rw-r--r-- | subcmds/start.py | 40 | ||||
-rw-r--r-- | subcmds/status.py | 27 | ||||
-rw-r--r-- | subcmds/upload.py | 24 |
11 files changed, 228 insertions, 172 deletions
@@ -268,8 +268,10 @@ class Command: | |||
268 | cls._parallel_context = None | 268 | cls._parallel_context = None |
269 | 269 | ||
270 | @classmethod | 270 | @classmethod |
271 | def _SetParallelContext(cls, context): | 271 | def _InitParallelWorker(cls, context, initializer): |
272 | cls._parallel_context = context | 272 | cls._parallel_context = context |
273 | if initializer: | ||
274 | initializer() | ||
273 | 275 | ||
274 | @classmethod | 276 | @classmethod |
275 | def ExecuteInParallel( | 277 | def ExecuteInParallel( |
@@ -281,6 +283,7 @@ class Command: | |||
281 | output=None, | 283 | output=None, |
282 | ordered=False, | 284 | ordered=False, |
283 | chunksize=WORKER_BATCH_SIZE, | 285 | chunksize=WORKER_BATCH_SIZE, |
286 | initializer=None, | ||
284 | ): | 287 | ): |
285 | """Helper for managing parallel execution boiler plate. | 288 | """Helper for managing parallel execution boiler plate. |
286 | 289 | ||
@@ -307,6 +310,7 @@ class Command: | |||
307 | ordered: Whether the jobs should be processed in order. | 310 | ordered: Whether the jobs should be processed in order. |
308 | chunksize: The number of jobs processed in batch by parallel | 311 | chunksize: The number of jobs processed in batch by parallel |
309 | workers. | 312 | workers. |
313 | initializer: Worker initializer. | ||
310 | 314 | ||
311 | Returns: | 315 | Returns: |
312 | The |callback| function's results are returned. | 316 | The |callback| function's results are returned. |
@@ -318,8 +322,8 @@ class Command: | |||
318 | else: | 322 | else: |
319 | with multiprocessing.Pool( | 323 | with multiprocessing.Pool( |
320 | jobs, | 324 | jobs, |
321 | initializer=cls._SetParallelContext, | 325 | initializer=cls._InitParallelWorker, |
322 | initargs=(cls._parallel_context,), | 326 | initargs=(cls._parallel_context, initializer), |
323 | ) as pool: | 327 | ) as pool: |
324 | submit = pool.imap if ordered else pool.imap_unordered | 328 | submit = pool.imap if ordered else pool.imap_unordered |
325 | return callback( | 329 | return callback( |
diff --git a/subcmds/abandon.py b/subcmds/abandon.py index e280d69e..3208be6b 100644 --- a/subcmds/abandon.py +++ b/subcmds/abandon.py | |||
@@ -70,8 +70,10 @@ It is equivalent to "git branch -D <branchname>". | |||
70 | else: | 70 | else: |
71 | args.insert(0, "'All local branches'") | 71 | args.insert(0, "'All local branches'") |
72 | 72 | ||
73 | def _ExecuteOne(self, all_branches, nb, project): | 73 | @classmethod |
74 | def _ExecuteOne(cls, all_branches, nb, project_idx): | ||
74 | """Abandon one project.""" | 75 | """Abandon one project.""" |
76 | project = cls.get_parallel_context()["projects"][project_idx] | ||
75 | if all_branches: | 77 | if all_branches: |
76 | branches = project.GetBranches() | 78 | branches = project.GetBranches() |
77 | else: | 79 | else: |
@@ -89,7 +91,7 @@ It is equivalent to "git branch -D <branchname>". | |||
89 | if status is not None: | 91 | if status is not None: |
90 | ret[name] = status | 92 | ret[name] = status |
91 | 93 | ||
92 | return (ret, project, errors) | 94 | return (ret, project_idx, errors) |
93 | 95 | ||
94 | def Execute(self, opt, args): | 96 | def Execute(self, opt, args): |
95 | nb = args[0].split() | 97 | nb = args[0].split() |
@@ -102,7 +104,8 @@ It is equivalent to "git branch -D <branchname>". | |||
102 | _RelPath = lambda p: p.RelPath(local=opt.this_manifest_only) | 104 | _RelPath = lambda p: p.RelPath(local=opt.this_manifest_only) |
103 | 105 | ||
104 | def _ProcessResults(_pool, pm, states): | 106 | def _ProcessResults(_pool, pm, states): |
105 | for results, project, errors in states: | 107 | for results, project_idx, errors in states: |
108 | project = all_projects[project_idx] | ||
106 | for branch, status in results.items(): | 109 | for branch, status in results.items(): |
107 | if status: | 110 | if status: |
108 | success[branch].append(project) | 111 | success[branch].append(project) |
@@ -111,15 +114,18 @@ It is equivalent to "git branch -D <branchname>". | |||
111 | aggregate_errors.extend(errors) | 114 | aggregate_errors.extend(errors) |
112 | pm.update(msg="") | 115 | pm.update(msg="") |
113 | 116 | ||
114 | self.ExecuteInParallel( | 117 | with self.ParallelContext(): |
115 | opt.jobs, | 118 | self.get_parallel_context()["projects"] = all_projects |
116 | functools.partial(self._ExecuteOne, opt.all, nb), | 119 | self.ExecuteInParallel( |
117 | all_projects, | 120 | opt.jobs, |
118 | callback=_ProcessResults, | 121 | functools.partial(self._ExecuteOne, opt.all, nb), |
119 | output=Progress( | 122 | range(len(all_projects)), |
120 | f"Abandon {nb}", len(all_projects), quiet=opt.quiet | 123 | callback=_ProcessResults, |
121 | ), | 124 | output=Progress( |
122 | ) | 125 | f"Abandon {nb}", len(all_projects), quiet=opt.quiet |
126 | ), | ||
127 | chunksize=1, | ||
128 | ) | ||
123 | 129 | ||
124 | width = max( | 130 | width = max( |
125 | itertools.chain( | 131 | itertools.chain( |
diff --git a/subcmds/branches.py b/subcmds/branches.py index 59b5cb28..08c6389c 100644 --- a/subcmds/branches.py +++ b/subcmds/branches.py | |||
@@ -98,6 +98,22 @@ is shown, then the branch appears in all projects. | |||
98 | """ | 98 | """ |
99 | PARALLEL_JOBS = DEFAULT_LOCAL_JOBS | 99 | PARALLEL_JOBS = DEFAULT_LOCAL_JOBS |
100 | 100 | ||
101 | @classmethod | ||
102 | def _ExpandProjectToBranches(cls, project_idx): | ||
103 | """Expands a project into a list of branch names & associated info. | ||
104 | |||
105 | Args: | ||
106 | project_idx: project.Project index | ||
107 | |||
108 | Returns: | ||
109 | List[Tuple[str, git_config.Branch, int]] | ||
110 | """ | ||
111 | branches = [] | ||
112 | project = cls.get_parallel_context()["projects"][project_idx] | ||
113 | for name, b in project.GetBranches().items(): | ||
114 | branches.append((name, b, project_idx)) | ||
115 | return branches | ||
116 | |||
101 | def Execute(self, opt, args): | 117 | def Execute(self, opt, args): |
102 | projects = self.GetProjects( | 118 | projects = self.GetProjects( |
103 | args, all_manifests=not opt.this_manifest_only | 119 | args, all_manifests=not opt.this_manifest_only |
@@ -107,17 +123,20 @@ is shown, then the branch appears in all projects. | |||
107 | project_cnt = len(projects) | 123 | project_cnt = len(projects) |
108 | 124 | ||
109 | def _ProcessResults(_pool, _output, results): | 125 | def _ProcessResults(_pool, _output, results): |
110 | for name, b in itertools.chain.from_iterable(results): | 126 | for name, b, project_idx in itertools.chain.from_iterable(results): |
127 | b.project = projects[project_idx] | ||
111 | if name not in all_branches: | 128 | if name not in all_branches: |
112 | all_branches[name] = BranchInfo(name) | 129 | all_branches[name] = BranchInfo(name) |
113 | all_branches[name].add(b) | 130 | all_branches[name].add(b) |
114 | 131 | ||
115 | self.ExecuteInParallel( | 132 | with self.ParallelContext(): |
116 | opt.jobs, | 133 | self.get_parallel_context()["projects"] = projects |
117 | expand_project_to_branches, | 134 | self.ExecuteInParallel( |
118 | projects, | 135 | opt.jobs, |
119 | callback=_ProcessResults, | 136 | self._ExpandProjectToBranches, |
120 | ) | 137 | range(len(projects)), |
138 | callback=_ProcessResults, | ||
139 | ) | ||
121 | 140 | ||
122 | names = sorted(all_branches) | 141 | names = sorted(all_branches) |
123 | 142 | ||
@@ -191,19 +210,3 @@ is shown, then the branch appears in all projects. | |||
191 | else: | 210 | else: |
192 | out.write(" in all projects") | 211 | out.write(" in all projects") |
193 | out.nl() | 212 | out.nl() |
194 | |||
195 | |||
196 | def expand_project_to_branches(project): | ||
197 | """Expands a project into a list of branch names & associated information. | ||
198 | |||
199 | Args: | ||
200 | project: project.Project | ||
201 | |||
202 | Returns: | ||
203 | List[Tuple[str, git_config.Branch]] | ||
204 | """ | ||
205 | branches = [] | ||
206 | for name, b in project.GetBranches().items(): | ||
207 | b.project = project | ||
208 | branches.append((name, b)) | ||
209 | return branches | ||
diff --git a/subcmds/checkout.py b/subcmds/checkout.py index 379bfa18..859ddf6c 100644 --- a/subcmds/checkout.py +++ b/subcmds/checkout.py | |||
@@ -20,7 +20,6 @@ from command import DEFAULT_LOCAL_JOBS | |||
20 | from error import GitError | 20 | from error import GitError |
21 | from error import RepoExitError | 21 | from error import RepoExitError |
22 | from progress import Progress | 22 | from progress import Progress |
23 | from project import Project | ||
24 | from repo_logging import RepoLogger | 23 | from repo_logging import RepoLogger |
25 | 24 | ||
26 | 25 | ||
@@ -30,7 +29,7 @@ logger = RepoLogger(__file__) | |||
30 | class CheckoutBranchResult(NamedTuple): | 29 | class CheckoutBranchResult(NamedTuple): |
31 | # Whether the Project is on the branch (i.e. branch exists and no errors) | 30 | # Whether the Project is on the branch (i.e. branch exists and no errors) |
32 | result: bool | 31 | result: bool |
33 | project: Project | 32 | project_idx: int |
34 | error: Exception | 33 | error: Exception |
35 | 34 | ||
36 | 35 | ||
@@ -62,15 +61,17 @@ The command is equivalent to: | |||
62 | if not args: | 61 | if not args: |
63 | self.Usage() | 62 | self.Usage() |
64 | 63 | ||
65 | def _ExecuteOne(self, nb, project): | 64 | @classmethod |
65 | def _ExecuteOne(cls, nb, project_idx): | ||
66 | """Checkout one project.""" | 66 | """Checkout one project.""" |
67 | error = None | 67 | error = None |
68 | result = None | 68 | result = None |
69 | project = cls.get_parallel_context()["projects"][project_idx] | ||
69 | try: | 70 | try: |
70 | result = project.CheckoutBranch(nb) | 71 | result = project.CheckoutBranch(nb) |
71 | except GitError as e: | 72 | except GitError as e: |
72 | error = e | 73 | error = e |
73 | return CheckoutBranchResult(result, project, error) | 74 | return CheckoutBranchResult(result, project_idx, error) |
74 | 75 | ||
75 | def Execute(self, opt, args): | 76 | def Execute(self, opt, args): |
76 | nb = args[0] | 77 | nb = args[0] |
@@ -83,22 +84,25 @@ The command is equivalent to: | |||
83 | 84 | ||
84 | def _ProcessResults(_pool, pm, results): | 85 | def _ProcessResults(_pool, pm, results): |
85 | for result in results: | 86 | for result in results: |
87 | project = all_projects[result.project_idx] | ||
86 | if result.error is not None: | 88 | if result.error is not None: |
87 | err.append(result.error) | 89 | err.append(result.error) |
88 | err_projects.append(result.project) | 90 | err_projects.append(project) |
89 | elif result.result: | 91 | elif result.result: |
90 | success.append(result.project) | 92 | success.append(project) |
91 | pm.update(msg="") | 93 | pm.update(msg="") |
92 | 94 | ||
93 | self.ExecuteInParallel( | 95 | with self.ParallelContext(): |
94 | opt.jobs, | 96 | self.get_parallel_context()["projects"] = all_projects |
95 | functools.partial(self._ExecuteOne, nb), | 97 | self.ExecuteInParallel( |
96 | all_projects, | 98 | opt.jobs, |
97 | callback=_ProcessResults, | 99 | functools.partial(self._ExecuteOne, nb), |
98 | output=Progress( | 100 | range(len(all_projects)), |
99 | f"Checkout {nb}", len(all_projects), quiet=opt.quiet | 101 | callback=_ProcessResults, |
100 | ), | 102 | output=Progress( |
101 | ) | 103 | f"Checkout {nb}", len(all_projects), quiet=opt.quiet |
104 | ), | ||
105 | ) | ||
102 | 106 | ||
103 | if err_projects: | 107 | if err_projects: |
104 | for p in err_projects: | 108 | for p in err_projects: |
diff --git a/subcmds/diff.py b/subcmds/diff.py index d9d72b40..7bb0cbbd 100644 --- a/subcmds/diff.py +++ b/subcmds/diff.py | |||
@@ -40,7 +40,8 @@ to the Unix 'patch' command. | |||
40 | help="paths are relative to the repository root", | 40 | help="paths are relative to the repository root", |
41 | ) | 41 | ) |
42 | 42 | ||
43 | def _ExecuteOne(self, absolute, local, project): | 43 | @classmethod |
44 | def _ExecuteOne(cls, absolute, local, project_idx): | ||
44 | """Obtains the diff for a specific project. | 45 | """Obtains the diff for a specific project. |
45 | 46 | ||
46 | Args: | 47 | Args: |
@@ -48,12 +49,13 @@ to the Unix 'patch' command. | |||
48 | local: a boolean, if True, the path is relative to the local | 49 | local: a boolean, if True, the path is relative to the local |
49 | (sub)manifest. If false, the path is relative to the outermost | 50 | (sub)manifest. If false, the path is relative to the outermost |
50 | manifest. | 51 | manifest. |
51 | project: Project to get status of. | 52 | project_idx: Project index to get status of. |
52 | 53 | ||
53 | Returns: | 54 | Returns: |
54 | The status of the project. | 55 | The status of the project. |
55 | """ | 56 | """ |
56 | buf = io.StringIO() | 57 | buf = io.StringIO() |
58 | project = cls.get_parallel_context()["projects"][project_idx] | ||
57 | ret = project.PrintWorkTreeDiff(absolute, output_redir=buf, local=local) | 59 | ret = project.PrintWorkTreeDiff(absolute, output_redir=buf, local=local) |
58 | return (ret, buf.getvalue()) | 60 | return (ret, buf.getvalue()) |
59 | 61 | ||
@@ -71,12 +73,15 @@ to the Unix 'patch' command. | |||
71 | ret = 1 | 73 | ret = 1 |
72 | return ret | 74 | return ret |
73 | 75 | ||
74 | return self.ExecuteInParallel( | 76 | with self.ParallelContext(): |
75 | opt.jobs, | 77 | self.get_parallel_context()["projects"] = all_projects |
76 | functools.partial( | 78 | return self.ExecuteInParallel( |
77 | self._ExecuteOne, opt.absolute, opt.this_manifest_only | 79 | opt.jobs, |
78 | ), | 80 | functools.partial( |
79 | all_projects, | 81 | self._ExecuteOne, opt.absolute, opt.this_manifest_only |
80 | callback=_ProcessResults, | 82 | ), |
81 | ordered=True, | 83 | range(len(all_projects)), |
82 | ) | 84 | callback=_ProcessResults, |
85 | ordered=True, | ||
86 | chunksize=1, | ||
87 | ) | ||
diff --git a/subcmds/forall.py b/subcmds/forall.py index 287f2e04..e5fc9e80 100644 --- a/subcmds/forall.py +++ b/subcmds/forall.py | |||
@@ -15,7 +15,6 @@ | |||
15 | import errno | 15 | import errno |
16 | import functools | 16 | import functools |
17 | import io | 17 | import io |
18 | import multiprocessing | ||
19 | import os | 18 | import os |
20 | import re | 19 | import re |
21 | import signal | 20 | import signal |
@@ -26,7 +25,6 @@ from color import Coloring | |||
26 | from command import Command | 25 | from command import Command |
27 | from command import DEFAULT_LOCAL_JOBS | 26 | from command import DEFAULT_LOCAL_JOBS |
28 | from command import MirrorSafeCommand | 27 | from command import MirrorSafeCommand |
29 | from command import WORKER_BATCH_SIZE | ||
30 | from error import ManifestInvalidRevisionError | 28 | from error import ManifestInvalidRevisionError |
31 | from repo_logging import RepoLogger | 29 | from repo_logging import RepoLogger |
32 | 30 | ||
@@ -241,7 +239,6 @@ without iterating through the remaining projects. | |||
241 | cmd.insert(cmd.index(cn) + 1, "--color") | 239 | cmd.insert(cmd.index(cn) + 1, "--color") |
242 | 240 | ||
243 | mirror = self.manifest.IsMirror | 241 | mirror = self.manifest.IsMirror |
244 | rc = 0 | ||
245 | 242 | ||
246 | smart_sync_manifest_name = "smart_sync_override.xml" | 243 | smart_sync_manifest_name = "smart_sync_override.xml" |
247 | smart_sync_manifest_path = os.path.join( | 244 | smart_sync_manifest_path = os.path.join( |
@@ -264,32 +261,41 @@ without iterating through the remaining projects. | |||
264 | 261 | ||
265 | os.environ["REPO_COUNT"] = str(len(projects)) | 262 | os.environ["REPO_COUNT"] = str(len(projects)) |
266 | 263 | ||
264 | def _ProcessResults(_pool, _output, results): | ||
265 | rc = 0 | ||
266 | first = True | ||
267 | for r, output in results: | ||
268 | if output: | ||
269 | if first: | ||
270 | first = False | ||
271 | elif opt.project_header: | ||
272 | print() | ||
273 | # To simplify the DoWorkWrapper, take care of automatic | ||
274 | # newlines. | ||
275 | end = "\n" | ||
276 | if output[-1] == "\n": | ||
277 | end = "" | ||
278 | print(output, end=end) | ||
279 | rc = rc or r | ||
280 | if r != 0 and opt.abort_on_errors: | ||
281 | raise Exception("Aborting due to previous error") | ||
282 | return rc | ||
283 | |||
267 | try: | 284 | try: |
268 | config = self.manifest.manifestProject.config | 285 | config = self.manifest.manifestProject.config |
269 | with multiprocessing.Pool(opt.jobs, InitWorker) as pool: | 286 | with self.ParallelContext(): |
270 | results_it = pool.imap( | 287 | self.get_parallel_context()["projects"] = projects |
288 | rc = self.ExecuteInParallel( | ||
289 | opt.jobs, | ||
271 | functools.partial( | 290 | functools.partial( |
272 | DoWorkWrapper, mirror, opt, cmd, shell, config | 291 | self.DoWorkWrapper, mirror, opt, cmd, shell, config |
273 | ), | 292 | ), |
274 | enumerate(projects), | 293 | range(len(projects)), |
275 | chunksize=WORKER_BATCH_SIZE, | 294 | callback=_ProcessResults, |
295 | ordered=True, | ||
296 | initializer=self.InitWorker, | ||
297 | chunksize=1, | ||
276 | ) | 298 | ) |
277 | first = True | ||
278 | for r, output in results_it: | ||
279 | if output: | ||
280 | if first: | ||
281 | first = False | ||
282 | elif opt.project_header: | ||
283 | print() | ||
284 | # To simplify the DoWorkWrapper, take care of automatic | ||
285 | # newlines. | ||
286 | end = "\n" | ||
287 | if output[-1] == "\n": | ||
288 | end = "" | ||
289 | print(output, end=end) | ||
290 | rc = rc or r | ||
291 | if r != 0 and opt.abort_on_errors: | ||
292 | raise Exception("Aborting due to previous error") | ||
293 | except (KeyboardInterrupt, WorkerKeyboardInterrupt): | 299 | except (KeyboardInterrupt, WorkerKeyboardInterrupt): |
294 | # Catch KeyboardInterrupt raised inside and outside of workers | 300 | # Catch KeyboardInterrupt raised inside and outside of workers |
295 | rc = rc or errno.EINTR | 301 | rc = rc or errno.EINTR |
@@ -304,29 +310,29 @@ without iterating through the remaining projects. | |||
304 | if rc != 0: | 310 | if rc != 0: |
305 | sys.exit(rc) | 311 | sys.exit(rc) |
306 | 312 | ||
313 | @classmethod | ||
314 | def InitWorker(cls): | ||
315 | signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
307 | 316 | ||
308 | class WorkerKeyboardInterrupt(Exception): | 317 | @classmethod |
309 | """Keyboard interrupt exception for worker processes.""" | 318 | def DoWorkWrapper(cls, mirror, opt, cmd, shell, config, project_idx): |
310 | 319 | """A wrapper around the DoWork() method. | |
311 | |||
312 | def InitWorker(): | ||
313 | signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
314 | 320 | ||
321 | Catch the KeyboardInterrupt exceptions here and re-raise them as a | ||
322 | different, ``Exception``-based exception to stop it flooding the console | ||
323 | with stacktraces and making the parent hang indefinitely. | ||
315 | 324 | ||
316 | def DoWorkWrapper(mirror, opt, cmd, shell, config, args): | 325 | """ |
317 | """A wrapper around the DoWork() method. | 326 | project = cls.get_parallel_context()["projects"][project_idx] |
327 | try: | ||
328 | return DoWork(project, mirror, opt, cmd, shell, project_idx, config) | ||
329 | except KeyboardInterrupt: | ||
330 | print("%s: Worker interrupted" % project.name) | ||
331 | raise WorkerKeyboardInterrupt() | ||
318 | 332 | ||
319 | Catch the KeyboardInterrupt exceptions here and re-raise them as a | ||
320 | different, ``Exception``-based exception to stop it flooding the console | ||
321 | with stacktraces and making the parent hang indefinitely. | ||
322 | 333 | ||
323 | """ | 334 | class WorkerKeyboardInterrupt(Exception): |
324 | cnt, project = args | 335 | """Keyboard interrupt exception for worker processes.""" |
325 | try: | ||
326 | return DoWork(project, mirror, opt, cmd, shell, cnt, config) | ||
327 | except KeyboardInterrupt: | ||
328 | print("%s: Worker interrupted" % project.name) | ||
329 | raise WorkerKeyboardInterrupt() | ||
330 | 336 | ||
331 | 337 | ||
332 | def DoWork(project, mirror, opt, cmd, shell, cnt, config): | 338 | def DoWork(project, mirror, opt, cmd, shell, cnt, config): |
diff --git a/subcmds/grep.py b/subcmds/grep.py index b677b6bd..918651d9 100644 --- a/subcmds/grep.py +++ b/subcmds/grep.py | |||
@@ -23,7 +23,6 @@ from error import GitError | |||
23 | from error import InvalidArgumentsError | 23 | from error import InvalidArgumentsError |
24 | from error import SilentRepoExitError | 24 | from error import SilentRepoExitError |
25 | from git_command import GitCommand | 25 | from git_command import GitCommand |
26 | from project import Project | ||
27 | from repo_logging import RepoLogger | 26 | from repo_logging import RepoLogger |
28 | 27 | ||
29 | 28 | ||
@@ -40,7 +39,7 @@ class GrepColoring(Coloring): | |||
40 | class ExecuteOneResult(NamedTuple): | 39 | class ExecuteOneResult(NamedTuple): |
41 | """Result from an execute instance.""" | 40 | """Result from an execute instance.""" |
42 | 41 | ||
43 | project: Project | 42 | project_idx: int |
44 | rc: int | 43 | rc: int |
45 | stdout: str | 44 | stdout: str |
46 | stderr: str | 45 | stderr: str |
@@ -262,8 +261,10 @@ contain a line that matches both expressions: | |||
262 | help="Show only file names not containing matching lines", | 261 | help="Show only file names not containing matching lines", |
263 | ) | 262 | ) |
264 | 263 | ||
265 | def _ExecuteOne(self, cmd_argv, project): | 264 | @classmethod |
265 | def _ExecuteOne(cls, cmd_argv, project_idx): | ||
266 | """Process one project.""" | 266 | """Process one project.""" |
267 | project = cls.get_parallel_context()["projects"][project_idx] | ||
267 | try: | 268 | try: |
268 | p = GitCommand( | 269 | p = GitCommand( |
269 | project, | 270 | project, |
@@ -274,7 +275,7 @@ contain a line that matches both expressions: | |||
274 | verify_command=True, | 275 | verify_command=True, |
275 | ) | 276 | ) |
276 | except GitError as e: | 277 | except GitError as e: |
277 | return ExecuteOneResult(project, -1, None, str(e), e) | 278 | return ExecuteOneResult(project_idx, -1, None, str(e), e) |
278 | 279 | ||
279 | try: | 280 | try: |
280 | error = None | 281 | error = None |
@@ -282,10 +283,12 @@ contain a line that matches both expressions: | |||
282 | except GitError as e: | 283 | except GitError as e: |
283 | rc = 1 | 284 | rc = 1 |
284 | error = e | 285 | error = e |
285 | return ExecuteOneResult(project, rc, p.stdout, p.stderr, error) | 286 | return ExecuteOneResult(project_idx, rc, p.stdout, p.stderr, error) |
286 | 287 | ||
287 | @staticmethod | 288 | @staticmethod |
288 | def _ProcessResults(full_name, have_rev, opt, _pool, out, results): | 289 | def _ProcessResults( |
290 | full_name, have_rev, opt, projects, _pool, out, results | ||
291 | ): | ||
289 | git_failed = False | 292 | git_failed = False |
290 | bad_rev = False | 293 | bad_rev = False |
291 | have_match = False | 294 | have_match = False |
@@ -293,9 +296,10 @@ contain a line that matches both expressions: | |||
293 | errors = [] | 296 | errors = [] |
294 | 297 | ||
295 | for result in results: | 298 | for result in results: |
299 | project = projects[result.project_idx] | ||
296 | if result.rc < 0: | 300 | if result.rc < 0: |
297 | git_failed = True | 301 | git_failed = True |
298 | out.project("--- project %s ---" % _RelPath(result.project)) | 302 | out.project("--- project %s ---" % _RelPath(project)) |
299 | out.nl() | 303 | out.nl() |
300 | out.fail("%s", result.stderr) | 304 | out.fail("%s", result.stderr) |
301 | out.nl() | 305 | out.nl() |
@@ -311,9 +315,7 @@ contain a line that matches both expressions: | |||
311 | ): | 315 | ): |
312 | bad_rev = True | 316 | bad_rev = True |
313 | else: | 317 | else: |
314 | out.project( | 318 | out.project("--- project %s ---" % _RelPath(project)) |
315 | "--- project %s ---" % _RelPath(result.project) | ||
316 | ) | ||
317 | out.nl() | 319 | out.nl() |
318 | out.fail("%s", result.stderr.strip()) | 320 | out.fail("%s", result.stderr.strip()) |
319 | out.nl() | 321 | out.nl() |
@@ -331,13 +333,13 @@ contain a line that matches both expressions: | |||
331 | rev, line = line.split(":", 1) | 333 | rev, line = line.split(":", 1) |
332 | out.write("%s", rev) | 334 | out.write("%s", rev) |
333 | out.write(":") | 335 | out.write(":") |
334 | out.project(_RelPath(result.project)) | 336 | out.project(_RelPath(project)) |
335 | out.write("/") | 337 | out.write("/") |
336 | out.write("%s", line) | 338 | out.write("%s", line) |
337 | out.nl() | 339 | out.nl() |
338 | elif full_name: | 340 | elif full_name: |
339 | for line in r: | 341 | for line in r: |
340 | out.project(_RelPath(result.project)) | 342 | out.project(_RelPath(project)) |
341 | out.write("/") | 343 | out.write("/") |
342 | out.write("%s", line) | 344 | out.write("%s", line) |
343 | out.nl() | 345 | out.nl() |
@@ -381,16 +383,19 @@ contain a line that matches both expressions: | |||
381 | cmd_argv.extend(opt.revision) | 383 | cmd_argv.extend(opt.revision) |
382 | cmd_argv.append("--") | 384 | cmd_argv.append("--") |
383 | 385 | ||
384 | git_failed, bad_rev, have_match, errors = self.ExecuteInParallel( | 386 | with self.ParallelContext(): |
385 | opt.jobs, | 387 | self.get_parallel_context()["projects"] = projects |
386 | functools.partial(self._ExecuteOne, cmd_argv), | 388 | git_failed, bad_rev, have_match, errors = self.ExecuteInParallel( |
387 | projects, | 389 | opt.jobs, |
388 | callback=functools.partial( | 390 | functools.partial(self._ExecuteOne, cmd_argv), |
389 | self._ProcessResults, full_name, have_rev, opt | 391 | range(len(projects)), |
390 | ), | 392 | callback=functools.partial( |
391 | output=out, | 393 | self._ProcessResults, full_name, have_rev, opt, projects |
392 | ordered=True, | 394 | ), |
393 | ) | 395 | output=out, |
396 | ordered=True, | ||
397 | chunksize=1, | ||
398 | ) | ||
394 | 399 | ||
395 | if git_failed: | 400 | if git_failed: |
396 | raise GrepCommandError( | 401 | raise GrepCommandError( |
diff --git a/subcmds/prune.py b/subcmds/prune.py index f99082a4..18bfc680 100644 --- a/subcmds/prune.py +++ b/subcmds/prune.py | |||
@@ -27,8 +27,10 @@ class Prune(PagedCommand): | |||
27 | """ | 27 | """ |
28 | PARALLEL_JOBS = DEFAULT_LOCAL_JOBS | 28 | PARALLEL_JOBS = DEFAULT_LOCAL_JOBS |
29 | 29 | ||
30 | def _ExecuteOne(self, project): | 30 | @classmethod |
31 | def _ExecuteOne(cls, project_idx): | ||
31 | """Process one project.""" | 32 | """Process one project.""" |
33 | project = cls.get_parallel_context()["projects"][project_idx] | ||
32 | return project.PruneHeads() | 34 | return project.PruneHeads() |
33 | 35 | ||
34 | def Execute(self, opt, args): | 36 | def Execute(self, opt, args): |
@@ -41,13 +43,15 @@ class Prune(PagedCommand): | |||
41 | def _ProcessResults(_pool, _output, results): | 43 | def _ProcessResults(_pool, _output, results): |
42 | return list(itertools.chain.from_iterable(results)) | 44 | return list(itertools.chain.from_iterable(results)) |
43 | 45 | ||
44 | all_branches = self.ExecuteInParallel( | 46 | with self.ParallelContext(): |
45 | opt.jobs, | 47 | self.get_parallel_context()["projects"] = projects |
46 | self._ExecuteOne, | 48 | all_branches = self.ExecuteInParallel( |
47 | projects, | 49 | opt.jobs, |
48 | callback=_ProcessResults, | 50 | self._ExecuteOne, |
49 | ordered=True, | 51 | range(len(projects)), |
50 | ) | 52 | callback=_ProcessResults, |
53 | ordered=True, | ||
54 | ) | ||
51 | 55 | ||
52 | if not all_branches: | 56 | if not all_branches: |
53 | return | 57 | return |
diff --git a/subcmds/start.py b/subcmds/start.py index 56008f42..6dca7e4e 100644 --- a/subcmds/start.py +++ b/subcmds/start.py | |||
@@ -21,7 +21,6 @@ from error import RepoExitError | |||
21 | from git_command import git | 21 | from git_command import git |
22 | from git_config import IsImmutable | 22 | from git_config import IsImmutable |
23 | from progress import Progress | 23 | from progress import Progress |
24 | from project import Project | ||
25 | from repo_logging import RepoLogger | 24 | from repo_logging import RepoLogger |
26 | 25 | ||
27 | 26 | ||
@@ -29,7 +28,7 @@ logger = RepoLogger(__file__) | |||
29 | 28 | ||
30 | 29 | ||
31 | class ExecuteOneResult(NamedTuple): | 30 | class ExecuteOneResult(NamedTuple): |
32 | project: Project | 31 | project_idx: int |
33 | error: Exception | 32 | error: Exception |
34 | 33 | ||
35 | 34 | ||
@@ -80,18 +79,20 @@ revision specified in the manifest. | |||
80 | if not git.check_ref_format("heads/%s" % nb): | 79 | if not git.check_ref_format("heads/%s" % nb): |
81 | self.OptionParser.error("'%s' is not a valid name" % nb) | 80 | self.OptionParser.error("'%s' is not a valid name" % nb) |
82 | 81 | ||
83 | def _ExecuteOne(self, revision, nb, project): | 82 | @classmethod |
83 | def _ExecuteOne(cls, revision, nb, default_revisionExpr, project_idx): | ||
84 | """Start one project.""" | 84 | """Start one project.""" |
85 | # If the current revision is immutable, such as a SHA1, a tag or | 85 | # If the current revision is immutable, such as a SHA1, a tag or |
86 | # a change, then we can't push back to it. Substitute with | 86 | # a change, then we can't push back to it. Substitute with |
87 | # dest_branch, if defined; or with manifest default revision instead. | 87 | # dest_branch, if defined; or with manifest default revision instead. |
88 | branch_merge = "" | 88 | branch_merge = "" |
89 | error = None | 89 | error = None |
90 | project = cls.get_parallel_context()["projects"][project_idx] | ||
90 | if IsImmutable(project.revisionExpr): | 91 | if IsImmutable(project.revisionExpr): |
91 | if project.dest_branch: | 92 | if project.dest_branch: |
92 | branch_merge = project.dest_branch | 93 | branch_merge = project.dest_branch |
93 | else: | 94 | else: |
94 | branch_merge = self.manifest.default.revisionExpr | 95 | branch_merge = default_revisionExpr |
95 | 96 | ||
96 | try: | 97 | try: |
97 | project.StartBranch( | 98 | project.StartBranch( |
@@ -100,7 +101,7 @@ revision specified in the manifest. | |||
100 | except Exception as e: | 101 | except Exception as e: |
101 | logger.error("error: unable to checkout %s: %s", project.name, e) | 102 | logger.error("error: unable to checkout %s: %s", project.name, e) |
102 | error = e | 103 | error = e |
103 | return ExecuteOneResult(project, error) | 104 | return ExecuteOneResult(project_idx, error) |
104 | 105 | ||
105 | def Execute(self, opt, args): | 106 | def Execute(self, opt, args): |
106 | nb = args[0] | 107 | nb = args[0] |
@@ -120,19 +121,28 @@ revision specified in the manifest. | |||
120 | def _ProcessResults(_pool, pm, results): | 121 | def _ProcessResults(_pool, pm, results): |
121 | for result in results: | 122 | for result in results: |
122 | if result.error: | 123 | if result.error: |
123 | err_projects.append(result.project) | 124 | project = all_projects[result.project_idx] |
125 | err_projects.append(project) | ||
124 | err.append(result.error) | 126 | err.append(result.error) |
125 | pm.update(msg="") | 127 | pm.update(msg="") |
126 | 128 | ||
127 | self.ExecuteInParallel( | 129 | with self.ParallelContext(): |
128 | opt.jobs, | 130 | self.get_parallel_context()["projects"] = all_projects |
129 | functools.partial(self._ExecuteOne, opt.revision, nb), | 131 | self.ExecuteInParallel( |
130 | all_projects, | 132 | opt.jobs, |
131 | callback=_ProcessResults, | 133 | functools.partial( |
132 | output=Progress( | 134 | self._ExecuteOne, |
133 | f"Starting {nb}", len(all_projects), quiet=opt.quiet | 135 | opt.revision, |
134 | ), | 136 | nb, |
135 | ) | 137 | self.manifest.default.revisionExpr, |
138 | ), | ||
139 | range(len(all_projects)), | ||
140 | callback=_ProcessResults, | ||
141 | output=Progress( | ||
142 | f"Starting {nb}", len(all_projects), quiet=opt.quiet | ||
143 | ), | ||
144 | chunksize=1, | ||
145 | ) | ||
136 | 146 | ||
137 | if err_projects: | 147 | if err_projects: |
138 | for p in err_projects: | 148 | for p in err_projects: |
diff --git a/subcmds/status.py b/subcmds/status.py index dac61ab6..cda73627 100644 --- a/subcmds/status.py +++ b/subcmds/status.py | |||
@@ -88,7 +88,8 @@ the following meanings: | |||
88 | "projects", | 88 | "projects", |
89 | ) | 89 | ) |
90 | 90 | ||
91 | def _StatusHelper(self, quiet, local, project): | 91 | @classmethod |
92 | def _StatusHelper(cls, quiet, local, project_idx): | ||
92 | """Obtains the status for a specific project. | 93 | """Obtains the status for a specific project. |
93 | 94 | ||
94 | Obtains the status for a project, redirecting the output to | 95 | Obtains the status for a project, redirecting the output to |
@@ -99,12 +100,13 @@ the following meanings: | |||
99 | local: a boolean, if True, the path is relative to the local | 100 | local: a boolean, if True, the path is relative to the local |
100 | (sub)manifest. If false, the path is relative to the outermost | 101 | (sub)manifest. If false, the path is relative to the outermost |
101 | manifest. | 102 | manifest. |
102 | project: Project to get status of. | 103 | project_idx: Project index to get status of. |
103 | 104 | ||
104 | Returns: | 105 | Returns: |
105 | The status of the project. | 106 | The status of the project. |
106 | """ | 107 | """ |
107 | buf = io.StringIO() | 108 | buf = io.StringIO() |
109 | project = cls.get_parallel_context()["projects"][project_idx] | ||
108 | ret = project.PrintWorkTreeStatus( | 110 | ret = project.PrintWorkTreeStatus( |
109 | quiet=quiet, output_redir=buf, local=local | 111 | quiet=quiet, output_redir=buf, local=local |
110 | ) | 112 | ) |
@@ -143,15 +145,18 @@ the following meanings: | |||
143 | ret += 1 | 145 | ret += 1 |
144 | return ret | 146 | return ret |
145 | 147 | ||
146 | counter = self.ExecuteInParallel( | 148 | with self.ParallelContext(): |
147 | opt.jobs, | 149 | self.get_parallel_context()["projects"] = all_projects |
148 | functools.partial( | 150 | counter = self.ExecuteInParallel( |
149 | self._StatusHelper, opt.quiet, opt.this_manifest_only | 151 | opt.jobs, |
150 | ), | 152 | functools.partial( |
151 | all_projects, | 153 | self._StatusHelper, opt.quiet, opt.this_manifest_only |
152 | callback=_ProcessResults, | 154 | ), |
153 | ordered=True, | 155 | range(len(all_projects)), |
154 | ) | 156 | callback=_ProcessResults, |
157 | ordered=True, | ||
158 | chunksize=1, | ||
159 | ) | ||
155 | 160 | ||
156 | if not opt.quiet and len(all_projects) == counter: | 161 | if not opt.quiet and len(all_projects) == counter: |
157 | print("nothing to commit (working directory clean)") | 162 | print("nothing to commit (working directory clean)") |
diff --git a/subcmds/upload.py b/subcmds/upload.py index 8039a1cd..6344ee31 100644 --- a/subcmds/upload.py +++ b/subcmds/upload.py | |||
@@ -713,16 +713,17 @@ Gerrit Code Review: https://www.gerritcodereview.com/ | |||
713 | merge_branch = p.stdout.strip() | 713 | merge_branch = p.stdout.strip() |
714 | return merge_branch | 714 | return merge_branch |
715 | 715 | ||
716 | @staticmethod | 716 | @classmethod |
717 | def _GatherOne(opt, project): | 717 | def _GatherOne(cls, opt, project_idx): |
718 | """Figure out the upload status for |project|.""" | 718 | """Figure out the upload status for |project|.""" |
719 | project = cls.get_parallel_context()["projects"][project_idx] | ||
719 | if opt.current_branch: | 720 | if opt.current_branch: |
720 | cbr = project.CurrentBranch | 721 | cbr = project.CurrentBranch |
721 | up_branch = project.GetUploadableBranch(cbr) | 722 | up_branch = project.GetUploadableBranch(cbr) |
722 | avail = [up_branch] if up_branch else None | 723 | avail = [up_branch] if up_branch else None |
723 | else: | 724 | else: |
724 | avail = project.GetUploadableBranches(opt.branch) | 725 | avail = project.GetUploadableBranches(opt.branch) |
725 | return (project, avail) | 726 | return (project_idx, avail) |
726 | 727 | ||
727 | def Execute(self, opt, args): | 728 | def Execute(self, opt, args): |
728 | projects = self.GetProjects( | 729 | projects = self.GetProjects( |
@@ -732,8 +733,9 @@ Gerrit Code Review: https://www.gerritcodereview.com/ | |||
732 | def _ProcessResults(_pool, _out, results): | 733 | def _ProcessResults(_pool, _out, results): |
733 | pending = [] | 734 | pending = [] |
734 | for result in results: | 735 | for result in results: |
735 | project, avail = result | 736 | project_idx, avail = result |
736 | if avail is None: | 737 | if avail is None: |
738 | project = projects[project_idx] | ||
737 | logger.error( | 739 | logger.error( |
738 | 'repo: error: %s: Unable to upload branch "%s". ' | 740 | 'repo: error: %s: Unable to upload branch "%s". ' |
739 | "You might be able to fix the branch by running:\n" | 741 | "You might be able to fix the branch by running:\n" |
@@ -746,12 +748,14 @@ Gerrit Code Review: https://www.gerritcodereview.com/ | |||
746 | pending.append(result) | 748 | pending.append(result) |
747 | return pending | 749 | return pending |
748 | 750 | ||
749 | pending = self.ExecuteInParallel( | 751 | with self.ParallelContext(): |
750 | opt.jobs, | 752 | self.get_parallel_context()["projects"] = projects |
751 | functools.partial(self._GatherOne, opt), | 753 | pending = self.ExecuteInParallel( |
752 | projects, | 754 | opt.jobs, |
753 | callback=_ProcessResults, | 755 | functools.partial(self._GatherOne, opt), |
754 | ) | 756 | range(len(projects)), |
757 | callback=_ProcessResults, | ||
758 | ) | ||
755 | 759 | ||
756 | if not pending: | 760 | if not pending: |
757 | if opt.branch is None: | 761 | if opt.branch is None: |