summaryrefslogtreecommitdiffstats
path: root/subcmds
diff options
context:
space:
mode:
Diffstat (limited to 'subcmds')
-rw-r--r--subcmds/sync.py44
1 files changed, 32 insertions, 12 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py
index 28568062..fb25c221 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -358,7 +358,7 @@ later is required to fix a server side protocol bug.
358 optimized_fetch=opt.optimized_fetch, 358 optimized_fetch=opt.optimized_fetch,
359 retry_fetches=opt.retry_fetches, 359 retry_fetches=opt.retry_fetches,
360 prune=opt.prune, 360 prune=opt.prune,
361 ssh_proxy=True, 361 ssh_proxy=self.ssh_proxy,
362 clone_filter=self.manifest.CloneFilter, 362 clone_filter=self.manifest.CloneFilter,
363 partial_clone_exclude=self.manifest.PartialCloneExclude) 363 partial_clone_exclude=self.manifest.PartialCloneExclude)
364 364
@@ -380,7 +380,11 @@ later is required to fix a server side protocol bug.
380 finish = time.time() 380 finish = time.time()
381 return (success, project, start, finish) 381 return (success, project, start, finish)
382 382
383 def _Fetch(self, projects, opt, err_event): 383 @classmethod
384 def _FetchInitChild(cls, ssh_proxy):
385 cls.ssh_proxy = ssh_proxy
386
387 def _Fetch(self, projects, opt, err_event, ssh_proxy):
384 ret = True 388 ret = True
385 389
386 jobs = opt.jobs_network if opt.jobs_network else self.jobs 390 jobs = opt.jobs_network if opt.jobs_network else self.jobs
@@ -410,8 +414,14 @@ later is required to fix a server side protocol bug.
410 break 414 break
411 return ret 415 return ret
412 416
417 # We pass the ssh proxy settings via the class. This allows multiprocessing
418 # to pickle it up when spawning children. We can't pass it as an argument
419 # to _FetchProjectList below as multiprocessing is unable to pickle those.
420 Sync.ssh_proxy = None
421
413 # NB: Multiprocessing is heavy, so don't spin it up for one job. 422 # NB: Multiprocessing is heavy, so don't spin it up for one job.
414 if len(projects_list) == 1 or jobs == 1: 423 if len(projects_list) == 1 or jobs == 1:
424 self._FetchInitChild(ssh_proxy)
415 if not _ProcessResults(self._FetchProjectList(opt, x) for x in projects_list): 425 if not _ProcessResults(self._FetchProjectList(opt, x) for x in projects_list):
416 ret = False 426 ret = False
417 else: 427 else:
@@ -429,7 +439,8 @@ later is required to fix a server side protocol bug.
429 else: 439 else:
430 pm.update(inc=0, msg='warming up') 440 pm.update(inc=0, msg='warming up')
431 chunksize = 4 441 chunksize = 4
432 with multiprocessing.Pool(jobs) as pool: 442 with multiprocessing.Pool(
443 jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,)) as pool:
433 results = pool.imap_unordered( 444 results = pool.imap_unordered(
434 functools.partial(self._FetchProjectList, opt), 445 functools.partial(self._FetchProjectList, opt),
435 projects_list, 446 projects_list,
@@ -438,6 +449,11 @@ later is required to fix a server side protocol bug.
438 ret = False 449 ret = False
439 pool.close() 450 pool.close()
440 451
452 # Cleanup the reference now that we're done with it, and we're going to
453 # release any resources it points to. If we don't, later multiprocessing
454 # usage (e.g. checkouts) will try to pickle and then crash.
455 del Sync.ssh_proxy
456
441 pm.end() 457 pm.end()
442 self._fetch_times.Save() 458 self._fetch_times.Save()
443 459
@@ -447,7 +463,7 @@ later is required to fix a server side protocol bug.
447 return (ret, fetched) 463 return (ret, fetched)
448 464
449 def _FetchMain(self, opt, args, all_projects, err_event, manifest_name, 465 def _FetchMain(self, opt, args, all_projects, err_event, manifest_name,
450 load_local_manifests): 466 load_local_manifests, ssh_proxy):
451 """The main network fetch loop. 467 """The main network fetch loop.
452 468
453 Args: 469 Args:
@@ -457,6 +473,7 @@ later is required to fix a server side protocol bug.
457 err_event: Whether an error was hit while processing. 473 err_event: Whether an error was hit while processing.
458 manifest_name: Manifest file to be reloaded. 474 manifest_name: Manifest file to be reloaded.
459 load_local_manifests: Whether to load local manifests. 475 load_local_manifests: Whether to load local manifests.
476 ssh_proxy: SSH manager for clients & masters.
460 """ 477 """
461 rp = self.manifest.repoProject 478 rp = self.manifest.repoProject
462 479
@@ -467,7 +484,7 @@ later is required to fix a server side protocol bug.
467 to_fetch.extend(all_projects) 484 to_fetch.extend(all_projects)
468 to_fetch.sort(key=self._fetch_times.Get, reverse=True) 485 to_fetch.sort(key=self._fetch_times.Get, reverse=True)
469 486
470 success, fetched = self._Fetch(to_fetch, opt, err_event) 487 success, fetched = self._Fetch(to_fetch, opt, err_event, ssh_proxy)
471 if not success: 488 if not success:
472 err_event.set() 489 err_event.set()
473 490
@@ -498,7 +515,7 @@ later is required to fix a server side protocol bug.
498 if previously_missing_set == missing_set: 515 if previously_missing_set == missing_set:
499 break 516 break
500 previously_missing_set = missing_set 517 previously_missing_set = missing_set
501 success, new_fetched = self._Fetch(missing, opt, err_event) 518 success, new_fetched = self._Fetch(missing, opt, err_event, ssh_proxy)
502 if not success: 519 if not success:
503 err_event.set() 520 err_event.set()
504 fetched.update(new_fetched) 521 fetched.update(new_fetched)
@@ -985,12 +1002,15 @@ later is required to fix a server side protocol bug.
985 1002
986 self._fetch_times = _FetchTimes(self.manifest) 1003 self._fetch_times = _FetchTimes(self.manifest)
987 if not opt.local_only: 1004 if not opt.local_only:
988 try: 1005 with multiprocessing.Manager() as manager:
989 ssh.init() 1006 with ssh.ProxyManager(manager) as ssh_proxy:
990 self._FetchMain(opt, args, all_projects, err_event, manifest_name, 1007 # Initialize the socket dir once in the parent.
991 load_local_manifests) 1008 ssh_proxy.sock()
992 finally: 1009 self._FetchMain(opt, args, all_projects, err_event, manifest_name,
993 ssh.close() 1010 load_local_manifests, ssh_proxy)
1011
1012 if opt.network_only:
1013 return
994 1014
995 # If we saw an error, exit with code 1 so that other scripts can check. 1015 # If we saw an error, exit with code 1 so that other scripts can check.
996 if err_event.is_set(): 1016 if err_event.is_set():