diff options
-rw-r--r-- | command.py | 40 | ||||
-rw-r--r-- | subcmds/abandon.py | 26 | ||||
-rw-r--r-- | subcmds/branches.py | 16 | ||||
-rw-r--r-- | subcmds/checkout.py | 22 | ||||
-rw-r--r-- | subcmds/diff.py | 33 | ||||
-rw-r--r-- | subcmds/grep.py | 24 | ||||
-rw-r--r-- | subcmds/prune.py | 20 | ||||
-rw-r--r-- | subcmds/start.py | 26 | ||||
-rw-r--r-- | subcmds/status.py | 32 | ||||
-rw-r--r-- | subcmds/sync.py | 49 |
10 files changed, 145 insertions, 143 deletions
@@ -12,6 +12,7 @@ | |||
12 | # See the License for the specific language governing permissions and | 12 | # See the License for the specific language governing permissions and |
13 | # limitations under the License. | 13 | # limitations under the License. |
14 | 14 | ||
15 | import multiprocessing | ||
15 | import os | 16 | import os |
16 | import optparse | 17 | import optparse |
17 | import platform | 18 | import platform |
@@ -21,6 +22,7 @@ import sys | |||
21 | from event_log import EventLog | 22 | from event_log import EventLog |
22 | from error import NoSuchProjectError | 23 | from error import NoSuchProjectError |
23 | from error import InvalidProjectGroupsError | 24 | from error import InvalidProjectGroupsError |
25 | import progress | ||
24 | 26 | ||
25 | 27 | ||
26 | # Number of projects to submit to a single worker process at a time. | 28 | # Number of projects to submit to a single worker process at a time. |
@@ -156,6 +158,44 @@ class Command(object): | |||
156 | """ | 158 | """ |
157 | raise NotImplementedError | 159 | raise NotImplementedError |
158 | 160 | ||
161 | @staticmethod | ||
162 | def ExecuteInParallel(jobs, func, inputs, callback, output=None, ordered=False): | ||
163 | """Helper for managing parallel execution boiler plate. | ||
164 | |||
165 | For subcommands that can easily split their work up. | ||
166 | |||
167 | Args: | ||
168 | jobs: How many parallel processes to use. | ||
169 | func: The function to apply to each of the |inputs|. Usually a | ||
170 | functools.partial for wrapping additional arguments. It will be run | ||
171 | in a separate process, so it must be pickalable, so nested functions | ||
172 | won't work. Methods on the subcommand Command class should work. | ||
173 | inputs: The list of items to process. Must be a list. | ||
174 | callback: The function to pass the results to for processing. It will be | ||
175 | executed in the main thread and process the results of |func| as they | ||
176 | become available. Thus it may be a local nested function. Its return | ||
177 | value is passed back directly. It takes three arguments: | ||
178 | - The processing pool (or None with one job). | ||
179 | - The |output| argument. | ||
180 | - An iterator for the results. | ||
181 | output: An output manager. May be progress.Progess or color.Coloring. | ||
182 | ordered: Whether the jobs should be processed in order. | ||
183 | |||
184 | Returns: | ||
185 | The |callback| function's results are returned. | ||
186 | """ | ||
187 | try: | ||
188 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | ||
189 | if len(inputs) == 1 or jobs == 1: | ||
190 | return callback(None, output, (func(x) for x in inputs)) | ||
191 | else: | ||
192 | with multiprocessing.Pool(jobs) as pool: | ||
193 | submit = pool.imap if ordered else pool.imap_unordered | ||
194 | return callback(pool, output, submit(func, inputs, chunksize=WORKER_BATCH_SIZE)) | ||
195 | finally: | ||
196 | if isinstance(output, progress.Progress): | ||
197 | output.end() | ||
198 | |||
159 | def _ResetPathToProjectMap(self, projects): | 199 | def _ResetPathToProjectMap(self, projects): |
160 | self._by_path = dict((p.worktree, p) for p in projects) | 200 | self._by_path = dict((p.worktree, p) for p in projects) |
161 | 201 | ||
diff --git a/subcmds/abandon.py b/subcmds/abandon.py index 1d22917e..c7c127d6 100644 --- a/subcmds/abandon.py +++ b/subcmds/abandon.py | |||
@@ -15,10 +15,9 @@ | |||
15 | from collections import defaultdict | 15 | from collections import defaultdict |
16 | import functools | 16 | import functools |
17 | import itertools | 17 | import itertools |
18 | import multiprocessing | ||
19 | import sys | 18 | import sys |
20 | 19 | ||
21 | from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE | 20 | from command import Command, DEFAULT_LOCAL_JOBS |
22 | from git_command import git | 21 | from git_command import git |
23 | from progress import Progress | 22 | from progress import Progress |
24 | 23 | ||
@@ -52,9 +51,9 @@ It is equivalent to "git branch -D <branchname>". | |||
52 | else: | 51 | else: |
53 | args.insert(0, "'All local branches'") | 52 | args.insert(0, "'All local branches'") |
54 | 53 | ||
55 | def _ExecuteOne(self, opt, nb, project): | 54 | def _ExecuteOne(self, all_branches, nb, project): |
56 | """Abandon one project.""" | 55 | """Abandon one project.""" |
57 | if opt.all: | 56 | if all_branches: |
58 | branches = project.GetBranches() | 57 | branches = project.GetBranches() |
59 | else: | 58 | else: |
60 | branches = [nb] | 59 | branches = [nb] |
@@ -72,7 +71,7 @@ It is equivalent to "git branch -D <branchname>". | |||
72 | success = defaultdict(list) | 71 | success = defaultdict(list) |
73 | all_projects = self.GetProjects(args[1:]) | 72 | all_projects = self.GetProjects(args[1:]) |
74 | 73 | ||
75 | def _ProcessResults(states): | 74 | def _ProcessResults(_pool, pm, states): |
76 | for (results, project) in states: | 75 | for (results, project) in states: |
77 | for branch, status in results.items(): | 76 | for branch, status in results.items(): |
78 | if status: | 77 | if status: |
@@ -81,17 +80,12 @@ It is equivalent to "git branch -D <branchname>". | |||
81 | err[branch].append(project) | 80 | err[branch].append(project) |
82 | pm.update() | 81 | pm.update() |
83 | 82 | ||
84 | pm = Progress('Abandon %s' % nb, len(all_projects), quiet=opt.quiet) | 83 | self.ExecuteInParallel( |
85 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 84 | opt.jobs, |
86 | if len(all_projects) == 1 or opt.jobs == 1: | 85 | functools.partial(self._ExecuteOne, opt.all, nb), |
87 | _ProcessResults(self._ExecuteOne(opt, nb, x) for x in all_projects) | 86 | all_projects, |
88 | else: | 87 | callback=_ProcessResults, |
89 | with multiprocessing.Pool(opt.jobs) as pool: | 88 | output=Progress('Abandon %s' % (nb,), len(all_projects), quiet=opt.quiet)) |
90 | states = pool.imap_unordered( | ||
91 | functools.partial(self._ExecuteOne, opt, nb), all_projects, | ||
92 | chunksize=WORKER_BATCH_SIZE) | ||
93 | _ProcessResults(states) | ||
94 | pm.end() | ||
95 | 89 | ||
96 | width = max(itertools.chain( | 90 | width = max(itertools.chain( |
97 | [25], (len(x) for x in itertools.chain(success, err)))) | 91 | [25], (len(x) for x in itertools.chain(success, err)))) |
diff --git a/subcmds/branches.py b/subcmds/branches.py index d5ea580c..2dc102bb 100644 --- a/subcmds/branches.py +++ b/subcmds/branches.py | |||
@@ -13,10 +13,10 @@ | |||
13 | # limitations under the License. | 13 | # limitations under the License. |
14 | 14 | ||
15 | import itertools | 15 | import itertools |
16 | import multiprocessing | ||
17 | import sys | 16 | import sys |
17 | |||
18 | from color import Coloring | 18 | from color import Coloring |
19 | from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE | 19 | from command import Command, DEFAULT_LOCAL_JOBS |
20 | 20 | ||
21 | 21 | ||
22 | class BranchColoring(Coloring): | 22 | class BranchColoring(Coloring): |
@@ -102,15 +102,19 @@ is shown, then the branch appears in all projects. | |||
102 | out = BranchColoring(self.manifest.manifestProject.config) | 102 | out = BranchColoring(self.manifest.manifestProject.config) |
103 | all_branches = {} | 103 | all_branches = {} |
104 | project_cnt = len(projects) | 104 | project_cnt = len(projects) |
105 | with multiprocessing.Pool(processes=opt.jobs) as pool: | ||
106 | project_branches = pool.imap_unordered( | ||
107 | expand_project_to_branches, projects, chunksize=WORKER_BATCH_SIZE) | ||
108 | 105 | ||
109 | for name, b in itertools.chain.from_iterable(project_branches): | 106 | def _ProcessResults(_pool, _output, results): |
107 | for name, b in itertools.chain.from_iterable(results): | ||
110 | if name not in all_branches: | 108 | if name not in all_branches: |
111 | all_branches[name] = BranchInfo(name) | 109 | all_branches[name] = BranchInfo(name) |
112 | all_branches[name].add(b) | 110 | all_branches[name].add(b) |
113 | 111 | ||
112 | self.ExecuteInParallel( | ||
113 | opt.jobs, | ||
114 | expand_project_to_branches, | ||
115 | projects, | ||
116 | callback=_ProcessResults) | ||
117 | |||
114 | names = sorted(all_branches) | 118 | names = sorted(all_branches) |
115 | 119 | ||
116 | if not names: | 120 | if not names: |
diff --git a/subcmds/checkout.py b/subcmds/checkout.py index 6b71a8fa..4d8009b1 100644 --- a/subcmds/checkout.py +++ b/subcmds/checkout.py | |||
@@ -13,10 +13,9 @@ | |||
13 | # limitations under the License. | 13 | # limitations under the License. |
14 | 14 | ||
15 | import functools | 15 | import functools |
16 | import multiprocessing | ||
17 | import sys | 16 | import sys |
18 | 17 | ||
19 | from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE | 18 | from command import Command, DEFAULT_LOCAL_JOBS |
20 | from progress import Progress | 19 | from progress import Progress |
21 | 20 | ||
22 | 21 | ||
@@ -50,7 +49,7 @@ The command is equivalent to: | |||
50 | success = [] | 49 | success = [] |
51 | all_projects = self.GetProjects(args[1:]) | 50 | all_projects = self.GetProjects(args[1:]) |
52 | 51 | ||
53 | def _ProcessResults(results): | 52 | def _ProcessResults(_pool, pm, results): |
54 | for status, project in results: | 53 | for status, project in results: |
55 | if status is not None: | 54 | if status is not None: |
56 | if status: | 55 | if status: |
@@ -59,17 +58,12 @@ The command is equivalent to: | |||
59 | err.append(project) | 58 | err.append(project) |
60 | pm.update() | 59 | pm.update() |
61 | 60 | ||
62 | pm = Progress('Checkout %s' % nb, len(all_projects), quiet=opt.quiet) | 61 | self.ExecuteInParallel( |
63 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 62 | opt.jobs, |
64 | if len(all_projects) == 1 or opt.jobs == 1: | 63 | functools.partial(self._ExecuteOne, nb), |
65 | _ProcessResults(self._ExecuteOne(nb, x) for x in all_projects) | 64 | all_projects, |
66 | else: | 65 | callback=_ProcessResults, |
67 | with multiprocessing.Pool(opt.jobs) as pool: | 66 | output=Progress('Checkout %s' % (nb,), len(all_projects), quiet=opt.quiet)) |
68 | results = pool.imap_unordered( | ||
69 | functools.partial(self._ExecuteOne, nb), all_projects, | ||
70 | chunksize=WORKER_BATCH_SIZE) | ||
71 | _ProcessResults(results) | ||
72 | pm.end() | ||
73 | 67 | ||
74 | if err: | 68 | if err: |
75 | for p in err: | 69 | for p in err: |
diff --git a/subcmds/diff.py b/subcmds/diff.py index cdc262e6..4966bb1a 100644 --- a/subcmds/diff.py +++ b/subcmds/diff.py | |||
@@ -14,9 +14,8 @@ | |||
14 | 14 | ||
15 | import functools | 15 | import functools |
16 | import io | 16 | import io |
17 | import multiprocessing | ||
18 | 17 | ||
19 | from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE | 18 | from command import DEFAULT_LOCAL_JOBS, PagedCommand |
20 | 19 | ||
21 | 20 | ||
22 | class Diff(PagedCommand): | 21 | class Diff(PagedCommand): |
@@ -36,7 +35,7 @@ to the Unix 'patch' command. | |||
36 | dest='absolute', action='store_true', | 35 | dest='absolute', action='store_true', |
37 | help='Paths are relative to the repository root') | 36 | help='Paths are relative to the repository root') |
38 | 37 | ||
39 | def _DiffHelper(self, absolute, project): | 38 | def _ExecuteOne(self, absolute, project): |
40 | """Obtains the diff for a specific project. | 39 | """Obtains the diff for a specific project. |
41 | 40 | ||
42 | Args: | 41 | Args: |
@@ -51,22 +50,20 @@ to the Unix 'patch' command. | |||
51 | return (ret, buf.getvalue()) | 50 | return (ret, buf.getvalue()) |
52 | 51 | ||
53 | def Execute(self, opt, args): | 52 | def Execute(self, opt, args): |
54 | ret = 0 | ||
55 | all_projects = self.GetProjects(args) | 53 | all_projects = self.GetProjects(args) |
56 | 54 | ||
57 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 55 | def _ProcessResults(_pool, _output, results): |
58 | if len(all_projects) == 1 or opt.jobs == 1: | 56 | ret = 0 |
59 | for project in all_projects: | 57 | for (state, output) in results: |
60 | if not project.PrintWorkTreeDiff(opt.absolute): | 58 | if output: |
59 | print(output, end='') | ||
60 | if not state: | ||
61 | ret = 1 | 61 | ret = 1 |
62 | else: | 62 | return ret |
63 | with multiprocessing.Pool(opt.jobs) as pool: | ||
64 | states = pool.imap(functools.partial(self._DiffHelper, opt.absolute), | ||
65 | all_projects, WORKER_BATCH_SIZE) | ||
66 | for (state, output) in states: | ||
67 | if output: | ||
68 | print(output, end='') | ||
69 | if not state: | ||
70 | ret = 1 | ||
71 | 63 | ||
72 | return ret | 64 | return self.ExecuteInParallel( |
65 | opt.jobs, | ||
66 | functools.partial(self._ExecuteOne, opt.absolute), | ||
67 | all_projects, | ||
68 | callback=_ProcessResults, | ||
69 | ordered=True) | ||
diff --git a/subcmds/grep.py b/subcmds/grep.py index 9a4a8a36..6cb1445a 100644 --- a/subcmds/grep.py +++ b/subcmds/grep.py | |||
@@ -13,11 +13,10 @@ | |||
13 | # limitations under the License. | 13 | # limitations under the License. |
14 | 14 | ||
15 | import functools | 15 | import functools |
16 | import multiprocessing | ||
17 | import sys | 16 | import sys |
18 | 17 | ||
19 | from color import Coloring | 18 | from color import Coloring |
20 | from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE | 19 | from command import DEFAULT_LOCAL_JOBS, PagedCommand |
21 | from error import GitError | 20 | from error import GitError |
22 | from git_command import GitCommand | 21 | from git_command import GitCommand |
23 | 22 | ||
@@ -173,7 +172,7 @@ contain a line that matches both expressions: | |||
173 | return (project, p.Wait(), p.stdout, p.stderr) | 172 | return (project, p.Wait(), p.stdout, p.stderr) |
174 | 173 | ||
175 | @staticmethod | 174 | @staticmethod |
176 | def _ProcessResults(out, full_name, have_rev, results): | 175 | def _ProcessResults(full_name, have_rev, _pool, out, results): |
177 | git_failed = False | 176 | git_failed = False |
178 | bad_rev = False | 177 | bad_rev = False |
179 | have_match = False | 178 | have_match = False |
@@ -256,18 +255,13 @@ contain a line that matches both expressions: | |||
256 | cmd_argv.extend(opt.revision) | 255 | cmd_argv.extend(opt.revision) |
257 | cmd_argv.append('--') | 256 | cmd_argv.append('--') |
258 | 257 | ||
259 | process_results = functools.partial( | 258 | git_failed, bad_rev, have_match = self.ExecuteInParallel( |
260 | self._ProcessResults, out, full_name, have_rev) | 259 | opt.jobs, |
261 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 260 | functools.partial(self._ExecuteOne, cmd_argv), |
262 | if len(projects) == 1 or opt.jobs == 1: | 261 | projects, |
263 | git_failed, bad_rev, have_match = process_results( | 262 | callback=functools.partial(self._ProcessResults, full_name, have_rev), |
264 | self._ExecuteOne(cmd_argv, x) for x in projects) | 263 | output=out, |
265 | else: | 264 | ordered=True) |
266 | with multiprocessing.Pool(opt.jobs) as pool: | ||
267 | results = pool.imap( | ||
268 | functools.partial(self._ExecuteOne, cmd_argv), projects, | ||
269 | chunksize=WORKER_BATCH_SIZE) | ||
270 | git_failed, bad_rev, have_match = process_results(results) | ||
271 | 265 | ||
272 | if git_failed: | 266 | if git_failed: |
273 | sys.exit(1) | 267 | sys.exit(1) |
diff --git a/subcmds/prune.py b/subcmds/prune.py index 4084c8b6..236b647f 100644 --- a/subcmds/prune.py +++ b/subcmds/prune.py | |||
@@ -13,10 +13,9 @@ | |||
13 | # limitations under the License. | 13 | # limitations under the License. |
14 | 14 | ||
15 | import itertools | 15 | import itertools |
16 | import multiprocessing | ||
17 | 16 | ||
18 | from color import Coloring | 17 | from color import Coloring |
19 | from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE | 18 | from command import DEFAULT_LOCAL_JOBS, PagedCommand |
20 | 19 | ||
21 | 20 | ||
22 | class Prune(PagedCommand): | 21 | class Prune(PagedCommand): |
@@ -36,18 +35,15 @@ class Prune(PagedCommand): | |||
36 | 35 | ||
37 | # NB: Should be able to refactor this module to display summary as results | 36 | # NB: Should be able to refactor this module to display summary as results |
38 | # come back from children. | 37 | # come back from children. |
39 | def _ProcessResults(results): | 38 | def _ProcessResults(_pool, _output, results): |
40 | return list(itertools.chain.from_iterable(results)) | 39 | return list(itertools.chain.from_iterable(results)) |
41 | 40 | ||
42 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 41 | all_branches = self.ExecuteInParallel( |
43 | if len(projects) == 1 or opt.jobs == 1: | 42 | opt.jobs, |
44 | all_branches = _ProcessResults(self._ExecuteOne(x) for x in projects) | 43 | self._ExecuteOne, |
45 | else: | 44 | projects, |
46 | with multiprocessing.Pool(opt.jobs) as pool: | 45 | callback=_ProcessResults, |
47 | results = pool.imap( | 46 | ordered=True) |
48 | self._ExecuteOne, projects, | ||
49 | chunksize=WORKER_BATCH_SIZE) | ||
50 | all_branches = _ProcessResults(results) | ||
51 | 47 | ||
52 | if not all_branches: | 48 | if not all_branches: |
53 | return | 49 | return |
diff --git a/subcmds/start.py b/subcmds/start.py index aa2f915a..ff2bae56 100644 --- a/subcmds/start.py +++ b/subcmds/start.py | |||
@@ -13,11 +13,10 @@ | |||
13 | # limitations under the License. | 13 | # limitations under the License. |
14 | 14 | ||
15 | import functools | 15 | import functools |
16 | import multiprocessing | ||
17 | import os | 16 | import os |
18 | import sys | 17 | import sys |
19 | 18 | ||
20 | from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE | 19 | from command import Command, DEFAULT_LOCAL_JOBS |
21 | from git_config import IsImmutable | 20 | from git_config import IsImmutable |
22 | from git_command import git | 21 | from git_command import git |
23 | import gitc_utils | 22 | import gitc_utils |
@@ -55,7 +54,7 @@ revision specified in the manifest. | |||
55 | if not git.check_ref_format('heads/%s' % nb): | 54 | if not git.check_ref_format('heads/%s' % nb): |
56 | self.OptionParser.error("'%s' is not a valid name" % nb) | 55 | self.OptionParser.error("'%s' is not a valid name" % nb) |
57 | 56 | ||
58 | def _ExecuteOne(self, opt, nb, project): | 57 | def _ExecuteOne(self, revision, nb, project): |
59 | """Start one project.""" | 58 | """Start one project.""" |
60 | # If the current revision is immutable, such as a SHA1, a tag or | 59 | # If the current revision is immutable, such as a SHA1, a tag or |
61 | # a change, then we can't push back to it. Substitute with | 60 | # a change, then we can't push back to it. Substitute with |
@@ -69,7 +68,7 @@ revision specified in the manifest. | |||
69 | 68 | ||
70 | try: | 69 | try: |
71 | ret = project.StartBranch( | 70 | ret = project.StartBranch( |
72 | nb, branch_merge=branch_merge, revision=opt.revision) | 71 | nb, branch_merge=branch_merge, revision=revision) |
73 | except Exception as e: | 72 | except Exception as e: |
74 | print('error: unable to checkout %s: %s' % (project.name, e), file=sys.stderr) | 73 | print('error: unable to checkout %s: %s' % (project.name, e), file=sys.stderr) |
75 | ret = False | 74 | ret = False |
@@ -123,23 +122,18 @@ revision specified in the manifest. | |||
123 | pm.update() | 122 | pm.update() |
124 | pm.end() | 123 | pm.end() |
125 | 124 | ||
126 | def _ProcessResults(results): | 125 | def _ProcessResults(_pool, pm, results): |
127 | for (result, project) in results: | 126 | for (result, project) in results: |
128 | if not result: | 127 | if not result: |
129 | err.append(project) | 128 | err.append(project) |
130 | pm.update() | 129 | pm.update() |
131 | 130 | ||
132 | pm = Progress('Starting %s' % nb, len(all_projects), quiet=opt.quiet) | 131 | self.ExecuteInParallel( |
133 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 132 | opt.jobs, |
134 | if len(all_projects) == 1 or opt.jobs == 1: | 133 | functools.partial(self._ExecuteOne, opt.revision, nb), |
135 | _ProcessResults(self._ExecuteOne(opt, nb, x) for x in all_projects) | 134 | all_projects, |
136 | else: | 135 | callback=_ProcessResults, |
137 | with multiprocessing.Pool(opt.jobs) as pool: | 136 | output=Progress('Starting %s' % (nb,), len(all_projects), quiet=opt.quiet)) |
138 | results = pool.imap_unordered( | ||
139 | functools.partial(self._ExecuteOne, opt, nb), all_projects, | ||
140 | chunksize=WORKER_BATCH_SIZE) | ||
141 | _ProcessResults(results) | ||
142 | pm.end() | ||
143 | 137 | ||
144 | if err: | 138 | if err: |
145 | for p in err: | 139 | for p in err: |
diff --git a/subcmds/status.py b/subcmds/status.py index dc223a00..1b48dcea 100644 --- a/subcmds/status.py +++ b/subcmds/status.py | |||
@@ -15,10 +15,9 @@ | |||
15 | import functools | 15 | import functools |
16 | import glob | 16 | import glob |
17 | import io | 17 | import io |
18 | import multiprocessing | ||
19 | import os | 18 | import os |
20 | 19 | ||
21 | from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE | 20 | from command import DEFAULT_LOCAL_JOBS, PagedCommand |
22 | 21 | ||
23 | from color import Coloring | 22 | from color import Coloring |
24 | import platform_utils | 23 | import platform_utils |
@@ -119,22 +118,23 @@ the following meanings: | |||
119 | 118 | ||
120 | def Execute(self, opt, args): | 119 | def Execute(self, opt, args): |
121 | all_projects = self.GetProjects(args) | 120 | all_projects = self.GetProjects(args) |
122 | counter = 0 | ||
123 | 121 | ||
124 | if opt.jobs == 1: | 122 | def _ProcessResults(_pool, _output, results): |
125 | for project in all_projects: | 123 | ret = 0 |
126 | state = project.PrintWorkTreeStatus(quiet=opt.quiet) | 124 | for (state, output) in results: |
125 | if output: | ||
126 | print(output, end='') | ||
127 | if state == 'CLEAN': | 127 | if state == 'CLEAN': |
128 | counter += 1 | 128 | ret += 1 |
129 | else: | 129 | return ret |
130 | with multiprocessing.Pool(opt.jobs) as pool: | 130 | |
131 | states = pool.imap(functools.partial(self._StatusHelper, opt.quiet), | 131 | counter = self.ExecuteInParallel( |
132 | all_projects, chunksize=WORKER_BATCH_SIZE) | 132 | opt.jobs, |
133 | for (state, output) in states: | 133 | functools.partial(self._StatusHelper, opt.quiet), |
134 | if output: | 134 | all_projects, |
135 | print(output, end='') | 135 | callback=_ProcessResults, |
136 | if state == 'CLEAN': | 136 | ordered=True) |
137 | counter += 1 | 137 | |
138 | if not opt.quiet and len(all_projects) == counter: | 138 | if not opt.quiet and len(all_projects) == counter: |
139 | print('nothing to commit (working directory clean)') | 139 | print('nothing to commit (working directory clean)') |
140 | 140 | ||
diff --git a/subcmds/sync.py b/subcmds/sync.py index 21166af5..4763fadc 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
@@ -51,7 +51,7 @@ import git_superproject | |||
51 | import gitc_utils | 51 | import gitc_utils |
52 | from project import Project | 52 | from project import Project |
53 | from project import RemoteSpec | 53 | from project import RemoteSpec |
54 | from command import Command, MirrorSafeCommand, WORKER_BATCH_SIZE | 54 | from command import Command, MirrorSafeCommand |
55 | from error import RepoChangedException, GitError, ManifestParseError | 55 | from error import RepoChangedException, GitError, ManifestParseError |
56 | import platform_utils | 56 | import platform_utils |
57 | from project import SyncBuffer | 57 | from project import SyncBuffer |
@@ -428,11 +428,12 @@ later is required to fix a server side protocol bug. | |||
428 | 428 | ||
429 | return (ret, fetched) | 429 | return (ret, fetched) |
430 | 430 | ||
431 | def _CheckoutOne(self, opt, project): | 431 | def _CheckoutOne(self, detach_head, force_sync, project): |
432 | """Checkout work tree for one project | 432 | """Checkout work tree for one project |
433 | 433 | ||
434 | Args: | 434 | Args: |
435 | opt: Program options returned from optparse. See _Options(). | 435 | detach_head: Whether to leave a detached HEAD. |
436 | force_sync: Force checking out of the repo. | ||
436 | project: Project object for the project to checkout. | 437 | project: Project object for the project to checkout. |
437 | 438 | ||
438 | Returns: | 439 | Returns: |
@@ -440,10 +441,10 @@ later is required to fix a server side protocol bug. | |||
440 | """ | 441 | """ |
441 | start = time.time() | 442 | start = time.time() |
442 | syncbuf = SyncBuffer(self.manifest.manifestProject.config, | 443 | syncbuf = SyncBuffer(self.manifest.manifestProject.config, |
443 | detach_head=opt.detach_head) | 444 | detach_head=detach_head) |
444 | success = False | 445 | success = False |
445 | try: | 446 | try: |
446 | project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync) | 447 | project.Sync_LocalHalf(syncbuf, force_sync=force_sync) |
447 | success = syncbuf.Finish() | 448 | success = syncbuf.Finish() |
448 | except Exception as e: | 449 | except Exception as e: |
449 | print('error: Cannot checkout %s: %s: %s' % | 450 | print('error: Cannot checkout %s: %s: %s' % |
@@ -464,44 +465,32 @@ later is required to fix a server side protocol bug. | |||
464 | opt: Program options returned from optparse. See _Options(). | 465 | opt: Program options returned from optparse. See _Options(). |
465 | err_results: A list of strings, paths to git repos where checkout failed. | 466 | err_results: A list of strings, paths to git repos where checkout failed. |
466 | """ | 467 | """ |
467 | ret = True | ||
468 | jobs = opt.jobs_checkout if opt.jobs_checkout else self.jobs | ||
469 | |||
470 | # Only checkout projects with worktrees. | 468 | # Only checkout projects with worktrees. |
471 | all_projects = [x for x in all_projects if x.worktree] | 469 | all_projects = [x for x in all_projects if x.worktree] |
472 | 470 | ||
473 | pm = Progress('Checking out', len(all_projects), quiet=opt.quiet) | 471 | def _ProcessResults(pool, pm, results): |
474 | 472 | ret = True | |
475 | def _ProcessResults(results): | ||
476 | for (success, project, start, finish) in results: | 473 | for (success, project, start, finish) in results: |
477 | self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, | 474 | self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, |
478 | start, finish, success) | 475 | start, finish, success) |
479 | # Check for any errors before running any more tasks. | 476 | # Check for any errors before running any more tasks. |
480 | # ...we'll let existing jobs finish, though. | 477 | # ...we'll let existing jobs finish, though. |
481 | if not success: | 478 | if not success: |
479 | ret = False | ||
482 | err_results.append(project.relpath) | 480 | err_results.append(project.relpath) |
483 | if opt.fail_fast: | 481 | if opt.fail_fast: |
484 | return False | 482 | if pool: |
483 | pool.close() | ||
484 | return ret | ||
485 | pm.update(msg=project.name) | 485 | pm.update(msg=project.name) |
486 | return True | 486 | return ret |
487 | |||
488 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | ||
489 | if len(all_projects) == 1 or jobs == 1: | ||
490 | if not _ProcessResults(self._CheckoutOne(opt, x) for x in all_projects): | ||
491 | ret = False | ||
492 | else: | ||
493 | with multiprocessing.Pool(jobs) as pool: | ||
494 | results = pool.imap_unordered( | ||
495 | functools.partial(self._CheckoutOne, opt), | ||
496 | all_projects, | ||
497 | chunksize=WORKER_BATCH_SIZE) | ||
498 | if not _ProcessResults(results): | ||
499 | ret = False | ||
500 | pool.close() | ||
501 | |||
502 | pm.end() | ||
503 | 487 | ||
504 | return ret and not err_results | 488 | return self.ExecuteInParallel( |
489 | opt.jobs_checkout if opt.jobs_checkout else self.jobs, | ||
490 | functools.partial(self._CheckoutOne, opt.detach_head, opt.force_sync), | ||
491 | all_projects, | ||
492 | callback=_ProcessResults, | ||
493 | output=Progress('Checking out', len(all_projects), quiet=opt.quiet)) and not err_results | ||
505 | 494 | ||
506 | def _GCProjects(self, projects, opt, err_event): | 495 | def _GCProjects(self, projects, opt, err_event): |
507 | pm = Progress('Garbage collecting', len(projects), delay=False, quiet=opt.quiet) | 496 | pm = Progress('Garbage collecting', len(projects), delay=False, quiet=opt.quiet) |