From b4b323a8bd02d52d060f7f6fa15ba045df5af5b2 Mon Sep 17 00:00:00 2001 From: Gavin Mak Date: Tue, 17 Jun 2025 10:54:41 -0700 Subject: sync: Add orchestration logic for --interleaved Introduce the parallel orchestration framework for `repo sync --interleaved`. The new logic respects project dependencies by processing them in hierarchical levels. Projects sharing a git object directory are grouped and processed serially. Also reuse the familiar fetch progress bar UX. Bug: 421935613 Change-Id: Ia388a231fa96b3220e343f952f07021bc9817d19 Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/483281 Commit-Queue: Gavin Mak Tested-by: Gavin Mak Reviewed-by: Scott Lee --- subcmds/sync.py | 295 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 275 insertions(+), 20 deletions(-) (limited to 'subcmds') diff --git a/subcmds/sync.py b/subcmds/sync.py index 6e369a10..711baca2 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py @@ -25,7 +25,7 @@ from pathlib import Path import sys import tempfile import time -from typing import List, NamedTuple, Set, Union +from typing import List, NamedTuple, Optional, Set, Union import urllib.error import urllib.parse import urllib.request @@ -194,6 +194,49 @@ class _CheckoutOneResult(NamedTuple): finish: float +class _SyncResult(NamedTuple): + """Individual project sync result for interleaved mode. + + Attributes: + relpath (str): The project's relative path from the repo client top. + fetch_success (bool): True if the fetch operation was successful. + checkout_success (bool): True if the checkout operation was + successful. + fetch_error (Optional[Exception]): The Exception from a failed fetch, + or None. + checkout_error (Optional[Exception]): The Exception from a failed + checkout, or None. + fetch_start (Optional[float]): The time.time() when fetch started. + fetch_finish (Optional[float]): The time.time() when fetch finished. + checkout_start (Optional[float]): The time.time() when checkout + started. + checkout_finish (Optional[float]): The time.time() when checkout + finished. + """ + + relpath: str + fetch_success: bool + checkout_success: bool + fetch_error: Optional[Exception] + checkout_error: Optional[Exception] + + fetch_start: Optional[float] + fetch_finish: Optional[float] + checkout_start: Optional[float] + checkout_finish: Optional[float] + + +class _InterleavedSyncResult(NamedTuple): + """Result of an interleaved sync. + + Attributes: + results (List[_SyncResult]): A list of results, one for each project + processed. Empty if the worker failed before creating results. + """ + + results: List[_SyncResult] + + class SuperprojectError(SyncError): """Superproject sync repo.""" @@ -837,15 +880,7 @@ later is required to fix a server side protocol bug. ) sync_event = _threading.Event() - - def _MonitorSyncLoop(): - while True: - pm.update(inc=0, msg=self._GetSyncProgressMessage()) - if sync_event.wait(timeout=1): - return - - sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop) - sync_progress_thread.daemon = True + sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) def _ProcessResults(pool, pm, results_sets): ret = True @@ -1828,6 +1863,16 @@ later is required to fix a server side protocol bug. all_manifests=not opt.this_manifest_only, ) + # Log the repo projects by existing and new. + existing = [x for x in all_projects if x.Exists] + mp.config.SetString("repo.existingprojectcount", str(len(existing))) + mp.config.SetString( + "repo.newprojectcount", str(len(all_projects) - len(existing)) + ) + + self._fetch_times = _FetchTimes(manifest) + self._local_sync_state = LocalSyncState(manifest) + if opt.interleaved: sync_method = self._SyncInterleaved else: @@ -1864,6 +1909,34 @@ later is required to fix a server side protocol bug. if not opt.quiet: print("repo sync has finished successfully.") + def _CreateSyncProgressThread( + self, pm: Progress, stop_event: _threading.Event + ) -> _threading.Thread: + """Creates and returns a daemon thread to update a Progress object. + + The returned thread is not yet started. The thread will periodically + update the progress bar with information from _GetSyncProgressMessage + until the stop_event is set. + + Args: + pm: The Progress object to update. + stop_event: The threading.Event to signal the monitor to stop. + + Returns: + The configured _threading.Thread object. + """ + + def _monitor_loop(): + """The target function for the monitor thread.""" + while True: + # Update the progress bar with the current status message. + pm.update(inc=0, msg=self._GetSyncProgressMessage()) + # Wait for 1 second or until the stop_event is set. + if stop_event.wait(timeout=1): + return + + return _threading.Thread(target=_monitor_loop, daemon=True) + def _SyncPhased( self, opt, @@ -1890,15 +1963,6 @@ later is required to fix a server side protocol bug. err_update_projects = False err_update_linkfiles = False - # Log the repo projects by existing and new. - existing = [x for x in all_projects if x.Exists] - mp.config.SetString("repo.existingprojectcount", str(len(existing))) - mp.config.SetString( - "repo.newprojectcount", str(len(all_projects) - len(existing)) - ) - - self._fetch_times = _FetchTimes(manifest) - self._local_sync_state = LocalSyncState(manifest) if not opt.local_only: with multiprocessing.Manager() as manager: with ssh.ProxyManager(manager) as ssh_proxy: @@ -2003,6 +2067,88 @@ later is required to fix a server side protocol bug. ) raise SyncError(aggregate_errors=errors) + @classmethod + def _SyncProjectList(cls, opt, project_indices) -> _InterleavedSyncResult: + """Worker for interleaved sync. + + This function is responsible for syncing a group of projects that share + a git object directory. + + Args: + opt: Program options returned from optparse. See _Options(). + project_indices: A list of indices into the projects list stored in + the parallel context. + + Returns: + An `_InterleavedSyncResult` containing the results for each project. + """ + results = [] + context = cls.get_parallel_context() + projects = context["projects"] + sync_dict = context["sync_dict"] + + assert project_indices, "_SyncProjectList called with no indices." + + # Use the first project as the representative for the progress bar. + first_project = projects[project_indices[0]] + key = f"{first_project.name} @ {first_project.relpath}" + start_time = time.time() + sync_dict[key] = start_time + + try: + for idx in project_indices: + project = projects[idx] + # For now, simulate a successful sync. + # TODO(b/421935613): Perform the actual git fetch and checkout. + results.append( + _SyncResult( + relpath=project.relpath, + fetch_success=True, + checkout_success=True, + fetch_error=None, + checkout_error=None, + fetch_start=None, + fetch_finish=None, + checkout_start=None, + checkout_finish=None, + ) + ) + finally: + del sync_dict[key] + + return _InterleavedSyncResult(results=results) + + def _ProcessSyncInterleavedResults( + self, + synced_relpaths: Set[str], + err_event: _threading.Event, + errors: List[Exception], + opt: optparse.Values, + pool: Optional[multiprocessing.Pool], + pm: Progress, + results_sets: List[_InterleavedSyncResult], + ): + """Callback to process results from interleaved sync workers.""" + ret = True + for result_group in results_sets: + for result in result_group.results: + pm.update() + if result.fetch_success and result.checkout_success: + synced_relpaths.add(result.relpath) + else: + ret = False + err_event.set() + if result.fetch_error: + errors.append(result.fetch_error) + if result.checkout_error: + errors.append(result.checkout_error) + + if not ret and opt.fail_fast: + if pool: + pool.close() + break + return ret + def _SyncInterleaved( self, opt, @@ -2026,7 +2172,116 @@ later is required to fix a server side protocol bug. 2. Projects that share git objects are processed serially to prevent race conditions. """ - raise NotImplementedError("Interleaved sync is not implemented yet.") + err_event = multiprocessing.Event() + synced_relpaths = set() + project_list = list(all_projects) + pm = Progress( + "Syncing", + len(project_list), + delay=False, + quiet=opt.quiet, + show_elapsed=True, + elide=True, + ) + previously_pending_relpaths = set() + + sync_event = _threading.Event() + sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) + + with self.ParallelContext(): + # TODO(gavinmak): Use multprocessing.Queue instead of dict. + self.get_parallel_context()[ + "sync_dict" + ] = multiprocessing.Manager().dict() + sync_progress_thread.start() + + try: + # Outer loop for dynamic project discovery (e.g., submodules). + # It continues until no unsynced projects remain. + while True: + projects_to_sync = [ + p + for p in project_list + if p.relpath not in synced_relpaths + ] + if not projects_to_sync: + break + + pending_relpaths = {p.relpath for p in projects_to_sync} + if previously_pending_relpaths == pending_relpaths: + logger.error( + "Stall detected in interleaved sync, not all " + "projects could be synced." + ) + err_event.set() + break + previously_pending_relpaths = pending_relpaths + + # Update the projects list for workers in the current pass. + self.get_parallel_context()["projects"] = projects_to_sync + project_index_map = { + p: i for i, p in enumerate(projects_to_sync) + } + + # Inner loop to process projects in a hierarchical order. + # This iterates through levels of project dependencies (e.g. + # 'foo' then 'foo/bar'). All projects in one level can be + # processed in parallel, but we must wait for a level to + # complete before starting the next. + for level_projects in _SafeCheckoutOrder(projects_to_sync): + if not level_projects: + continue + + objdir_project_map = collections.defaultdict(list) + for p in level_projects: + objdir_project_map[p.objdir].append( + project_index_map[p] + ) + + work_items = list(objdir_project_map.values()) + if not work_items: + continue + + jobs = max(1, min(opt.jobs, len(work_items))) + callback = functools.partial( + self._ProcessSyncInterleavedResults, + synced_relpaths, + err_event, + errors, + opt, + ) + if not self.ExecuteInParallel( + jobs, + functools.partial(self._SyncProjectList, opt), + work_items, + callback=callback, + output=pm, + chunksize=1, + ): + err_event.set() + + if err_event.is_set() and opt.fail_fast: + raise SyncFailFastError(aggregate_errors=errors) + + self._ReloadManifest(None, manifest) + project_list = self.GetProjects( + args, + missing_ok=True, + submodules_ok=opt.fetch_submodules, + manifest=manifest, + all_manifests=not opt.this_manifest_only, + ) + finally: + sync_event.set() + sync_progress_thread.join() + + pm.end() + + if err_event.is_set(): + logger.error( + "error: Unable to fully sync the tree in interleaved mode." + ) + raise SyncError(aggregate_errors=errors) def _PostRepoUpgrade(manifest, quiet=False): -- cgit v1.2.3-54-g00ecf