diff options
| -rw-r--r-- | project.py | 19 | ||||
| -rw-r--r-- | subcmds/sync.py | 27 | ||||
| -rw-r--r-- | tests/test_subcmds_sync.py | 24 |
3 files changed, 43 insertions, 27 deletions
| @@ -2396,26 +2396,25 @@ class Project: | |||
| 2396 | try: | 2396 | try: |
| 2397 | # if revision (sha or tag) is not present then following function | 2397 | # if revision (sha or tag) is not present then following function |
| 2398 | # throws an error. | 2398 | # throws an error. |
| 2399 | revs = [f"{self.revisionExpr}^0"] | ||
| 2400 | upstream_rev = None | ||
| 2401 | if self.upstream: | ||
| 2402 | upstream_rev = self.GetRemote().ToLocal(self.upstream) | ||
| 2403 | revs.append(upstream_rev) | ||
| 2404 | |||
| 2399 | self.bare_git.rev_list( | 2405 | self.bare_git.rev_list( |
| 2400 | "-1", | 2406 | "-1", |
| 2401 | "--missing=allow-any", | 2407 | "--missing=allow-any", |
| 2402 | "%s^0" % self.revisionExpr, | 2408 | *revs, |
| 2403 | "--", | 2409 | "--", |
| 2404 | log_as_error=False, | 2410 | log_as_error=False, |
| 2405 | ) | 2411 | ) |
| 2412 | |||
| 2406 | if self.upstream: | 2413 | if self.upstream: |
| 2407 | rev = self.GetRemote().ToLocal(self.upstream) | ||
| 2408 | self.bare_git.rev_list( | ||
| 2409 | "-1", | ||
| 2410 | "--missing=allow-any", | ||
| 2411 | "%s^0" % rev, | ||
| 2412 | "--", | ||
| 2413 | log_as_error=False, | ||
| 2414 | ) | ||
| 2415 | self.bare_git.merge_base( | 2414 | self.bare_git.merge_base( |
| 2416 | "--is-ancestor", | 2415 | "--is-ancestor", |
| 2417 | self.revisionExpr, | 2416 | self.revisionExpr, |
| 2418 | rev, | 2417 | upstream_rev, |
| 2419 | log_as_error=False, | 2418 | log_as_error=False, |
| 2420 | ) | 2419 | ) |
| 2421 | return True | 2420 | return True |
diff --git a/subcmds/sync.py b/subcmds/sync.py index 0ae59f55..bebe18b9 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
| @@ -131,6 +131,11 @@ def _SafeCheckoutOrder(checkouts: List[Project]) -> List[List[Project]]: | |||
| 131 | return res | 131 | return res |
| 132 | 132 | ||
| 133 | 133 | ||
| 134 | def _chunksize(projects: int, jobs: int) -> int: | ||
| 135 | """Calculate chunk size for the given number of projects and jobs.""" | ||
| 136 | return min(max(1, projects // jobs), WORKER_BATCH_SIZE) | ||
| 137 | |||
| 138 | |||
| 134 | class _FetchOneResult(NamedTuple): | 139 | class _FetchOneResult(NamedTuple): |
| 135 | """_FetchOne return value. | 140 | """_FetchOne return value. |
| 136 | 141 | ||
| @@ -819,7 +824,6 @@ later is required to fix a server side protocol bug. | |||
| 819 | def _Fetch(self, projects, opt, err_event, ssh_proxy, errors): | 824 | def _Fetch(self, projects, opt, err_event, ssh_proxy, errors): |
| 820 | ret = True | 825 | ret = True |
| 821 | 826 | ||
| 822 | jobs = opt.jobs_network | ||
| 823 | fetched = set() | 827 | fetched = set() |
| 824 | remote_fetched = set() | 828 | remote_fetched = set() |
| 825 | pm = Progress( | 829 | pm = Progress( |
| @@ -849,6 +853,8 @@ later is required to fix a server side protocol bug. | |||
| 849 | objdir_project_map.setdefault(project.objdir, []).append(project) | 853 | objdir_project_map.setdefault(project.objdir, []).append(project) |
| 850 | projects_list = list(objdir_project_map.values()) | 854 | projects_list = list(objdir_project_map.values()) |
| 851 | 855 | ||
| 856 | jobs = min(opt.jobs_network, len(projects_list)) | ||
| 857 | |||
| 852 | def _ProcessResults(results_sets): | 858 | def _ProcessResults(results_sets): |
| 853 | ret = True | 859 | ret = True |
| 854 | for results in results_sets: | 860 | for results in results_sets: |
| @@ -888,35 +894,22 @@ later is required to fix a server side protocol bug. | |||
| 888 | Sync.ssh_proxy = None | 894 | Sync.ssh_proxy = None |
| 889 | 895 | ||
| 890 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 896 | # NB: Multiprocessing is heavy, so don't spin it up for one job. |
| 891 | if len(projects_list) == 1 or jobs == 1: | 897 | if jobs == 1: |
| 892 | self._FetchInitChild(ssh_proxy) | 898 | self._FetchInitChild(ssh_proxy) |
| 893 | if not _ProcessResults( | 899 | if not _ProcessResults( |
| 894 | self._FetchProjectList(opt, x) for x in projects_list | 900 | self._FetchProjectList(opt, x) for x in projects_list |
| 895 | ): | 901 | ): |
| 896 | ret = False | 902 | ret = False |
| 897 | else: | 903 | else: |
| 898 | # Favor throughput over responsiveness when quiet. It seems that | 904 | if not opt.quiet: |
| 899 | # imap() will yield results in batches relative to chunksize, so | ||
| 900 | # even as the children finish a sync, we won't see the result until | ||
| 901 | # one child finishes ~chunksize jobs. When using a large --jobs | ||
| 902 | # with large chunksize, this can be jarring as there will be a large | ||
| 903 | # initial delay where repo looks like it isn't doing anything and | ||
| 904 | # sits at 0%, but then suddenly completes a lot of jobs all at once. | ||
| 905 | # Since this code is more network bound, we can accept a bit more | ||
| 906 | # CPU overhead with a smaller chunksize so that the user sees more | ||
| 907 | # immediate & continuous feedback. | ||
| 908 | if opt.quiet: | ||
| 909 | chunksize = WORKER_BATCH_SIZE | ||
| 910 | else: | ||
| 911 | pm.update(inc=0, msg="warming up") | 905 | pm.update(inc=0, msg="warming up") |
| 912 | chunksize = 4 | ||
| 913 | with multiprocessing.Pool( | 906 | with multiprocessing.Pool( |
| 914 | jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,) | 907 | jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,) |
| 915 | ) as pool: | 908 | ) as pool: |
| 916 | results = pool.imap_unordered( | 909 | results = pool.imap_unordered( |
| 917 | functools.partial(self._FetchProjectList, opt), | 910 | functools.partial(self._FetchProjectList, opt), |
| 918 | projects_list, | 911 | projects_list, |
| 919 | chunksize=chunksize, | 912 | chunksize=_chunksize(len(projects_list), jobs), |
| 920 | ) | 913 | ) |
| 921 | if not _ProcessResults(results): | 914 | if not _ProcessResults(results): |
| 922 | ret = False | 915 | ret = False |
diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py index 8dde687c..b871317c 100644 --- a/tests/test_subcmds_sync.py +++ b/tests/test_subcmds_sync.py | |||
| @@ -355,6 +355,30 @@ class SafeCheckoutOrder(unittest.TestCase): | |||
| 355 | ) | 355 | ) |
| 356 | 356 | ||
| 357 | 357 | ||
| 358 | class Chunksize(unittest.TestCase): | ||
| 359 | """Tests for _chunksize.""" | ||
| 360 | |||
| 361 | def test_single_project(self): | ||
| 362 | """Single project.""" | ||
| 363 | self.assertEqual(sync._chunksize(1, 1), 1) | ||
| 364 | |||
| 365 | def test_low_project_count(self): | ||
| 366 | """Multiple projects, low number of projects to sync.""" | ||
| 367 | self.assertEqual(sync._chunksize(10, 1), 10) | ||
| 368 | self.assertEqual(sync._chunksize(10, 2), 5) | ||
| 369 | self.assertEqual(sync._chunksize(10, 4), 2) | ||
| 370 | self.assertEqual(sync._chunksize(10, 8), 1) | ||
| 371 | self.assertEqual(sync._chunksize(10, 16), 1) | ||
| 372 | |||
| 373 | def test_high_project_count(self): | ||
| 374 | """Multiple projects, high number of projects to sync.""" | ||
| 375 | self.assertEqual(sync._chunksize(2800, 1), 32) | ||
| 376 | self.assertEqual(sync._chunksize(2800, 16), 32) | ||
| 377 | self.assertEqual(sync._chunksize(2800, 32), 32) | ||
| 378 | self.assertEqual(sync._chunksize(2800, 64), 32) | ||
| 379 | self.assertEqual(sync._chunksize(2800, 128), 21) | ||
| 380 | |||
| 381 | |||
| 358 | class GetPreciousObjectsState(unittest.TestCase): | 382 | class GetPreciousObjectsState(unittest.TestCase): |
| 359 | """Tests for _GetPreciousObjectsState.""" | 383 | """Tests for _GetPreciousObjectsState.""" |
| 360 | 384 | ||
