summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGavin Mak <gavinmak@google.com>2025-10-20 11:13:09 -0700
committerLUCI <gerrit-scoped@luci-project-accounts.iam.gserviceaccount.com>2025-10-20 11:28:21 -0700
commit1afe96a7e997ce7748f066b206a85ac648f7a87c (patch)
tree69f31057506ad29f5e415819b52decf577960469
parent2719a8e203e43b34a437b510092758870b81cae6 (diff)
downloadgit-repo-main.tar.gz
sync: fix saving of fetch times and local statev2.59stablemain
Interleaved sync didn't save _fetch_times and _local_sync_state to disk. Phased sync saved them, but incorrectly applied moving average smoothing repeatedly when fetching submodules, and discarded historical data during partial syncs. Move .Save() calls to the end of main sync loops to ensure they run once. Update _FetchTimes.Save() to merge new data with existing history, preventing data loss. Change-Id: I174f98a62ac86859f1eeea1daba65eb35c227852 Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/519821 Commit-Queue: Gavin Mak <gavinmak@google.com> Reviewed-by: Scott Lee <ddoman@google.com> Tested-by: Gavin Mak <gavinmak@google.com>
-rw-r--r--subcmds/sync.py311
-rw-r--r--tests/test_subcmds_sync.py3
2 files changed, 167 insertions, 147 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py
index 582bd057..f9500314 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -975,9 +975,6 @@ later is required to fix a server side protocol bug.
975 sync_event.set() 975 sync_event.set()
976 sync_progress_thread.join() 976 sync_progress_thread.join()
977 977
978 self._fetch_times.Save()
979 self._local_sync_state.Save()
980
981 if not self.outer_client.manifest.IsArchive: 978 if not self.outer_client.manifest.IsArchive:
982 self._GCProjects(projects, opt, err_event) 979 self._GCProjects(projects, opt, err_event)
983 980
@@ -1003,53 +1000,58 @@ later is required to fix a server side protocol bug.
1003 to_fetch.extend(all_projects) 1000 to_fetch.extend(all_projects)
1004 to_fetch.sort(key=self._fetch_times.Get, reverse=True) 1001 to_fetch.sort(key=self._fetch_times.Get, reverse=True)
1005 1002
1006 result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors) 1003 try:
1007 success = result.success 1004 result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors)
1008 fetched = result.projects
1009 if not success:
1010 err_event.set()
1011
1012 if opt.network_only:
1013 # Bail out now; the rest touches the working tree.
1014 if err_event.is_set():
1015 e = SyncError(
1016 "error: Exited sync due to fetch errors.",
1017 aggregate_errors=errors,
1018 )
1019
1020 logger.error(e)
1021 raise e
1022 return _FetchMainResult([])
1023
1024 # Iteratively fetch missing and/or nested unregistered submodules.
1025 previously_missing_set = set()
1026 while True:
1027 self._ReloadManifest(None, manifest)
1028 all_projects = self.GetProjects(
1029 args,
1030 missing_ok=True,
1031 submodules_ok=opt.fetch_submodules,
1032 manifest=manifest,
1033 all_manifests=not opt.this_manifest_only,
1034 )
1035 missing = []
1036 for project in all_projects:
1037 if project.gitdir not in fetched:
1038 missing.append(project)
1039 if not missing:
1040 break
1041 # Stop us from non-stopped fetching actually-missing repos: If set
1042 # of missing repos has not been changed from last fetch, we break.
1043 missing_set = {p.name for p in missing}
1044 if previously_missing_set == missing_set:
1045 break
1046 previously_missing_set = missing_set
1047 result = self._Fetch(missing, opt, err_event, ssh_proxy, errors)
1048 success = result.success 1005 success = result.success
1049 new_fetched = result.projects 1006 fetched = result.projects
1050 if not success: 1007 if not success:
1051 err_event.set() 1008 err_event.set()
1052 fetched.update(new_fetched) 1009
1010 if opt.network_only:
1011 # Bail out now; the rest touches the working tree.
1012 if err_event.is_set():
1013 e = SyncError(
1014 "error: Exited sync due to fetch errors.",
1015 aggregate_errors=errors,
1016 )
1017
1018 logger.error(e)
1019 raise e
1020 return _FetchMainResult([])
1021
1022 # Iteratively fetch missing and/or nested unregistered submodules.
1023 previously_missing_set = set()
1024 while True:
1025 self._ReloadManifest(None, manifest)
1026 all_projects = self.GetProjects(
1027 args,
1028 missing_ok=True,
1029 submodules_ok=opt.fetch_submodules,
1030 manifest=manifest,
1031 all_manifests=not opt.this_manifest_only,
1032 )
1033 missing = []
1034 for project in all_projects:
1035 if project.gitdir not in fetched:
1036 missing.append(project)
1037 if not missing:
1038 break
1039 # Stop us from non-stopped fetching actually-missing repos: If
1040 # set of missing repos has not been changed from last fetch, we
1041 # break.
1042 missing_set = {p.name for p in missing}
1043 if previously_missing_set == missing_set:
1044 break
1045 previously_missing_set = missing_set
1046 result = self._Fetch(missing, opt, err_event, ssh_proxy, errors)
1047 success = result.success
1048 new_fetched = result.projects
1049 if not success:
1050 err_event.set()
1051 fetched.update(new_fetched)
1052 finally:
1053 self._fetch_times.Save()
1054 self._local_sync_state.Save()
1053 1055
1054 return _FetchMainResult(all_projects) 1056 return _FetchMainResult(all_projects)
1055 1057
@@ -2491,107 +2493,120 @@ later is required to fix a server side protocol bug.
2491 sync_event = _threading.Event() 2493 sync_event = _threading.Event()
2492 sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) 2494 sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event)
2493 2495
2494 with multiprocessing.Manager() as manager, ssh.ProxyManager( 2496 try:
2495 manager 2497 with multiprocessing.Manager() as manager, ssh.ProxyManager(
2496 ) as ssh_proxy: 2498 manager
2497 ssh_proxy.sock() 2499 ) as ssh_proxy:
2498 with self.ParallelContext(): 2500 ssh_proxy.sock()
2499 self.get_parallel_context()["ssh_proxy"] = ssh_proxy 2501 with self.ParallelContext():
2500 # TODO(gavinmak): Use multprocessing.Queue instead of dict. 2502 self.get_parallel_context()["ssh_proxy"] = ssh_proxy
2501 self.get_parallel_context()[ 2503 # TODO(gavinmak): Use multprocessing.Queue instead of dict.
2502 "sync_dict" 2504 self.get_parallel_context()[
2503 ] = multiprocessing.Manager().dict() 2505 "sync_dict"
2504 sync_progress_thread.start() 2506 ] = multiprocessing.Manager().dict()
2507 sync_progress_thread.start()
2505 2508
2506 try: 2509 try:
2507 # Outer loop for dynamic project discovery. This continues 2510 # Outer loop for dynamic project discovery. This
2508 # until no unsynced projects remain. 2511 # continues until no unsynced projects remain.
2509 while True: 2512 while True:
2510 projects_to_sync = [ 2513 projects_to_sync = [
2511 p 2514 p
2512 for p in project_list 2515 for p in project_list
2513 if p.relpath not in finished_relpaths 2516 if p.relpath not in finished_relpaths
2514 ] 2517 ]
2515 if not projects_to_sync: 2518 if not projects_to_sync:
2516 break 2519 break
2517 2520
2518 pending_relpaths = {p.relpath for p in projects_to_sync} 2521 pending_relpaths = {
2519 if previously_pending_relpaths == pending_relpaths: 2522 p.relpath for p in projects_to_sync
2520 stalled_projects_str = "\n".join( 2523 }
2521 f" - {path}" 2524 if previously_pending_relpaths == pending_relpaths:
2522 for path in sorted(list(pending_relpaths)) 2525 stalled_projects_str = "\n".join(
2523 ) 2526 f" - {path}"
2524 logger.error( 2527 for path in sorted(list(pending_relpaths))
2525 "The following projects failed and could not " 2528 )
2526 "be synced:\n%s", 2529 logger.error(
2527 stalled_projects_str, 2530 "The following projects failed and could "
2528 ) 2531 "not be synced:\n%s",
2529 err_event.set() 2532 stalled_projects_str,
2530 break
2531 previously_pending_relpaths = pending_relpaths
2532
2533 self.get_parallel_context()[
2534 "projects"
2535 ] = projects_to_sync
2536 project_index_map = {
2537 p: i for i, p in enumerate(projects_to_sync)
2538 }
2539
2540 # Inner loop to process projects in a hierarchical
2541 # order. This iterates through levels of project
2542 # dependencies (e.g. 'foo' then 'foo/bar'). All projects
2543 # in one level can be processed in parallel, but we must
2544 # wait for a level to complete before starting the next.
2545 for level_projects in _SafeCheckoutOrder(
2546 projects_to_sync
2547 ):
2548 if not level_projects:
2549 continue
2550
2551 objdir_project_map = collections.defaultdict(list)
2552 for p in level_projects:
2553 objdir_project_map[p.objdir].append(
2554 project_index_map[p]
2555 ) 2533 )
2556
2557 work_items = list(objdir_project_map.values())
2558 if not work_items:
2559 continue
2560
2561 jobs = max(1, min(opt.jobs, len(work_items)))
2562 callback = functools.partial(
2563 self._ProcessSyncInterleavedResults,
2564 finished_relpaths,
2565 err_event,
2566 errors,
2567 opt,
2568 )
2569 if not self.ExecuteInParallel(
2570 jobs,
2571 functools.partial(self._SyncProjectList, opt),
2572 work_items,
2573 callback=callback,
2574 output=pm,
2575 chunksize=1,
2576 initializer=self.InitWorker,
2577 ):
2578 err_event.set() 2534 err_event.set()
2535 break
2536 previously_pending_relpaths = pending_relpaths
2537
2538 self.get_parallel_context()[
2539 "projects"
2540 ] = projects_to_sync
2541 project_index_map = {
2542 p: i for i, p in enumerate(projects_to_sync)
2543 }
2544
2545 # Inner loop to process projects in a hierarchical
2546 # order. This iterates through levels of project
2547 # dependencies (e.g. 'foo' then 'foo/bar'). All
2548 # projects in one level can be processed in
2549 # parallel, but we must wait for a level to complete
2550 # before starting the next.
2551 for level_projects in _SafeCheckoutOrder(
2552 projects_to_sync
2553 ):
2554 if not level_projects:
2555 continue
2579 2556
2580 if err_event.is_set() and opt.fail_fast: 2557 objdir_project_map = collections.defaultdict(
2581 raise SyncFailFastError(aggregate_errors=errors) 2558 list
2582 2559 )
2583 self._ReloadManifest(None, manifest) 2560 for p in level_projects:
2584 project_list = self.GetProjects( 2561 objdir_project_map[p.objdir].append(
2585 args, 2562 project_index_map[p]
2586 missing_ok=True, 2563 )
2587 submodules_ok=opt.fetch_submodules, 2564
2588 manifest=manifest, 2565 work_items = list(objdir_project_map.values())
2589 all_manifests=not opt.this_manifest_only, 2566 if not work_items:
2590 ) 2567 continue
2591 pm.update_total(len(project_list)) 2568
2592 finally: 2569 jobs = max(1, min(opt.jobs, len(work_items)))
2593 sync_event.set() 2570 callback = functools.partial(
2594 sync_progress_thread.join() 2571 self._ProcessSyncInterleavedResults,
2572 finished_relpaths,
2573 err_event,
2574 errors,
2575 opt,
2576 )
2577 if not self.ExecuteInParallel(
2578 jobs,
2579 functools.partial(
2580 self._SyncProjectList, opt
2581 ),
2582 work_items,
2583 callback=callback,
2584 output=pm,
2585 chunksize=1,
2586 initializer=self.InitWorker,
2587 ):
2588 err_event.set()
2589
2590 if err_event.is_set() and opt.fail_fast:
2591 raise SyncFailFastError(
2592 aggregate_errors=errors
2593 )
2594
2595 self._ReloadManifest(None, manifest)
2596 project_list = self.GetProjects(
2597 args,
2598 missing_ok=True,
2599 submodules_ok=opt.fetch_submodules,
2600 manifest=manifest,
2601 all_manifests=not opt.this_manifest_only,
2602 )
2603 pm.update_total(len(project_list))
2604 finally:
2605 sync_event.set()
2606 sync_progress_thread.join()
2607 finally:
2608 self._fetch_times.Save()
2609 self._local_sync_state.Save()
2595 2610
2596 pm.end() 2611 pm.end()
2597 2612
@@ -2695,17 +2710,19 @@ class _FetchTimes:
2695 self._saved = {} 2710 self._saved = {}
2696 2711
2697 def Save(self): 2712 def Save(self):
2698 if self._saved is None: 2713 if not self._seen:
2699 return 2714 return
2700 2715
2716 self._Load()
2717
2701 for name, t in self._seen.items(): 2718 for name, t in self._seen.items():
2702 # Keep a moving average across the previous/current sync runs. 2719 # Keep a moving average across the previous/current sync runs.
2703 old = self._saved.get(name, t) 2720 old = self._saved.get(name, t)
2704 self._seen[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old) 2721 self._saved[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old)
2705 2722
2706 try: 2723 try:
2707 with open(self._path, "w") as f: 2724 with open(self._path, "w") as f:
2708 json.dump(self._seen, f, indent=2) 2725 json.dump(self._saved, f, indent=2)
2709 except (OSError, TypeError): 2726 except (OSError, TypeError):
2710 platform_utils.remove(self._path, missing_ok=True) 2727 platform_utils.remove(self._path, missing_ok=True)
2711 2728
diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py
index 6c9cc9ab..6eb8a5a7 100644
--- a/tests/test_subcmds_sync.py
+++ b/tests/test_subcmds_sync.py
@@ -681,6 +681,9 @@ class InterleavedSyncTest(unittest.TestCase):
681 # Mock _GetCurrentBranchOnly for worker tests. 681 # Mock _GetCurrentBranchOnly for worker tests.
682 mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start() 682 mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start()
683 683
684 self.cmd._fetch_times = mock.Mock()
685 self.cmd._local_sync_state = mock.Mock()
686
684 def tearDown(self): 687 def tearDown(self):
685 """Clean up resources.""" 688 """Clean up resources."""
686 shutil.rmtree(self.repodir) 689 shutil.rmtree(self.repodir)