From b4b323a8bd02d52d060f7f6fa15ba045df5af5b2 Mon Sep 17 00:00:00 2001 From: Gavin Mak Date: Tue, 17 Jun 2025 10:54:41 -0700 Subject: sync: Add orchestration logic for --interleaved Introduce the parallel orchestration framework for `repo sync --interleaved`. The new logic respects project dependencies by processing them in hierarchical levels. Projects sharing a git object directory are grouped and processed serially. Also reuse the familiar fetch progress bar UX. Bug: 421935613 Change-Id: Ia388a231fa96b3220e343f952f07021bc9817d19 Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/483281 Commit-Queue: Gavin Mak Tested-by: Gavin Mak Reviewed-by: Scott Lee --- tests/test_subcmds_sync.py | 124 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 123 insertions(+), 1 deletion(-) (limited to 'tests') diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py index b871317c..60f283af 100644 --- a/tests/test_subcmds_sync.py +++ b/tests/test_subcmds_sync.py @@ -305,8 +305,10 @@ class LocalSyncState(unittest.TestCase): class FakeProject: - def __init__(self, relpath): + def __init__(self, relpath, name=None, objdir=None): self.relpath = relpath + self.name = name or relpath + self.objdir = objdir or relpath def __str__(self): return f"project: {self.relpath}" @@ -513,3 +515,123 @@ class SyncCommand(unittest.TestCase): self.cmd.Execute(self.opt, []) self.assertIn(self.sync_local_half_error, e.aggregate_errors) self.assertIn(self.sync_network_half_error, e.aggregate_errors) + + +class InterleavedSyncTest(unittest.TestCase): + """Tests for interleaved sync.""" + + def setUp(self): + """Set up a sync command with mocks.""" + self.repodir = tempfile.mkdtemp(".repo") + self.manifest = mock.MagicMock(repodir=self.repodir) + self.manifest.repoProject.LastFetch = time.time() + self.manifest.repoProject.worktree = self.repodir + self.manifest.manifestProject.worktree = self.repodir + self.manifest.IsArchive = False + self.manifest.CloneBundle = False + self.manifest.default.sync_j = 1 + + self.cmd = sync.Sync(manifest=self.manifest) + self.cmd.outer_manifest = self.manifest + + # Mock projects. + self.projA = FakeProject("projA", objdir="objA") + self.projB = FakeProject("projB", objdir="objB") + self.projA_sub = FakeProject( + "projA/sub", name="projA_sub", objdir="objA_sub" + ) + self.projC = FakeProject("projC", objdir="objC") + + # Mock methods that are not part of the core interleaved sync logic. + mock.patch.object(self.cmd, "_UpdateAllManifestProjects").start() + mock.patch.object(self.cmd, "_UpdateProjectsRevisionId").start() + mock.patch.object(self.cmd, "_ValidateOptionsWithManifest").start() + mock.patch.object(sync, "_PostRepoUpgrade").start() + mock.patch.object(sync, "_PostRepoFetch").start() + + def tearDown(self): + """Clean up resources.""" + shutil.rmtree(self.repodir) + mock.patch.stopall() + + def test_interleaved_fail_fast(self): + """Test that --fail-fast is respected in interleaved mode.""" + opt, args = self.cmd.OptionParser.parse_args( + ["--interleaved", "--fail-fast", "-j2"] + ) + opt.quiet = True + + # With projA/sub, _SafeCheckoutOrder creates two batches: + # 1. [projA, projB] + # 2. [projA/sub] + # We want to fail on the first batch and ensure the second isn't run. + all_projects = [self.projA, self.projB, self.projA_sub] + mock.patch.object( + self.cmd, "GetProjects", return_value=all_projects + ).start() + + # Mock ExecuteInParallel to simulate a failed run on the first batch of + # projects. + execute_mock = mock.patch.object( + self.cmd, "ExecuteInParallel", return_value=False + ).start() + + with self.assertRaises(sync.SyncFailFastError): + self.cmd._SyncInterleaved( + opt, + args, + [], + self.manifest, + self.manifest.manifestProject, + all_projects, + {}, + ) + + execute_mock.assert_called_once() + + def test_interleaved_shared_objdir_serial(self): + """Test that projects with shared objdir are processed serially.""" + opt, args = self.cmd.OptionParser.parse_args(["--interleaved", "-j4"]) + opt.quiet = True + + # Setup projects with a shared objdir. + self.projA.objdir = "common_objdir" + self.projC.objdir = "common_objdir" + + all_projects = [self.projA, self.projB, self.projC] + mock.patch.object( + self.cmd, "GetProjects", return_value=all_projects + ).start() + + def execute_side_effect(jobs, target, work_items, **kwargs): + # The callback is a partial object. The first arg is the set we + # need to update to avoid the stall detection. + synced_relpaths_set = kwargs["callback"].args[0] + projects_in_pass = self.cmd.get_parallel_context()["projects"] + for item in work_items: + for project_idx in item: + synced_relpaths_set.add( + projects_in_pass[project_idx].relpath + ) + return True + + execute_mock = mock.patch.object( + self.cmd, "ExecuteInParallel", side_effect=execute_side_effect + ).start() + + self.cmd._SyncInterleaved( + opt, + args, + [], + self.manifest, + self.manifest.manifestProject, + all_projects, + {}, + ) + + execute_mock.assert_called_once() + jobs_arg, _, work_items = execute_mock.call_args.args + self.assertEqual(jobs_arg, 2) + work_items_sets = {frozenset(item) for item in work_items} + expected_sets = {frozenset([0, 2]), frozenset([1])} + self.assertEqual(work_items_sets, expected_sets) -- cgit v1.2.3-54-g00ecf