summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMike Frysinger <vapier@google.com>2021-02-23 20:48:04 -0500
committerMike Frysinger <vapier@google.com>2021-02-27 19:55:14 +0000
commitebf04a4404e93c6d6b167775c90264dc24ed8c98 (patch)
treeb22de7eb3060f913395d2e655e996ce6fd64fd60
parent8dbc07aced638b0d625870e283307e348046c82f (diff)
downloadgit-repo-ebf04a4404e93c6d6b167775c90264dc24ed8c98.tar.gz
sync: switch local checkout to multiprocessing
This avoids GIL limitations with using threads for parallel processing. In a CrOS checkout with ~1000 repos, the nop case goes from ~6 sec down to ~4 sec with -j8. Not a big deal, but shows that this actually works to speed things up unlike the threading model. This reworks the checkout logic to return results for processing in the main thread instead of leaving every thread to do its own processing. Bug: https://crbug.com/gerrit/12389 Change-Id: I143e5e3f7158e83ea67e2d14e5552153a874248a Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/298063 Reviewed-by: Chris Mcdonald <cjmcdonald@google.com> Tested-by: Mike Frysinger <vapier@google.com>
-rw-r--r--subcmds/sync.py176
1 files changed, 52 insertions, 124 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py
index d1b631ae..0db96b54 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -12,9 +12,11 @@
12# See the License for the specific language governing permissions and 12# See the License for the specific language governing permissions and
13# limitations under the License. 13# limitations under the License.
14 14
15import functools
15import http.cookiejar as cookielib 16import http.cookiejar as cookielib
16import io 17import io
17import json 18import json
19import multiprocessing
18import netrc 20import netrc
19from optparse import SUPPRESS_HELP 21from optparse import SUPPRESS_HELP
20import os 22import os
@@ -56,7 +58,7 @@ import git_superproject
56import gitc_utils 58import gitc_utils
57from project import Project 59from project import Project
58from project import RemoteSpec 60from project import RemoteSpec
59from command import Command, MirrorSafeCommand 61from command import Command, MirrorSafeCommand, WORKER_BATCH_SIZE
60from error import RepoChangedException, GitError, ManifestParseError 62from error import RepoChangedException, GitError, ManifestParseError
61import platform_utils 63import platform_utils
62from project import SyncBuffer 64from project import SyncBuffer
@@ -71,10 +73,6 @@ class _FetchError(Exception):
71 """Internal error thrown in _FetchHelper() when we don't want stack trace.""" 73 """Internal error thrown in _FetchHelper() when we don't want stack trace."""
72 74
73 75
74class _CheckoutError(Exception):
75 """Internal error thrown in _CheckoutOne() when we don't want stack trace."""
76
77
78class Sync(Command, MirrorSafeCommand): 76class Sync(Command, MirrorSafeCommand):
79 jobs = 1 77 jobs = 1
80 common = True 78 common = True
@@ -457,149 +455,80 @@ later is required to fix a server side protocol bug.
457 455
458 return fetched 456 return fetched
459 457
460 def _CheckoutWorker(self, opt, sem, project, *args, **kwargs): 458 def _CheckoutOne(self, opt, project):
461 """Main function of the fetch threads.
462
463 Delegates most of the work to _CheckoutOne.
464
465 Args:
466 opt: Program options returned from optparse. See _Options().
467 projects: Projects to fetch.
468 sem: We'll release() this semaphore when we exit so that another thread
469 can be started up.
470 *args, **kwargs: Remaining arguments to pass to _CheckoutOne. See the
471 _CheckoutOne docstring for details.
472 """
473 try:
474 return self._CheckoutOne(opt, project, *args, **kwargs)
475 finally:
476 sem.release()
477
478 def _CheckoutOne(self, opt, project, lock, pm, err_event, err_results):
479 """Checkout work tree for one project 459 """Checkout work tree for one project
480 460
481 Args: 461 Args:
482 opt: Program options returned from optparse. See _Options(). 462 opt: Program options returned from optparse. See _Options().
483 project: Project object for the project to checkout. 463 project: Project object for the project to checkout.
484 lock: Lock for accessing objects that are shared amongst multiple
485 _CheckoutWorker() threads.
486 pm: Instance of a Project object. We will call pm.update() (with our
487 lock held).
488 err_event: We'll set this event in the case of an error (after printing
489 out info about the error).
490 err_results: A list of strings, paths to git repos where checkout
491 failed.
492 464
493 Returns: 465 Returns:
494 Whether the fetch was successful. 466 Whether the fetch was successful.
495 """ 467 """
496 # We'll set to true once we've locked the lock.
497 did_lock = False
498
499 # Encapsulate everything in a try/except/finally so that:
500 # - We always set err_event in the case of an exception.
501 # - We always make sure we unlock the lock if we locked it.
502 start = time.time() 468 start = time.time()
503 syncbuf = SyncBuffer(self.manifest.manifestProject.config, 469 syncbuf = SyncBuffer(self.manifest.manifestProject.config,
504 detach_head=opt.detach_head) 470 detach_head=opt.detach_head)
505 success = False 471 success = False
506 with lock:
507 pm.start(project.name)
508 try: 472 try:
509 try: 473 project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync)
510 project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync) 474 success = syncbuf.Finish()
511 475 except Exception as e:
512 # Lock around all the rest of the code, since printing, updating a set 476 print('error: Cannot checkout %s: %s: %s' %
513 # and Progress.update() are not thread safe. 477 (project.name, type(e).__name__, str(e)),
514 lock.acquire() 478 file=sys.stderr)
515 success = syncbuf.Finish() 479 raise
516 did_lock = True
517
518 if not success:
519 err_event.set()
520 print('error: Cannot checkout %s' % (project.name),
521 file=sys.stderr)
522 raise _CheckoutError()
523 except _CheckoutError:
524 pass
525 except Exception as e:
526 print('error: Cannot checkout %s: %s: %s' %
527 (project.name, type(e).__name__, str(e)),
528 file=sys.stderr)
529 err_event.set()
530 raise
531 finally:
532 if not did_lock:
533 lock.acquire()
534 if not success:
535 err_results.append(project.relpath)
536 pm.finish(project.name)
537 lock.release()
538 finish = time.time()
539 self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL,
540 start, finish, success)
541 480
542 return success 481 if not success:
482 print('error: Cannot checkout %s' % (project.name), file=sys.stderr)
483 finish = time.time()
484 return (success, project, start, finish)
543 485
544 def _Checkout(self, all_projects, opt, err_event, err_results): 486 def _Checkout(self, all_projects, opt, err_results):
545 """Checkout projects listed in all_projects 487 """Checkout projects listed in all_projects
546 488
547 Args: 489 Args:
548 all_projects: List of all projects that should be checked out. 490 all_projects: List of all projects that should be checked out.
549 opt: Program options returned from optparse. See _Options(). 491 opt: Program options returned from optparse. See _Options().
550 err_event: We'll set this event in the case of an error (after printing 492 err_results: A list of strings, paths to git repos where checkout failed.
551 out info about the error).
552 err_results: A list of strings, paths to git repos where checkout
553 failed.
554 """ 493 """
494 ret = True
555 495
556 # Perform checkouts in multiple threads when we are using partial clone. 496 # Only checkout projects with worktrees.
557 # Without partial clone, all needed git objects are already downloaded, 497 all_projects = [x for x in all_projects if x.worktree]
558 # in this situation it's better to use only one process because the checkout
559 # would be mostly disk I/O; with partial clone, the objects are only
560 # downloaded when demanded (at checkout time), which is similar to the
561 # Sync_NetworkHalf case and parallelism would be helpful.
562 if self.manifest.CloneFilter:
563 syncjobs = self.jobs
564 else:
565 syncjobs = 1
566 498
567 lock = _threading.Lock()
568 pm = Progress('Checking out', len(all_projects)) 499 pm = Progress('Checking out', len(all_projects))
569 500
570 threads = set() 501 def _ProcessResults(results):
571 sem = _threading.Semaphore(syncjobs) 502 for (success, project, start, finish) in results:
572 503 self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL,
573 for project in all_projects: 504 start, finish, success)
574 # Check for any errors before running any more tasks. 505 # Check for any errors before running any more tasks.
575 # ...we'll let existing threads finish, though. 506 # ...we'll let existing threads finish, though.
576 if err_event.is_set() and opt.fail_fast: 507 if not success:
577 break 508 err_results.append(project.relpath)
578 509 if opt.fail_fast:
579 sem.acquire() 510 return False
580 if project.worktree: 511 pm.update(msg=project.name)
581 kwargs = dict(opt=opt, 512 return True
582 sem=sem, 513
583 project=project, 514 # NB: Multiprocessing is heavy, so don't spin it up for one job.
584 lock=lock, 515 if len(all_projects) == 1 or opt.jobs == 1:
585 pm=pm, 516 if not _ProcessResults(self._CheckoutOne(opt, x) for x in all_projects):
586 err_event=err_event, 517 ret = False
587 err_results=err_results) 518 else:
588 if syncjobs > 1: 519 with multiprocessing.Pool(opt.jobs) as pool:
589 t = _threading.Thread(target=self._CheckoutWorker, 520 results = pool.imap_unordered(
590 kwargs=kwargs) 521 functools.partial(self._CheckoutOne, opt),
591 # Ensure that Ctrl-C will not freeze the repo process. 522 all_projects,
592 t.daemon = True 523 chunksize=WORKER_BATCH_SIZE)
593 threads.add(t) 524 if not _ProcessResults(results):
594 t.start() 525 ret = False
595 else: 526 pool.close()
596 self._CheckoutWorker(**kwargs)
597
598 for t in threads:
599 t.join()
600 527
601 pm.end() 528 pm.end()
602 529
530 return ret
531
603 def _GCProjects(self, projects, opt, err_event): 532 def _GCProjects(self, projects, opt, err_event):
604 gc_gitdirs = {} 533 gc_gitdirs = {}
605 for project in projects: 534 for project in projects:
@@ -946,7 +875,6 @@ later is required to fix a server side protocol bug.
946 875
947 err_network_sync = False 876 err_network_sync = False
948 err_update_projects = False 877 err_update_projects = False
949 err_checkout = False
950 878
951 self._fetch_times = _FetchTimes(self.manifest) 879 self._fetch_times = _FetchTimes(self.manifest)
952 if not opt.local_only: 880 if not opt.local_only:
@@ -1011,10 +939,10 @@ later is required to fix a server side protocol bug.
1011 sys.exit(1) 939 sys.exit(1)
1012 940
1013 err_results = [] 941 err_results = []
1014 self._Checkout(all_projects, opt, err_event, err_results) 942 # NB: We don't exit here because this is the last step.
1015 if err_event.is_set(): 943 err_checkout = not self._Checkout(all_projects, opt, err_results)
1016 err_checkout = True 944 if err_checkout:
1017 # NB: We don't exit here because this is the last step. 945 err_event.set()
1018 946
1019 # If there's a notice that's supposed to print at the end of the sync, print 947 # If there's a notice that's supposed to print at the end of the sync, print
1020 # it now... 948 # it now...