summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--subcmds/sync.py311
-rw-r--r--tests/test_subcmds_sync.py3
2 files changed, 167 insertions, 147 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py
index 582bd057..f9500314 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -975,9 +975,6 @@ later is required to fix a server side protocol bug.
975 sync_event.set() 975 sync_event.set()
976 sync_progress_thread.join() 976 sync_progress_thread.join()
977 977
978 self._fetch_times.Save()
979 self._local_sync_state.Save()
980
981 if not self.outer_client.manifest.IsArchive: 978 if not self.outer_client.manifest.IsArchive:
982 self._GCProjects(projects, opt, err_event) 979 self._GCProjects(projects, opt, err_event)
983 980
@@ -1003,53 +1000,58 @@ later is required to fix a server side protocol bug.
1003 to_fetch.extend(all_projects) 1000 to_fetch.extend(all_projects)
1004 to_fetch.sort(key=self._fetch_times.Get, reverse=True) 1001 to_fetch.sort(key=self._fetch_times.Get, reverse=True)
1005 1002
1006 result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors) 1003 try:
1007 success = result.success 1004 result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors)
1008 fetched = result.projects
1009 if not success:
1010 err_event.set()
1011
1012 if opt.network_only:
1013 # Bail out now; the rest touches the working tree.
1014 if err_event.is_set():
1015 e = SyncError(
1016 "error: Exited sync due to fetch errors.",
1017 aggregate_errors=errors,
1018 )
1019
1020 logger.error(e)
1021 raise e
1022 return _FetchMainResult([])
1023
1024 # Iteratively fetch missing and/or nested unregistered submodules.
1025 previously_missing_set = set()
1026 while True:
1027 self._ReloadManifest(None, manifest)
1028 all_projects = self.GetProjects(
1029 args,
1030 missing_ok=True,
1031 submodules_ok=opt.fetch_submodules,
1032 manifest=manifest,
1033 all_manifests=not opt.this_manifest_only,
1034 )
1035 missing = []
1036 for project in all_projects:
1037 if project.gitdir not in fetched:
1038 missing.append(project)
1039 if not missing:
1040 break
1041 # Stop us from non-stopped fetching actually-missing repos: If set
1042 # of missing repos has not been changed from last fetch, we break.
1043 missing_set = {p.name for p in missing}
1044 if previously_missing_set == missing_set:
1045 break
1046 previously_missing_set = missing_set
1047 result = self._Fetch(missing, opt, err_event, ssh_proxy, errors)
1048 success = result.success 1005 success = result.success
1049 new_fetched = result.projects 1006 fetched = result.projects
1050 if not success: 1007 if not success:
1051 err_event.set() 1008 err_event.set()
1052 fetched.update(new_fetched) 1009
1010 if opt.network_only:
1011 # Bail out now; the rest touches the working tree.
1012 if err_event.is_set():
1013 e = SyncError(
1014 "error: Exited sync due to fetch errors.",
1015 aggregate_errors=errors,
1016 )
1017
1018 logger.error(e)
1019 raise e
1020 return _FetchMainResult([])
1021
1022 # Iteratively fetch missing and/or nested unregistered submodules.
1023 previously_missing_set = set()
1024 while True:
1025 self._ReloadManifest(None, manifest)
1026 all_projects = self.GetProjects(
1027 args,
1028 missing_ok=True,
1029 submodules_ok=opt.fetch_submodules,
1030 manifest=manifest,
1031 all_manifests=not opt.this_manifest_only,
1032 )
1033 missing = []
1034 for project in all_projects:
1035 if project.gitdir not in fetched:
1036 missing.append(project)
1037 if not missing:
1038 break
1039 # Stop us from non-stopped fetching actually-missing repos: If
1040 # set of missing repos has not been changed from last fetch, we
1041 # break.
1042 missing_set = {p.name for p in missing}
1043 if previously_missing_set == missing_set:
1044 break
1045 previously_missing_set = missing_set
1046 result = self._Fetch(missing, opt, err_event, ssh_proxy, errors)
1047 success = result.success
1048 new_fetched = result.projects
1049 if not success:
1050 err_event.set()
1051 fetched.update(new_fetched)
1052 finally:
1053 self._fetch_times.Save()
1054 self._local_sync_state.Save()
1053 1055
1054 return _FetchMainResult(all_projects) 1056 return _FetchMainResult(all_projects)
1055 1057
@@ -2491,107 +2493,120 @@ later is required to fix a server side protocol bug.
2491 sync_event = _threading.Event() 2493 sync_event = _threading.Event()
2492 sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) 2494 sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event)
2493 2495
2494 with multiprocessing.Manager() as manager, ssh.ProxyManager( 2496 try:
2495 manager 2497 with multiprocessing.Manager() as manager, ssh.ProxyManager(
2496 ) as ssh_proxy: 2498 manager
2497 ssh_proxy.sock() 2499 ) as ssh_proxy:
2498 with self.ParallelContext(): 2500 ssh_proxy.sock()
2499 self.get_parallel_context()["ssh_proxy"] = ssh_proxy 2501 with self.ParallelContext():
2500 # TODO(gavinmak): Use multprocessing.Queue instead of dict. 2502 self.get_parallel_context()["ssh_proxy"] = ssh_proxy
2501 self.get_parallel_context()[ 2503 # TODO(gavinmak): Use multprocessing.Queue instead of dict.
2502 "sync_dict" 2504 self.get_parallel_context()[
2503 ] = multiprocessing.Manager().dict() 2505 "sync_dict"
2504 sync_progress_thread.start() 2506 ] = multiprocessing.Manager().dict()
2507 sync_progress_thread.start()
2505 2508
2506 try: 2509 try:
2507 # Outer loop for dynamic project discovery. This continues 2510 # Outer loop for dynamic project discovery. This
2508 # until no unsynced projects remain. 2511 # continues until no unsynced projects remain.
2509 while True: 2512 while True:
2510 projects_to_sync = [ 2513 projects_to_sync = [
2511 p 2514 p
2512 for p in project_list 2515 for p in project_list
2513 if p.relpath not in finished_relpaths 2516 if p.relpath not in finished_relpaths
2514 ] 2517 ]
2515 if not projects_to_sync: 2518 if not projects_to_sync:
2516 break 2519 break
2517 2520
2518 pending_relpaths = {p.relpath for p in projects_to_sync} 2521 pending_relpaths = {
2519 if previously_pending_relpaths == pending_relpaths: 2522 p.relpath for p in projects_to_sync
2520 stalled_projects_str = "\n".join( 2523 }
2521 f" - {path}" 2524 if previously_pending_relpaths == pending_relpaths:
2522 for path in sorted(list(pending_relpaths)) 2525 stalled_projects_str = "\n".join(
2523 ) 2526 f" - {path}"
2524 logger.error( 2527 for path in sorted(list(pending_relpaths))
2525 "The following projects failed and could not " 2528 )
2526 "be synced:\n%s", 2529 logger.error(
2527 stalled_projects_str, 2530 "The following projects failed and could "
2528 ) 2531 "not be synced:\n%s",
2529 err_event.set() 2532 stalled_projects_str,
2530 break
2531 previously_pending_relpaths = pending_relpaths
2532
2533 self.get_parallel_context()[
2534 "projects"
2535 ] = projects_to_sync
2536 project_index_map = {
2537 p: i for i, p in enumerate(projects_to_sync)
2538 }
2539
2540 # Inner loop to process projects in a hierarchical
2541 # order. This iterates through levels of project
2542 # dependencies (e.g. 'foo' then 'foo/bar'). All projects
2543 # in one level can be processed in parallel, but we must
2544 # wait for a level to complete before starting the next.
2545 for level_projects in _SafeCheckoutOrder(
2546 projects_to_sync
2547 ):
2548 if not level_projects:
2549 continue
2550
2551 objdir_project_map = collections.defaultdict(list)
2552 for p in level_projects:
2553 objdir_project_map[p.objdir].append(
2554 project_index_map[p]
2555 ) 2533 )
2556
2557 work_items = list(objdir_project_map.values())
2558 if not work_items:
2559 continue
2560
2561 jobs = max(1, min(opt.jobs, len(work_items)))
2562 callback = functools.partial(
2563 self._ProcessSyncInterleavedResults,
2564 finished_relpaths,
2565 err_event,
2566 errors,
2567 opt,
2568 )
2569 if not self.ExecuteInParallel(
2570 jobs,
2571 functools.partial(self._SyncProjectList, opt),
2572 work_items,
2573 callback=callback,
2574 output=pm,
2575 chunksize=1,
2576 initializer=self.InitWorker,
2577 ):
2578 err_event.set() 2534 err_event.set()
2535 break
2536 previously_pending_relpaths = pending_relpaths
2537
2538 self.get_parallel_context()[
2539 "projects"
2540 ] = projects_to_sync
2541 project_index_map = {
2542 p: i for i, p in enumerate(projects_to_sync)
2543 }
2544
2545 # Inner loop to process projects in a hierarchical
2546 # order. This iterates through levels of project
2547 # dependencies (e.g. 'foo' then 'foo/bar'). All
2548 # projects in one level can be processed in
2549 # parallel, but we must wait for a level to complete
2550 # before starting the next.
2551 for level_projects in _SafeCheckoutOrder(
2552 projects_to_sync
2553 ):
2554 if not level_projects:
2555 continue
2579 2556
2580 if err_event.is_set() and opt.fail_fast: 2557 objdir_project_map = collections.defaultdict(
2581 raise SyncFailFastError(aggregate_errors=errors) 2558 list
2582 2559 )
2583 self._ReloadManifest(None, manifest) 2560 for p in level_projects:
2584 project_list = self.GetProjects( 2561 objdir_project_map[p.objdir].append(
2585 args, 2562 project_index_map[p]
2586 missing_ok=True, 2563 )
2587 submodules_ok=opt.fetch_submodules, 2564
2588 manifest=manifest, 2565 work_items = list(objdir_project_map.values())
2589 all_manifests=not opt.this_manifest_only, 2566 if not work_items:
2590 ) 2567 continue
2591 pm.update_total(len(project_list)) 2568
2592 finally: 2569 jobs = max(1, min(opt.jobs, len(work_items)))
2593 sync_event.set() 2570 callback = functools.partial(
2594 sync_progress_thread.join() 2571 self._ProcessSyncInterleavedResults,
2572 finished_relpaths,
2573 err_event,
2574 errors,
2575 opt,
2576 )
2577 if not self.ExecuteInParallel(
2578 jobs,
2579 functools.partial(
2580 self._SyncProjectList, opt
2581 ),
2582 work_items,
2583 callback=callback,
2584 output=pm,
2585 chunksize=1,
2586 initializer=self.InitWorker,
2587 ):
2588 err_event.set()
2589
2590 if err_event.is_set() and opt.fail_fast:
2591 raise SyncFailFastError(
2592 aggregate_errors=errors
2593 )
2594
2595 self._ReloadManifest(None, manifest)
2596 project_list = self.GetProjects(
2597 args,
2598 missing_ok=True,
2599 submodules_ok=opt.fetch_submodules,
2600 manifest=manifest,
2601 all_manifests=not opt.this_manifest_only,
2602 )
2603 pm.update_total(len(project_list))
2604 finally:
2605 sync_event.set()
2606 sync_progress_thread.join()
2607 finally:
2608 self._fetch_times.Save()
2609 self._local_sync_state.Save()
2595 2610
2596 pm.end() 2611 pm.end()
2597 2612
@@ -2695,17 +2710,19 @@ class _FetchTimes:
2695 self._saved = {} 2710 self._saved = {}
2696 2711
2697 def Save(self): 2712 def Save(self):
2698 if self._saved is None: 2713 if not self._seen:
2699 return 2714 return
2700 2715
2716 self._Load()
2717
2701 for name, t in self._seen.items(): 2718 for name, t in self._seen.items():
2702 # Keep a moving average across the previous/current sync runs. 2719 # Keep a moving average across the previous/current sync runs.
2703 old = self._saved.get(name, t) 2720 old = self._saved.get(name, t)
2704 self._seen[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old) 2721 self._saved[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old)
2705 2722
2706 try: 2723 try:
2707 with open(self._path, "w") as f: 2724 with open(self._path, "w") as f:
2708 json.dump(self._seen, f, indent=2) 2725 json.dump(self._saved, f, indent=2)
2709 except (OSError, TypeError): 2726 except (OSError, TypeError):
2710 platform_utils.remove(self._path, missing_ok=True) 2727 platform_utils.remove(self._path, missing_ok=True)
2711 2728
diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py
index 6c9cc9ab..6eb8a5a7 100644
--- a/tests/test_subcmds_sync.py
+++ b/tests/test_subcmds_sync.py
@@ -681,6 +681,9 @@ class InterleavedSyncTest(unittest.TestCase):
681 # Mock _GetCurrentBranchOnly for worker tests. 681 # Mock _GetCurrentBranchOnly for worker tests.
682 mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start() 682 mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start()
683 683
684 self.cmd._fetch_times = mock.Mock()
685 self.cmd._local_sync_state = mock.Mock()
686
684 def tearDown(self): 687 def tearDown(self):
685 """Clean up resources.""" 688 """Clean up resources."""
686 shutil.rmtree(self.repodir) 689 shutil.rmtree(self.repodir)