From b4b323a8bd02d52d060f7f6fa15ba045df5af5b2 Mon Sep 17 00:00:00 2001 From: Gavin Mak Date: Tue, 17 Jun 2025 10:54:41 -0700 Subject: sync: Add orchestration logic for --interleaved Introduce the parallel orchestration framework for `repo sync --interleaved`. The new logic respects project dependencies by processing them in hierarchical levels. Projects sharing a git object directory are grouped and processed serially. Also reuse the familiar fetch progress bar UX. Bug: 421935613 Change-Id: Ia388a231fa96b3220e343f952f07021bc9817d19 Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/483281 Commit-Queue: Gavin Mak Tested-by: Gavin Mak Reviewed-by: Scott Lee --- subcmds/sync.py | 295 ++++++++++++++++++++++++++++++++++++++++++--- tests/test_subcmds_sync.py | 124 ++++++++++++++++++- 2 files changed, 398 insertions(+), 21 deletions(-) diff --git a/subcmds/sync.py b/subcmds/sync.py index 6e369a10..711baca2 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py @@ -25,7 +25,7 @@ from pathlib import Path import sys import tempfile import time -from typing import List, NamedTuple, Set, Union +from typing import List, NamedTuple, Optional, Set, Union import urllib.error import urllib.parse import urllib.request @@ -194,6 +194,49 @@ class _CheckoutOneResult(NamedTuple): finish: float +class _SyncResult(NamedTuple): + """Individual project sync result for interleaved mode. + + Attributes: + relpath (str): The project's relative path from the repo client top. + fetch_success (bool): True if the fetch operation was successful. + checkout_success (bool): True if the checkout operation was + successful. + fetch_error (Optional[Exception]): The Exception from a failed fetch, + or None. + checkout_error (Optional[Exception]): The Exception from a failed + checkout, or None. + fetch_start (Optional[float]): The time.time() when fetch started. + fetch_finish (Optional[float]): The time.time() when fetch finished. + checkout_start (Optional[float]): The time.time() when checkout + started. + checkout_finish (Optional[float]): The time.time() when checkout + finished. + """ + + relpath: str + fetch_success: bool + checkout_success: bool + fetch_error: Optional[Exception] + checkout_error: Optional[Exception] + + fetch_start: Optional[float] + fetch_finish: Optional[float] + checkout_start: Optional[float] + checkout_finish: Optional[float] + + +class _InterleavedSyncResult(NamedTuple): + """Result of an interleaved sync. + + Attributes: + results (List[_SyncResult]): A list of results, one for each project + processed. Empty if the worker failed before creating results. + """ + + results: List[_SyncResult] + + class SuperprojectError(SyncError): """Superproject sync repo.""" @@ -837,15 +880,7 @@ later is required to fix a server side protocol bug. ) sync_event = _threading.Event() - - def _MonitorSyncLoop(): - while True: - pm.update(inc=0, msg=self._GetSyncProgressMessage()) - if sync_event.wait(timeout=1): - return - - sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop) - sync_progress_thread.daemon = True + sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) def _ProcessResults(pool, pm, results_sets): ret = True @@ -1828,6 +1863,16 @@ later is required to fix a server side protocol bug. all_manifests=not opt.this_manifest_only, ) + # Log the repo projects by existing and new. + existing = [x for x in all_projects if x.Exists] + mp.config.SetString("repo.existingprojectcount", str(len(existing))) + mp.config.SetString( + "repo.newprojectcount", str(len(all_projects) - len(existing)) + ) + + self._fetch_times = _FetchTimes(manifest) + self._local_sync_state = LocalSyncState(manifest) + if opt.interleaved: sync_method = self._SyncInterleaved else: @@ -1864,6 +1909,34 @@ later is required to fix a server side protocol bug. if not opt.quiet: print("repo sync has finished successfully.") + def _CreateSyncProgressThread( + self, pm: Progress, stop_event: _threading.Event + ) -> _threading.Thread: + """Creates and returns a daemon thread to update a Progress object. + + The returned thread is not yet started. The thread will periodically + update the progress bar with information from _GetSyncProgressMessage + until the stop_event is set. + + Args: + pm: The Progress object to update. + stop_event: The threading.Event to signal the monitor to stop. + + Returns: + The configured _threading.Thread object. + """ + + def _monitor_loop(): + """The target function for the monitor thread.""" + while True: + # Update the progress bar with the current status message. + pm.update(inc=0, msg=self._GetSyncProgressMessage()) + # Wait for 1 second or until the stop_event is set. + if stop_event.wait(timeout=1): + return + + return _threading.Thread(target=_monitor_loop, daemon=True) + def _SyncPhased( self, opt, @@ -1890,15 +1963,6 @@ later is required to fix a server side protocol bug. err_update_projects = False err_update_linkfiles = False - # Log the repo projects by existing and new. - existing = [x for x in all_projects if x.Exists] - mp.config.SetString("repo.existingprojectcount", str(len(existing))) - mp.config.SetString( - "repo.newprojectcount", str(len(all_projects) - len(existing)) - ) - - self._fetch_times = _FetchTimes(manifest) - self._local_sync_state = LocalSyncState(manifest) if not opt.local_only: with multiprocessing.Manager() as manager: with ssh.ProxyManager(manager) as ssh_proxy: @@ -2003,6 +2067,88 @@ later is required to fix a server side protocol bug. ) raise SyncError(aggregate_errors=errors) + @classmethod + def _SyncProjectList(cls, opt, project_indices) -> _InterleavedSyncResult: + """Worker for interleaved sync. + + This function is responsible for syncing a group of projects that share + a git object directory. + + Args: + opt: Program options returned from optparse. See _Options(). + project_indices: A list of indices into the projects list stored in + the parallel context. + + Returns: + An `_InterleavedSyncResult` containing the results for each project. + """ + results = [] + context = cls.get_parallel_context() + projects = context["projects"] + sync_dict = context["sync_dict"] + + assert project_indices, "_SyncProjectList called with no indices." + + # Use the first project as the representative for the progress bar. + first_project = projects[project_indices[0]] + key = f"{first_project.name} @ {first_project.relpath}" + start_time = time.time() + sync_dict[key] = start_time + + try: + for idx in project_indices: + project = projects[idx] + # For now, simulate a successful sync. + # TODO(b/421935613): Perform the actual git fetch and checkout. + results.append( + _SyncResult( + relpath=project.relpath, + fetch_success=True, + checkout_success=True, + fetch_error=None, + checkout_error=None, + fetch_start=None, + fetch_finish=None, + checkout_start=None, + checkout_finish=None, + ) + ) + finally: + del sync_dict[key] + + return _InterleavedSyncResult(results=results) + + def _ProcessSyncInterleavedResults( + self, + synced_relpaths: Set[str], + err_event: _threading.Event, + errors: List[Exception], + opt: optparse.Values, + pool: Optional[multiprocessing.Pool], + pm: Progress, + results_sets: List[_InterleavedSyncResult], + ): + """Callback to process results from interleaved sync workers.""" + ret = True + for result_group in results_sets: + for result in result_group.results: + pm.update() + if result.fetch_success and result.checkout_success: + synced_relpaths.add(result.relpath) + else: + ret = False + err_event.set() + if result.fetch_error: + errors.append(result.fetch_error) + if result.checkout_error: + errors.append(result.checkout_error) + + if not ret and opt.fail_fast: + if pool: + pool.close() + break + return ret + def _SyncInterleaved( self, opt, @@ -2026,7 +2172,116 @@ later is required to fix a server side protocol bug. 2. Projects that share git objects are processed serially to prevent race conditions. """ - raise NotImplementedError("Interleaved sync is not implemented yet.") + err_event = multiprocessing.Event() + synced_relpaths = set() + project_list = list(all_projects) + pm = Progress( + "Syncing", + len(project_list), + delay=False, + quiet=opt.quiet, + show_elapsed=True, + elide=True, + ) + previously_pending_relpaths = set() + + sync_event = _threading.Event() + sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) + + with self.ParallelContext(): + # TODO(gavinmak): Use multprocessing.Queue instead of dict. + self.get_parallel_context()[ + "sync_dict" + ] = multiprocessing.Manager().dict() + sync_progress_thread.start() + + try: + # Outer loop for dynamic project discovery (e.g., submodules). + # It continues until no unsynced projects remain. + while True: + projects_to_sync = [ + p + for p in project_list + if p.relpath not in synced_relpaths + ] + if not projects_to_sync: + break + + pending_relpaths = {p.relpath for p in projects_to_sync} + if previously_pending_relpaths == pending_relpaths: + logger.error( + "Stall detected in interleaved sync, not all " + "projects could be synced." + ) + err_event.set() + break + previously_pending_relpaths = pending_relpaths + + # Update the projects list for workers in the current pass. + self.get_parallel_context()["projects"] = projects_to_sync + project_index_map = { + p: i for i, p in enumerate(projects_to_sync) + } + + # Inner loop to process projects in a hierarchical order. + # This iterates through levels of project dependencies (e.g. + # 'foo' then 'foo/bar'). All projects in one level can be + # processed in parallel, but we must wait for a level to + # complete before starting the next. + for level_projects in _SafeCheckoutOrder(projects_to_sync): + if not level_projects: + continue + + objdir_project_map = collections.defaultdict(list) + for p in level_projects: + objdir_project_map[p.objdir].append( + project_index_map[p] + ) + + work_items = list(objdir_project_map.values()) + if not work_items: + continue + + jobs = max(1, min(opt.jobs, len(work_items))) + callback = functools.partial( + self._ProcessSyncInterleavedResults, + synced_relpaths, + err_event, + errors, + opt, + ) + if not self.ExecuteInParallel( + jobs, + functools.partial(self._SyncProjectList, opt), + work_items, + callback=callback, + output=pm, + chunksize=1, + ): + err_event.set() + + if err_event.is_set() and opt.fail_fast: + raise SyncFailFastError(aggregate_errors=errors) + + self._ReloadManifest(None, manifest) + project_list = self.GetProjects( + args, + missing_ok=True, + submodules_ok=opt.fetch_submodules, + manifest=manifest, + all_manifests=not opt.this_manifest_only, + ) + finally: + sync_event.set() + sync_progress_thread.join() + + pm.end() + + if err_event.is_set(): + logger.error( + "error: Unable to fully sync the tree in interleaved mode." + ) + raise SyncError(aggregate_errors=errors) def _PostRepoUpgrade(manifest, quiet=False): diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py index b871317c..60f283af 100644 --- a/tests/test_subcmds_sync.py +++ b/tests/test_subcmds_sync.py @@ -305,8 +305,10 @@ class LocalSyncState(unittest.TestCase): class FakeProject: - def __init__(self, relpath): + def __init__(self, relpath, name=None, objdir=None): self.relpath = relpath + self.name = name or relpath + self.objdir = objdir or relpath def __str__(self): return f"project: {self.relpath}" @@ -513,3 +515,123 @@ class SyncCommand(unittest.TestCase): self.cmd.Execute(self.opt, []) self.assertIn(self.sync_local_half_error, e.aggregate_errors) self.assertIn(self.sync_network_half_error, e.aggregate_errors) + + +class InterleavedSyncTest(unittest.TestCase): + """Tests for interleaved sync.""" + + def setUp(self): + """Set up a sync command with mocks.""" + self.repodir = tempfile.mkdtemp(".repo") + self.manifest = mock.MagicMock(repodir=self.repodir) + self.manifest.repoProject.LastFetch = time.time() + self.manifest.repoProject.worktree = self.repodir + self.manifest.manifestProject.worktree = self.repodir + self.manifest.IsArchive = False + self.manifest.CloneBundle = False + self.manifest.default.sync_j = 1 + + self.cmd = sync.Sync(manifest=self.manifest) + self.cmd.outer_manifest = self.manifest + + # Mock projects. + self.projA = FakeProject("projA", objdir="objA") + self.projB = FakeProject("projB", objdir="objB") + self.projA_sub = FakeProject( + "projA/sub", name="projA_sub", objdir="objA_sub" + ) + self.projC = FakeProject("projC", objdir="objC") + + # Mock methods that are not part of the core interleaved sync logic. + mock.patch.object(self.cmd, "_UpdateAllManifestProjects").start() + mock.patch.object(self.cmd, "_UpdateProjectsRevisionId").start() + mock.patch.object(self.cmd, "_ValidateOptionsWithManifest").start() + mock.patch.object(sync, "_PostRepoUpgrade").start() + mock.patch.object(sync, "_PostRepoFetch").start() + + def tearDown(self): + """Clean up resources.""" + shutil.rmtree(self.repodir) + mock.patch.stopall() + + def test_interleaved_fail_fast(self): + """Test that --fail-fast is respected in interleaved mode.""" + opt, args = self.cmd.OptionParser.parse_args( + ["--interleaved", "--fail-fast", "-j2"] + ) + opt.quiet = True + + # With projA/sub, _SafeCheckoutOrder creates two batches: + # 1. [projA, projB] + # 2. [projA/sub] + # We want to fail on the first batch and ensure the second isn't run. + all_projects = [self.projA, self.projB, self.projA_sub] + mock.patch.object( + self.cmd, "GetProjects", return_value=all_projects + ).start() + + # Mock ExecuteInParallel to simulate a failed run on the first batch of + # projects. + execute_mock = mock.patch.object( + self.cmd, "ExecuteInParallel", return_value=False + ).start() + + with self.assertRaises(sync.SyncFailFastError): + self.cmd._SyncInterleaved( + opt, + args, + [], + self.manifest, + self.manifest.manifestProject, + all_projects, + {}, + ) + + execute_mock.assert_called_once() + + def test_interleaved_shared_objdir_serial(self): + """Test that projects with shared objdir are processed serially.""" + opt, args = self.cmd.OptionParser.parse_args(["--interleaved", "-j4"]) + opt.quiet = True + + # Setup projects with a shared objdir. + self.projA.objdir = "common_objdir" + self.projC.objdir = "common_objdir" + + all_projects = [self.projA, self.projB, self.projC] + mock.patch.object( + self.cmd, "GetProjects", return_value=all_projects + ).start() + + def execute_side_effect(jobs, target, work_items, **kwargs): + # The callback is a partial object. The first arg is the set we + # need to update to avoid the stall detection. + synced_relpaths_set = kwargs["callback"].args[0] + projects_in_pass = self.cmd.get_parallel_context()["projects"] + for item in work_items: + for project_idx in item: + synced_relpaths_set.add( + projects_in_pass[project_idx].relpath + ) + return True + + execute_mock = mock.patch.object( + self.cmd, "ExecuteInParallel", side_effect=execute_side_effect + ).start() + + self.cmd._SyncInterleaved( + opt, + args, + [], + self.manifest, + self.manifest.manifestProject, + all_projects, + {}, + ) + + execute_mock.assert_called_once() + jobs_arg, _, work_items = execute_mock.call_args.args + self.assertEqual(jobs_arg, 2) + work_items_sets = {frozenset(item) for item in work_items} + expected_sets = {frozenset([0, 2]), frozenset([1])} + self.assertEqual(work_items_sets, expected_sets) -- cgit v1.2.3-54-g00ecf