summaryrefslogtreecommitdiffstats
path: root/subcmds/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'subcmds/sync.py')
-rw-r--r--subcmds/sync.py401
1 files changed, 288 insertions, 113 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py
index 711baca2..f0c398a3 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -13,6 +13,7 @@
13# limitations under the License. 13# limitations under the License.
14 14
15import collections 15import collections
16import contextlib
16import functools 17import functools
17import http.cookiejar as cookielib 18import http.cookiejar as cookielib
18import io 19import io
@@ -198,33 +199,41 @@ class _SyncResult(NamedTuple):
198 """Individual project sync result for interleaved mode. 199 """Individual project sync result for interleaved mode.
199 200
200 Attributes: 201 Attributes:
202 project_index (int): The index of the project in the shared list.
201 relpath (str): The project's relative path from the repo client top. 203 relpath (str): The project's relative path from the repo client top.
204 remote_fetched (bool): True if the remote was actually queried.
202 fetch_success (bool): True if the fetch operation was successful. 205 fetch_success (bool): True if the fetch operation was successful.
203 checkout_success (bool): True if the checkout operation was
204 successful.
205 fetch_error (Optional[Exception]): The Exception from a failed fetch, 206 fetch_error (Optional[Exception]): The Exception from a failed fetch,
206 or None. 207 or None.
207 checkout_error (Optional[Exception]): The Exception from a failed
208 checkout, or None.
209 fetch_start (Optional[float]): The time.time() when fetch started. 208 fetch_start (Optional[float]): The time.time() when fetch started.
210 fetch_finish (Optional[float]): The time.time() when fetch finished. 209 fetch_finish (Optional[float]): The time.time() when fetch finished.
210 checkout_success (bool): True if the checkout operation was
211 successful.
212 checkout_error (Optional[Exception]): The Exception from a failed
213 checkout, or None.
211 checkout_start (Optional[float]): The time.time() when checkout 214 checkout_start (Optional[float]): The time.time() when checkout
212 started. 215 started.
213 checkout_finish (Optional[float]): The time.time() when checkout 216 checkout_finish (Optional[float]): The time.time() when checkout
214 finished. 217 finished.
218 stderr_text (str): The combined output from both fetch and checkout.
215 """ 219 """
216 220
221 project_index: int
217 relpath: str 222 relpath: str
223
224 remote_fetched: bool
218 fetch_success: bool 225 fetch_success: bool
219 checkout_success: bool
220 fetch_error: Optional[Exception] 226 fetch_error: Optional[Exception]
221 checkout_error: Optional[Exception]
222
223 fetch_start: Optional[float] 227 fetch_start: Optional[float]
224 fetch_finish: Optional[float] 228 fetch_finish: Optional[float]
229
230 checkout_success: bool
231 checkout_error: Optional[Exception]
225 checkout_start: Optional[float] 232 checkout_start: Optional[float]
226 checkout_finish: Optional[float] 233 checkout_finish: Optional[float]
227 234
235 stderr_text: str
236
228 237
229class _InterleavedSyncResult(NamedTuple): 238class _InterleavedSyncResult(NamedTuple):
230 """Result of an interleaved sync. 239 """Result of an interleaved sync.
@@ -996,6 +1005,7 @@ later is required to fix a server side protocol bug.
996 err_event.set() 1005 err_event.set()
997 1006
998 # Call self update, unless requested not to 1007 # Call self update, unless requested not to
1008 # TODO(b/42193561): Extract repo update logic to ExecuteHelper.
999 if os.environ.get("REPO_SKIP_SELF_UPDATE", "0") == "0": 1009 if os.environ.get("REPO_SKIP_SELF_UPDATE", "0") == "0":
1000 _PostRepoFetch(rp, opt.repo_verify) 1010 _PostRepoFetch(rp, opt.repo_verify)
1001 if opt.network_only: 1011 if opt.network_only:
@@ -1176,6 +1186,16 @@ later is required to fix a server side protocol bug.
1176 self._local_sync_state.Save() 1186 self._local_sync_state.Save()
1177 return proc_res and not err_results 1187 return proc_res and not err_results
1178 1188
1189 def _PrintManifestNotices(self, opt):
1190 """Print all manifest notices, but only once."""
1191 printed_notices = set()
1192 # Print all manifest notices, but only once.
1193 # Sort by path_prefix to ensure consistent ordering.
1194 for m in sorted(self.ManifestList(opt), key=lambda x: x.path_prefix):
1195 if m.notice and m.notice not in printed_notices:
1196 print(m.notice)
1197 printed_notices.add(m.notice)
1198
1179 @staticmethod 1199 @staticmethod
1180 def _GetPreciousObjectsState(project: Project, opt): 1200 def _GetPreciousObjectsState(project: Project, opt):
1181 """Get the preciousObjects state for the project. 1201 """Get the preciousObjects state for the project.
@@ -2032,14 +2052,7 @@ later is required to fix a server side protocol bug.
2032 if err_checkout: 2052 if err_checkout:
2033 err_event.set() 2053 err_event.set()
2034 2054
2035 printed_notices = set() 2055 self._PrintManifestNotices(opt)
2036 # If there's a notice that's supposed to print at the end of the sync,
2037 # print it now... But avoid printing duplicate messages, and preserve
2038 # order.
2039 for m in sorted(self.ManifestList(opt), key=lambda x: x.path_prefix):
2040 if m.notice and m.notice not in printed_notices:
2041 print(m.notice)
2042 printed_notices.add(m.notice)
2043 2056
2044 # If we saw an error, exit with code 1 so that other scripts can check. 2057 # If we saw an error, exit with code 1 so that other scripts can check.
2045 if err_event.is_set(): 2058 if err_event.is_set():
@@ -2068,6 +2081,139 @@ later is required to fix a server side protocol bug.
2068 raise SyncError(aggregate_errors=errors) 2081 raise SyncError(aggregate_errors=errors)
2069 2082
2070 @classmethod 2083 @classmethod
2084 def _SyncOneProject(cls, opt, project_index, project) -> _SyncResult:
2085 """Syncs a single project for interleaved sync."""
2086 fetch_success = False
2087 remote_fetched = False
2088 fetch_error = None
2089 fetch_start = None
2090 fetch_finish = None
2091 network_output = ""
2092
2093 if opt.local_only:
2094 fetch_success = True
2095 else:
2096 fetch_start = time.time()
2097 network_output_capture = io.StringIO()
2098 try:
2099 ssh_proxy = cls.get_parallel_context().get("ssh_proxy")
2100 sync_result = project.Sync_NetworkHalf(
2101 quiet=opt.quiet,
2102 verbose=opt.verbose,
2103 output_redir=network_output_capture,
2104 current_branch_only=cls._GetCurrentBranchOnly(
2105 opt, project.manifest
2106 ),
2107 force_sync=opt.force_sync,
2108 clone_bundle=opt.clone_bundle,
2109 tags=opt.tags,
2110 archive=project.manifest.IsArchive,
2111 optimized_fetch=opt.optimized_fetch,
2112 retry_fetches=opt.retry_fetches,
2113 prune=opt.prune,
2114 ssh_proxy=ssh_proxy,
2115 clone_filter=project.manifest.CloneFilter,
2116 partial_clone_exclude=project.manifest.PartialCloneExclude,
2117 clone_filter_for_depth=project.manifest.CloneFilterForDepth,
2118 )
2119 fetch_success = sync_result.success
2120 remote_fetched = sync_result.remote_fetched
2121 fetch_error = sync_result.error
2122 except KeyboardInterrupt:
2123 logger.error(
2124 "Keyboard interrupt while processing %s", project.name
2125 )
2126 except GitError as e:
2127 fetch_error = e
2128 logger.error("error.GitError: Cannot fetch %s", e)
2129 except Exception as e:
2130 fetch_error = e
2131 logger.error(
2132 "error: Cannot fetch %s (%s: %s)",
2133 project.name,
2134 type(e).__name__,
2135 e,
2136 )
2137 finally:
2138 fetch_finish = time.time()
2139 network_output = network_output_capture.getvalue()
2140
2141 checkout_success = False
2142 checkout_error = None
2143 checkout_start = None
2144 checkout_finish = None
2145 checkout_stderr = ""
2146
2147 if fetch_success and not opt.network_only:
2148 checkout_start = time.time()
2149 stderr_capture = io.StringIO()
2150 try:
2151 with contextlib.redirect_stderr(stderr_capture):
2152 syncbuf = SyncBuffer(
2153 project.manifest.manifestProject.config,
2154 detach_head=opt.detach_head,
2155 )
2156 local_half_errors = []
2157 project.Sync_LocalHalf(
2158 syncbuf,
2159 force_sync=opt.force_sync,
2160 force_checkout=opt.force_checkout,
2161 force_rebase=opt.rebase,
2162 errors=local_half_errors,
2163 verbose=opt.verbose,
2164 )
2165 checkout_success = syncbuf.Finish()
2166 if local_half_errors:
2167 checkout_error = SyncError(
2168 aggregate_errors=local_half_errors
2169 )
2170 except KeyboardInterrupt:
2171 logger.error(
2172 "Keyboard interrupt while processing %s", project.name
2173 )
2174 except GitError as e:
2175 checkout_error = e
2176 logger.error(
2177 "error.GitError: Cannot checkout %s: %s", project.name, e
2178 )
2179 except Exception as e:
2180 checkout_error = e
2181 logger.error(
2182 "error: Cannot checkout %s: %s: %s",
2183 project.name,
2184 type(e).__name__,
2185 e,
2186 )
2187 finally:
2188 checkout_finish = time.time()
2189 checkout_stderr = stderr_capture.getvalue()
2190 elif fetch_success:
2191 checkout_success = True
2192
2193 # Consolidate all captured output.
2194 captured_parts = []
2195 if network_output:
2196 captured_parts.append(network_output)
2197 if checkout_stderr:
2198 captured_parts.append(checkout_stderr)
2199 stderr_text = "\n".join(captured_parts)
2200
2201 return _SyncResult(
2202 project_index=project_index,
2203 relpath=project.relpath,
2204 fetch_success=fetch_success,
2205 remote_fetched=remote_fetched,
2206 checkout_success=checkout_success,
2207 fetch_error=fetch_error,
2208 checkout_error=checkout_error,
2209 stderr_text=stderr_text.strip(),
2210 fetch_start=fetch_start,
2211 fetch_finish=fetch_finish,
2212 checkout_start=checkout_start,
2213 checkout_finish=checkout_finish,
2214 )
2215
2216 @classmethod
2071 def _SyncProjectList(cls, opt, project_indices) -> _InterleavedSyncResult: 2217 def _SyncProjectList(cls, opt, project_indices) -> _InterleavedSyncResult:
2072 """Worker for interleaved sync. 2218 """Worker for interleaved sync.
2073 2219
@@ -2092,27 +2238,12 @@ later is required to fix a server side protocol bug.
2092 # Use the first project as the representative for the progress bar. 2238 # Use the first project as the representative for the progress bar.
2093 first_project = projects[project_indices[0]] 2239 first_project = projects[project_indices[0]]
2094 key = f"{first_project.name} @ {first_project.relpath}" 2240 key = f"{first_project.name} @ {first_project.relpath}"
2095 start_time = time.time() 2241 sync_dict[key] = time.time()
2096 sync_dict[key] = start_time
2097 2242
2098 try: 2243 try:
2099 for idx in project_indices: 2244 for idx in project_indices:
2100 project = projects[idx] 2245 project = projects[idx]
2101 # For now, simulate a successful sync. 2246 results.append(cls._SyncOneProject(opt, idx, project))
2102 # TODO(b/421935613): Perform the actual git fetch and checkout.
2103 results.append(
2104 _SyncResult(
2105 relpath=project.relpath,
2106 fetch_success=True,
2107 checkout_success=True,
2108 fetch_error=None,
2109 checkout_error=None,
2110 fetch_start=None,
2111 fetch_finish=None,
2112 checkout_start=None,
2113 checkout_finish=None,
2114 )
2115 )
2116 finally: 2247 finally:
2117 del sync_dict[key] 2248 del sync_dict[key]
2118 2249
@@ -2130,9 +2261,39 @@ later is required to fix a server side protocol bug.
2130 ): 2261 ):
2131 """Callback to process results from interleaved sync workers.""" 2262 """Callback to process results from interleaved sync workers."""
2132 ret = True 2263 ret = True
2264 projects = self.get_parallel_context()["projects"]
2133 for result_group in results_sets: 2265 for result_group in results_sets:
2134 for result in result_group.results: 2266 for result in result_group.results:
2135 pm.update() 2267 pm.update()
2268 project = projects[result.project_index]
2269
2270 if opt.verbose and result.stderr_text:
2271 pm.display_message(result.stderr_text)
2272
2273 if result.fetch_start:
2274 self._fetch_times.Set(
2275 project,
2276 result.fetch_finish - result.fetch_start,
2277 )
2278 self._local_sync_state.SetFetchTime(project)
2279 self.event_log.AddSync(
2280 project,
2281 event_log.TASK_SYNC_NETWORK,
2282 result.fetch_start,
2283 result.fetch_finish,
2284 result.fetch_success,
2285 )
2286 if result.checkout_start:
2287 if result.checkout_success:
2288 self._local_sync_state.SetCheckoutTime(project)
2289 self.event_log.AddSync(
2290 project,
2291 event_log.TASK_SYNC_LOCAL,
2292 result.checkout_start,
2293 result.checkout_finish,
2294 result.checkout_success,
2295 )
2296
2136 if result.fetch_success and result.checkout_success: 2297 if result.fetch_success and result.checkout_success:
2137 synced_relpaths.add(result.relpath) 2298 synced_relpaths.add(result.relpath)
2138 else: 2299 else:
@@ -2188,96 +2349,110 @@ later is required to fix a server side protocol bug.
2188 sync_event = _threading.Event() 2349 sync_event = _threading.Event()
2189 sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) 2350 sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event)
2190 2351
2191 with self.ParallelContext(): 2352 with multiprocessing.Manager() as manager, ssh.ProxyManager(
2192 # TODO(gavinmak): Use multprocessing.Queue instead of dict. 2353 manager
2193 self.get_parallel_context()[ 2354 ) as ssh_proxy:
2194 "sync_dict" 2355 ssh_proxy.sock()
2195 ] = multiprocessing.Manager().dict() 2356 with self.ParallelContext():
2196 sync_progress_thread.start() 2357 self.get_parallel_context()["ssh_proxy"] = ssh_proxy
2358 # TODO(gavinmak): Use multprocessing.Queue instead of dict.
2359 self.get_parallel_context()[
2360 "sync_dict"
2361 ] = multiprocessing.Manager().dict()
2362 sync_progress_thread.start()
2197 2363
2198 try: 2364 try:
2199 # Outer loop for dynamic project discovery (e.g., submodules). 2365 # Outer loop for dynamic project discovery. This continues
2200 # It continues until no unsynced projects remain. 2366 # until no unsynced projects remain.
2201 while True: 2367 while True:
2202 projects_to_sync = [ 2368 projects_to_sync = [
2203 p 2369 p
2204 for p in project_list 2370 for p in project_list
2205 if p.relpath not in synced_relpaths 2371 if p.relpath not in synced_relpaths
2206 ] 2372 ]
2207 if not projects_to_sync: 2373 if not projects_to_sync:
2208 break 2374 break
2209 2375
2210 pending_relpaths = {p.relpath for p in projects_to_sync} 2376 pending_relpaths = {p.relpath for p in projects_to_sync}
2211 if previously_pending_relpaths == pending_relpaths: 2377 if previously_pending_relpaths == pending_relpaths:
2212 logger.error( 2378 logger.error(
2213 "Stall detected in interleaved sync, not all " 2379 "Stall detected in interleaved sync, not all "
2214 "projects could be synced." 2380 "projects could be synced."
2215 )
2216 err_event.set()
2217 break
2218 previously_pending_relpaths = pending_relpaths
2219
2220 # Update the projects list for workers in the current pass.
2221 self.get_parallel_context()["projects"] = projects_to_sync
2222 project_index_map = {
2223 p: i for i, p in enumerate(projects_to_sync)
2224 }
2225
2226 # Inner loop to process projects in a hierarchical order.
2227 # This iterates through levels of project dependencies (e.g.
2228 # 'foo' then 'foo/bar'). All projects in one level can be
2229 # processed in parallel, but we must wait for a level to
2230 # complete before starting the next.
2231 for level_projects in _SafeCheckoutOrder(projects_to_sync):
2232 if not level_projects:
2233 continue
2234
2235 objdir_project_map = collections.defaultdict(list)
2236 for p in level_projects:
2237 objdir_project_map[p.objdir].append(
2238 project_index_map[p]
2239 ) 2381 )
2240
2241 work_items = list(objdir_project_map.values())
2242 if not work_items:
2243 continue
2244
2245 jobs = max(1, min(opt.jobs, len(work_items)))
2246 callback = functools.partial(
2247 self._ProcessSyncInterleavedResults,
2248 synced_relpaths,
2249 err_event,
2250 errors,
2251 opt,
2252 )
2253 if not self.ExecuteInParallel(
2254 jobs,
2255 functools.partial(self._SyncProjectList, opt),
2256 work_items,
2257 callback=callback,
2258 output=pm,
2259 chunksize=1,
2260 ):
2261 err_event.set() 2382 err_event.set()
2383 break
2384 previously_pending_relpaths = pending_relpaths
2385
2386 self.get_parallel_context()[
2387 "projects"
2388 ] = projects_to_sync
2389 project_index_map = {
2390 p: i for i, p in enumerate(projects_to_sync)
2391 }
2392
2393 # Inner loop to process projects in a hierarchical
2394 # order. This iterates through levels of project
2395 # dependencies (e.g. 'foo' then 'foo/bar'). All projects
2396 # in one level can be processed in parallel, but we must
2397 # wait for a level to complete before starting the next.
2398 for level_projects in _SafeCheckoutOrder(
2399 projects_to_sync
2400 ):
2401 if not level_projects:
2402 continue
2262 2403
2263 if err_event.is_set() and opt.fail_fast: 2404 objdir_project_map = collections.defaultdict(list)
2264 raise SyncFailFastError(aggregate_errors=errors) 2405 for p in level_projects:
2406 objdir_project_map[p.objdir].append(
2407 project_index_map[p]
2408 )
2265 2409
2266 self._ReloadManifest(None, manifest) 2410 work_items = list(objdir_project_map.values())
2267 project_list = self.GetProjects( 2411 if not work_items:
2268 args, 2412 continue
2269 missing_ok=True, 2413
2270 submodules_ok=opt.fetch_submodules, 2414 jobs = max(1, min(opt.jobs, len(work_items)))
2271 manifest=manifest, 2415 callback = functools.partial(
2272 all_manifests=not opt.this_manifest_only, 2416 self._ProcessSyncInterleavedResults,
2273 ) 2417 synced_relpaths,
2274 finally: 2418 err_event,
2275 sync_event.set() 2419 errors,
2276 sync_progress_thread.join() 2420 opt,
2421 )
2422 if not self.ExecuteInParallel(
2423 jobs,
2424 functools.partial(self._SyncProjectList, opt),
2425 work_items,
2426 callback=callback,
2427 output=pm,
2428 chunksize=1,
2429 ):
2430 err_event.set()
2431
2432 if err_event.is_set() and opt.fail_fast:
2433 raise SyncFailFastError(aggregate_errors=errors)
2434
2435 self._ReloadManifest(None, manifest)
2436 project_list = self.GetProjects(
2437 args,
2438 missing_ok=True,
2439 submodules_ok=opt.fetch_submodules,
2440 manifest=manifest,
2441 all_manifests=not opt.this_manifest_only,
2442 )
2443 finally:
2444 sync_event.set()
2445 sync_progress_thread.join()
2277 2446
2278 pm.end() 2447 pm.end()
2279 2448
2449 # TODO(b/421935613): Add the manifest loop block from PhasedSync.
2450 if not self.outer_client.manifest.IsArchive:
2451 self._GCProjects(project_list, opt, err_event)
2452
2453 self._PrintManifestNotices(opt)
2280 if err_event.is_set(): 2454 if err_event.is_set():
2455 # TODO(b/421935613): Log errors better like SyncPhased.
2281 logger.error( 2456 logger.error(
2282 "error: Unable to fully sync the tree in interleaved mode." 2457 "error: Unable to fully sync the tree in interleaved mode."
2283 ) 2458 )