summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGavin Mak <gavinmak@google.com>2025-06-17 10:54:41 -0700
committerLUCI <gerrit-scoped@luci-project-accounts.iam.gserviceaccount.com>2025-06-17 16:13:36 -0700
commitb4b323a8bd02d52d060f7f6fa15ba045df5af5b2 (patch)
treec8a5d836db5ca1b4d3532c3ae995005081ff2cb1
parentf91f4462e6365b5545b39be597dab23619b8d291 (diff)
downloadgit-repo-b4b323a8bd02d52d060f7f6fa15ba045df5af5b2.tar.gz
sync: Add orchestration logic for --interleaved
Introduce the parallel orchestration framework for `repo sync --interleaved`. The new logic respects project dependencies by processing them in hierarchical levels. Projects sharing a git object directory are grouped and processed serially. Also reuse the familiar fetch progress bar UX. Bug: 421935613 Change-Id: Ia388a231fa96b3220e343f952f07021bc9817d19 Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/483281 Commit-Queue: Gavin Mak <gavinmak@google.com> Tested-by: Gavin Mak <gavinmak@google.com> Reviewed-by: Scott Lee <ddoman@google.com>
-rw-r--r--subcmds/sync.py295
-rw-r--r--tests/test_subcmds_sync.py124
2 files changed, 398 insertions, 21 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py
index 6e369a10..711baca2 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -25,7 +25,7 @@ from pathlib import Path
25import sys 25import sys
26import tempfile 26import tempfile
27import time 27import time
28from typing import List, NamedTuple, Set, Union 28from typing import List, NamedTuple, Optional, Set, Union
29import urllib.error 29import urllib.error
30import urllib.parse 30import urllib.parse
31import urllib.request 31import urllib.request
@@ -194,6 +194,49 @@ class _CheckoutOneResult(NamedTuple):
194 finish: float 194 finish: float
195 195
196 196
197class _SyncResult(NamedTuple):
198 """Individual project sync result for interleaved mode.
199
200 Attributes:
201 relpath (str): The project's relative path from the repo client top.
202 fetch_success (bool): True if the fetch operation was successful.
203 checkout_success (bool): True if the checkout operation was
204 successful.
205 fetch_error (Optional[Exception]): The Exception from a failed fetch,
206 or None.
207 checkout_error (Optional[Exception]): The Exception from a failed
208 checkout, or None.
209 fetch_start (Optional[float]): The time.time() when fetch started.
210 fetch_finish (Optional[float]): The time.time() when fetch finished.
211 checkout_start (Optional[float]): The time.time() when checkout
212 started.
213 checkout_finish (Optional[float]): The time.time() when checkout
214 finished.
215 """
216
217 relpath: str
218 fetch_success: bool
219 checkout_success: bool
220 fetch_error: Optional[Exception]
221 checkout_error: Optional[Exception]
222
223 fetch_start: Optional[float]
224 fetch_finish: Optional[float]
225 checkout_start: Optional[float]
226 checkout_finish: Optional[float]
227
228
229class _InterleavedSyncResult(NamedTuple):
230 """Result of an interleaved sync.
231
232 Attributes:
233 results (List[_SyncResult]): A list of results, one for each project
234 processed. Empty if the worker failed before creating results.
235 """
236
237 results: List[_SyncResult]
238
239
197class SuperprojectError(SyncError): 240class SuperprojectError(SyncError):
198 """Superproject sync repo.""" 241 """Superproject sync repo."""
199 242
@@ -837,15 +880,7 @@ later is required to fix a server side protocol bug.
837 ) 880 )
838 881
839 sync_event = _threading.Event() 882 sync_event = _threading.Event()
840 883 sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event)
841 def _MonitorSyncLoop():
842 while True:
843 pm.update(inc=0, msg=self._GetSyncProgressMessage())
844 if sync_event.wait(timeout=1):
845 return
846
847 sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop)
848 sync_progress_thread.daemon = True
849 884
850 def _ProcessResults(pool, pm, results_sets): 885 def _ProcessResults(pool, pm, results_sets):
851 ret = True 886 ret = True
@@ -1828,6 +1863,16 @@ later is required to fix a server side protocol bug.
1828 all_manifests=not opt.this_manifest_only, 1863 all_manifests=not opt.this_manifest_only,
1829 ) 1864 )
1830 1865
1866 # Log the repo projects by existing and new.
1867 existing = [x for x in all_projects if x.Exists]
1868 mp.config.SetString("repo.existingprojectcount", str(len(existing)))
1869 mp.config.SetString(
1870 "repo.newprojectcount", str(len(all_projects) - len(existing))
1871 )
1872
1873 self._fetch_times = _FetchTimes(manifest)
1874 self._local_sync_state = LocalSyncState(manifest)
1875
1831 if opt.interleaved: 1876 if opt.interleaved:
1832 sync_method = self._SyncInterleaved 1877 sync_method = self._SyncInterleaved
1833 else: 1878 else:
@@ -1864,6 +1909,34 @@ later is required to fix a server side protocol bug.
1864 if not opt.quiet: 1909 if not opt.quiet:
1865 print("repo sync has finished successfully.") 1910 print("repo sync has finished successfully.")
1866 1911
1912 def _CreateSyncProgressThread(
1913 self, pm: Progress, stop_event: _threading.Event
1914 ) -> _threading.Thread:
1915 """Creates and returns a daemon thread to update a Progress object.
1916
1917 The returned thread is not yet started. The thread will periodically
1918 update the progress bar with information from _GetSyncProgressMessage
1919 until the stop_event is set.
1920
1921 Args:
1922 pm: The Progress object to update.
1923 stop_event: The threading.Event to signal the monitor to stop.
1924
1925 Returns:
1926 The configured _threading.Thread object.
1927 """
1928
1929 def _monitor_loop():
1930 """The target function for the monitor thread."""
1931 while True:
1932 # Update the progress bar with the current status message.
1933 pm.update(inc=0, msg=self._GetSyncProgressMessage())
1934 # Wait for 1 second or until the stop_event is set.
1935 if stop_event.wait(timeout=1):
1936 return
1937
1938 return _threading.Thread(target=_monitor_loop, daemon=True)
1939
1867 def _SyncPhased( 1940 def _SyncPhased(
1868 self, 1941 self,
1869 opt, 1942 opt,
@@ -1890,15 +1963,6 @@ later is required to fix a server side protocol bug.
1890 err_update_projects = False 1963 err_update_projects = False
1891 err_update_linkfiles = False 1964 err_update_linkfiles = False
1892 1965
1893 # Log the repo projects by existing and new.
1894 existing = [x for x in all_projects if x.Exists]
1895 mp.config.SetString("repo.existingprojectcount", str(len(existing)))
1896 mp.config.SetString(
1897 "repo.newprojectcount", str(len(all_projects) - len(existing))
1898 )
1899
1900 self._fetch_times = _FetchTimes(manifest)
1901 self._local_sync_state = LocalSyncState(manifest)
1902 if not opt.local_only: 1966 if not opt.local_only:
1903 with multiprocessing.Manager() as manager: 1967 with multiprocessing.Manager() as manager:
1904 with ssh.ProxyManager(manager) as ssh_proxy: 1968 with ssh.ProxyManager(manager) as ssh_proxy:
@@ -2003,6 +2067,88 @@ later is required to fix a server side protocol bug.
2003 ) 2067 )
2004 raise SyncError(aggregate_errors=errors) 2068 raise SyncError(aggregate_errors=errors)
2005 2069
2070 @classmethod
2071 def _SyncProjectList(cls, opt, project_indices) -> _InterleavedSyncResult:
2072 """Worker for interleaved sync.
2073
2074 This function is responsible for syncing a group of projects that share
2075 a git object directory.
2076
2077 Args:
2078 opt: Program options returned from optparse. See _Options().
2079 project_indices: A list of indices into the projects list stored in
2080 the parallel context.
2081
2082 Returns:
2083 An `_InterleavedSyncResult` containing the results for each project.
2084 """
2085 results = []
2086 context = cls.get_parallel_context()
2087 projects = context["projects"]
2088 sync_dict = context["sync_dict"]
2089
2090 assert project_indices, "_SyncProjectList called with no indices."
2091
2092 # Use the first project as the representative for the progress bar.
2093 first_project = projects[project_indices[0]]
2094 key = f"{first_project.name} @ {first_project.relpath}"
2095 start_time = time.time()
2096 sync_dict[key] = start_time
2097
2098 try:
2099 for idx in project_indices:
2100 project = projects[idx]
2101 # For now, simulate a successful sync.
2102 # TODO(b/421935613): Perform the actual git fetch and checkout.
2103 results.append(
2104 _SyncResult(
2105 relpath=project.relpath,
2106 fetch_success=True,
2107 checkout_success=True,
2108 fetch_error=None,
2109 checkout_error=None,
2110 fetch_start=None,
2111 fetch_finish=None,
2112 checkout_start=None,
2113 checkout_finish=None,
2114 )
2115 )
2116 finally:
2117 del sync_dict[key]
2118
2119 return _InterleavedSyncResult(results=results)
2120
2121 def _ProcessSyncInterleavedResults(
2122 self,
2123 synced_relpaths: Set[str],
2124 err_event: _threading.Event,
2125 errors: List[Exception],
2126 opt: optparse.Values,
2127 pool: Optional[multiprocessing.Pool],
2128 pm: Progress,
2129 results_sets: List[_InterleavedSyncResult],
2130 ):
2131 """Callback to process results from interleaved sync workers."""
2132 ret = True
2133 for result_group in results_sets:
2134 for result in result_group.results:
2135 pm.update()
2136 if result.fetch_success and result.checkout_success:
2137 synced_relpaths.add(result.relpath)
2138 else:
2139 ret = False
2140 err_event.set()
2141 if result.fetch_error:
2142 errors.append(result.fetch_error)
2143 if result.checkout_error:
2144 errors.append(result.checkout_error)
2145
2146 if not ret and opt.fail_fast:
2147 if pool:
2148 pool.close()
2149 break
2150 return ret
2151
2006 def _SyncInterleaved( 2152 def _SyncInterleaved(
2007 self, 2153 self,
2008 opt, 2154 opt,
@@ -2026,7 +2172,116 @@ later is required to fix a server side protocol bug.
2026 2. Projects that share git objects are processed serially to prevent 2172 2. Projects that share git objects are processed serially to prevent
2027 race conditions. 2173 race conditions.
2028 """ 2174 """
2029 raise NotImplementedError("Interleaved sync is not implemented yet.") 2175 err_event = multiprocessing.Event()
2176 synced_relpaths = set()
2177 project_list = list(all_projects)
2178 pm = Progress(
2179 "Syncing",
2180 len(project_list),
2181 delay=False,
2182 quiet=opt.quiet,
2183 show_elapsed=True,
2184 elide=True,
2185 )
2186 previously_pending_relpaths = set()
2187
2188 sync_event = _threading.Event()
2189 sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event)
2190
2191 with self.ParallelContext():
2192 # TODO(gavinmak): Use multprocessing.Queue instead of dict.
2193 self.get_parallel_context()[
2194 "sync_dict"
2195 ] = multiprocessing.Manager().dict()
2196 sync_progress_thread.start()
2197
2198 try:
2199 # Outer loop for dynamic project discovery (e.g., submodules).
2200 # It continues until no unsynced projects remain.
2201 while True:
2202 projects_to_sync = [
2203 p
2204 for p in project_list
2205 if p.relpath not in synced_relpaths
2206 ]
2207 if not projects_to_sync:
2208 break
2209
2210 pending_relpaths = {p.relpath for p in projects_to_sync}
2211 if previously_pending_relpaths == pending_relpaths:
2212 logger.error(
2213 "Stall detected in interleaved sync, not all "
2214 "projects could be synced."
2215 )
2216 err_event.set()
2217 break
2218 previously_pending_relpaths = pending_relpaths
2219
2220 # Update the projects list for workers in the current pass.
2221 self.get_parallel_context()["projects"] = projects_to_sync
2222 project_index_map = {
2223 p: i for i, p in enumerate(projects_to_sync)
2224 }
2225
2226 # Inner loop to process projects in a hierarchical order.
2227 # This iterates through levels of project dependencies (e.g.
2228 # 'foo' then 'foo/bar'). All projects in one level can be
2229 # processed in parallel, but we must wait for a level to
2230 # complete before starting the next.
2231 for level_projects in _SafeCheckoutOrder(projects_to_sync):
2232 if not level_projects:
2233 continue
2234
2235 objdir_project_map = collections.defaultdict(list)
2236 for p in level_projects:
2237 objdir_project_map[p.objdir].append(
2238 project_index_map[p]
2239 )
2240
2241 work_items = list(objdir_project_map.values())
2242 if not work_items:
2243 continue
2244
2245 jobs = max(1, min(opt.jobs, len(work_items)))
2246 callback = functools.partial(
2247 self._ProcessSyncInterleavedResults,
2248 synced_relpaths,
2249 err_event,
2250 errors,
2251 opt,
2252 )
2253 if not self.ExecuteInParallel(
2254 jobs,
2255 functools.partial(self._SyncProjectList, opt),
2256 work_items,
2257 callback=callback,
2258 output=pm,
2259 chunksize=1,
2260 ):
2261 err_event.set()
2262
2263 if err_event.is_set() and opt.fail_fast:
2264 raise SyncFailFastError(aggregate_errors=errors)
2265
2266 self._ReloadManifest(None, manifest)
2267 project_list = self.GetProjects(
2268 args,
2269 missing_ok=True,
2270 submodules_ok=opt.fetch_submodules,
2271 manifest=manifest,
2272 all_manifests=not opt.this_manifest_only,
2273 )
2274 finally:
2275 sync_event.set()
2276 sync_progress_thread.join()
2277
2278 pm.end()
2279
2280 if err_event.is_set():
2281 logger.error(
2282 "error: Unable to fully sync the tree in interleaved mode."
2283 )
2284 raise SyncError(aggregate_errors=errors)
2030 2285
2031 2286
2032def _PostRepoUpgrade(manifest, quiet=False): 2287def _PostRepoUpgrade(manifest, quiet=False):
diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py
index b871317c..60f283af 100644
--- a/tests/test_subcmds_sync.py
+++ b/tests/test_subcmds_sync.py
@@ -305,8 +305,10 @@ class LocalSyncState(unittest.TestCase):
305 305
306 306
307class FakeProject: 307class FakeProject:
308 def __init__(self, relpath): 308 def __init__(self, relpath, name=None, objdir=None):
309 self.relpath = relpath 309 self.relpath = relpath
310 self.name = name or relpath
311 self.objdir = objdir or relpath
310 312
311 def __str__(self): 313 def __str__(self):
312 return f"project: {self.relpath}" 314 return f"project: {self.relpath}"
@@ -513,3 +515,123 @@ class SyncCommand(unittest.TestCase):
513 self.cmd.Execute(self.opt, []) 515 self.cmd.Execute(self.opt, [])
514 self.assertIn(self.sync_local_half_error, e.aggregate_errors) 516 self.assertIn(self.sync_local_half_error, e.aggregate_errors)
515 self.assertIn(self.sync_network_half_error, e.aggregate_errors) 517 self.assertIn(self.sync_network_half_error, e.aggregate_errors)
518
519
520class InterleavedSyncTest(unittest.TestCase):
521 """Tests for interleaved sync."""
522
523 def setUp(self):
524 """Set up a sync command with mocks."""
525 self.repodir = tempfile.mkdtemp(".repo")
526 self.manifest = mock.MagicMock(repodir=self.repodir)
527 self.manifest.repoProject.LastFetch = time.time()
528 self.manifest.repoProject.worktree = self.repodir
529 self.manifest.manifestProject.worktree = self.repodir
530 self.manifest.IsArchive = False
531 self.manifest.CloneBundle = False
532 self.manifest.default.sync_j = 1
533
534 self.cmd = sync.Sync(manifest=self.manifest)
535 self.cmd.outer_manifest = self.manifest
536
537 # Mock projects.
538 self.projA = FakeProject("projA", objdir="objA")
539 self.projB = FakeProject("projB", objdir="objB")
540 self.projA_sub = FakeProject(
541 "projA/sub", name="projA_sub", objdir="objA_sub"
542 )
543 self.projC = FakeProject("projC", objdir="objC")
544
545 # Mock methods that are not part of the core interleaved sync logic.
546 mock.patch.object(self.cmd, "_UpdateAllManifestProjects").start()
547 mock.patch.object(self.cmd, "_UpdateProjectsRevisionId").start()
548 mock.patch.object(self.cmd, "_ValidateOptionsWithManifest").start()
549 mock.patch.object(sync, "_PostRepoUpgrade").start()
550 mock.patch.object(sync, "_PostRepoFetch").start()
551
552 def tearDown(self):
553 """Clean up resources."""
554 shutil.rmtree(self.repodir)
555 mock.patch.stopall()
556
557 def test_interleaved_fail_fast(self):
558 """Test that --fail-fast is respected in interleaved mode."""
559 opt, args = self.cmd.OptionParser.parse_args(
560 ["--interleaved", "--fail-fast", "-j2"]
561 )
562 opt.quiet = True
563
564 # With projA/sub, _SafeCheckoutOrder creates two batches:
565 # 1. [projA, projB]
566 # 2. [projA/sub]
567 # We want to fail on the first batch and ensure the second isn't run.
568 all_projects = [self.projA, self.projB, self.projA_sub]
569 mock.patch.object(
570 self.cmd, "GetProjects", return_value=all_projects
571 ).start()
572
573 # Mock ExecuteInParallel to simulate a failed run on the first batch of
574 # projects.
575 execute_mock = mock.patch.object(
576 self.cmd, "ExecuteInParallel", return_value=False
577 ).start()
578
579 with self.assertRaises(sync.SyncFailFastError):
580 self.cmd._SyncInterleaved(
581 opt,
582 args,
583 [],
584 self.manifest,
585 self.manifest.manifestProject,
586 all_projects,
587 {},
588 )
589
590 execute_mock.assert_called_once()
591
592 def test_interleaved_shared_objdir_serial(self):
593 """Test that projects with shared objdir are processed serially."""
594 opt, args = self.cmd.OptionParser.parse_args(["--interleaved", "-j4"])
595 opt.quiet = True
596
597 # Setup projects with a shared objdir.
598 self.projA.objdir = "common_objdir"
599 self.projC.objdir = "common_objdir"
600
601 all_projects = [self.projA, self.projB, self.projC]
602 mock.patch.object(
603 self.cmd, "GetProjects", return_value=all_projects
604 ).start()
605
606 def execute_side_effect(jobs, target, work_items, **kwargs):
607 # The callback is a partial object. The first arg is the set we
608 # need to update to avoid the stall detection.
609 synced_relpaths_set = kwargs["callback"].args[0]
610 projects_in_pass = self.cmd.get_parallel_context()["projects"]
611 for item in work_items:
612 for project_idx in item:
613 synced_relpaths_set.add(
614 projects_in_pass[project_idx].relpath
615 )
616 return True
617
618 execute_mock = mock.patch.object(
619 self.cmd, "ExecuteInParallel", side_effect=execute_side_effect
620 ).start()
621
622 self.cmd._SyncInterleaved(
623 opt,
624 args,
625 [],
626 self.manifest,
627 self.manifest.manifestProject,
628 all_projects,
629 {},
630 )
631
632 execute_mock.assert_called_once()
633 jobs_arg, _, work_items = execute_mock.call_args.args
634 self.assertEqual(jobs_arg, 2)
635 work_items_sets = {frozenset(item) for item in work_items}
636 expected_sets = {frozenset([0, 2]), frozenset([1])}
637 self.assertEqual(work_items_sets, expected_sets)