diff options
author | Gavin Mak <gavinmak@google.com> | 2025-06-17 10:54:41 -0700 |
---|---|---|
committer | LUCI <gerrit-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2025-06-17 16:13:36 -0700 |
commit | b4b323a8bd02d52d060f7f6fa15ba045df5af5b2 (patch) | |
tree | c8a5d836db5ca1b4d3532c3ae995005081ff2cb1 /subcmds/sync.py | |
parent | f91f4462e6365b5545b39be597dab23619b8d291 (diff) | |
download | git-repo-b4b323a8bd02d52d060f7f6fa15ba045df5af5b2.tar.gz |
sync: Add orchestration logic for --interleaved
Introduce the parallel orchestration framework for `repo sync
--interleaved`.
The new logic respects project dependencies by processing them in
hierarchical levels. Projects sharing a git object directory are grouped
and processed serially. Also reuse the familiar fetch progress bar UX.
Bug: 421935613
Change-Id: Ia388a231fa96b3220e343f952f07021bc9817d19
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/483281
Commit-Queue: Gavin Mak <gavinmak@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
Reviewed-by: Scott Lee <ddoman@google.com>
Diffstat (limited to 'subcmds/sync.py')
-rw-r--r-- | subcmds/sync.py | 295 |
1 files changed, 275 insertions, 20 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py index 6e369a10..711baca2 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
@@ -25,7 +25,7 @@ from pathlib import Path | |||
25 | import sys | 25 | import sys |
26 | import tempfile | 26 | import tempfile |
27 | import time | 27 | import time |
28 | from typing import List, NamedTuple, Set, Union | 28 | from typing import List, NamedTuple, Optional, Set, Union |
29 | import urllib.error | 29 | import urllib.error |
30 | import urllib.parse | 30 | import urllib.parse |
31 | import urllib.request | 31 | import urllib.request |
@@ -194,6 +194,49 @@ class _CheckoutOneResult(NamedTuple): | |||
194 | finish: float | 194 | finish: float |
195 | 195 | ||
196 | 196 | ||
197 | class _SyncResult(NamedTuple): | ||
198 | """Individual project sync result for interleaved mode. | ||
199 | |||
200 | Attributes: | ||
201 | relpath (str): The project's relative path from the repo client top. | ||
202 | fetch_success (bool): True if the fetch operation was successful. | ||
203 | checkout_success (bool): True if the checkout operation was | ||
204 | successful. | ||
205 | fetch_error (Optional[Exception]): The Exception from a failed fetch, | ||
206 | or None. | ||
207 | checkout_error (Optional[Exception]): The Exception from a failed | ||
208 | checkout, or None. | ||
209 | fetch_start (Optional[float]): The time.time() when fetch started. | ||
210 | fetch_finish (Optional[float]): The time.time() when fetch finished. | ||
211 | checkout_start (Optional[float]): The time.time() when checkout | ||
212 | started. | ||
213 | checkout_finish (Optional[float]): The time.time() when checkout | ||
214 | finished. | ||
215 | """ | ||
216 | |||
217 | relpath: str | ||
218 | fetch_success: bool | ||
219 | checkout_success: bool | ||
220 | fetch_error: Optional[Exception] | ||
221 | checkout_error: Optional[Exception] | ||
222 | |||
223 | fetch_start: Optional[float] | ||
224 | fetch_finish: Optional[float] | ||
225 | checkout_start: Optional[float] | ||
226 | checkout_finish: Optional[float] | ||
227 | |||
228 | |||
229 | class _InterleavedSyncResult(NamedTuple): | ||
230 | """Result of an interleaved sync. | ||
231 | |||
232 | Attributes: | ||
233 | results (List[_SyncResult]): A list of results, one for each project | ||
234 | processed. Empty if the worker failed before creating results. | ||
235 | """ | ||
236 | |||
237 | results: List[_SyncResult] | ||
238 | |||
239 | |||
197 | class SuperprojectError(SyncError): | 240 | class SuperprojectError(SyncError): |
198 | """Superproject sync repo.""" | 241 | """Superproject sync repo.""" |
199 | 242 | ||
@@ -837,15 +880,7 @@ later is required to fix a server side protocol bug. | |||
837 | ) | 880 | ) |
838 | 881 | ||
839 | sync_event = _threading.Event() | 882 | sync_event = _threading.Event() |
840 | 883 | sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) | |
841 | def _MonitorSyncLoop(): | ||
842 | while True: | ||
843 | pm.update(inc=0, msg=self._GetSyncProgressMessage()) | ||
844 | if sync_event.wait(timeout=1): | ||
845 | return | ||
846 | |||
847 | sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop) | ||
848 | sync_progress_thread.daemon = True | ||
849 | 884 | ||
850 | def _ProcessResults(pool, pm, results_sets): | 885 | def _ProcessResults(pool, pm, results_sets): |
851 | ret = True | 886 | ret = True |
@@ -1828,6 +1863,16 @@ later is required to fix a server side protocol bug. | |||
1828 | all_manifests=not opt.this_manifest_only, | 1863 | all_manifests=not opt.this_manifest_only, |
1829 | ) | 1864 | ) |
1830 | 1865 | ||
1866 | # Log the repo projects by existing and new. | ||
1867 | existing = [x for x in all_projects if x.Exists] | ||
1868 | mp.config.SetString("repo.existingprojectcount", str(len(existing))) | ||
1869 | mp.config.SetString( | ||
1870 | "repo.newprojectcount", str(len(all_projects) - len(existing)) | ||
1871 | ) | ||
1872 | |||
1873 | self._fetch_times = _FetchTimes(manifest) | ||
1874 | self._local_sync_state = LocalSyncState(manifest) | ||
1875 | |||
1831 | if opt.interleaved: | 1876 | if opt.interleaved: |
1832 | sync_method = self._SyncInterleaved | 1877 | sync_method = self._SyncInterleaved |
1833 | else: | 1878 | else: |
@@ -1864,6 +1909,34 @@ later is required to fix a server side protocol bug. | |||
1864 | if not opt.quiet: | 1909 | if not opt.quiet: |
1865 | print("repo sync has finished successfully.") | 1910 | print("repo sync has finished successfully.") |
1866 | 1911 | ||
1912 | def _CreateSyncProgressThread( | ||
1913 | self, pm: Progress, stop_event: _threading.Event | ||
1914 | ) -> _threading.Thread: | ||
1915 | """Creates and returns a daemon thread to update a Progress object. | ||
1916 | |||
1917 | The returned thread is not yet started. The thread will periodically | ||
1918 | update the progress bar with information from _GetSyncProgressMessage | ||
1919 | until the stop_event is set. | ||
1920 | |||
1921 | Args: | ||
1922 | pm: The Progress object to update. | ||
1923 | stop_event: The threading.Event to signal the monitor to stop. | ||
1924 | |||
1925 | Returns: | ||
1926 | The configured _threading.Thread object. | ||
1927 | """ | ||
1928 | |||
1929 | def _monitor_loop(): | ||
1930 | """The target function for the monitor thread.""" | ||
1931 | while True: | ||
1932 | # Update the progress bar with the current status message. | ||
1933 | pm.update(inc=0, msg=self._GetSyncProgressMessage()) | ||
1934 | # Wait for 1 second or until the stop_event is set. | ||
1935 | if stop_event.wait(timeout=1): | ||
1936 | return | ||
1937 | |||
1938 | return _threading.Thread(target=_monitor_loop, daemon=True) | ||
1939 | |||
1867 | def _SyncPhased( | 1940 | def _SyncPhased( |
1868 | self, | 1941 | self, |
1869 | opt, | 1942 | opt, |
@@ -1890,15 +1963,6 @@ later is required to fix a server side protocol bug. | |||
1890 | err_update_projects = False | 1963 | err_update_projects = False |
1891 | err_update_linkfiles = False | 1964 | err_update_linkfiles = False |
1892 | 1965 | ||
1893 | # Log the repo projects by existing and new. | ||
1894 | existing = [x for x in all_projects if x.Exists] | ||
1895 | mp.config.SetString("repo.existingprojectcount", str(len(existing))) | ||
1896 | mp.config.SetString( | ||
1897 | "repo.newprojectcount", str(len(all_projects) - len(existing)) | ||
1898 | ) | ||
1899 | |||
1900 | self._fetch_times = _FetchTimes(manifest) | ||
1901 | self._local_sync_state = LocalSyncState(manifest) | ||
1902 | if not opt.local_only: | 1966 | if not opt.local_only: |
1903 | with multiprocessing.Manager() as manager: | 1967 | with multiprocessing.Manager() as manager: |
1904 | with ssh.ProxyManager(manager) as ssh_proxy: | 1968 | with ssh.ProxyManager(manager) as ssh_proxy: |
@@ -2003,6 +2067,88 @@ later is required to fix a server side protocol bug. | |||
2003 | ) | 2067 | ) |
2004 | raise SyncError(aggregate_errors=errors) | 2068 | raise SyncError(aggregate_errors=errors) |
2005 | 2069 | ||
2070 | @classmethod | ||
2071 | def _SyncProjectList(cls, opt, project_indices) -> _InterleavedSyncResult: | ||
2072 | """Worker for interleaved sync. | ||
2073 | |||
2074 | This function is responsible for syncing a group of projects that share | ||
2075 | a git object directory. | ||
2076 | |||
2077 | Args: | ||
2078 | opt: Program options returned from optparse. See _Options(). | ||
2079 | project_indices: A list of indices into the projects list stored in | ||
2080 | the parallel context. | ||
2081 | |||
2082 | Returns: | ||
2083 | An `_InterleavedSyncResult` containing the results for each project. | ||
2084 | """ | ||
2085 | results = [] | ||
2086 | context = cls.get_parallel_context() | ||
2087 | projects = context["projects"] | ||
2088 | sync_dict = context["sync_dict"] | ||
2089 | |||
2090 | assert project_indices, "_SyncProjectList called with no indices." | ||
2091 | |||
2092 | # Use the first project as the representative for the progress bar. | ||
2093 | first_project = projects[project_indices[0]] | ||
2094 | key = f"{first_project.name} @ {first_project.relpath}" | ||
2095 | start_time = time.time() | ||
2096 | sync_dict[key] = start_time | ||
2097 | |||
2098 | try: | ||
2099 | for idx in project_indices: | ||
2100 | project = projects[idx] | ||
2101 | # For now, simulate a successful sync. | ||
2102 | # TODO(b/421935613): Perform the actual git fetch and checkout. | ||
2103 | results.append( | ||
2104 | _SyncResult( | ||
2105 | relpath=project.relpath, | ||
2106 | fetch_success=True, | ||
2107 | checkout_success=True, | ||
2108 | fetch_error=None, | ||
2109 | checkout_error=None, | ||
2110 | fetch_start=None, | ||
2111 | fetch_finish=None, | ||
2112 | checkout_start=None, | ||
2113 | checkout_finish=None, | ||
2114 | ) | ||
2115 | ) | ||
2116 | finally: | ||
2117 | del sync_dict[key] | ||
2118 | |||
2119 | return _InterleavedSyncResult(results=results) | ||
2120 | |||
2121 | def _ProcessSyncInterleavedResults( | ||
2122 | self, | ||
2123 | synced_relpaths: Set[str], | ||
2124 | err_event: _threading.Event, | ||
2125 | errors: List[Exception], | ||
2126 | opt: optparse.Values, | ||
2127 | pool: Optional[multiprocessing.Pool], | ||
2128 | pm: Progress, | ||
2129 | results_sets: List[_InterleavedSyncResult], | ||
2130 | ): | ||
2131 | """Callback to process results from interleaved sync workers.""" | ||
2132 | ret = True | ||
2133 | for result_group in results_sets: | ||
2134 | for result in result_group.results: | ||
2135 | pm.update() | ||
2136 | if result.fetch_success and result.checkout_success: | ||
2137 | synced_relpaths.add(result.relpath) | ||
2138 | else: | ||
2139 | ret = False | ||
2140 | err_event.set() | ||
2141 | if result.fetch_error: | ||
2142 | errors.append(result.fetch_error) | ||
2143 | if result.checkout_error: | ||
2144 | errors.append(result.checkout_error) | ||
2145 | |||
2146 | if not ret and opt.fail_fast: | ||
2147 | if pool: | ||
2148 | pool.close() | ||
2149 | break | ||
2150 | return ret | ||
2151 | |||
2006 | def _SyncInterleaved( | 2152 | def _SyncInterleaved( |
2007 | self, | 2153 | self, |
2008 | opt, | 2154 | opt, |
@@ -2026,7 +2172,116 @@ later is required to fix a server side protocol bug. | |||
2026 | 2. Projects that share git objects are processed serially to prevent | 2172 | 2. Projects that share git objects are processed serially to prevent |
2027 | race conditions. | 2173 | race conditions. |
2028 | """ | 2174 | """ |
2029 | raise NotImplementedError("Interleaved sync is not implemented yet.") | 2175 | err_event = multiprocessing.Event() |
2176 | synced_relpaths = set() | ||
2177 | project_list = list(all_projects) | ||
2178 | pm = Progress( | ||
2179 | "Syncing", | ||
2180 | len(project_list), | ||
2181 | delay=False, | ||
2182 | quiet=opt.quiet, | ||
2183 | show_elapsed=True, | ||
2184 | elide=True, | ||
2185 | ) | ||
2186 | previously_pending_relpaths = set() | ||
2187 | |||
2188 | sync_event = _threading.Event() | ||
2189 | sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) | ||
2190 | |||
2191 | with self.ParallelContext(): | ||
2192 | # TODO(gavinmak): Use multprocessing.Queue instead of dict. | ||
2193 | self.get_parallel_context()[ | ||
2194 | "sync_dict" | ||
2195 | ] = multiprocessing.Manager().dict() | ||
2196 | sync_progress_thread.start() | ||
2197 | |||
2198 | try: | ||
2199 | # Outer loop for dynamic project discovery (e.g., submodules). | ||
2200 | # It continues until no unsynced projects remain. | ||
2201 | while True: | ||
2202 | projects_to_sync = [ | ||
2203 | p | ||
2204 | for p in project_list | ||
2205 | if p.relpath not in synced_relpaths | ||
2206 | ] | ||
2207 | if not projects_to_sync: | ||
2208 | break | ||
2209 | |||
2210 | pending_relpaths = {p.relpath for p in projects_to_sync} | ||
2211 | if previously_pending_relpaths == pending_relpaths: | ||
2212 | logger.error( | ||
2213 | "Stall detected in interleaved sync, not all " | ||
2214 | "projects could be synced." | ||
2215 | ) | ||
2216 | err_event.set() | ||
2217 | break | ||
2218 | previously_pending_relpaths = pending_relpaths | ||
2219 | |||
2220 | # Update the projects list for workers in the current pass. | ||
2221 | self.get_parallel_context()["projects"] = projects_to_sync | ||
2222 | project_index_map = { | ||
2223 | p: i for i, p in enumerate(projects_to_sync) | ||
2224 | } | ||
2225 | |||
2226 | # Inner loop to process projects in a hierarchical order. | ||
2227 | # This iterates through levels of project dependencies (e.g. | ||
2228 | # 'foo' then 'foo/bar'). All projects in one level can be | ||
2229 | # processed in parallel, but we must wait for a level to | ||
2230 | # complete before starting the next. | ||
2231 | for level_projects in _SafeCheckoutOrder(projects_to_sync): | ||
2232 | if not level_projects: | ||
2233 | continue | ||
2234 | |||
2235 | objdir_project_map = collections.defaultdict(list) | ||
2236 | for p in level_projects: | ||
2237 | objdir_project_map[p.objdir].append( | ||
2238 | project_index_map[p] | ||
2239 | ) | ||
2240 | |||
2241 | work_items = list(objdir_project_map.values()) | ||
2242 | if not work_items: | ||
2243 | continue | ||
2244 | |||
2245 | jobs = max(1, min(opt.jobs, len(work_items))) | ||
2246 | callback = functools.partial( | ||
2247 | self._ProcessSyncInterleavedResults, | ||
2248 | synced_relpaths, | ||
2249 | err_event, | ||
2250 | errors, | ||
2251 | opt, | ||
2252 | ) | ||
2253 | if not self.ExecuteInParallel( | ||
2254 | jobs, | ||
2255 | functools.partial(self._SyncProjectList, opt), | ||
2256 | work_items, | ||
2257 | callback=callback, | ||
2258 | output=pm, | ||
2259 | chunksize=1, | ||
2260 | ): | ||
2261 | err_event.set() | ||
2262 | |||
2263 | if err_event.is_set() and opt.fail_fast: | ||
2264 | raise SyncFailFastError(aggregate_errors=errors) | ||
2265 | |||
2266 | self._ReloadManifest(None, manifest) | ||
2267 | project_list = self.GetProjects( | ||
2268 | args, | ||
2269 | missing_ok=True, | ||
2270 | submodules_ok=opt.fetch_submodules, | ||
2271 | manifest=manifest, | ||
2272 | all_manifests=not opt.this_manifest_only, | ||
2273 | ) | ||
2274 | finally: | ||
2275 | sync_event.set() | ||
2276 | sync_progress_thread.join() | ||
2277 | |||
2278 | pm.end() | ||
2279 | |||
2280 | if err_event.is_set(): | ||
2281 | logger.error( | ||
2282 | "error: Unable to fully sync the tree in interleaved mode." | ||
2283 | ) | ||
2284 | raise SyncError(aggregate_errors=errors) | ||
2030 | 2285 | ||
2031 | 2286 | ||
2032 | def _PostRepoUpgrade(manifest, quiet=False): | 2287 | def _PostRepoUpgrade(manifest, quiet=False): |