diff options
-rw-r--r-- | subcmds/sync.py | 311 | ||||
-rw-r--r-- | tests/test_subcmds_sync.py | 3 |
2 files changed, 167 insertions, 147 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py index 582bd057..f9500314 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
@@ -975,9 +975,6 @@ later is required to fix a server side protocol bug. | |||
975 | sync_event.set() | 975 | sync_event.set() |
976 | sync_progress_thread.join() | 976 | sync_progress_thread.join() |
977 | 977 | ||
978 | self._fetch_times.Save() | ||
979 | self._local_sync_state.Save() | ||
980 | |||
981 | if not self.outer_client.manifest.IsArchive: | 978 | if not self.outer_client.manifest.IsArchive: |
982 | self._GCProjects(projects, opt, err_event) | 979 | self._GCProjects(projects, opt, err_event) |
983 | 980 | ||
@@ -1003,53 +1000,58 @@ later is required to fix a server side protocol bug. | |||
1003 | to_fetch.extend(all_projects) | 1000 | to_fetch.extend(all_projects) |
1004 | to_fetch.sort(key=self._fetch_times.Get, reverse=True) | 1001 | to_fetch.sort(key=self._fetch_times.Get, reverse=True) |
1005 | 1002 | ||
1006 | result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors) | 1003 | try: |
1007 | success = result.success | 1004 | result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors) |
1008 | fetched = result.projects | ||
1009 | if not success: | ||
1010 | err_event.set() | ||
1011 | |||
1012 | if opt.network_only: | ||
1013 | # Bail out now; the rest touches the working tree. | ||
1014 | if err_event.is_set(): | ||
1015 | e = SyncError( | ||
1016 | "error: Exited sync due to fetch errors.", | ||
1017 | aggregate_errors=errors, | ||
1018 | ) | ||
1019 | |||
1020 | logger.error(e) | ||
1021 | raise e | ||
1022 | return _FetchMainResult([]) | ||
1023 | |||
1024 | # Iteratively fetch missing and/or nested unregistered submodules. | ||
1025 | previously_missing_set = set() | ||
1026 | while True: | ||
1027 | self._ReloadManifest(None, manifest) | ||
1028 | all_projects = self.GetProjects( | ||
1029 | args, | ||
1030 | missing_ok=True, | ||
1031 | submodules_ok=opt.fetch_submodules, | ||
1032 | manifest=manifest, | ||
1033 | all_manifests=not opt.this_manifest_only, | ||
1034 | ) | ||
1035 | missing = [] | ||
1036 | for project in all_projects: | ||
1037 | if project.gitdir not in fetched: | ||
1038 | missing.append(project) | ||
1039 | if not missing: | ||
1040 | break | ||
1041 | # Stop us from non-stopped fetching actually-missing repos: If set | ||
1042 | # of missing repos has not been changed from last fetch, we break. | ||
1043 | missing_set = {p.name for p in missing} | ||
1044 | if previously_missing_set == missing_set: | ||
1045 | break | ||
1046 | previously_missing_set = missing_set | ||
1047 | result = self._Fetch(missing, opt, err_event, ssh_proxy, errors) | ||
1048 | success = result.success | 1005 | success = result.success |
1049 | new_fetched = result.projects | 1006 | fetched = result.projects |
1050 | if not success: | 1007 | if not success: |
1051 | err_event.set() | 1008 | err_event.set() |
1052 | fetched.update(new_fetched) | 1009 | |
1010 | if opt.network_only: | ||
1011 | # Bail out now; the rest touches the working tree. | ||
1012 | if err_event.is_set(): | ||
1013 | e = SyncError( | ||
1014 | "error: Exited sync due to fetch errors.", | ||
1015 | aggregate_errors=errors, | ||
1016 | ) | ||
1017 | |||
1018 | logger.error(e) | ||
1019 | raise e | ||
1020 | return _FetchMainResult([]) | ||
1021 | |||
1022 | # Iteratively fetch missing and/or nested unregistered submodules. | ||
1023 | previously_missing_set = set() | ||
1024 | while True: | ||
1025 | self._ReloadManifest(None, manifest) | ||
1026 | all_projects = self.GetProjects( | ||
1027 | args, | ||
1028 | missing_ok=True, | ||
1029 | submodules_ok=opt.fetch_submodules, | ||
1030 | manifest=manifest, | ||
1031 | all_manifests=not opt.this_manifest_only, | ||
1032 | ) | ||
1033 | missing = [] | ||
1034 | for project in all_projects: | ||
1035 | if project.gitdir not in fetched: | ||
1036 | missing.append(project) | ||
1037 | if not missing: | ||
1038 | break | ||
1039 | # Stop us from non-stopped fetching actually-missing repos: If | ||
1040 | # set of missing repos has not been changed from last fetch, we | ||
1041 | # break. | ||
1042 | missing_set = {p.name for p in missing} | ||
1043 | if previously_missing_set == missing_set: | ||
1044 | break | ||
1045 | previously_missing_set = missing_set | ||
1046 | result = self._Fetch(missing, opt, err_event, ssh_proxy, errors) | ||
1047 | success = result.success | ||
1048 | new_fetched = result.projects | ||
1049 | if not success: | ||
1050 | err_event.set() | ||
1051 | fetched.update(new_fetched) | ||
1052 | finally: | ||
1053 | self._fetch_times.Save() | ||
1054 | self._local_sync_state.Save() | ||
1053 | 1055 | ||
1054 | return _FetchMainResult(all_projects) | 1056 | return _FetchMainResult(all_projects) |
1055 | 1057 | ||
@@ -2491,107 +2493,120 @@ later is required to fix a server side protocol bug. | |||
2491 | sync_event = _threading.Event() | 2493 | sync_event = _threading.Event() |
2492 | sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) | 2494 | sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) |
2493 | 2495 | ||
2494 | with multiprocessing.Manager() as manager, ssh.ProxyManager( | 2496 | try: |
2495 | manager | 2497 | with multiprocessing.Manager() as manager, ssh.ProxyManager( |
2496 | ) as ssh_proxy: | 2498 | manager |
2497 | ssh_proxy.sock() | 2499 | ) as ssh_proxy: |
2498 | with self.ParallelContext(): | 2500 | ssh_proxy.sock() |
2499 | self.get_parallel_context()["ssh_proxy"] = ssh_proxy | 2501 | with self.ParallelContext(): |
2500 | # TODO(gavinmak): Use multprocessing.Queue instead of dict. | 2502 | self.get_parallel_context()["ssh_proxy"] = ssh_proxy |
2501 | self.get_parallel_context()[ | 2503 | # TODO(gavinmak): Use multprocessing.Queue instead of dict. |
2502 | "sync_dict" | 2504 | self.get_parallel_context()[ |
2503 | ] = multiprocessing.Manager().dict() | 2505 | "sync_dict" |
2504 | sync_progress_thread.start() | 2506 | ] = multiprocessing.Manager().dict() |
2507 | sync_progress_thread.start() | ||
2505 | 2508 | ||
2506 | try: | 2509 | try: |
2507 | # Outer loop for dynamic project discovery. This continues | 2510 | # Outer loop for dynamic project discovery. This |
2508 | # until no unsynced projects remain. | 2511 | # continues until no unsynced projects remain. |
2509 | while True: | 2512 | while True: |
2510 | projects_to_sync = [ | 2513 | projects_to_sync = [ |
2511 | p | 2514 | p |
2512 | for p in project_list | 2515 | for p in project_list |
2513 | if p.relpath not in finished_relpaths | 2516 | if p.relpath not in finished_relpaths |
2514 | ] | 2517 | ] |
2515 | if not projects_to_sync: | 2518 | if not projects_to_sync: |
2516 | break | 2519 | break |
2517 | 2520 | ||
2518 | pending_relpaths = {p.relpath for p in projects_to_sync} | 2521 | pending_relpaths = { |
2519 | if previously_pending_relpaths == pending_relpaths: | 2522 | p.relpath for p in projects_to_sync |
2520 | stalled_projects_str = "\n".join( | 2523 | } |
2521 | f" - {path}" | 2524 | if previously_pending_relpaths == pending_relpaths: |
2522 | for path in sorted(list(pending_relpaths)) | 2525 | stalled_projects_str = "\n".join( |
2523 | ) | 2526 | f" - {path}" |
2524 | logger.error( | 2527 | for path in sorted(list(pending_relpaths)) |
2525 | "The following projects failed and could not " | 2528 | ) |
2526 | "be synced:\n%s", | 2529 | logger.error( |
2527 | stalled_projects_str, | 2530 | "The following projects failed and could " |
2528 | ) | 2531 | "not be synced:\n%s", |
2529 | err_event.set() | 2532 | stalled_projects_str, |
2530 | break | ||
2531 | previously_pending_relpaths = pending_relpaths | ||
2532 | |||
2533 | self.get_parallel_context()[ | ||
2534 | "projects" | ||
2535 | ] = projects_to_sync | ||
2536 | project_index_map = { | ||
2537 | p: i for i, p in enumerate(projects_to_sync) | ||
2538 | } | ||
2539 | |||
2540 | # Inner loop to process projects in a hierarchical | ||
2541 | # order. This iterates through levels of project | ||
2542 | # dependencies (e.g. 'foo' then 'foo/bar'). All projects | ||
2543 | # in one level can be processed in parallel, but we must | ||
2544 | # wait for a level to complete before starting the next. | ||
2545 | for level_projects in _SafeCheckoutOrder( | ||
2546 | projects_to_sync | ||
2547 | ): | ||
2548 | if not level_projects: | ||
2549 | continue | ||
2550 | |||
2551 | objdir_project_map = collections.defaultdict(list) | ||
2552 | for p in level_projects: | ||
2553 | objdir_project_map[p.objdir].append( | ||
2554 | project_index_map[p] | ||
2555 | ) | 2533 | ) |
2556 | |||
2557 | work_items = list(objdir_project_map.values()) | ||
2558 | if not work_items: | ||
2559 | continue | ||
2560 | |||
2561 | jobs = max(1, min(opt.jobs, len(work_items))) | ||
2562 | callback = functools.partial( | ||
2563 | self._ProcessSyncInterleavedResults, | ||
2564 | finished_relpaths, | ||
2565 | err_event, | ||
2566 | errors, | ||
2567 | opt, | ||
2568 | ) | ||
2569 | if not self.ExecuteInParallel( | ||
2570 | jobs, | ||
2571 | functools.partial(self._SyncProjectList, opt), | ||
2572 | work_items, | ||
2573 | callback=callback, | ||
2574 | output=pm, | ||
2575 | chunksize=1, | ||
2576 | initializer=self.InitWorker, | ||
2577 | ): | ||
2578 | err_event.set() | 2534 | err_event.set() |
2535 | break | ||
2536 | previously_pending_relpaths = pending_relpaths | ||
2537 | |||
2538 | self.get_parallel_context()[ | ||
2539 | "projects" | ||
2540 | ] = projects_to_sync | ||
2541 | project_index_map = { | ||
2542 | p: i for i, p in enumerate(projects_to_sync) | ||
2543 | } | ||
2544 | |||
2545 | # Inner loop to process projects in a hierarchical | ||
2546 | # order. This iterates through levels of project | ||
2547 | # dependencies (e.g. 'foo' then 'foo/bar'). All | ||
2548 | # projects in one level can be processed in | ||
2549 | # parallel, but we must wait for a level to complete | ||
2550 | # before starting the next. | ||
2551 | for level_projects in _SafeCheckoutOrder( | ||
2552 | projects_to_sync | ||
2553 | ): | ||
2554 | if not level_projects: | ||
2555 | continue | ||
2579 | 2556 | ||
2580 | if err_event.is_set() and opt.fail_fast: | 2557 | objdir_project_map = collections.defaultdict( |
2581 | raise SyncFailFastError(aggregate_errors=errors) | 2558 | list |
2582 | 2559 | ) | |
2583 | self._ReloadManifest(None, manifest) | 2560 | for p in level_projects: |
2584 | project_list = self.GetProjects( | 2561 | objdir_project_map[p.objdir].append( |
2585 | args, | 2562 | project_index_map[p] |
2586 | missing_ok=True, | 2563 | ) |
2587 | submodules_ok=opt.fetch_submodules, | 2564 | |
2588 | manifest=manifest, | 2565 | work_items = list(objdir_project_map.values()) |
2589 | all_manifests=not opt.this_manifest_only, | 2566 | if not work_items: |
2590 | ) | 2567 | continue |
2591 | pm.update_total(len(project_list)) | 2568 | |
2592 | finally: | 2569 | jobs = max(1, min(opt.jobs, len(work_items))) |
2593 | sync_event.set() | 2570 | callback = functools.partial( |
2594 | sync_progress_thread.join() | 2571 | self._ProcessSyncInterleavedResults, |
2572 | finished_relpaths, | ||
2573 | err_event, | ||
2574 | errors, | ||
2575 | opt, | ||
2576 | ) | ||
2577 | if not self.ExecuteInParallel( | ||
2578 | jobs, | ||
2579 | functools.partial( | ||
2580 | self._SyncProjectList, opt | ||
2581 | ), | ||
2582 | work_items, | ||
2583 | callback=callback, | ||
2584 | output=pm, | ||
2585 | chunksize=1, | ||
2586 | initializer=self.InitWorker, | ||
2587 | ): | ||
2588 | err_event.set() | ||
2589 | |||
2590 | if err_event.is_set() and opt.fail_fast: | ||
2591 | raise SyncFailFastError( | ||
2592 | aggregate_errors=errors | ||
2593 | ) | ||
2594 | |||
2595 | self._ReloadManifest(None, manifest) | ||
2596 | project_list = self.GetProjects( | ||
2597 | args, | ||
2598 | missing_ok=True, | ||
2599 | submodules_ok=opt.fetch_submodules, | ||
2600 | manifest=manifest, | ||
2601 | all_manifests=not opt.this_manifest_only, | ||
2602 | ) | ||
2603 | pm.update_total(len(project_list)) | ||
2604 | finally: | ||
2605 | sync_event.set() | ||
2606 | sync_progress_thread.join() | ||
2607 | finally: | ||
2608 | self._fetch_times.Save() | ||
2609 | self._local_sync_state.Save() | ||
2595 | 2610 | ||
2596 | pm.end() | 2611 | pm.end() |
2597 | 2612 | ||
@@ -2695,17 +2710,19 @@ class _FetchTimes: | |||
2695 | self._saved = {} | 2710 | self._saved = {} |
2696 | 2711 | ||
2697 | def Save(self): | 2712 | def Save(self): |
2698 | if self._saved is None: | 2713 | if not self._seen: |
2699 | return | 2714 | return |
2700 | 2715 | ||
2716 | self._Load() | ||
2717 | |||
2701 | for name, t in self._seen.items(): | 2718 | for name, t in self._seen.items(): |
2702 | # Keep a moving average across the previous/current sync runs. | 2719 | # Keep a moving average across the previous/current sync runs. |
2703 | old = self._saved.get(name, t) | 2720 | old = self._saved.get(name, t) |
2704 | self._seen[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old) | 2721 | self._saved[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old) |
2705 | 2722 | ||
2706 | try: | 2723 | try: |
2707 | with open(self._path, "w") as f: | 2724 | with open(self._path, "w") as f: |
2708 | json.dump(self._seen, f, indent=2) | 2725 | json.dump(self._saved, f, indent=2) |
2709 | except (OSError, TypeError): | 2726 | except (OSError, TypeError): |
2710 | platform_utils.remove(self._path, missing_ok=True) | 2727 | platform_utils.remove(self._path, missing_ok=True) |
2711 | 2728 | ||
diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py index 6c9cc9ab..6eb8a5a7 100644 --- a/tests/test_subcmds_sync.py +++ b/tests/test_subcmds_sync.py | |||
@@ -681,6 +681,9 @@ class InterleavedSyncTest(unittest.TestCase): | |||
681 | # Mock _GetCurrentBranchOnly for worker tests. | 681 | # Mock _GetCurrentBranchOnly for worker tests. |
682 | mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start() | 682 | mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start() |
683 | 683 | ||
684 | self.cmd._fetch_times = mock.Mock() | ||
685 | self.cmd._local_sync_state = mock.Mock() | ||
686 | |||
684 | def tearDown(self): | 687 | def tearDown(self): |
685 | """Clean up resources.""" | 688 | """Clean up resources.""" |
686 | shutil.rmtree(self.repodir) | 689 | shutil.rmtree(self.repodir) |