diff options
-rw-r--r-- | project.py | 19 | ||||
-rw-r--r-- | subcmds/sync.py | 27 | ||||
-rw-r--r-- | tests/test_subcmds_sync.py | 24 |
3 files changed, 43 insertions, 27 deletions
@@ -2396,26 +2396,25 @@ class Project: | |||
2396 | try: | 2396 | try: |
2397 | # if revision (sha or tag) is not present then following function | 2397 | # if revision (sha or tag) is not present then following function |
2398 | # throws an error. | 2398 | # throws an error. |
2399 | revs = [f"{self.revisionExpr}^0"] | ||
2400 | upstream_rev = None | ||
2401 | if self.upstream: | ||
2402 | upstream_rev = self.GetRemote().ToLocal(self.upstream) | ||
2403 | revs.append(upstream_rev) | ||
2404 | |||
2399 | self.bare_git.rev_list( | 2405 | self.bare_git.rev_list( |
2400 | "-1", | 2406 | "-1", |
2401 | "--missing=allow-any", | 2407 | "--missing=allow-any", |
2402 | "%s^0" % self.revisionExpr, | 2408 | *revs, |
2403 | "--", | 2409 | "--", |
2404 | log_as_error=False, | 2410 | log_as_error=False, |
2405 | ) | 2411 | ) |
2412 | |||
2406 | if self.upstream: | 2413 | if self.upstream: |
2407 | rev = self.GetRemote().ToLocal(self.upstream) | ||
2408 | self.bare_git.rev_list( | ||
2409 | "-1", | ||
2410 | "--missing=allow-any", | ||
2411 | "%s^0" % rev, | ||
2412 | "--", | ||
2413 | log_as_error=False, | ||
2414 | ) | ||
2415 | self.bare_git.merge_base( | 2414 | self.bare_git.merge_base( |
2416 | "--is-ancestor", | 2415 | "--is-ancestor", |
2417 | self.revisionExpr, | 2416 | self.revisionExpr, |
2418 | rev, | 2417 | upstream_rev, |
2419 | log_as_error=False, | 2418 | log_as_error=False, |
2420 | ) | 2419 | ) |
2421 | return True | 2420 | return True |
diff --git a/subcmds/sync.py b/subcmds/sync.py index 0ae59f55..bebe18b9 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
@@ -131,6 +131,11 @@ def _SafeCheckoutOrder(checkouts: List[Project]) -> List[List[Project]]: | |||
131 | return res | 131 | return res |
132 | 132 | ||
133 | 133 | ||
134 | def _chunksize(projects: int, jobs: int) -> int: | ||
135 | """Calculate chunk size for the given number of projects and jobs.""" | ||
136 | return min(max(1, projects // jobs), WORKER_BATCH_SIZE) | ||
137 | |||
138 | |||
134 | class _FetchOneResult(NamedTuple): | 139 | class _FetchOneResult(NamedTuple): |
135 | """_FetchOne return value. | 140 | """_FetchOne return value. |
136 | 141 | ||
@@ -819,7 +824,6 @@ later is required to fix a server side protocol bug. | |||
819 | def _Fetch(self, projects, opt, err_event, ssh_proxy, errors): | 824 | def _Fetch(self, projects, opt, err_event, ssh_proxy, errors): |
820 | ret = True | 825 | ret = True |
821 | 826 | ||
822 | jobs = opt.jobs_network | ||
823 | fetched = set() | 827 | fetched = set() |
824 | remote_fetched = set() | 828 | remote_fetched = set() |
825 | pm = Progress( | 829 | pm = Progress( |
@@ -849,6 +853,8 @@ later is required to fix a server side protocol bug. | |||
849 | objdir_project_map.setdefault(project.objdir, []).append(project) | 853 | objdir_project_map.setdefault(project.objdir, []).append(project) |
850 | projects_list = list(objdir_project_map.values()) | 854 | projects_list = list(objdir_project_map.values()) |
851 | 855 | ||
856 | jobs = min(opt.jobs_network, len(projects_list)) | ||
857 | |||
852 | def _ProcessResults(results_sets): | 858 | def _ProcessResults(results_sets): |
853 | ret = True | 859 | ret = True |
854 | for results in results_sets: | 860 | for results in results_sets: |
@@ -888,35 +894,22 @@ later is required to fix a server side protocol bug. | |||
888 | Sync.ssh_proxy = None | 894 | Sync.ssh_proxy = None |
889 | 895 | ||
890 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 896 | # NB: Multiprocessing is heavy, so don't spin it up for one job. |
891 | if len(projects_list) == 1 or jobs == 1: | 897 | if jobs == 1: |
892 | self._FetchInitChild(ssh_proxy) | 898 | self._FetchInitChild(ssh_proxy) |
893 | if not _ProcessResults( | 899 | if not _ProcessResults( |
894 | self._FetchProjectList(opt, x) for x in projects_list | 900 | self._FetchProjectList(opt, x) for x in projects_list |
895 | ): | 901 | ): |
896 | ret = False | 902 | ret = False |
897 | else: | 903 | else: |
898 | # Favor throughput over responsiveness when quiet. It seems that | 904 | if not opt.quiet: |
899 | # imap() will yield results in batches relative to chunksize, so | ||
900 | # even as the children finish a sync, we won't see the result until | ||
901 | # one child finishes ~chunksize jobs. When using a large --jobs | ||
902 | # with large chunksize, this can be jarring as there will be a large | ||
903 | # initial delay where repo looks like it isn't doing anything and | ||
904 | # sits at 0%, but then suddenly completes a lot of jobs all at once. | ||
905 | # Since this code is more network bound, we can accept a bit more | ||
906 | # CPU overhead with a smaller chunksize so that the user sees more | ||
907 | # immediate & continuous feedback. | ||
908 | if opt.quiet: | ||
909 | chunksize = WORKER_BATCH_SIZE | ||
910 | else: | ||
911 | pm.update(inc=0, msg="warming up") | 905 | pm.update(inc=0, msg="warming up") |
912 | chunksize = 4 | ||
913 | with multiprocessing.Pool( | 906 | with multiprocessing.Pool( |
914 | jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,) | 907 | jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,) |
915 | ) as pool: | 908 | ) as pool: |
916 | results = pool.imap_unordered( | 909 | results = pool.imap_unordered( |
917 | functools.partial(self._FetchProjectList, opt), | 910 | functools.partial(self._FetchProjectList, opt), |
918 | projects_list, | 911 | projects_list, |
919 | chunksize=chunksize, | 912 | chunksize=_chunksize(len(projects_list), jobs), |
920 | ) | 913 | ) |
921 | if not _ProcessResults(results): | 914 | if not _ProcessResults(results): |
922 | ret = False | 915 | ret = False |
diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py index 8dde687c..b871317c 100644 --- a/tests/test_subcmds_sync.py +++ b/tests/test_subcmds_sync.py | |||
@@ -355,6 +355,30 @@ class SafeCheckoutOrder(unittest.TestCase): | |||
355 | ) | 355 | ) |
356 | 356 | ||
357 | 357 | ||
358 | class Chunksize(unittest.TestCase): | ||
359 | """Tests for _chunksize.""" | ||
360 | |||
361 | def test_single_project(self): | ||
362 | """Single project.""" | ||
363 | self.assertEqual(sync._chunksize(1, 1), 1) | ||
364 | |||
365 | def test_low_project_count(self): | ||
366 | """Multiple projects, low number of projects to sync.""" | ||
367 | self.assertEqual(sync._chunksize(10, 1), 10) | ||
368 | self.assertEqual(sync._chunksize(10, 2), 5) | ||
369 | self.assertEqual(sync._chunksize(10, 4), 2) | ||
370 | self.assertEqual(sync._chunksize(10, 8), 1) | ||
371 | self.assertEqual(sync._chunksize(10, 16), 1) | ||
372 | |||
373 | def test_high_project_count(self): | ||
374 | """Multiple projects, high number of projects to sync.""" | ||
375 | self.assertEqual(sync._chunksize(2800, 1), 32) | ||
376 | self.assertEqual(sync._chunksize(2800, 16), 32) | ||
377 | self.assertEqual(sync._chunksize(2800, 32), 32) | ||
378 | self.assertEqual(sync._chunksize(2800, 64), 32) | ||
379 | self.assertEqual(sync._chunksize(2800, 128), 21) | ||
380 | |||
381 | |||
358 | class GetPreciousObjectsState(unittest.TestCase): | 382 | class GetPreciousObjectsState(unittest.TestCase): |
359 | """Tests for _GetPreciousObjectsState.""" | 383 | """Tests for _GetPreciousObjectsState.""" |
360 | 384 | ||