summaryrefslogtreecommitdiffstats
path: root/subcmds/sync.py
diff options
context:
space:
mode:
authorMike Frysinger <vapier@google.com>2021-02-24 00:15:32 -0500
committerMike Frysinger <vapier@google.com>2021-04-01 14:52:57 +0000
commitb2fa30a2b891b22c173c960a67bf38ccbba8de1b (patch)
treeae532bceade00e3509c8fb59026c056e0f49b7c9 /subcmds/sync.py
parentd246d1fee7f42f2526a20a96597c8f01eda31433 (diff)
downloadgit-repo-b2fa30a2b891b22c173c960a67bf38ccbba8de1b.tar.gz
sync: switch network fetch to multiprocessing
This avoids GIL limitations with using threads for parallel processing. This reworks the fetch logic to return results for processing in the main thread instead of leaving every thread to do its own processing. We have to tweak the chunking logic a little here because multiprocessing favors batching over returning immediate results when using a larger value for chunksize. When a single job can be quite slow, this tradeoff is not good UX. Bug: https://crbug.com/gerrit/12389 Change-Id: I0f0512d15ad7332d1eb28aff52c29d378acc9e1d Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/298642 Reviewed-by: Chris Mcdonald <cjmcdonald@google.com> Tested-by: Mike Frysinger <vapier@google.com>
Diffstat (limited to 'subcmds/sync.py')
-rw-r--r--subcmds/sync.py215
1 files changed, 91 insertions, 124 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py
index b14ad246..bf1369c0 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -45,11 +45,6 @@ except ImportError:
45 def _rlimit_nofile(): 45 def _rlimit_nofile():
46 return (256, 256) 46 return (256, 256)
47 47
48try:
49 import multiprocessing
50except ImportError:
51 multiprocessing = None
52
53import event_log 48import event_log
54from git_command import GIT, git_require 49from git_command import GIT, git_require
55from git_config import GetUrlCookieFile 50from git_config import GetUrlCookieFile
@@ -69,10 +64,6 @@ from manifest_xml import GitcManifest
69_ONE_DAY_S = 24 * 60 * 60 64_ONE_DAY_S = 24 * 60 * 60
70 65
71 66
72class _FetchError(Exception):
73 """Internal error thrown in _FetchHelper() when we don't want stack trace."""
74
75
76class Sync(Command, MirrorSafeCommand): 67class Sync(Command, MirrorSafeCommand):
77 jobs = 1 68 jobs = 1
78 common = True 69 common = True
@@ -315,148 +306,119 @@ later is required to fix a server side protocol bug.
315 self._ReloadManifest(manifest_path) 306 self._ReloadManifest(manifest_path)
316 return manifest_path 307 return manifest_path
317 308
318 def _FetchProjectList(self, opt, projects, sem, *args, **kwargs): 309 def _FetchProjectList(self, opt, projects):
319 """Main function of the fetch threads. 310 """Main function of the fetch worker.
311
312 The projects we're given share the same underlying git object store, so we
313 have to fetch them in serial.
320 314
321 Delegates most of the work to _FetchHelper. 315 Delegates most of the work to _FetchHelper.
322 316
323 Args: 317 Args:
324 opt: Program options returned from optparse. See _Options(). 318 opt: Program options returned from optparse. See _Options().
325 projects: Projects to fetch. 319 projects: Projects to fetch.
326 sem: We'll release() this semaphore when we exit so that another thread
327 can be started up.
328 *args, **kwargs: Remaining arguments to pass to _FetchHelper. See the
329 _FetchHelper docstring for details.
330 """ 320 """
331 try: 321 return [self._FetchOne(opt, x) for x in projects]
332 for project in projects:
333 success = self._FetchHelper(opt, project, *args, **kwargs)
334 if not success and opt.fail_fast:
335 break
336 finally:
337 sem.release()
338 322
339 def _FetchHelper(self, opt, project, lock, fetched, pm, err_event, 323 def _FetchOne(self, opt, project):
340 clone_filter):
341 """Fetch git objects for a single project. 324 """Fetch git objects for a single project.
342 325
343 Args: 326 Args:
344 opt: Program options returned from optparse. See _Options(). 327 opt: Program options returned from optparse. See _Options().
345 project: Project object for the project to fetch. 328 project: Project object for the project to fetch.
346 lock: Lock for accessing objects that are shared amongst multiple
347 _FetchHelper() threads.
348 fetched: set object that we will add project.gitdir to when we're done
349 (with our lock held).
350 pm: Instance of a Project object. We will call pm.update() (with our
351 lock held).
352 err_event: We'll set this event in the case of an error (after printing
353 out info about the error).
354 clone_filter: Filter for use in a partial clone.
355 329
356 Returns: 330 Returns:
357 Whether the fetch was successful. 331 Whether the fetch was successful.
358 """ 332 """
359 # We'll set to true once we've locked the lock.
360 did_lock = False
361
362 # Encapsulate everything in a try/except/finally so that:
363 # - We always set err_event in the case of an exception.
364 # - We always make sure we unlock the lock if we locked it.
365 start = time.time() 333 start = time.time()
366 success = False 334 success = False
367 buf = io.StringIO() 335 buf = io.StringIO()
368 with lock:
369 pm.start(project.name)
370 try: 336 try:
371 try: 337 success = project.Sync_NetworkHalf(
372 success = project.Sync_NetworkHalf( 338 quiet=opt.quiet,
373 quiet=opt.quiet, 339 verbose=opt.verbose,
374 verbose=opt.verbose, 340 output_redir=buf,
375 output_redir=buf, 341 current_branch_only=self._GetCurrentBranchOnly(opt),
376 current_branch_only=self._GetCurrentBranchOnly(opt), 342 force_sync=opt.force_sync,
377 force_sync=opt.force_sync, 343 clone_bundle=opt.clone_bundle,
378 clone_bundle=opt.clone_bundle, 344 tags=opt.tags, archive=self.manifest.IsArchive,
379 tags=opt.tags, archive=self.manifest.IsArchive, 345 optimized_fetch=opt.optimized_fetch,
380 optimized_fetch=opt.optimized_fetch, 346 retry_fetches=opt.retry_fetches,
381 retry_fetches=opt.retry_fetches, 347 prune=opt.prune,
382 prune=opt.prune, 348 clone_filter=self.manifest.CloneFilter)
383 clone_filter=clone_filter) 349
384 self._fetch_times.Set(project, time.time() - start) 350 output = buf.getvalue()
385 351 if opt.verbose and output:
386 # Lock around all the rest of the code, since printing, updating a set 352 print('\n' + output.rstrip())
387 # and Progress.update() are not thread safe. 353
388 lock.acquire() 354 if not success:
389 did_lock = True 355 print('error: Cannot fetch %s from %s'
390 356 % (project.name, project.remote.url),
391 output = buf.getvalue() 357 file=sys.stderr)
392 if opt.verbose and output: 358 except Exception as e:
393 pm.update(inc=0, msg=output.rstrip()) 359 print('error: Cannot fetch %s (%s: %s)'
394 360 % (project.name, type(e).__name__, str(e)), file=sys.stderr)
395 if not success: 361 raise
396 err_event.set()
397 print('error: Cannot fetch %s from %s'
398 % (project.name, project.remote.url),
399 file=sys.stderr)
400 if opt.fail_fast:
401 raise _FetchError()
402
403 fetched.add(project.gitdir)
404 except _FetchError:
405 pass
406 except Exception as e:
407 print('error: Cannot fetch %s (%s: %s)'
408 % (project.name, type(e).__name__, str(e)), file=sys.stderr)
409 err_event.set()
410 raise
411 finally:
412 if not did_lock:
413 lock.acquire()
414 pm.finish(project.name)
415 lock.release()
416 finish = time.time()
417 self.event_log.AddSync(project, event_log.TASK_SYNC_NETWORK,
418 start, finish, success)
419 362
420 return success 363 finish = time.time()
364 return (success, project, start, finish)
421 365
422 def _Fetch(self, projects, opt, err_event): 366 def _Fetch(self, projects, opt, err_event):
367 ret = True
368
423 fetched = set() 369 fetched = set()
424 lock = _threading.Lock() 370 pm = Progress('Fetching', len(projects), delay=False)
425 pm = Progress('Fetching', len(projects))
426 371
427 objdir_project_map = dict() 372 objdir_project_map = dict()
428 for project in projects: 373 for project in projects:
429 objdir_project_map.setdefault(project.objdir, []).append(project) 374 objdir_project_map.setdefault(project.objdir, []).append(project)
375 projects_list = list(objdir_project_map.values())
376
377 def _ProcessResults(results_sets):
378 ret = True
379 for results in results_sets:
380 for (success, project, start, finish) in results:
381 self._fetch_times.Set(project, finish - start)
382 self.event_log.AddSync(project, event_log.TASK_SYNC_NETWORK,
383 start, finish, success)
384 # Check for any errors before running any more tasks.
385 # ...we'll let existing jobs finish, though.
386 if not success:
387 ret = False
388 else:
389 fetched.add(project.gitdir)
390 pm.update(msg=project.name)
391 if not ret and opt.fail_fast:
392 break
393 return ret
430 394
431 threads = set() 395 # NB: Multiprocessing is heavy, so don't spin it up for one job.
432 sem = _threading.Semaphore(self.jobs) 396 if len(projects_list) == 1 or opt.jobs == 1:
433 for project_list in objdir_project_map.values(): 397 if not _ProcessResults(self._FetchProjectList(opt, x) for x in projects_list):
434 # Check for any errors before running any more tasks. 398 ret = False
435 # ...we'll let existing threads finish, though. 399 else:
436 if err_event.is_set() and opt.fail_fast: 400 # Favor throughput over responsiveness when quiet. It seems that imap()
437 break 401 # will yield results in batches relative to chunksize, so even as the
438 402 # children finish a sync, we won't see the result until one child finishes
439 sem.acquire() 403 # ~chunksize jobs. When using a large --jobs with large chunksize, this
440 kwargs = dict(opt=opt, 404 # can be jarring as there will be a large initial delay where repo looks
441 projects=project_list, 405 # like it isn't doing anything and sits at 0%, but then suddenly completes
442 sem=sem, 406 # a lot of jobs all at once. Since this code is more network bound, we
443 lock=lock, 407 # can accept a bit more CPU overhead with a smaller chunksize so that the
444 fetched=fetched, 408 # user sees more immediate & continuous feedback.
445 pm=pm, 409 if opt.quiet:
446 err_event=err_event, 410 chunksize = WORKER_BATCH_SIZE
447 clone_filter=self.manifest.CloneFilter)
448 if self.jobs > 1:
449 t = _threading.Thread(target=self._FetchProjectList,
450 kwargs=kwargs)
451 # Ensure that Ctrl-C will not freeze the repo process.
452 t.daemon = True
453 threads.add(t)
454 t.start()
455 else: 411 else:
456 self._FetchProjectList(**kwargs) 412 pm.update(inc=0, msg='warming up')
457 413 chunksize = 4
458 for t in threads: 414 with multiprocessing.Pool(opt.jobs) as pool:
459 t.join() 415 results = pool.imap_unordered(
416 functools.partial(self._FetchProjectList, opt),
417 projects_list,
418 chunksize=chunksize)
419 if not _ProcessResults(results):
420 ret = False
421 pool.close()
460 422
461 pm.end() 423 pm.end()
462 self._fetch_times.Save() 424 self._fetch_times.Save()
@@ -464,7 +426,7 @@ later is required to fix a server side protocol bug.
464 if not self.manifest.IsArchive: 426 if not self.manifest.IsArchive:
465 self._GCProjects(projects, opt, err_event) 427 self._GCProjects(projects, opt, err_event)
466 428
467 return fetched 429 return (ret, fetched)
468 430
469 def _CheckoutOne(self, opt, project): 431 def _CheckoutOne(self, opt, project):
470 """Checkout work tree for one project 432 """Checkout work tree for one project
@@ -514,7 +476,7 @@ later is required to fix a server side protocol bug.
514 self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, 476 self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL,
515 start, finish, success) 477 start, finish, success)
516 # Check for any errors before running any more tasks. 478 # Check for any errors before running any more tasks.
517 # ...we'll let existing threads finish, though. 479 # ...we'll let existing jobs finish, though.
518 if not success: 480 if not success:
519 err_results.append(project.relpath) 481 err_results.append(project.relpath)
520 if opt.fail_fast: 482 if opt.fail_fast:
@@ -894,7 +856,9 @@ later is required to fix a server side protocol bug.
894 to_fetch.extend(all_projects) 856 to_fetch.extend(all_projects)
895 to_fetch.sort(key=self._fetch_times.Get, reverse=True) 857 to_fetch.sort(key=self._fetch_times.Get, reverse=True)
896 858
897 fetched = self._Fetch(to_fetch, opt, err_event) 859 success, fetched = self._Fetch(to_fetch, opt, err_event)
860 if not success:
861 err_event.set()
898 862
899 _PostRepoFetch(rp, opt.repo_verify) 863 _PostRepoFetch(rp, opt.repo_verify)
900 if opt.network_only: 864 if opt.network_only:
@@ -923,7 +887,10 @@ later is required to fix a server side protocol bug.
923 if previously_missing_set == missing_set: 887 if previously_missing_set == missing_set:
924 break 888 break
925 previously_missing_set = missing_set 889 previously_missing_set = missing_set
926 fetched.update(self._Fetch(missing, opt, err_event)) 890 success, new_fetched = self._Fetch(to_fetch, opt, err_event)
891 if not success:
892 err_event.set()
893 fetched.update(new_fetched)
927 894
928 # If we saw an error, exit with code 1 so that other scripts can check. 895 # If we saw an error, exit with code 1 so that other scripts can check.
929 if err_event.is_set(): 896 if err_event.is_set():