diff options
Diffstat (limited to 'subcmds')
-rw-r--r-- | subcmds/sync.py | 44 |
1 files changed, 32 insertions, 12 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py index 28568062..fb25c221 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
@@ -358,7 +358,7 @@ later is required to fix a server side protocol bug. | |||
358 | optimized_fetch=opt.optimized_fetch, | 358 | optimized_fetch=opt.optimized_fetch, |
359 | retry_fetches=opt.retry_fetches, | 359 | retry_fetches=opt.retry_fetches, |
360 | prune=opt.prune, | 360 | prune=opt.prune, |
361 | ssh_proxy=True, | 361 | ssh_proxy=self.ssh_proxy, |
362 | clone_filter=self.manifest.CloneFilter, | 362 | clone_filter=self.manifest.CloneFilter, |
363 | partial_clone_exclude=self.manifest.PartialCloneExclude) | 363 | partial_clone_exclude=self.manifest.PartialCloneExclude) |
364 | 364 | ||
@@ -380,7 +380,11 @@ later is required to fix a server side protocol bug. | |||
380 | finish = time.time() | 380 | finish = time.time() |
381 | return (success, project, start, finish) | 381 | return (success, project, start, finish) |
382 | 382 | ||
383 | def _Fetch(self, projects, opt, err_event): | 383 | @classmethod |
384 | def _FetchInitChild(cls, ssh_proxy): | ||
385 | cls.ssh_proxy = ssh_proxy | ||
386 | |||
387 | def _Fetch(self, projects, opt, err_event, ssh_proxy): | ||
384 | ret = True | 388 | ret = True |
385 | 389 | ||
386 | jobs = opt.jobs_network if opt.jobs_network else self.jobs | 390 | jobs = opt.jobs_network if opt.jobs_network else self.jobs |
@@ -410,8 +414,14 @@ later is required to fix a server side protocol bug. | |||
410 | break | 414 | break |
411 | return ret | 415 | return ret |
412 | 416 | ||
417 | # We pass the ssh proxy settings via the class. This allows multiprocessing | ||
418 | # to pickle it up when spawning children. We can't pass it as an argument | ||
419 | # to _FetchProjectList below as multiprocessing is unable to pickle those. | ||
420 | Sync.ssh_proxy = None | ||
421 | |||
413 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 422 | # NB: Multiprocessing is heavy, so don't spin it up for one job. |
414 | if len(projects_list) == 1 or jobs == 1: | 423 | if len(projects_list) == 1 or jobs == 1: |
424 | self._FetchInitChild(ssh_proxy) | ||
415 | if not _ProcessResults(self._FetchProjectList(opt, x) for x in projects_list): | 425 | if not _ProcessResults(self._FetchProjectList(opt, x) for x in projects_list): |
416 | ret = False | 426 | ret = False |
417 | else: | 427 | else: |
@@ -429,7 +439,8 @@ later is required to fix a server side protocol bug. | |||
429 | else: | 439 | else: |
430 | pm.update(inc=0, msg='warming up') | 440 | pm.update(inc=0, msg='warming up') |
431 | chunksize = 4 | 441 | chunksize = 4 |
432 | with multiprocessing.Pool(jobs) as pool: | 442 | with multiprocessing.Pool( |
443 | jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,)) as pool: | ||
433 | results = pool.imap_unordered( | 444 | results = pool.imap_unordered( |
434 | functools.partial(self._FetchProjectList, opt), | 445 | functools.partial(self._FetchProjectList, opt), |
435 | projects_list, | 446 | projects_list, |
@@ -438,6 +449,11 @@ later is required to fix a server side protocol bug. | |||
438 | ret = False | 449 | ret = False |
439 | pool.close() | 450 | pool.close() |
440 | 451 | ||
452 | # Cleanup the reference now that we're done with it, and we're going to | ||
453 | # release any resources it points to. If we don't, later multiprocessing | ||
454 | # usage (e.g. checkouts) will try to pickle and then crash. | ||
455 | del Sync.ssh_proxy | ||
456 | |||
441 | pm.end() | 457 | pm.end() |
442 | self._fetch_times.Save() | 458 | self._fetch_times.Save() |
443 | 459 | ||
@@ -447,7 +463,7 @@ later is required to fix a server side protocol bug. | |||
447 | return (ret, fetched) | 463 | return (ret, fetched) |
448 | 464 | ||
449 | def _FetchMain(self, opt, args, all_projects, err_event, manifest_name, | 465 | def _FetchMain(self, opt, args, all_projects, err_event, manifest_name, |
450 | load_local_manifests): | 466 | load_local_manifests, ssh_proxy): |
451 | """The main network fetch loop. | 467 | """The main network fetch loop. |
452 | 468 | ||
453 | Args: | 469 | Args: |
@@ -457,6 +473,7 @@ later is required to fix a server side protocol bug. | |||
457 | err_event: Whether an error was hit while processing. | 473 | err_event: Whether an error was hit while processing. |
458 | manifest_name: Manifest file to be reloaded. | 474 | manifest_name: Manifest file to be reloaded. |
459 | load_local_manifests: Whether to load local manifests. | 475 | load_local_manifests: Whether to load local manifests. |
476 | ssh_proxy: SSH manager for clients & masters. | ||
460 | """ | 477 | """ |
461 | rp = self.manifest.repoProject | 478 | rp = self.manifest.repoProject |
462 | 479 | ||
@@ -467,7 +484,7 @@ later is required to fix a server side protocol bug. | |||
467 | to_fetch.extend(all_projects) | 484 | to_fetch.extend(all_projects) |
468 | to_fetch.sort(key=self._fetch_times.Get, reverse=True) | 485 | to_fetch.sort(key=self._fetch_times.Get, reverse=True) |
469 | 486 | ||
470 | success, fetched = self._Fetch(to_fetch, opt, err_event) | 487 | success, fetched = self._Fetch(to_fetch, opt, err_event, ssh_proxy) |
471 | if not success: | 488 | if not success: |
472 | err_event.set() | 489 | err_event.set() |
473 | 490 | ||
@@ -498,7 +515,7 @@ later is required to fix a server side protocol bug. | |||
498 | if previously_missing_set == missing_set: | 515 | if previously_missing_set == missing_set: |
499 | break | 516 | break |
500 | previously_missing_set = missing_set | 517 | previously_missing_set = missing_set |
501 | success, new_fetched = self._Fetch(missing, opt, err_event) | 518 | success, new_fetched = self._Fetch(missing, opt, err_event, ssh_proxy) |
502 | if not success: | 519 | if not success: |
503 | err_event.set() | 520 | err_event.set() |
504 | fetched.update(new_fetched) | 521 | fetched.update(new_fetched) |
@@ -985,12 +1002,15 @@ later is required to fix a server side protocol bug. | |||
985 | 1002 | ||
986 | self._fetch_times = _FetchTimes(self.manifest) | 1003 | self._fetch_times = _FetchTimes(self.manifest) |
987 | if not opt.local_only: | 1004 | if not opt.local_only: |
988 | try: | 1005 | with multiprocessing.Manager() as manager: |
989 | ssh.init() | 1006 | with ssh.ProxyManager(manager) as ssh_proxy: |
990 | self._FetchMain(opt, args, all_projects, err_event, manifest_name, | 1007 | # Initialize the socket dir once in the parent. |
991 | load_local_manifests) | 1008 | ssh_proxy.sock() |
992 | finally: | 1009 | self._FetchMain(opt, args, all_projects, err_event, manifest_name, |
993 | ssh.close() | 1010 | load_local_manifests, ssh_proxy) |
1011 | |||
1012 | if opt.network_only: | ||
1013 | return | ||
994 | 1014 | ||
995 | # If we saw an error, exit with code 1 so that other scripts can check. | 1015 | # If we saw an error, exit with code 1 so that other scripts can check. |
996 | if err_event.is_set(): | 1016 | if err_event.is_set(): |