summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMike Frysinger <vapier@google.com>2021-03-01 00:56:38 -0500
committerMike Frysinger <vapier@google.com>2021-04-15 05:10:16 +0000
commitb5d075d04f1e555f85aad27e74f16073a50b2ae6 (patch)
treeb7342a0cd0a8d081cceb801b615bf8bbe1cc5647
parentb8bf291ddbe00731d441a34cbf1ec5b5f95f401b (diff)
downloadgit-repo-b5d075d04f1e555f85aad27e74f16073a50b2ae6.tar.gz
command: add a helper for the parallel execution boilerplate
Now that we have a bunch of subcommands doing parallel execution, a common pattern arises that we can factor out for most of them. We leave forall alone as it's a bit too complicated atm to cut over. Change-Id: I3617a4f7c66142bcd1ab030cb4cca698a65010ac Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/301942 Tested-by: Mike Frysinger <vapier@google.com> Reviewed-by: Chris Mcdonald <cjmcdonald@google.com>
-rw-r--r--command.py40
-rw-r--r--subcmds/abandon.py26
-rw-r--r--subcmds/branches.py16
-rw-r--r--subcmds/checkout.py22
-rw-r--r--subcmds/diff.py33
-rw-r--r--subcmds/grep.py24
-rw-r--r--subcmds/prune.py20
-rw-r--r--subcmds/start.py26
-rw-r--r--subcmds/status.py32
-rw-r--r--subcmds/sync.py49
10 files changed, 145 insertions, 143 deletions
diff --git a/command.py b/command.py
index be2d6a6e..9b1220dc 100644
--- a/command.py
+++ b/command.py
@@ -12,6 +12,7 @@
12# See the License for the specific language governing permissions and 12# See the License for the specific language governing permissions and
13# limitations under the License. 13# limitations under the License.
14 14
15import multiprocessing
15import os 16import os
16import optparse 17import optparse
17import platform 18import platform
@@ -21,6 +22,7 @@ import sys
21from event_log import EventLog 22from event_log import EventLog
22from error import NoSuchProjectError 23from error import NoSuchProjectError
23from error import InvalidProjectGroupsError 24from error import InvalidProjectGroupsError
25import progress
24 26
25 27
26# Number of projects to submit to a single worker process at a time. 28# Number of projects to submit to a single worker process at a time.
@@ -156,6 +158,44 @@ class Command(object):
156 """ 158 """
157 raise NotImplementedError 159 raise NotImplementedError
158 160
161 @staticmethod
162 def ExecuteInParallel(jobs, func, inputs, callback, output=None, ordered=False):
163 """Helper for managing parallel execution boiler plate.
164
165 For subcommands that can easily split their work up.
166
167 Args:
168 jobs: How many parallel processes to use.
169 func: The function to apply to each of the |inputs|. Usually a
170 functools.partial for wrapping additional arguments. It will be run
171 in a separate process, so it must be pickalable, so nested functions
172 won't work. Methods on the subcommand Command class should work.
173 inputs: The list of items to process. Must be a list.
174 callback: The function to pass the results to for processing. It will be
175 executed in the main thread and process the results of |func| as they
176 become available. Thus it may be a local nested function. Its return
177 value is passed back directly. It takes three arguments:
178 - The processing pool (or None with one job).
179 - The |output| argument.
180 - An iterator for the results.
181 output: An output manager. May be progress.Progess or color.Coloring.
182 ordered: Whether the jobs should be processed in order.
183
184 Returns:
185 The |callback| function's results are returned.
186 """
187 try:
188 # NB: Multiprocessing is heavy, so don't spin it up for one job.
189 if len(inputs) == 1 or jobs == 1:
190 return callback(None, output, (func(x) for x in inputs))
191 else:
192 with multiprocessing.Pool(jobs) as pool:
193 submit = pool.imap if ordered else pool.imap_unordered
194 return callback(pool, output, submit(func, inputs, chunksize=WORKER_BATCH_SIZE))
195 finally:
196 if isinstance(output, progress.Progress):
197 output.end()
198
159 def _ResetPathToProjectMap(self, projects): 199 def _ResetPathToProjectMap(self, projects):
160 self._by_path = dict((p.worktree, p) for p in projects) 200 self._by_path = dict((p.worktree, p) for p in projects)
161 201
diff --git a/subcmds/abandon.py b/subcmds/abandon.py
index 1d22917e..c7c127d6 100644
--- a/subcmds/abandon.py
+++ b/subcmds/abandon.py
@@ -15,10 +15,9 @@
15from collections import defaultdict 15from collections import defaultdict
16import functools 16import functools
17import itertools 17import itertools
18import multiprocessing
19import sys 18import sys
20 19
21from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE 20from command import Command, DEFAULT_LOCAL_JOBS
22from git_command import git 21from git_command import git
23from progress import Progress 22from progress import Progress
24 23
@@ -52,9 +51,9 @@ It is equivalent to "git branch -D <branchname>".
52 else: 51 else:
53 args.insert(0, "'All local branches'") 52 args.insert(0, "'All local branches'")
54 53
55 def _ExecuteOne(self, opt, nb, project): 54 def _ExecuteOne(self, all_branches, nb, project):
56 """Abandon one project.""" 55 """Abandon one project."""
57 if opt.all: 56 if all_branches:
58 branches = project.GetBranches() 57 branches = project.GetBranches()
59 else: 58 else:
60 branches = [nb] 59 branches = [nb]
@@ -72,7 +71,7 @@ It is equivalent to "git branch -D <branchname>".
72 success = defaultdict(list) 71 success = defaultdict(list)
73 all_projects = self.GetProjects(args[1:]) 72 all_projects = self.GetProjects(args[1:])
74 73
75 def _ProcessResults(states): 74 def _ProcessResults(_pool, pm, states):
76 for (results, project) in states: 75 for (results, project) in states:
77 for branch, status in results.items(): 76 for branch, status in results.items():
78 if status: 77 if status:
@@ -81,17 +80,12 @@ It is equivalent to "git branch -D <branchname>".
81 err[branch].append(project) 80 err[branch].append(project)
82 pm.update() 81 pm.update()
83 82
84 pm = Progress('Abandon %s' % nb, len(all_projects), quiet=opt.quiet) 83 self.ExecuteInParallel(
85 # NB: Multiprocessing is heavy, so don't spin it up for one job. 84 opt.jobs,
86 if len(all_projects) == 1 or opt.jobs == 1: 85 functools.partial(self._ExecuteOne, opt.all, nb),
87 _ProcessResults(self._ExecuteOne(opt, nb, x) for x in all_projects) 86 all_projects,
88 else: 87 callback=_ProcessResults,
89 with multiprocessing.Pool(opt.jobs) as pool: 88 output=Progress('Abandon %s' % (nb,), len(all_projects), quiet=opt.quiet))
90 states = pool.imap_unordered(
91 functools.partial(self._ExecuteOne, opt, nb), all_projects,
92 chunksize=WORKER_BATCH_SIZE)
93 _ProcessResults(states)
94 pm.end()
95 89
96 width = max(itertools.chain( 90 width = max(itertools.chain(
97 [25], (len(x) for x in itertools.chain(success, err)))) 91 [25], (len(x) for x in itertools.chain(success, err))))
diff --git a/subcmds/branches.py b/subcmds/branches.py
index d5ea580c..2dc102bb 100644
--- a/subcmds/branches.py
+++ b/subcmds/branches.py
@@ -13,10 +13,10 @@
13# limitations under the License. 13# limitations under the License.
14 14
15import itertools 15import itertools
16import multiprocessing
17import sys 16import sys
17
18from color import Coloring 18from color import Coloring
19from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE 19from command import Command, DEFAULT_LOCAL_JOBS
20 20
21 21
22class BranchColoring(Coloring): 22class BranchColoring(Coloring):
@@ -102,15 +102,19 @@ is shown, then the branch appears in all projects.
102 out = BranchColoring(self.manifest.manifestProject.config) 102 out = BranchColoring(self.manifest.manifestProject.config)
103 all_branches = {} 103 all_branches = {}
104 project_cnt = len(projects) 104 project_cnt = len(projects)
105 with multiprocessing.Pool(processes=opt.jobs) as pool:
106 project_branches = pool.imap_unordered(
107 expand_project_to_branches, projects, chunksize=WORKER_BATCH_SIZE)
108 105
109 for name, b in itertools.chain.from_iterable(project_branches): 106 def _ProcessResults(_pool, _output, results):
107 for name, b in itertools.chain.from_iterable(results):
110 if name not in all_branches: 108 if name not in all_branches:
111 all_branches[name] = BranchInfo(name) 109 all_branches[name] = BranchInfo(name)
112 all_branches[name].add(b) 110 all_branches[name].add(b)
113 111
112 self.ExecuteInParallel(
113 opt.jobs,
114 expand_project_to_branches,
115 projects,
116 callback=_ProcessResults)
117
114 names = sorted(all_branches) 118 names = sorted(all_branches)
115 119
116 if not names: 120 if not names:
diff --git a/subcmds/checkout.py b/subcmds/checkout.py
index 6b71a8fa..4d8009b1 100644
--- a/subcmds/checkout.py
+++ b/subcmds/checkout.py
@@ -13,10 +13,9 @@
13# limitations under the License. 13# limitations under the License.
14 14
15import functools 15import functools
16import multiprocessing
17import sys 16import sys
18 17
19from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE 18from command import Command, DEFAULT_LOCAL_JOBS
20from progress import Progress 19from progress import Progress
21 20
22 21
@@ -50,7 +49,7 @@ The command is equivalent to:
50 success = [] 49 success = []
51 all_projects = self.GetProjects(args[1:]) 50 all_projects = self.GetProjects(args[1:])
52 51
53 def _ProcessResults(results): 52 def _ProcessResults(_pool, pm, results):
54 for status, project in results: 53 for status, project in results:
55 if status is not None: 54 if status is not None:
56 if status: 55 if status:
@@ -59,17 +58,12 @@ The command is equivalent to:
59 err.append(project) 58 err.append(project)
60 pm.update() 59 pm.update()
61 60
62 pm = Progress('Checkout %s' % nb, len(all_projects), quiet=opt.quiet) 61 self.ExecuteInParallel(
63 # NB: Multiprocessing is heavy, so don't spin it up for one job. 62 opt.jobs,
64 if len(all_projects) == 1 or opt.jobs == 1: 63 functools.partial(self._ExecuteOne, nb),
65 _ProcessResults(self._ExecuteOne(nb, x) for x in all_projects) 64 all_projects,
66 else: 65 callback=_ProcessResults,
67 with multiprocessing.Pool(opt.jobs) as pool: 66 output=Progress('Checkout %s' % (nb,), len(all_projects), quiet=opt.quiet))
68 results = pool.imap_unordered(
69 functools.partial(self._ExecuteOne, nb), all_projects,
70 chunksize=WORKER_BATCH_SIZE)
71 _ProcessResults(results)
72 pm.end()
73 67
74 if err: 68 if err:
75 for p in err: 69 for p in err:
diff --git a/subcmds/diff.py b/subcmds/diff.py
index cdc262e6..4966bb1a 100644
--- a/subcmds/diff.py
+++ b/subcmds/diff.py
@@ -14,9 +14,8 @@
14 14
15import functools 15import functools
16import io 16import io
17import multiprocessing
18 17
19from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE 18from command import DEFAULT_LOCAL_JOBS, PagedCommand
20 19
21 20
22class Diff(PagedCommand): 21class Diff(PagedCommand):
@@ -36,7 +35,7 @@ to the Unix 'patch' command.
36 dest='absolute', action='store_true', 35 dest='absolute', action='store_true',
37 help='Paths are relative to the repository root') 36 help='Paths are relative to the repository root')
38 37
39 def _DiffHelper(self, absolute, project): 38 def _ExecuteOne(self, absolute, project):
40 """Obtains the diff for a specific project. 39 """Obtains the diff for a specific project.
41 40
42 Args: 41 Args:
@@ -51,22 +50,20 @@ to the Unix 'patch' command.
51 return (ret, buf.getvalue()) 50 return (ret, buf.getvalue())
52 51
53 def Execute(self, opt, args): 52 def Execute(self, opt, args):
54 ret = 0
55 all_projects = self.GetProjects(args) 53 all_projects = self.GetProjects(args)
56 54
57 # NB: Multiprocessing is heavy, so don't spin it up for one job. 55 def _ProcessResults(_pool, _output, results):
58 if len(all_projects) == 1 or opt.jobs == 1: 56 ret = 0
59 for project in all_projects: 57 for (state, output) in results:
60 if not project.PrintWorkTreeDiff(opt.absolute): 58 if output:
59 print(output, end='')
60 if not state:
61 ret = 1 61 ret = 1
62 else: 62 return ret
63 with multiprocessing.Pool(opt.jobs) as pool:
64 states = pool.imap(functools.partial(self._DiffHelper, opt.absolute),
65 all_projects, WORKER_BATCH_SIZE)
66 for (state, output) in states:
67 if output:
68 print(output, end='')
69 if not state:
70 ret = 1
71 63
72 return ret 64 return self.ExecuteInParallel(
65 opt.jobs,
66 functools.partial(self._ExecuteOne, opt.absolute),
67 all_projects,
68 callback=_ProcessResults,
69 ordered=True)
diff --git a/subcmds/grep.py b/subcmds/grep.py
index 9a4a8a36..6cb1445a 100644
--- a/subcmds/grep.py
+++ b/subcmds/grep.py
@@ -13,11 +13,10 @@
13# limitations under the License. 13# limitations under the License.
14 14
15import functools 15import functools
16import multiprocessing
17import sys 16import sys
18 17
19from color import Coloring 18from color import Coloring
20from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE 19from command import DEFAULT_LOCAL_JOBS, PagedCommand
21from error import GitError 20from error import GitError
22from git_command import GitCommand 21from git_command import GitCommand
23 22
@@ -173,7 +172,7 @@ contain a line that matches both expressions:
173 return (project, p.Wait(), p.stdout, p.stderr) 172 return (project, p.Wait(), p.stdout, p.stderr)
174 173
175 @staticmethod 174 @staticmethod
176 def _ProcessResults(out, full_name, have_rev, results): 175 def _ProcessResults(full_name, have_rev, _pool, out, results):
177 git_failed = False 176 git_failed = False
178 bad_rev = False 177 bad_rev = False
179 have_match = False 178 have_match = False
@@ -256,18 +255,13 @@ contain a line that matches both expressions:
256 cmd_argv.extend(opt.revision) 255 cmd_argv.extend(opt.revision)
257 cmd_argv.append('--') 256 cmd_argv.append('--')
258 257
259 process_results = functools.partial( 258 git_failed, bad_rev, have_match = self.ExecuteInParallel(
260 self._ProcessResults, out, full_name, have_rev) 259 opt.jobs,
261 # NB: Multiprocessing is heavy, so don't spin it up for one job. 260 functools.partial(self._ExecuteOne, cmd_argv),
262 if len(projects) == 1 or opt.jobs == 1: 261 projects,
263 git_failed, bad_rev, have_match = process_results( 262 callback=functools.partial(self._ProcessResults, full_name, have_rev),
264 self._ExecuteOne(cmd_argv, x) for x in projects) 263 output=out,
265 else: 264 ordered=True)
266 with multiprocessing.Pool(opt.jobs) as pool:
267 results = pool.imap(
268 functools.partial(self._ExecuteOne, cmd_argv), projects,
269 chunksize=WORKER_BATCH_SIZE)
270 git_failed, bad_rev, have_match = process_results(results)
271 265
272 if git_failed: 266 if git_failed:
273 sys.exit(1) 267 sys.exit(1)
diff --git a/subcmds/prune.py b/subcmds/prune.py
index 4084c8b6..236b647f 100644
--- a/subcmds/prune.py
+++ b/subcmds/prune.py
@@ -13,10 +13,9 @@
13# limitations under the License. 13# limitations under the License.
14 14
15import itertools 15import itertools
16import multiprocessing
17 16
18from color import Coloring 17from color import Coloring
19from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE 18from command import DEFAULT_LOCAL_JOBS, PagedCommand
20 19
21 20
22class Prune(PagedCommand): 21class Prune(PagedCommand):
@@ -36,18 +35,15 @@ class Prune(PagedCommand):
36 35
37 # NB: Should be able to refactor this module to display summary as results 36 # NB: Should be able to refactor this module to display summary as results
38 # come back from children. 37 # come back from children.
39 def _ProcessResults(results): 38 def _ProcessResults(_pool, _output, results):
40 return list(itertools.chain.from_iterable(results)) 39 return list(itertools.chain.from_iterable(results))
41 40
42 # NB: Multiprocessing is heavy, so don't spin it up for one job. 41 all_branches = self.ExecuteInParallel(
43 if len(projects) == 1 or opt.jobs == 1: 42 opt.jobs,
44 all_branches = _ProcessResults(self._ExecuteOne(x) for x in projects) 43 self._ExecuteOne,
45 else: 44 projects,
46 with multiprocessing.Pool(opt.jobs) as pool: 45 callback=_ProcessResults,
47 results = pool.imap( 46 ordered=True)
48 self._ExecuteOne, projects,
49 chunksize=WORKER_BATCH_SIZE)
50 all_branches = _ProcessResults(results)
51 47
52 if not all_branches: 48 if not all_branches:
53 return 49 return
diff --git a/subcmds/start.py b/subcmds/start.py
index aa2f915a..ff2bae56 100644
--- a/subcmds/start.py
+++ b/subcmds/start.py
@@ -13,11 +13,10 @@
13# limitations under the License. 13# limitations under the License.
14 14
15import functools 15import functools
16import multiprocessing
17import os 16import os
18import sys 17import sys
19 18
20from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE 19from command import Command, DEFAULT_LOCAL_JOBS
21from git_config import IsImmutable 20from git_config import IsImmutable
22from git_command import git 21from git_command import git
23import gitc_utils 22import gitc_utils
@@ -55,7 +54,7 @@ revision specified in the manifest.
55 if not git.check_ref_format('heads/%s' % nb): 54 if not git.check_ref_format('heads/%s' % nb):
56 self.OptionParser.error("'%s' is not a valid name" % nb) 55 self.OptionParser.error("'%s' is not a valid name" % nb)
57 56
58 def _ExecuteOne(self, opt, nb, project): 57 def _ExecuteOne(self, revision, nb, project):
59 """Start one project.""" 58 """Start one project."""
60 # If the current revision is immutable, such as a SHA1, a tag or 59 # If the current revision is immutable, such as a SHA1, a tag or
61 # a change, then we can't push back to it. Substitute with 60 # a change, then we can't push back to it. Substitute with
@@ -69,7 +68,7 @@ revision specified in the manifest.
69 68
70 try: 69 try:
71 ret = project.StartBranch( 70 ret = project.StartBranch(
72 nb, branch_merge=branch_merge, revision=opt.revision) 71 nb, branch_merge=branch_merge, revision=revision)
73 except Exception as e: 72 except Exception as e:
74 print('error: unable to checkout %s: %s' % (project.name, e), file=sys.stderr) 73 print('error: unable to checkout %s: %s' % (project.name, e), file=sys.stderr)
75 ret = False 74 ret = False
@@ -123,23 +122,18 @@ revision specified in the manifest.
123 pm.update() 122 pm.update()
124 pm.end() 123 pm.end()
125 124
126 def _ProcessResults(results): 125 def _ProcessResults(_pool, pm, results):
127 for (result, project) in results: 126 for (result, project) in results:
128 if not result: 127 if not result:
129 err.append(project) 128 err.append(project)
130 pm.update() 129 pm.update()
131 130
132 pm = Progress('Starting %s' % nb, len(all_projects), quiet=opt.quiet) 131 self.ExecuteInParallel(
133 # NB: Multiprocessing is heavy, so don't spin it up for one job. 132 opt.jobs,
134 if len(all_projects) == 1 or opt.jobs == 1: 133 functools.partial(self._ExecuteOne, opt.revision, nb),
135 _ProcessResults(self._ExecuteOne(opt, nb, x) for x in all_projects) 134 all_projects,
136 else: 135 callback=_ProcessResults,
137 with multiprocessing.Pool(opt.jobs) as pool: 136 output=Progress('Starting %s' % (nb,), len(all_projects), quiet=opt.quiet))
138 results = pool.imap_unordered(
139 functools.partial(self._ExecuteOne, opt, nb), all_projects,
140 chunksize=WORKER_BATCH_SIZE)
141 _ProcessResults(results)
142 pm.end()
143 137
144 if err: 138 if err:
145 for p in err: 139 for p in err:
diff --git a/subcmds/status.py b/subcmds/status.py
index dc223a00..1b48dcea 100644
--- a/subcmds/status.py
+++ b/subcmds/status.py
@@ -15,10 +15,9 @@
15import functools 15import functools
16import glob 16import glob
17import io 17import io
18import multiprocessing
19import os 18import os
20 19
21from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE 20from command import DEFAULT_LOCAL_JOBS, PagedCommand
22 21
23from color import Coloring 22from color import Coloring
24import platform_utils 23import platform_utils
@@ -119,22 +118,23 @@ the following meanings:
119 118
120 def Execute(self, opt, args): 119 def Execute(self, opt, args):
121 all_projects = self.GetProjects(args) 120 all_projects = self.GetProjects(args)
122 counter = 0
123 121
124 if opt.jobs == 1: 122 def _ProcessResults(_pool, _output, results):
125 for project in all_projects: 123 ret = 0
126 state = project.PrintWorkTreeStatus(quiet=opt.quiet) 124 for (state, output) in results:
125 if output:
126 print(output, end='')
127 if state == 'CLEAN': 127 if state == 'CLEAN':
128 counter += 1 128 ret += 1
129 else: 129 return ret
130 with multiprocessing.Pool(opt.jobs) as pool: 130
131 states = pool.imap(functools.partial(self._StatusHelper, opt.quiet), 131 counter = self.ExecuteInParallel(
132 all_projects, chunksize=WORKER_BATCH_SIZE) 132 opt.jobs,
133 for (state, output) in states: 133 functools.partial(self._StatusHelper, opt.quiet),
134 if output: 134 all_projects,
135 print(output, end='') 135 callback=_ProcessResults,
136 if state == 'CLEAN': 136 ordered=True)
137 counter += 1 137
138 if not opt.quiet and len(all_projects) == counter: 138 if not opt.quiet and len(all_projects) == counter:
139 print('nothing to commit (working directory clean)') 139 print('nothing to commit (working directory clean)')
140 140
diff --git a/subcmds/sync.py b/subcmds/sync.py
index 21166af5..4763fadc 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -51,7 +51,7 @@ import git_superproject
51import gitc_utils 51import gitc_utils
52from project import Project 52from project import Project
53from project import RemoteSpec 53from project import RemoteSpec
54from command import Command, MirrorSafeCommand, WORKER_BATCH_SIZE 54from command import Command, MirrorSafeCommand
55from error import RepoChangedException, GitError, ManifestParseError 55from error import RepoChangedException, GitError, ManifestParseError
56import platform_utils 56import platform_utils
57from project import SyncBuffer 57from project import SyncBuffer
@@ -428,11 +428,12 @@ later is required to fix a server side protocol bug.
428 428
429 return (ret, fetched) 429 return (ret, fetched)
430 430
431 def _CheckoutOne(self, opt, project): 431 def _CheckoutOne(self, detach_head, force_sync, project):
432 """Checkout work tree for one project 432 """Checkout work tree for one project
433 433
434 Args: 434 Args:
435 opt: Program options returned from optparse. See _Options(). 435 detach_head: Whether to leave a detached HEAD.
436 force_sync: Force checking out of the repo.
436 project: Project object for the project to checkout. 437 project: Project object for the project to checkout.
437 438
438 Returns: 439 Returns:
@@ -440,10 +441,10 @@ later is required to fix a server side protocol bug.
440 """ 441 """
441 start = time.time() 442 start = time.time()
442 syncbuf = SyncBuffer(self.manifest.manifestProject.config, 443 syncbuf = SyncBuffer(self.manifest.manifestProject.config,
443 detach_head=opt.detach_head) 444 detach_head=detach_head)
444 success = False 445 success = False
445 try: 446 try:
446 project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync) 447 project.Sync_LocalHalf(syncbuf, force_sync=force_sync)
447 success = syncbuf.Finish() 448 success = syncbuf.Finish()
448 except Exception as e: 449 except Exception as e:
449 print('error: Cannot checkout %s: %s: %s' % 450 print('error: Cannot checkout %s: %s: %s' %
@@ -464,44 +465,32 @@ later is required to fix a server side protocol bug.
464 opt: Program options returned from optparse. See _Options(). 465 opt: Program options returned from optparse. See _Options().
465 err_results: A list of strings, paths to git repos where checkout failed. 466 err_results: A list of strings, paths to git repos where checkout failed.
466 """ 467 """
467 ret = True
468 jobs = opt.jobs_checkout if opt.jobs_checkout else self.jobs
469
470 # Only checkout projects with worktrees. 468 # Only checkout projects with worktrees.
471 all_projects = [x for x in all_projects if x.worktree] 469 all_projects = [x for x in all_projects if x.worktree]
472 470
473 pm = Progress('Checking out', len(all_projects), quiet=opt.quiet) 471 def _ProcessResults(pool, pm, results):
474 472 ret = True
475 def _ProcessResults(results):
476 for (success, project, start, finish) in results: 473 for (success, project, start, finish) in results:
477 self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, 474 self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL,
478 start, finish, success) 475 start, finish, success)
479 # Check for any errors before running any more tasks. 476 # Check for any errors before running any more tasks.
480 # ...we'll let existing jobs finish, though. 477 # ...we'll let existing jobs finish, though.
481 if not success: 478 if not success:
479 ret = False
482 err_results.append(project.relpath) 480 err_results.append(project.relpath)
483 if opt.fail_fast: 481 if opt.fail_fast:
484 return False 482 if pool:
483 pool.close()
484 return ret
485 pm.update(msg=project.name) 485 pm.update(msg=project.name)
486 return True 486 return ret
487
488 # NB: Multiprocessing is heavy, so don't spin it up for one job.
489 if len(all_projects) == 1 or jobs == 1:
490 if not _ProcessResults(self._CheckoutOne(opt, x) for x in all_projects):
491 ret = False
492 else:
493 with multiprocessing.Pool(jobs) as pool:
494 results = pool.imap_unordered(
495 functools.partial(self._CheckoutOne, opt),
496 all_projects,
497 chunksize=WORKER_BATCH_SIZE)
498 if not _ProcessResults(results):
499 ret = False
500 pool.close()
501
502 pm.end()
503 487
504 return ret and not err_results 488 return self.ExecuteInParallel(
489 opt.jobs_checkout if opt.jobs_checkout else self.jobs,
490 functools.partial(self._CheckoutOne, opt.detach_head, opt.force_sync),
491 all_projects,
492 callback=_ProcessResults,
493 output=Progress('Checking out', len(all_projects), quiet=opt.quiet)) and not err_results
505 494
506 def _GCProjects(self, projects, opt, err_event): 495 def _GCProjects(self, projects, opt, err_event):
507 pm = Progress('Garbage collecting', len(projects), delay=False, quiet=opt.quiet) 496 pm = Progress('Garbage collecting', len(projects), delay=False, quiet=opt.quiet)