diff options
-rw-r--r-- | subcmds/sync.py | 176 |
1 files changed, 52 insertions, 124 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py index d1b631ae..0db96b54 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
@@ -12,9 +12,11 @@ | |||
12 | # See the License for the specific language governing permissions and | 12 | # See the License for the specific language governing permissions and |
13 | # limitations under the License. | 13 | # limitations under the License. |
14 | 14 | ||
15 | import functools | ||
15 | import http.cookiejar as cookielib | 16 | import http.cookiejar as cookielib |
16 | import io | 17 | import io |
17 | import json | 18 | import json |
19 | import multiprocessing | ||
18 | import netrc | 20 | import netrc |
19 | from optparse import SUPPRESS_HELP | 21 | from optparse import SUPPRESS_HELP |
20 | import os | 22 | import os |
@@ -56,7 +58,7 @@ import git_superproject | |||
56 | import gitc_utils | 58 | import gitc_utils |
57 | from project import Project | 59 | from project import Project |
58 | from project import RemoteSpec | 60 | from project import RemoteSpec |
59 | from command import Command, MirrorSafeCommand | 61 | from command import Command, MirrorSafeCommand, WORKER_BATCH_SIZE |
60 | from error import RepoChangedException, GitError, ManifestParseError | 62 | from error import RepoChangedException, GitError, ManifestParseError |
61 | import platform_utils | 63 | import platform_utils |
62 | from project import SyncBuffer | 64 | from project import SyncBuffer |
@@ -71,10 +73,6 @@ class _FetchError(Exception): | |||
71 | """Internal error thrown in _FetchHelper() when we don't want stack trace.""" | 73 | """Internal error thrown in _FetchHelper() when we don't want stack trace.""" |
72 | 74 | ||
73 | 75 | ||
74 | class _CheckoutError(Exception): | ||
75 | """Internal error thrown in _CheckoutOne() when we don't want stack trace.""" | ||
76 | |||
77 | |||
78 | class Sync(Command, MirrorSafeCommand): | 76 | class Sync(Command, MirrorSafeCommand): |
79 | jobs = 1 | 77 | jobs = 1 |
80 | common = True | 78 | common = True |
@@ -457,149 +455,80 @@ later is required to fix a server side protocol bug. | |||
457 | 455 | ||
458 | return fetched | 456 | return fetched |
459 | 457 | ||
460 | def _CheckoutWorker(self, opt, sem, project, *args, **kwargs): | 458 | def _CheckoutOne(self, opt, project): |
461 | """Main function of the fetch threads. | ||
462 | |||
463 | Delegates most of the work to _CheckoutOne. | ||
464 | |||
465 | Args: | ||
466 | opt: Program options returned from optparse. See _Options(). | ||
467 | projects: Projects to fetch. | ||
468 | sem: We'll release() this semaphore when we exit so that another thread | ||
469 | can be started up. | ||
470 | *args, **kwargs: Remaining arguments to pass to _CheckoutOne. See the | ||
471 | _CheckoutOne docstring for details. | ||
472 | """ | ||
473 | try: | ||
474 | return self._CheckoutOne(opt, project, *args, **kwargs) | ||
475 | finally: | ||
476 | sem.release() | ||
477 | |||
478 | def _CheckoutOne(self, opt, project, lock, pm, err_event, err_results): | ||
479 | """Checkout work tree for one project | 459 | """Checkout work tree for one project |
480 | 460 | ||
481 | Args: | 461 | Args: |
482 | opt: Program options returned from optparse. See _Options(). | 462 | opt: Program options returned from optparse. See _Options(). |
483 | project: Project object for the project to checkout. | 463 | project: Project object for the project to checkout. |
484 | lock: Lock for accessing objects that are shared amongst multiple | ||
485 | _CheckoutWorker() threads. | ||
486 | pm: Instance of a Project object. We will call pm.update() (with our | ||
487 | lock held). | ||
488 | err_event: We'll set this event in the case of an error (after printing | ||
489 | out info about the error). | ||
490 | err_results: A list of strings, paths to git repos where checkout | ||
491 | failed. | ||
492 | 464 | ||
493 | Returns: | 465 | Returns: |
494 | Whether the fetch was successful. | 466 | Whether the fetch was successful. |
495 | """ | 467 | """ |
496 | # We'll set to true once we've locked the lock. | ||
497 | did_lock = False | ||
498 | |||
499 | # Encapsulate everything in a try/except/finally so that: | ||
500 | # - We always set err_event in the case of an exception. | ||
501 | # - We always make sure we unlock the lock if we locked it. | ||
502 | start = time.time() | 468 | start = time.time() |
503 | syncbuf = SyncBuffer(self.manifest.manifestProject.config, | 469 | syncbuf = SyncBuffer(self.manifest.manifestProject.config, |
504 | detach_head=opt.detach_head) | 470 | detach_head=opt.detach_head) |
505 | success = False | 471 | success = False |
506 | with lock: | ||
507 | pm.start(project.name) | ||
508 | try: | 472 | try: |
509 | try: | 473 | project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync) |
510 | project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync) | 474 | success = syncbuf.Finish() |
511 | 475 | except Exception as e: | |
512 | # Lock around all the rest of the code, since printing, updating a set | 476 | print('error: Cannot checkout %s: %s: %s' % |
513 | # and Progress.update() are not thread safe. | 477 | (project.name, type(e).__name__, str(e)), |
514 | lock.acquire() | 478 | file=sys.stderr) |
515 | success = syncbuf.Finish() | 479 | raise |
516 | did_lock = True | ||
517 | |||
518 | if not success: | ||
519 | err_event.set() | ||
520 | print('error: Cannot checkout %s' % (project.name), | ||
521 | file=sys.stderr) | ||
522 | raise _CheckoutError() | ||
523 | except _CheckoutError: | ||
524 | pass | ||
525 | except Exception as e: | ||
526 | print('error: Cannot checkout %s: %s: %s' % | ||
527 | (project.name, type(e).__name__, str(e)), | ||
528 | file=sys.stderr) | ||
529 | err_event.set() | ||
530 | raise | ||
531 | finally: | ||
532 | if not did_lock: | ||
533 | lock.acquire() | ||
534 | if not success: | ||
535 | err_results.append(project.relpath) | ||
536 | pm.finish(project.name) | ||
537 | lock.release() | ||
538 | finish = time.time() | ||
539 | self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, | ||
540 | start, finish, success) | ||
541 | 480 | ||
542 | return success | 481 | if not success: |
482 | print('error: Cannot checkout %s' % (project.name), file=sys.stderr) | ||
483 | finish = time.time() | ||
484 | return (success, project, start, finish) | ||
543 | 485 | ||
544 | def _Checkout(self, all_projects, opt, err_event, err_results): | 486 | def _Checkout(self, all_projects, opt, err_results): |
545 | """Checkout projects listed in all_projects | 487 | """Checkout projects listed in all_projects |
546 | 488 | ||
547 | Args: | 489 | Args: |
548 | all_projects: List of all projects that should be checked out. | 490 | all_projects: List of all projects that should be checked out. |
549 | opt: Program options returned from optparse. See _Options(). | 491 | opt: Program options returned from optparse. See _Options(). |
550 | err_event: We'll set this event in the case of an error (after printing | 492 | err_results: A list of strings, paths to git repos where checkout failed. |
551 | out info about the error). | ||
552 | err_results: A list of strings, paths to git repos where checkout | ||
553 | failed. | ||
554 | """ | 493 | """ |
494 | ret = True | ||
555 | 495 | ||
556 | # Perform checkouts in multiple threads when we are using partial clone. | 496 | # Only checkout projects with worktrees. |
557 | # Without partial clone, all needed git objects are already downloaded, | 497 | all_projects = [x for x in all_projects if x.worktree] |
558 | # in this situation it's better to use only one process because the checkout | ||
559 | # would be mostly disk I/O; with partial clone, the objects are only | ||
560 | # downloaded when demanded (at checkout time), which is similar to the | ||
561 | # Sync_NetworkHalf case and parallelism would be helpful. | ||
562 | if self.manifest.CloneFilter: | ||
563 | syncjobs = self.jobs | ||
564 | else: | ||
565 | syncjobs = 1 | ||
566 | 498 | ||
567 | lock = _threading.Lock() | ||
568 | pm = Progress('Checking out', len(all_projects)) | 499 | pm = Progress('Checking out', len(all_projects)) |
569 | 500 | ||
570 | threads = set() | 501 | def _ProcessResults(results): |
571 | sem = _threading.Semaphore(syncjobs) | 502 | for (success, project, start, finish) in results: |
572 | 503 | self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, | |
573 | for project in all_projects: | 504 | start, finish, success) |
574 | # Check for any errors before running any more tasks. | 505 | # Check for any errors before running any more tasks. |
575 | # ...we'll let existing threads finish, though. | 506 | # ...we'll let existing threads finish, though. |
576 | if err_event.is_set() and opt.fail_fast: | 507 | if not success: |
577 | break | 508 | err_results.append(project.relpath) |
578 | 509 | if opt.fail_fast: | |
579 | sem.acquire() | 510 | return False |
580 | if project.worktree: | 511 | pm.update(msg=project.name) |
581 | kwargs = dict(opt=opt, | 512 | return True |
582 | sem=sem, | 513 | |
583 | project=project, | 514 | # NB: Multiprocessing is heavy, so don't spin it up for one job. |
584 | lock=lock, | 515 | if len(all_projects) == 1 or opt.jobs == 1: |
585 | pm=pm, | 516 | if not _ProcessResults(self._CheckoutOne(opt, x) for x in all_projects): |
586 | err_event=err_event, | 517 | ret = False |
587 | err_results=err_results) | 518 | else: |
588 | if syncjobs > 1: | 519 | with multiprocessing.Pool(opt.jobs) as pool: |
589 | t = _threading.Thread(target=self._CheckoutWorker, | 520 | results = pool.imap_unordered( |
590 | kwargs=kwargs) | 521 | functools.partial(self._CheckoutOne, opt), |
591 | # Ensure that Ctrl-C will not freeze the repo process. | 522 | all_projects, |
592 | t.daemon = True | 523 | chunksize=WORKER_BATCH_SIZE) |
593 | threads.add(t) | 524 | if not _ProcessResults(results): |
594 | t.start() | 525 | ret = False |
595 | else: | 526 | pool.close() |
596 | self._CheckoutWorker(**kwargs) | ||
597 | |||
598 | for t in threads: | ||
599 | t.join() | ||
600 | 527 | ||
601 | pm.end() | 528 | pm.end() |
602 | 529 | ||
530 | return ret | ||
531 | |||
603 | def _GCProjects(self, projects, opt, err_event): | 532 | def _GCProjects(self, projects, opt, err_event): |
604 | gc_gitdirs = {} | 533 | gc_gitdirs = {} |
605 | for project in projects: | 534 | for project in projects: |
@@ -946,7 +875,6 @@ later is required to fix a server side protocol bug. | |||
946 | 875 | ||
947 | err_network_sync = False | 876 | err_network_sync = False |
948 | err_update_projects = False | 877 | err_update_projects = False |
949 | err_checkout = False | ||
950 | 878 | ||
951 | self._fetch_times = _FetchTimes(self.manifest) | 879 | self._fetch_times = _FetchTimes(self.manifest) |
952 | if not opt.local_only: | 880 | if not opt.local_only: |
@@ -1011,10 +939,10 @@ later is required to fix a server side protocol bug. | |||
1011 | sys.exit(1) | 939 | sys.exit(1) |
1012 | 940 | ||
1013 | err_results = [] | 941 | err_results = [] |
1014 | self._Checkout(all_projects, opt, err_event, err_results) | 942 | # NB: We don't exit here because this is the last step. |
1015 | if err_event.is_set(): | 943 | err_checkout = not self._Checkout(all_projects, opt, err_results) |
1016 | err_checkout = True | 944 | if err_checkout: |
1017 | # NB: We don't exit here because this is the last step. | 945 | err_event.set() |
1018 | 946 | ||
1019 | # If there's a notice that's supposed to print at the end of the sync, print | 947 | # If there's a notice that's supposed to print at the end of the sync, print |
1020 | # it now... | 948 | # it now... |