diff options
author | Mike Frysinger <vapier@google.com> | 2021-02-23 20:48:04 -0500 |
---|---|---|
committer | Mike Frysinger <vapier@google.com> | 2021-02-27 19:55:14 +0000 |
commit | ebf04a4404e93c6d6b167775c90264dc24ed8c98 (patch) | |
tree | b22de7eb3060f913395d2e655e996ce6fd64fd60 | |
parent | 8dbc07aced638b0d625870e283307e348046c82f (diff) | |
download | git-repo-ebf04a4404e93c6d6b167775c90264dc24ed8c98.tar.gz |
sync: switch local checkout to multiprocessing
This avoids GIL limitations with using threads for parallel processing.
In a CrOS checkout with ~1000 repos, the nop case goes from ~6 sec down
to ~4 sec with -j8. Not a big deal, but shows that this actually works
to speed things up unlike the threading model.
This reworks the checkout logic to return results for processing in the
main thread instead of leaving every thread to do its own processing.
Bug: https://crbug.com/gerrit/12389
Change-Id: I143e5e3f7158e83ea67e2d14e5552153a874248a
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/298063
Reviewed-by: Chris Mcdonald <cjmcdonald@google.com>
Tested-by: Mike Frysinger <vapier@google.com>
-rw-r--r-- | subcmds/sync.py | 176 |
1 files changed, 52 insertions, 124 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py index d1b631ae..0db96b54 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
@@ -12,9 +12,11 @@ | |||
12 | # See the License for the specific language governing permissions and | 12 | # See the License for the specific language governing permissions and |
13 | # limitations under the License. | 13 | # limitations under the License. |
14 | 14 | ||
15 | import functools | ||
15 | import http.cookiejar as cookielib | 16 | import http.cookiejar as cookielib |
16 | import io | 17 | import io |
17 | import json | 18 | import json |
19 | import multiprocessing | ||
18 | import netrc | 20 | import netrc |
19 | from optparse import SUPPRESS_HELP | 21 | from optparse import SUPPRESS_HELP |
20 | import os | 22 | import os |
@@ -56,7 +58,7 @@ import git_superproject | |||
56 | import gitc_utils | 58 | import gitc_utils |
57 | from project import Project | 59 | from project import Project |
58 | from project import RemoteSpec | 60 | from project import RemoteSpec |
59 | from command import Command, MirrorSafeCommand | 61 | from command import Command, MirrorSafeCommand, WORKER_BATCH_SIZE |
60 | from error import RepoChangedException, GitError, ManifestParseError | 62 | from error import RepoChangedException, GitError, ManifestParseError |
61 | import platform_utils | 63 | import platform_utils |
62 | from project import SyncBuffer | 64 | from project import SyncBuffer |
@@ -71,10 +73,6 @@ class _FetchError(Exception): | |||
71 | """Internal error thrown in _FetchHelper() when we don't want stack trace.""" | 73 | """Internal error thrown in _FetchHelper() when we don't want stack trace.""" |
72 | 74 | ||
73 | 75 | ||
74 | class _CheckoutError(Exception): | ||
75 | """Internal error thrown in _CheckoutOne() when we don't want stack trace.""" | ||
76 | |||
77 | |||
78 | class Sync(Command, MirrorSafeCommand): | 76 | class Sync(Command, MirrorSafeCommand): |
79 | jobs = 1 | 77 | jobs = 1 |
80 | common = True | 78 | common = True |
@@ -457,149 +455,80 @@ later is required to fix a server side protocol bug. | |||
457 | 455 | ||
458 | return fetched | 456 | return fetched |
459 | 457 | ||
460 | def _CheckoutWorker(self, opt, sem, project, *args, **kwargs): | 458 | def _CheckoutOne(self, opt, project): |
461 | """Main function of the fetch threads. | ||
462 | |||
463 | Delegates most of the work to _CheckoutOne. | ||
464 | |||
465 | Args: | ||
466 | opt: Program options returned from optparse. See _Options(). | ||
467 | projects: Projects to fetch. | ||
468 | sem: We'll release() this semaphore when we exit so that another thread | ||
469 | can be started up. | ||
470 | *args, **kwargs: Remaining arguments to pass to _CheckoutOne. See the | ||
471 | _CheckoutOne docstring for details. | ||
472 | """ | ||
473 | try: | ||
474 | return self._CheckoutOne(opt, project, *args, **kwargs) | ||
475 | finally: | ||
476 | sem.release() | ||
477 | |||
478 | def _CheckoutOne(self, opt, project, lock, pm, err_event, err_results): | ||
479 | """Checkout work tree for one project | 459 | """Checkout work tree for one project |
480 | 460 | ||
481 | Args: | 461 | Args: |
482 | opt: Program options returned from optparse. See _Options(). | 462 | opt: Program options returned from optparse. See _Options(). |
483 | project: Project object for the project to checkout. | 463 | project: Project object for the project to checkout. |
484 | lock: Lock for accessing objects that are shared amongst multiple | ||
485 | _CheckoutWorker() threads. | ||
486 | pm: Instance of a Project object. We will call pm.update() (with our | ||
487 | lock held). | ||
488 | err_event: We'll set this event in the case of an error (after printing | ||
489 | out info about the error). | ||
490 | err_results: A list of strings, paths to git repos where checkout | ||
491 | failed. | ||
492 | 464 | ||
493 | Returns: | 465 | Returns: |
494 | Whether the fetch was successful. | 466 | Whether the fetch was successful. |
495 | """ | 467 | """ |
496 | # We'll set to true once we've locked the lock. | ||
497 | did_lock = False | ||
498 | |||
499 | # Encapsulate everything in a try/except/finally so that: | ||
500 | # - We always set err_event in the case of an exception. | ||
501 | # - We always make sure we unlock the lock if we locked it. | ||
502 | start = time.time() | 468 | start = time.time() |
503 | syncbuf = SyncBuffer(self.manifest.manifestProject.config, | 469 | syncbuf = SyncBuffer(self.manifest.manifestProject.config, |
504 | detach_head=opt.detach_head) | 470 | detach_head=opt.detach_head) |
505 | success = False | 471 | success = False |
506 | with lock: | ||
507 | pm.start(project.name) | ||
508 | try: | 472 | try: |
509 | try: | 473 | project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync) |
510 | project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync) | 474 | success = syncbuf.Finish() |
511 | 475 | except Exception as e: | |
512 | # Lock around all the rest of the code, since printing, updating a set | 476 | print('error: Cannot checkout %s: %s: %s' % |
513 | # and Progress.update() are not thread safe. | 477 | (project.name, type(e).__name__, str(e)), |
514 | lock.acquire() | 478 | file=sys.stderr) |
515 | success = syncbuf.Finish() | 479 | raise |
516 | did_lock = True | ||
517 | |||
518 | if not success: | ||
519 | err_event.set() | ||
520 | print('error: Cannot checkout %s' % (project.name), | ||
521 | file=sys.stderr) | ||
522 | raise _CheckoutError() | ||
523 | except _CheckoutError: | ||
524 | pass | ||
525 | except Exception as e: | ||
526 | print('error: Cannot checkout %s: %s: %s' % | ||
527 | (project.name, type(e).__name__, str(e)), | ||
528 | file=sys.stderr) | ||
529 | err_event.set() | ||
530 | raise | ||
531 | finally: | ||
532 | if not did_lock: | ||
533 | lock.acquire() | ||
534 | if not success: | ||
535 | err_results.append(project.relpath) | ||
536 | pm.finish(project.name) | ||
537 | lock.release() | ||
538 | finish = time.time() | ||
539 | self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, | ||
540 | start, finish, success) | ||
541 | 480 | ||
542 | return success | 481 | if not success: |
482 | print('error: Cannot checkout %s' % (project.name), file=sys.stderr) | ||
483 | finish = time.time() | ||
484 | return (success, project, start, finish) | ||
543 | 485 | ||
544 | def _Checkout(self, all_projects, opt, err_event, err_results): | 486 | def _Checkout(self, all_projects, opt, err_results): |
545 | """Checkout projects listed in all_projects | 487 | """Checkout projects listed in all_projects |
546 | 488 | ||
547 | Args: | 489 | Args: |
548 | all_projects: List of all projects that should be checked out. | 490 | all_projects: List of all projects that should be checked out. |
549 | opt: Program options returned from optparse. See _Options(). | 491 | opt: Program options returned from optparse. See _Options(). |
550 | err_event: We'll set this event in the case of an error (after printing | 492 | err_results: A list of strings, paths to git repos where checkout failed. |
551 | out info about the error). | ||
552 | err_results: A list of strings, paths to git repos where checkout | ||
553 | failed. | ||
554 | """ | 493 | """ |
494 | ret = True | ||
555 | 495 | ||
556 | # Perform checkouts in multiple threads when we are using partial clone. | 496 | # Only checkout projects with worktrees. |
557 | # Without partial clone, all needed git objects are already downloaded, | 497 | all_projects = [x for x in all_projects if x.worktree] |
558 | # in this situation it's better to use only one process because the checkout | ||
559 | # would be mostly disk I/O; with partial clone, the objects are only | ||
560 | # downloaded when demanded (at checkout time), which is similar to the | ||
561 | # Sync_NetworkHalf case and parallelism would be helpful. | ||
562 | if self.manifest.CloneFilter: | ||
563 | syncjobs = self.jobs | ||
564 | else: | ||
565 | syncjobs = 1 | ||
566 | 498 | ||
567 | lock = _threading.Lock() | ||
568 | pm = Progress('Checking out', len(all_projects)) | 499 | pm = Progress('Checking out', len(all_projects)) |
569 | 500 | ||
570 | threads = set() | 501 | def _ProcessResults(results): |
571 | sem = _threading.Semaphore(syncjobs) | 502 | for (success, project, start, finish) in results: |
572 | 503 | self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, | |
573 | for project in all_projects: | 504 | start, finish, success) |
574 | # Check for any errors before running any more tasks. | 505 | # Check for any errors before running any more tasks. |
575 | # ...we'll let existing threads finish, though. | 506 | # ...we'll let existing threads finish, though. |
576 | if err_event.is_set() and opt.fail_fast: | 507 | if not success: |
577 | break | 508 | err_results.append(project.relpath) |
578 | 509 | if opt.fail_fast: | |
579 | sem.acquire() | 510 | return False |
580 | if project.worktree: | 511 | pm.update(msg=project.name) |
581 | kwargs = dict(opt=opt, | 512 | return True |
582 | sem=sem, | 513 | |
583 | project=project, | 514 | # NB: Multiprocessing is heavy, so don't spin it up for one job. |
584 | lock=lock, | 515 | if len(all_projects) == 1 or opt.jobs == 1: |
585 | pm=pm, | 516 | if not _ProcessResults(self._CheckoutOne(opt, x) for x in all_projects): |
586 | err_event=err_event, | 517 | ret = False |
587 | err_results=err_results) | 518 | else: |
588 | if syncjobs > 1: | 519 | with multiprocessing.Pool(opt.jobs) as pool: |
589 | t = _threading.Thread(target=self._CheckoutWorker, | 520 | results = pool.imap_unordered( |
590 | kwargs=kwargs) | 521 | functools.partial(self._CheckoutOne, opt), |
591 | # Ensure that Ctrl-C will not freeze the repo process. | 522 | all_projects, |
592 | t.daemon = True | 523 | chunksize=WORKER_BATCH_SIZE) |
593 | threads.add(t) | 524 | if not _ProcessResults(results): |
594 | t.start() | 525 | ret = False |
595 | else: | 526 | pool.close() |
596 | self._CheckoutWorker(**kwargs) | ||
597 | |||
598 | for t in threads: | ||
599 | t.join() | ||
600 | 527 | ||
601 | pm.end() | 528 | pm.end() |
602 | 529 | ||
530 | return ret | ||
531 | |||
603 | def _GCProjects(self, projects, opt, err_event): | 532 | def _GCProjects(self, projects, opt, err_event): |
604 | gc_gitdirs = {} | 533 | gc_gitdirs = {} |
605 | for project in projects: | 534 | for project in projects: |
@@ -946,7 +875,6 @@ later is required to fix a server side protocol bug. | |||
946 | 875 | ||
947 | err_network_sync = False | 876 | err_network_sync = False |
948 | err_update_projects = False | 877 | err_update_projects = False |
949 | err_checkout = False | ||
950 | 878 | ||
951 | self._fetch_times = _FetchTimes(self.manifest) | 879 | self._fetch_times = _FetchTimes(self.manifest) |
952 | if not opt.local_only: | 880 | if not opt.local_only: |
@@ -1011,10 +939,10 @@ later is required to fix a server side protocol bug. | |||
1011 | sys.exit(1) | 939 | sys.exit(1) |
1012 | 940 | ||
1013 | err_results = [] | 941 | err_results = [] |
1014 | self._Checkout(all_projects, opt, err_event, err_results) | 942 | # NB: We don't exit here because this is the last step. |
1015 | if err_event.is_set(): | 943 | err_checkout = not self._Checkout(all_projects, opt, err_results) |
1016 | err_checkout = True | 944 | if err_checkout: |
1017 | # NB: We don't exit here because this is the last step. | 945 | err_event.set() |
1018 | 946 | ||
1019 | # If there's a notice that's supposed to print at the end of the sync, print | 947 | # If there's a notice that's supposed to print at the end of the sync, print |
1020 | # it now... | 948 | # it now... |