diff options
| -rw-r--r-- | subcmds/sync.py | 176 | 
1 files changed, 52 insertions, 124 deletions
| diff --git a/subcmds/sync.py b/subcmds/sync.py index d1b631ae..0db96b54 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
| @@ -12,9 +12,11 @@ | |||
| 12 | # See the License for the specific language governing permissions and | 12 | # See the License for the specific language governing permissions and | 
| 13 | # limitations under the License. | 13 | # limitations under the License. | 
| 14 | 14 | ||
| 15 | import functools | ||
| 15 | import http.cookiejar as cookielib | 16 | import http.cookiejar as cookielib | 
| 16 | import io | 17 | import io | 
| 17 | import json | 18 | import json | 
| 19 | import multiprocessing | ||
| 18 | import netrc | 20 | import netrc | 
| 19 | from optparse import SUPPRESS_HELP | 21 | from optparse import SUPPRESS_HELP | 
| 20 | import os | 22 | import os | 
| @@ -56,7 +58,7 @@ import git_superproject | |||
| 56 | import gitc_utils | 58 | import gitc_utils | 
| 57 | from project import Project | 59 | from project import Project | 
| 58 | from project import RemoteSpec | 60 | from project import RemoteSpec | 
| 59 | from command import Command, MirrorSafeCommand | 61 | from command import Command, MirrorSafeCommand, WORKER_BATCH_SIZE | 
| 60 | from error import RepoChangedException, GitError, ManifestParseError | 62 | from error import RepoChangedException, GitError, ManifestParseError | 
| 61 | import platform_utils | 63 | import platform_utils | 
| 62 | from project import SyncBuffer | 64 | from project import SyncBuffer | 
| @@ -71,10 +73,6 @@ class _FetchError(Exception): | |||
| 71 | """Internal error thrown in _FetchHelper() when we don't want stack trace.""" | 73 | """Internal error thrown in _FetchHelper() when we don't want stack trace.""" | 
| 72 | 74 | ||
| 73 | 75 | ||
| 74 | class _CheckoutError(Exception): | ||
| 75 | """Internal error thrown in _CheckoutOne() when we don't want stack trace.""" | ||
| 76 | |||
| 77 | |||
| 78 | class Sync(Command, MirrorSafeCommand): | 76 | class Sync(Command, MirrorSafeCommand): | 
| 79 | jobs = 1 | 77 | jobs = 1 | 
| 80 | common = True | 78 | common = True | 
| @@ -457,149 +455,80 @@ later is required to fix a server side protocol bug. | |||
| 457 | 455 | ||
| 458 | return fetched | 456 | return fetched | 
| 459 | 457 | ||
| 460 | def _CheckoutWorker(self, opt, sem, project, *args, **kwargs): | 458 | def _CheckoutOne(self, opt, project): | 
| 461 | """Main function of the fetch threads. | ||
| 462 | |||
| 463 | Delegates most of the work to _CheckoutOne. | ||
| 464 | |||
| 465 | Args: | ||
| 466 | opt: Program options returned from optparse. See _Options(). | ||
| 467 | projects: Projects to fetch. | ||
| 468 | sem: We'll release() this semaphore when we exit so that another thread | ||
| 469 | can be started up. | ||
| 470 | *args, **kwargs: Remaining arguments to pass to _CheckoutOne. See the | ||
| 471 | _CheckoutOne docstring for details. | ||
| 472 | """ | ||
| 473 | try: | ||
| 474 | return self._CheckoutOne(opt, project, *args, **kwargs) | ||
| 475 | finally: | ||
| 476 | sem.release() | ||
| 477 | |||
| 478 | def _CheckoutOne(self, opt, project, lock, pm, err_event, err_results): | ||
| 479 | """Checkout work tree for one project | 459 | """Checkout work tree for one project | 
| 480 | 460 | ||
| 481 | Args: | 461 | Args: | 
| 482 | opt: Program options returned from optparse. See _Options(). | 462 | opt: Program options returned from optparse. See _Options(). | 
| 483 | project: Project object for the project to checkout. | 463 | project: Project object for the project to checkout. | 
| 484 | lock: Lock for accessing objects that are shared amongst multiple | ||
| 485 | _CheckoutWorker() threads. | ||
| 486 | pm: Instance of a Project object. We will call pm.update() (with our | ||
| 487 | lock held). | ||
| 488 | err_event: We'll set this event in the case of an error (after printing | ||
| 489 | out info about the error). | ||
| 490 | err_results: A list of strings, paths to git repos where checkout | ||
| 491 | failed. | ||
| 492 | 464 | ||
| 493 | Returns: | 465 | Returns: | 
| 494 | Whether the fetch was successful. | 466 | Whether the fetch was successful. | 
| 495 | """ | 467 | """ | 
| 496 | # We'll set to true once we've locked the lock. | ||
| 497 | did_lock = False | ||
| 498 | |||
| 499 | # Encapsulate everything in a try/except/finally so that: | ||
| 500 | # - We always set err_event in the case of an exception. | ||
| 501 | # - We always make sure we unlock the lock if we locked it. | ||
| 502 | start = time.time() | 468 | start = time.time() | 
| 503 | syncbuf = SyncBuffer(self.manifest.manifestProject.config, | 469 | syncbuf = SyncBuffer(self.manifest.manifestProject.config, | 
| 504 | detach_head=opt.detach_head) | 470 | detach_head=opt.detach_head) | 
| 505 | success = False | 471 | success = False | 
| 506 | with lock: | ||
| 507 | pm.start(project.name) | ||
| 508 | try: | 472 | try: | 
| 509 | try: | 473 | project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync) | 
| 510 | project.Sync_LocalHalf(syncbuf, force_sync=opt.force_sync) | 474 | success = syncbuf.Finish() | 
| 511 | 475 | except Exception as e: | |
| 512 | # Lock around all the rest of the code, since printing, updating a set | 476 | print('error: Cannot checkout %s: %s: %s' % | 
| 513 | # and Progress.update() are not thread safe. | 477 | (project.name, type(e).__name__, str(e)), | 
| 514 | lock.acquire() | 478 | file=sys.stderr) | 
| 515 | success = syncbuf.Finish() | 479 | raise | 
| 516 | did_lock = True | ||
| 517 | |||
| 518 | if not success: | ||
| 519 | err_event.set() | ||
| 520 | print('error: Cannot checkout %s' % (project.name), | ||
| 521 | file=sys.stderr) | ||
| 522 | raise _CheckoutError() | ||
| 523 | except _CheckoutError: | ||
| 524 | pass | ||
| 525 | except Exception as e: | ||
| 526 | print('error: Cannot checkout %s: %s: %s' % | ||
| 527 | (project.name, type(e).__name__, str(e)), | ||
| 528 | file=sys.stderr) | ||
| 529 | err_event.set() | ||
| 530 | raise | ||
| 531 | finally: | ||
| 532 | if not did_lock: | ||
| 533 | lock.acquire() | ||
| 534 | if not success: | ||
| 535 | err_results.append(project.relpath) | ||
| 536 | pm.finish(project.name) | ||
| 537 | lock.release() | ||
| 538 | finish = time.time() | ||
| 539 | self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, | ||
| 540 | start, finish, success) | ||
| 541 | 480 | ||
| 542 | return success | 481 | if not success: | 
| 482 | print('error: Cannot checkout %s' % (project.name), file=sys.stderr) | ||
| 483 | finish = time.time() | ||
| 484 | return (success, project, start, finish) | ||
| 543 | 485 | ||
| 544 | def _Checkout(self, all_projects, opt, err_event, err_results): | 486 | def _Checkout(self, all_projects, opt, err_results): | 
| 545 | """Checkout projects listed in all_projects | 487 | """Checkout projects listed in all_projects | 
| 546 | 488 | ||
| 547 | Args: | 489 | Args: | 
| 548 | all_projects: List of all projects that should be checked out. | 490 | all_projects: List of all projects that should be checked out. | 
| 549 | opt: Program options returned from optparse. See _Options(). | 491 | opt: Program options returned from optparse. See _Options(). | 
| 550 | err_event: We'll set this event in the case of an error (after printing | 492 | err_results: A list of strings, paths to git repos where checkout failed. | 
| 551 | out info about the error). | ||
| 552 | err_results: A list of strings, paths to git repos where checkout | ||
| 553 | failed. | ||
| 554 | """ | 493 | """ | 
| 494 | ret = True | ||
| 555 | 495 | ||
| 556 | # Perform checkouts in multiple threads when we are using partial clone. | 496 | # Only checkout projects with worktrees. | 
| 557 | # Without partial clone, all needed git objects are already downloaded, | 497 | all_projects = [x for x in all_projects if x.worktree] | 
| 558 | # in this situation it's better to use only one process because the checkout | ||
| 559 | # would be mostly disk I/O; with partial clone, the objects are only | ||
| 560 | # downloaded when demanded (at checkout time), which is similar to the | ||
| 561 | # Sync_NetworkHalf case and parallelism would be helpful. | ||
| 562 | if self.manifest.CloneFilter: | ||
| 563 | syncjobs = self.jobs | ||
| 564 | else: | ||
| 565 | syncjobs = 1 | ||
| 566 | 498 | ||
| 567 | lock = _threading.Lock() | ||
| 568 | pm = Progress('Checking out', len(all_projects)) | 499 | pm = Progress('Checking out', len(all_projects)) | 
| 569 | 500 | ||
| 570 | threads = set() | 501 | def _ProcessResults(results): | 
| 571 | sem = _threading.Semaphore(syncjobs) | 502 | for (success, project, start, finish) in results: | 
| 572 | 503 | self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, | |
| 573 | for project in all_projects: | 504 | start, finish, success) | 
| 574 | # Check for any errors before running any more tasks. | 505 | # Check for any errors before running any more tasks. | 
| 575 | # ...we'll let existing threads finish, though. | 506 | # ...we'll let existing threads finish, though. | 
| 576 | if err_event.is_set() and opt.fail_fast: | 507 | if not success: | 
| 577 | break | 508 | err_results.append(project.relpath) | 
| 578 | 509 | if opt.fail_fast: | |
| 579 | sem.acquire() | 510 | return False | 
| 580 | if project.worktree: | 511 | pm.update(msg=project.name) | 
| 581 | kwargs = dict(opt=opt, | 512 | return True | 
| 582 | sem=sem, | 513 | |
| 583 | project=project, | 514 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 
| 584 | lock=lock, | 515 | if len(all_projects) == 1 or opt.jobs == 1: | 
| 585 | pm=pm, | 516 | if not _ProcessResults(self._CheckoutOne(opt, x) for x in all_projects): | 
| 586 | err_event=err_event, | 517 | ret = False | 
| 587 | err_results=err_results) | 518 | else: | 
| 588 | if syncjobs > 1: | 519 | with multiprocessing.Pool(opt.jobs) as pool: | 
| 589 | t = _threading.Thread(target=self._CheckoutWorker, | 520 | results = pool.imap_unordered( | 
| 590 | kwargs=kwargs) | 521 | functools.partial(self._CheckoutOne, opt), | 
| 591 | # Ensure that Ctrl-C will not freeze the repo process. | 522 | all_projects, | 
| 592 | t.daemon = True | 523 | chunksize=WORKER_BATCH_SIZE) | 
| 593 | threads.add(t) | 524 | if not _ProcessResults(results): | 
| 594 | t.start() | 525 | ret = False | 
| 595 | else: | 526 | pool.close() | 
| 596 | self._CheckoutWorker(**kwargs) | ||
| 597 | |||
| 598 | for t in threads: | ||
| 599 | t.join() | ||
| 600 | 527 | ||
| 601 | pm.end() | 528 | pm.end() | 
| 602 | 529 | ||
| 530 | return ret | ||
| 531 | |||
| 603 | def _GCProjects(self, projects, opt, err_event): | 532 | def _GCProjects(self, projects, opt, err_event): | 
| 604 | gc_gitdirs = {} | 533 | gc_gitdirs = {} | 
| 605 | for project in projects: | 534 | for project in projects: | 
| @@ -946,7 +875,6 @@ later is required to fix a server side protocol bug. | |||
| 946 | 875 | ||
| 947 | err_network_sync = False | 876 | err_network_sync = False | 
| 948 | err_update_projects = False | 877 | err_update_projects = False | 
| 949 | err_checkout = False | ||
| 950 | 878 | ||
| 951 | self._fetch_times = _FetchTimes(self.manifest) | 879 | self._fetch_times = _FetchTimes(self.manifest) | 
| 952 | if not opt.local_only: | 880 | if not opt.local_only: | 
| @@ -1011,10 +939,10 @@ later is required to fix a server side protocol bug. | |||
| 1011 | sys.exit(1) | 939 | sys.exit(1) | 
| 1012 | 940 | ||
| 1013 | err_results = [] | 941 | err_results = [] | 
| 1014 | self._Checkout(all_projects, opt, err_event, err_results) | 942 | # NB: We don't exit here because this is the last step. | 
| 1015 | if err_event.is_set(): | 943 | err_checkout = not self._Checkout(all_projects, opt, err_results) | 
| 1016 | err_checkout = True | 944 | if err_checkout: | 
| 1017 | # NB: We don't exit here because this is the last step. | 945 | err_event.set() | 
| 1018 | 946 | ||
| 1019 | # If there's a notice that's supposed to print at the end of the sync, print | 947 | # If there's a notice that's supposed to print at the end of the sync, print | 
| 1020 | # it now... | 948 | # it now... | 
