diff options
| -rw-r--r-- | command.py | 40 | ||||
| -rw-r--r-- | subcmds/abandon.py | 26 | ||||
| -rw-r--r-- | subcmds/branches.py | 16 | ||||
| -rw-r--r-- | subcmds/checkout.py | 22 | ||||
| -rw-r--r-- | subcmds/diff.py | 33 | ||||
| -rw-r--r-- | subcmds/grep.py | 24 | ||||
| -rw-r--r-- | subcmds/prune.py | 20 | ||||
| -rw-r--r-- | subcmds/start.py | 26 | ||||
| -rw-r--r-- | subcmds/status.py | 32 | ||||
| -rw-r--r-- | subcmds/sync.py | 49 |
10 files changed, 145 insertions, 143 deletions
| @@ -12,6 +12,7 @@ | |||
| 12 | # See the License for the specific language governing permissions and | 12 | # See the License for the specific language governing permissions and |
| 13 | # limitations under the License. | 13 | # limitations under the License. |
| 14 | 14 | ||
| 15 | import multiprocessing | ||
| 15 | import os | 16 | import os |
| 16 | import optparse | 17 | import optparse |
| 17 | import platform | 18 | import platform |
| @@ -21,6 +22,7 @@ import sys | |||
| 21 | from event_log import EventLog | 22 | from event_log import EventLog |
| 22 | from error import NoSuchProjectError | 23 | from error import NoSuchProjectError |
| 23 | from error import InvalidProjectGroupsError | 24 | from error import InvalidProjectGroupsError |
| 25 | import progress | ||
| 24 | 26 | ||
| 25 | 27 | ||
| 26 | # Number of projects to submit to a single worker process at a time. | 28 | # Number of projects to submit to a single worker process at a time. |
| @@ -156,6 +158,44 @@ class Command(object): | |||
| 156 | """ | 158 | """ |
| 157 | raise NotImplementedError | 159 | raise NotImplementedError |
| 158 | 160 | ||
| 161 | @staticmethod | ||
| 162 | def ExecuteInParallel(jobs, func, inputs, callback, output=None, ordered=False): | ||
| 163 | """Helper for managing parallel execution boiler plate. | ||
| 164 | |||
| 165 | For subcommands that can easily split their work up. | ||
| 166 | |||
| 167 | Args: | ||
| 168 | jobs: How many parallel processes to use. | ||
| 169 | func: The function to apply to each of the |inputs|. Usually a | ||
| 170 | functools.partial for wrapping additional arguments. It will be run | ||
| 171 | in a separate process, so it must be pickalable, so nested functions | ||
| 172 | won't work. Methods on the subcommand Command class should work. | ||
| 173 | inputs: The list of items to process. Must be a list. | ||
| 174 | callback: The function to pass the results to for processing. It will be | ||
| 175 | executed in the main thread and process the results of |func| as they | ||
| 176 | become available. Thus it may be a local nested function. Its return | ||
| 177 | value is passed back directly. It takes three arguments: | ||
| 178 | - The processing pool (or None with one job). | ||
| 179 | - The |output| argument. | ||
| 180 | - An iterator for the results. | ||
| 181 | output: An output manager. May be progress.Progess or color.Coloring. | ||
| 182 | ordered: Whether the jobs should be processed in order. | ||
| 183 | |||
| 184 | Returns: | ||
| 185 | The |callback| function's results are returned. | ||
| 186 | """ | ||
| 187 | try: | ||
| 188 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | ||
| 189 | if len(inputs) == 1 or jobs == 1: | ||
| 190 | return callback(None, output, (func(x) for x in inputs)) | ||
| 191 | else: | ||
| 192 | with multiprocessing.Pool(jobs) as pool: | ||
| 193 | submit = pool.imap if ordered else pool.imap_unordered | ||
| 194 | return callback(pool, output, submit(func, inputs, chunksize=WORKER_BATCH_SIZE)) | ||
| 195 | finally: | ||
| 196 | if isinstance(output, progress.Progress): | ||
| 197 | output.end() | ||
| 198 | |||
| 159 | def _ResetPathToProjectMap(self, projects): | 199 | def _ResetPathToProjectMap(self, projects): |
| 160 | self._by_path = dict((p.worktree, p) for p in projects) | 200 | self._by_path = dict((p.worktree, p) for p in projects) |
| 161 | 201 | ||
diff --git a/subcmds/abandon.py b/subcmds/abandon.py index 1d22917e..c7c127d6 100644 --- a/subcmds/abandon.py +++ b/subcmds/abandon.py | |||
| @@ -15,10 +15,9 @@ | |||
| 15 | from collections import defaultdict | 15 | from collections import defaultdict |
| 16 | import functools | 16 | import functools |
| 17 | import itertools | 17 | import itertools |
| 18 | import multiprocessing | ||
| 19 | import sys | 18 | import sys |
| 20 | 19 | ||
| 21 | from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE | 20 | from command import Command, DEFAULT_LOCAL_JOBS |
| 22 | from git_command import git | 21 | from git_command import git |
| 23 | from progress import Progress | 22 | from progress import Progress |
| 24 | 23 | ||
| @@ -52,9 +51,9 @@ It is equivalent to "git branch -D <branchname>". | |||
| 52 | else: | 51 | else: |
| 53 | args.insert(0, "'All local branches'") | 52 | args.insert(0, "'All local branches'") |
| 54 | 53 | ||
| 55 | def _ExecuteOne(self, opt, nb, project): | 54 | def _ExecuteOne(self, all_branches, nb, project): |
| 56 | """Abandon one project.""" | 55 | """Abandon one project.""" |
| 57 | if opt.all: | 56 | if all_branches: |
| 58 | branches = project.GetBranches() | 57 | branches = project.GetBranches() |
| 59 | else: | 58 | else: |
| 60 | branches = [nb] | 59 | branches = [nb] |
| @@ -72,7 +71,7 @@ It is equivalent to "git branch -D <branchname>". | |||
| 72 | success = defaultdict(list) | 71 | success = defaultdict(list) |
| 73 | all_projects = self.GetProjects(args[1:]) | 72 | all_projects = self.GetProjects(args[1:]) |
| 74 | 73 | ||
| 75 | def _ProcessResults(states): | 74 | def _ProcessResults(_pool, pm, states): |
| 76 | for (results, project) in states: | 75 | for (results, project) in states: |
| 77 | for branch, status in results.items(): | 76 | for branch, status in results.items(): |
| 78 | if status: | 77 | if status: |
| @@ -81,17 +80,12 @@ It is equivalent to "git branch -D <branchname>". | |||
| 81 | err[branch].append(project) | 80 | err[branch].append(project) |
| 82 | pm.update() | 81 | pm.update() |
| 83 | 82 | ||
| 84 | pm = Progress('Abandon %s' % nb, len(all_projects), quiet=opt.quiet) | 83 | self.ExecuteInParallel( |
| 85 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 84 | opt.jobs, |
| 86 | if len(all_projects) == 1 or opt.jobs == 1: | 85 | functools.partial(self._ExecuteOne, opt.all, nb), |
| 87 | _ProcessResults(self._ExecuteOne(opt, nb, x) for x in all_projects) | 86 | all_projects, |
| 88 | else: | 87 | callback=_ProcessResults, |
| 89 | with multiprocessing.Pool(opt.jobs) as pool: | 88 | output=Progress('Abandon %s' % (nb,), len(all_projects), quiet=opt.quiet)) |
| 90 | states = pool.imap_unordered( | ||
| 91 | functools.partial(self._ExecuteOne, opt, nb), all_projects, | ||
| 92 | chunksize=WORKER_BATCH_SIZE) | ||
| 93 | _ProcessResults(states) | ||
| 94 | pm.end() | ||
| 95 | 89 | ||
| 96 | width = max(itertools.chain( | 90 | width = max(itertools.chain( |
| 97 | [25], (len(x) for x in itertools.chain(success, err)))) | 91 | [25], (len(x) for x in itertools.chain(success, err)))) |
diff --git a/subcmds/branches.py b/subcmds/branches.py index d5ea580c..2dc102bb 100644 --- a/subcmds/branches.py +++ b/subcmds/branches.py | |||
| @@ -13,10 +13,10 @@ | |||
| 13 | # limitations under the License. | 13 | # limitations under the License. |
| 14 | 14 | ||
| 15 | import itertools | 15 | import itertools |
| 16 | import multiprocessing | ||
| 17 | import sys | 16 | import sys |
| 17 | |||
| 18 | from color import Coloring | 18 | from color import Coloring |
| 19 | from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE | 19 | from command import Command, DEFAULT_LOCAL_JOBS |
| 20 | 20 | ||
| 21 | 21 | ||
| 22 | class BranchColoring(Coloring): | 22 | class BranchColoring(Coloring): |
| @@ -102,15 +102,19 @@ is shown, then the branch appears in all projects. | |||
| 102 | out = BranchColoring(self.manifest.manifestProject.config) | 102 | out = BranchColoring(self.manifest.manifestProject.config) |
| 103 | all_branches = {} | 103 | all_branches = {} |
| 104 | project_cnt = len(projects) | 104 | project_cnt = len(projects) |
| 105 | with multiprocessing.Pool(processes=opt.jobs) as pool: | ||
| 106 | project_branches = pool.imap_unordered( | ||
| 107 | expand_project_to_branches, projects, chunksize=WORKER_BATCH_SIZE) | ||
| 108 | 105 | ||
| 109 | for name, b in itertools.chain.from_iterable(project_branches): | 106 | def _ProcessResults(_pool, _output, results): |
| 107 | for name, b in itertools.chain.from_iterable(results): | ||
| 110 | if name not in all_branches: | 108 | if name not in all_branches: |
| 111 | all_branches[name] = BranchInfo(name) | 109 | all_branches[name] = BranchInfo(name) |
| 112 | all_branches[name].add(b) | 110 | all_branches[name].add(b) |
| 113 | 111 | ||
| 112 | self.ExecuteInParallel( | ||
| 113 | opt.jobs, | ||
| 114 | expand_project_to_branches, | ||
| 115 | projects, | ||
| 116 | callback=_ProcessResults) | ||
| 117 | |||
| 114 | names = sorted(all_branches) | 118 | names = sorted(all_branches) |
| 115 | 119 | ||
| 116 | if not names: | 120 | if not names: |
diff --git a/subcmds/checkout.py b/subcmds/checkout.py index 6b71a8fa..4d8009b1 100644 --- a/subcmds/checkout.py +++ b/subcmds/checkout.py | |||
| @@ -13,10 +13,9 @@ | |||
| 13 | # limitations under the License. | 13 | # limitations under the License. |
| 14 | 14 | ||
| 15 | import functools | 15 | import functools |
| 16 | import multiprocessing | ||
| 17 | import sys | 16 | import sys |
| 18 | 17 | ||
| 19 | from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE | 18 | from command import Command, DEFAULT_LOCAL_JOBS |
| 20 | from progress import Progress | 19 | from progress import Progress |
| 21 | 20 | ||
| 22 | 21 | ||
| @@ -50,7 +49,7 @@ The command is equivalent to: | |||
| 50 | success = [] | 49 | success = [] |
| 51 | all_projects = self.GetProjects(args[1:]) | 50 | all_projects = self.GetProjects(args[1:]) |
| 52 | 51 | ||
| 53 | def _ProcessResults(results): | 52 | def _ProcessResults(_pool, pm, results): |
| 54 | for status, project in results: | 53 | for status, project in results: |
| 55 | if status is not None: | 54 | if status is not None: |
| 56 | if status: | 55 | if status: |
| @@ -59,17 +58,12 @@ The command is equivalent to: | |||
| 59 | err.append(project) | 58 | err.append(project) |
| 60 | pm.update() | 59 | pm.update() |
| 61 | 60 | ||
| 62 | pm = Progress('Checkout %s' % nb, len(all_projects), quiet=opt.quiet) | 61 | self.ExecuteInParallel( |
| 63 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 62 | opt.jobs, |
| 64 | if len(all_projects) == 1 or opt.jobs == 1: | 63 | functools.partial(self._ExecuteOne, nb), |
| 65 | _ProcessResults(self._ExecuteOne(nb, x) for x in all_projects) | 64 | all_projects, |
| 66 | else: | 65 | callback=_ProcessResults, |
| 67 | with multiprocessing.Pool(opt.jobs) as pool: | 66 | output=Progress('Checkout %s' % (nb,), len(all_projects), quiet=opt.quiet)) |
| 68 | results = pool.imap_unordered( | ||
| 69 | functools.partial(self._ExecuteOne, nb), all_projects, | ||
| 70 | chunksize=WORKER_BATCH_SIZE) | ||
| 71 | _ProcessResults(results) | ||
| 72 | pm.end() | ||
| 73 | 67 | ||
| 74 | if err: | 68 | if err: |
| 75 | for p in err: | 69 | for p in err: |
diff --git a/subcmds/diff.py b/subcmds/diff.py index cdc262e6..4966bb1a 100644 --- a/subcmds/diff.py +++ b/subcmds/diff.py | |||
| @@ -14,9 +14,8 @@ | |||
| 14 | 14 | ||
| 15 | import functools | 15 | import functools |
| 16 | import io | 16 | import io |
| 17 | import multiprocessing | ||
| 18 | 17 | ||
| 19 | from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE | 18 | from command import DEFAULT_LOCAL_JOBS, PagedCommand |
| 20 | 19 | ||
| 21 | 20 | ||
| 22 | class Diff(PagedCommand): | 21 | class Diff(PagedCommand): |
| @@ -36,7 +35,7 @@ to the Unix 'patch' command. | |||
| 36 | dest='absolute', action='store_true', | 35 | dest='absolute', action='store_true', |
| 37 | help='Paths are relative to the repository root') | 36 | help='Paths are relative to the repository root') |
| 38 | 37 | ||
| 39 | def _DiffHelper(self, absolute, project): | 38 | def _ExecuteOne(self, absolute, project): |
| 40 | """Obtains the diff for a specific project. | 39 | """Obtains the diff for a specific project. |
| 41 | 40 | ||
| 42 | Args: | 41 | Args: |
| @@ -51,22 +50,20 @@ to the Unix 'patch' command. | |||
| 51 | return (ret, buf.getvalue()) | 50 | return (ret, buf.getvalue()) |
| 52 | 51 | ||
| 53 | def Execute(self, opt, args): | 52 | def Execute(self, opt, args): |
| 54 | ret = 0 | ||
| 55 | all_projects = self.GetProjects(args) | 53 | all_projects = self.GetProjects(args) |
| 56 | 54 | ||
| 57 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 55 | def _ProcessResults(_pool, _output, results): |
| 58 | if len(all_projects) == 1 or opt.jobs == 1: | 56 | ret = 0 |
| 59 | for project in all_projects: | 57 | for (state, output) in results: |
| 60 | if not project.PrintWorkTreeDiff(opt.absolute): | 58 | if output: |
| 59 | print(output, end='') | ||
| 60 | if not state: | ||
| 61 | ret = 1 | 61 | ret = 1 |
| 62 | else: | 62 | return ret |
| 63 | with multiprocessing.Pool(opt.jobs) as pool: | ||
| 64 | states = pool.imap(functools.partial(self._DiffHelper, opt.absolute), | ||
| 65 | all_projects, WORKER_BATCH_SIZE) | ||
| 66 | for (state, output) in states: | ||
| 67 | if output: | ||
| 68 | print(output, end='') | ||
| 69 | if not state: | ||
| 70 | ret = 1 | ||
| 71 | 63 | ||
| 72 | return ret | 64 | return self.ExecuteInParallel( |
| 65 | opt.jobs, | ||
| 66 | functools.partial(self._ExecuteOne, opt.absolute), | ||
| 67 | all_projects, | ||
| 68 | callback=_ProcessResults, | ||
| 69 | ordered=True) | ||
diff --git a/subcmds/grep.py b/subcmds/grep.py index 9a4a8a36..6cb1445a 100644 --- a/subcmds/grep.py +++ b/subcmds/grep.py | |||
| @@ -13,11 +13,10 @@ | |||
| 13 | # limitations under the License. | 13 | # limitations under the License. |
| 14 | 14 | ||
| 15 | import functools | 15 | import functools |
| 16 | import multiprocessing | ||
| 17 | import sys | 16 | import sys |
| 18 | 17 | ||
| 19 | from color import Coloring | 18 | from color import Coloring |
| 20 | from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE | 19 | from command import DEFAULT_LOCAL_JOBS, PagedCommand |
| 21 | from error import GitError | 20 | from error import GitError |
| 22 | from git_command import GitCommand | 21 | from git_command import GitCommand |
| 23 | 22 | ||
| @@ -173,7 +172,7 @@ contain a line that matches both expressions: | |||
| 173 | return (project, p.Wait(), p.stdout, p.stderr) | 172 | return (project, p.Wait(), p.stdout, p.stderr) |
| 174 | 173 | ||
| 175 | @staticmethod | 174 | @staticmethod |
| 176 | def _ProcessResults(out, full_name, have_rev, results): | 175 | def _ProcessResults(full_name, have_rev, _pool, out, results): |
| 177 | git_failed = False | 176 | git_failed = False |
| 178 | bad_rev = False | 177 | bad_rev = False |
| 179 | have_match = False | 178 | have_match = False |
| @@ -256,18 +255,13 @@ contain a line that matches both expressions: | |||
| 256 | cmd_argv.extend(opt.revision) | 255 | cmd_argv.extend(opt.revision) |
| 257 | cmd_argv.append('--') | 256 | cmd_argv.append('--') |
| 258 | 257 | ||
| 259 | process_results = functools.partial( | 258 | git_failed, bad_rev, have_match = self.ExecuteInParallel( |
| 260 | self._ProcessResults, out, full_name, have_rev) | 259 | opt.jobs, |
| 261 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 260 | functools.partial(self._ExecuteOne, cmd_argv), |
| 262 | if len(projects) == 1 or opt.jobs == 1: | 261 | projects, |
| 263 | git_failed, bad_rev, have_match = process_results( | 262 | callback=functools.partial(self._ProcessResults, full_name, have_rev), |
| 264 | self._ExecuteOne(cmd_argv, x) for x in projects) | 263 | output=out, |
| 265 | else: | 264 | ordered=True) |
| 266 | with multiprocessing.Pool(opt.jobs) as pool: | ||
| 267 | results = pool.imap( | ||
| 268 | functools.partial(self._ExecuteOne, cmd_argv), projects, | ||
| 269 | chunksize=WORKER_BATCH_SIZE) | ||
| 270 | git_failed, bad_rev, have_match = process_results(results) | ||
| 271 | 265 | ||
| 272 | if git_failed: | 266 | if git_failed: |
| 273 | sys.exit(1) | 267 | sys.exit(1) |
diff --git a/subcmds/prune.py b/subcmds/prune.py index 4084c8b6..236b647f 100644 --- a/subcmds/prune.py +++ b/subcmds/prune.py | |||
| @@ -13,10 +13,9 @@ | |||
| 13 | # limitations under the License. | 13 | # limitations under the License. |
| 14 | 14 | ||
| 15 | import itertools | 15 | import itertools |
| 16 | import multiprocessing | ||
| 17 | 16 | ||
| 18 | from color import Coloring | 17 | from color import Coloring |
| 19 | from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE | 18 | from command import DEFAULT_LOCAL_JOBS, PagedCommand |
| 20 | 19 | ||
| 21 | 20 | ||
| 22 | class Prune(PagedCommand): | 21 | class Prune(PagedCommand): |
| @@ -36,18 +35,15 @@ class Prune(PagedCommand): | |||
| 36 | 35 | ||
| 37 | # NB: Should be able to refactor this module to display summary as results | 36 | # NB: Should be able to refactor this module to display summary as results |
| 38 | # come back from children. | 37 | # come back from children. |
| 39 | def _ProcessResults(results): | 38 | def _ProcessResults(_pool, _output, results): |
| 40 | return list(itertools.chain.from_iterable(results)) | 39 | return list(itertools.chain.from_iterable(results)) |
| 41 | 40 | ||
| 42 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 41 | all_branches = self.ExecuteInParallel( |
| 43 | if len(projects) == 1 or opt.jobs == 1: | 42 | opt.jobs, |
| 44 | all_branches = _ProcessResults(self._ExecuteOne(x) for x in projects) | 43 | self._ExecuteOne, |
| 45 | else: | 44 | projects, |
| 46 | with multiprocessing.Pool(opt.jobs) as pool: | 45 | callback=_ProcessResults, |
| 47 | results = pool.imap( | 46 | ordered=True) |
| 48 | self._ExecuteOne, projects, | ||
| 49 | chunksize=WORKER_BATCH_SIZE) | ||
| 50 | all_branches = _ProcessResults(results) | ||
| 51 | 47 | ||
| 52 | if not all_branches: | 48 | if not all_branches: |
| 53 | return | 49 | return |
diff --git a/subcmds/start.py b/subcmds/start.py index aa2f915a..ff2bae56 100644 --- a/subcmds/start.py +++ b/subcmds/start.py | |||
| @@ -13,11 +13,10 @@ | |||
| 13 | # limitations under the License. | 13 | # limitations under the License. |
| 14 | 14 | ||
| 15 | import functools | 15 | import functools |
| 16 | import multiprocessing | ||
| 17 | import os | 16 | import os |
| 18 | import sys | 17 | import sys |
| 19 | 18 | ||
| 20 | from command import Command, DEFAULT_LOCAL_JOBS, WORKER_BATCH_SIZE | 19 | from command import Command, DEFAULT_LOCAL_JOBS |
| 21 | from git_config import IsImmutable | 20 | from git_config import IsImmutable |
| 22 | from git_command import git | 21 | from git_command import git |
| 23 | import gitc_utils | 22 | import gitc_utils |
| @@ -55,7 +54,7 @@ revision specified in the manifest. | |||
| 55 | if not git.check_ref_format('heads/%s' % nb): | 54 | if not git.check_ref_format('heads/%s' % nb): |
| 56 | self.OptionParser.error("'%s' is not a valid name" % nb) | 55 | self.OptionParser.error("'%s' is not a valid name" % nb) |
| 57 | 56 | ||
| 58 | def _ExecuteOne(self, opt, nb, project): | 57 | def _ExecuteOne(self, revision, nb, project): |
| 59 | """Start one project.""" | 58 | """Start one project.""" |
| 60 | # If the current revision is immutable, such as a SHA1, a tag or | 59 | # If the current revision is immutable, such as a SHA1, a tag or |
| 61 | # a change, then we can't push back to it. Substitute with | 60 | # a change, then we can't push back to it. Substitute with |
| @@ -69,7 +68,7 @@ revision specified in the manifest. | |||
| 69 | 68 | ||
| 70 | try: | 69 | try: |
| 71 | ret = project.StartBranch( | 70 | ret = project.StartBranch( |
| 72 | nb, branch_merge=branch_merge, revision=opt.revision) | 71 | nb, branch_merge=branch_merge, revision=revision) |
| 73 | except Exception as e: | 72 | except Exception as e: |
| 74 | print('error: unable to checkout %s: %s' % (project.name, e), file=sys.stderr) | 73 | print('error: unable to checkout %s: %s' % (project.name, e), file=sys.stderr) |
| 75 | ret = False | 74 | ret = False |
| @@ -123,23 +122,18 @@ revision specified in the manifest. | |||
| 123 | pm.update() | 122 | pm.update() |
| 124 | pm.end() | 123 | pm.end() |
| 125 | 124 | ||
| 126 | def _ProcessResults(results): | 125 | def _ProcessResults(_pool, pm, results): |
| 127 | for (result, project) in results: | 126 | for (result, project) in results: |
| 128 | if not result: | 127 | if not result: |
| 129 | err.append(project) | 128 | err.append(project) |
| 130 | pm.update() | 129 | pm.update() |
| 131 | 130 | ||
| 132 | pm = Progress('Starting %s' % nb, len(all_projects), quiet=opt.quiet) | 131 | self.ExecuteInParallel( |
| 133 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 132 | opt.jobs, |
| 134 | if len(all_projects) == 1 or opt.jobs == 1: | 133 | functools.partial(self._ExecuteOne, opt.revision, nb), |
| 135 | _ProcessResults(self._ExecuteOne(opt, nb, x) for x in all_projects) | 134 | all_projects, |
| 136 | else: | 135 | callback=_ProcessResults, |
| 137 | with multiprocessing.Pool(opt.jobs) as pool: | 136 | output=Progress('Starting %s' % (nb,), len(all_projects), quiet=opt.quiet)) |
| 138 | results = pool.imap_unordered( | ||
| 139 | functools.partial(self._ExecuteOne, opt, nb), all_projects, | ||
| 140 | chunksize=WORKER_BATCH_SIZE) | ||
| 141 | _ProcessResults(results) | ||
| 142 | pm.end() | ||
| 143 | 137 | ||
| 144 | if err: | 138 | if err: |
| 145 | for p in err: | 139 | for p in err: |
diff --git a/subcmds/status.py b/subcmds/status.py index dc223a00..1b48dcea 100644 --- a/subcmds/status.py +++ b/subcmds/status.py | |||
| @@ -15,10 +15,9 @@ | |||
| 15 | import functools | 15 | import functools |
| 16 | import glob | 16 | import glob |
| 17 | import io | 17 | import io |
| 18 | import multiprocessing | ||
| 19 | import os | 18 | import os |
| 20 | 19 | ||
| 21 | from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE | 20 | from command import DEFAULT_LOCAL_JOBS, PagedCommand |
| 22 | 21 | ||
| 23 | from color import Coloring | 22 | from color import Coloring |
| 24 | import platform_utils | 23 | import platform_utils |
| @@ -119,22 +118,23 @@ the following meanings: | |||
| 119 | 118 | ||
| 120 | def Execute(self, opt, args): | 119 | def Execute(self, opt, args): |
| 121 | all_projects = self.GetProjects(args) | 120 | all_projects = self.GetProjects(args) |
| 122 | counter = 0 | ||
| 123 | 121 | ||
| 124 | if opt.jobs == 1: | 122 | def _ProcessResults(_pool, _output, results): |
| 125 | for project in all_projects: | 123 | ret = 0 |
| 126 | state = project.PrintWorkTreeStatus(quiet=opt.quiet) | 124 | for (state, output) in results: |
| 125 | if output: | ||
| 126 | print(output, end='') | ||
| 127 | if state == 'CLEAN': | 127 | if state == 'CLEAN': |
| 128 | counter += 1 | 128 | ret += 1 |
| 129 | else: | 129 | return ret |
| 130 | with multiprocessing.Pool(opt.jobs) as pool: | 130 | |
| 131 | states = pool.imap(functools.partial(self._StatusHelper, opt.quiet), | 131 | counter = self.ExecuteInParallel( |
| 132 | all_projects, chunksize=WORKER_BATCH_SIZE) | 132 | opt.jobs, |
| 133 | for (state, output) in states: | 133 | functools.partial(self._StatusHelper, opt.quiet), |
| 134 | if output: | 134 | all_projects, |
| 135 | print(output, end='') | 135 | callback=_ProcessResults, |
| 136 | if state == 'CLEAN': | 136 | ordered=True) |
| 137 | counter += 1 | 137 | |
| 138 | if not opt.quiet and len(all_projects) == counter: | 138 | if not opt.quiet and len(all_projects) == counter: |
| 139 | print('nothing to commit (working directory clean)') | 139 | print('nothing to commit (working directory clean)') |
| 140 | 140 | ||
diff --git a/subcmds/sync.py b/subcmds/sync.py index 21166af5..4763fadc 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
| @@ -51,7 +51,7 @@ import git_superproject | |||
| 51 | import gitc_utils | 51 | import gitc_utils |
| 52 | from project import Project | 52 | from project import Project |
| 53 | from project import RemoteSpec | 53 | from project import RemoteSpec |
| 54 | from command import Command, MirrorSafeCommand, WORKER_BATCH_SIZE | 54 | from command import Command, MirrorSafeCommand |
| 55 | from error import RepoChangedException, GitError, ManifestParseError | 55 | from error import RepoChangedException, GitError, ManifestParseError |
| 56 | import platform_utils | 56 | import platform_utils |
| 57 | from project import SyncBuffer | 57 | from project import SyncBuffer |
| @@ -428,11 +428,12 @@ later is required to fix a server side protocol bug. | |||
| 428 | 428 | ||
| 429 | return (ret, fetched) | 429 | return (ret, fetched) |
| 430 | 430 | ||
| 431 | def _CheckoutOne(self, opt, project): | 431 | def _CheckoutOne(self, detach_head, force_sync, project): |
| 432 | """Checkout work tree for one project | 432 | """Checkout work tree for one project |
| 433 | 433 | ||
| 434 | Args: | 434 | Args: |
| 435 | opt: Program options returned from optparse. See _Options(). | 435 | detach_head: Whether to leave a detached HEAD. |
| 436 | force_sync: Force checking out of the repo. | ||
| 436 | project: Project object for the project to checkout. | 437 | project: Project object for the project to checkout. |
| 437 | 438 | ||
| 438 | Returns: | 439 | Returns: |
| @@ -440,10 +441,10 @@ later is required to fix a server side protocol bug. | |||
| 440 | """ | 441 | """ |
| 441 | start = time.time() | 442 | start = time.time() |
| 442 | syncbuf = SyncBuffer(self.manifest.manifestProject.config, | 443 | syncbuf = SyncBuffer(self.manifest.manifestProject.config, |
| 443 | detach_head=opt.detach_head) | 444 | detach_head=detach_head) |
| 444 | success = False | 445 | success = False |
| 445 | try: | 446 | try: |
| 446 | project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync) | 447 | project.Sync_LocalHalf(syncbuf, force_sync=force_sync) |
| 447 | success = syncbuf.Finish() | 448 | success = syncbuf.Finish() |
| 448 | except Exception as e: | 449 | except Exception as e: |
| 449 | print('error: Cannot checkout %s: %s: %s' % | 450 | print('error: Cannot checkout %s: %s: %s' % |
| @@ -464,44 +465,32 @@ later is required to fix a server side protocol bug. | |||
| 464 | opt: Program options returned from optparse. See _Options(). | 465 | opt: Program options returned from optparse. See _Options(). |
| 465 | err_results: A list of strings, paths to git repos where checkout failed. | 466 | err_results: A list of strings, paths to git repos where checkout failed. |
| 466 | """ | 467 | """ |
| 467 | ret = True | ||
| 468 | jobs = opt.jobs_checkout if opt.jobs_checkout else self.jobs | ||
| 469 | |||
| 470 | # Only checkout projects with worktrees. | 468 | # Only checkout projects with worktrees. |
| 471 | all_projects = [x for x in all_projects if x.worktree] | 469 | all_projects = [x for x in all_projects if x.worktree] |
| 472 | 470 | ||
| 473 | pm = Progress('Checking out', len(all_projects), quiet=opt.quiet) | 471 | def _ProcessResults(pool, pm, results): |
| 474 | 472 | ret = True | |
| 475 | def _ProcessResults(results): | ||
| 476 | for (success, project, start, finish) in results: | 473 | for (success, project, start, finish) in results: |
| 477 | self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, | 474 | self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, |
| 478 | start, finish, success) | 475 | start, finish, success) |
| 479 | # Check for any errors before running any more tasks. | 476 | # Check for any errors before running any more tasks. |
| 480 | # ...we'll let existing jobs finish, though. | 477 | # ...we'll let existing jobs finish, though. |
| 481 | if not success: | 478 | if not success: |
| 479 | ret = False | ||
| 482 | err_results.append(project.relpath) | 480 | err_results.append(project.relpath) |
| 483 | if opt.fail_fast: | 481 | if opt.fail_fast: |
| 484 | return False | 482 | if pool: |
| 483 | pool.close() | ||
| 484 | return ret | ||
| 485 | pm.update(msg=project.name) | 485 | pm.update(msg=project.name) |
| 486 | return True | 486 | return ret |
| 487 | |||
| 488 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | ||
| 489 | if len(all_projects) == 1 or jobs == 1: | ||
| 490 | if not _ProcessResults(self._CheckoutOne(opt, x) for x in all_projects): | ||
| 491 | ret = False | ||
| 492 | else: | ||
| 493 | with multiprocessing.Pool(jobs) as pool: | ||
| 494 | results = pool.imap_unordered( | ||
| 495 | functools.partial(self._CheckoutOne, opt), | ||
| 496 | all_projects, | ||
| 497 | chunksize=WORKER_BATCH_SIZE) | ||
| 498 | if not _ProcessResults(results): | ||
| 499 | ret = False | ||
| 500 | pool.close() | ||
| 501 | |||
| 502 | pm.end() | ||
| 503 | 487 | ||
| 504 | return ret and not err_results | 488 | return self.ExecuteInParallel( |
| 489 | opt.jobs_checkout if opt.jobs_checkout else self.jobs, | ||
| 490 | functools.partial(self._CheckoutOne, opt.detach_head, opt.force_sync), | ||
| 491 | all_projects, | ||
| 492 | callback=_ProcessResults, | ||
| 493 | output=Progress('Checking out', len(all_projects), quiet=opt.quiet)) and not err_results | ||
| 505 | 494 | ||
| 506 | def _GCProjects(self, projects, opt, err_event): | 495 | def _GCProjects(self, projects, opt, err_event): |
| 507 | pm = Progress('Garbage collecting', len(projects), delay=False, quiet=opt.quiet) | 496 | pm = Progress('Garbage collecting', len(projects), delay=False, quiet=opt.quiet) |
