summaryrefslogtreecommitdiffstats
path: root/subcmds/sync.py
diff options
context:
space:
mode:
authorMike Frysinger <vapier@google.com>2021-05-06 00:44:42 -0400
committerMike Frysinger <vapier@google.com>2021-05-10 21:16:06 +0000
commit339f2df1ddd741070e340ec01d6882dd1eee617c (patch)
treed16fe7c87ba966a400d545bef5a49c460b75dc57 /subcmds/sync.py
parent19e409c81863878d5d313fdc40b3975b98602454 (diff)
downloadgit-repo-339f2df1ddd741070e340ec01d6882dd1eee617c.tar.gz
ssh: rewrite proxy management for multiprocessing usagev2.15
We changed sync to use multiprocessing for parallel work. This broke the ssh proxy code as it's all based on threads. Rewrite the logic to be multiprocessing safe. Now instead of the module acting as a stateful object, callers have to instantiate a new ProxyManager class that holds all the state, an pass that down to any users. Bug: https://crbug.com/gerrit/12389 Change-Id: I4b1af116f7306b91e825d3c56fb4274c9b033562 Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/305486 Tested-by: Mike Frysinger <vapier@google.com> Reviewed-by: Chris Mcdonald <cjmcdonald@google.com>
Diffstat (limited to 'subcmds/sync.py')
-rw-r--r--subcmds/sync.py44
1 files changed, 32 insertions, 12 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py
index 28568062..fb25c221 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -358,7 +358,7 @@ later is required to fix a server side protocol bug.
358 optimized_fetch=opt.optimized_fetch, 358 optimized_fetch=opt.optimized_fetch,
359 retry_fetches=opt.retry_fetches, 359 retry_fetches=opt.retry_fetches,
360 prune=opt.prune, 360 prune=opt.prune,
361 ssh_proxy=True, 361 ssh_proxy=self.ssh_proxy,
362 clone_filter=self.manifest.CloneFilter, 362 clone_filter=self.manifest.CloneFilter,
363 partial_clone_exclude=self.manifest.PartialCloneExclude) 363 partial_clone_exclude=self.manifest.PartialCloneExclude)
364 364
@@ -380,7 +380,11 @@ later is required to fix a server side protocol bug.
380 finish = time.time() 380 finish = time.time()
381 return (success, project, start, finish) 381 return (success, project, start, finish)
382 382
383 def _Fetch(self, projects, opt, err_event): 383 @classmethod
384 def _FetchInitChild(cls, ssh_proxy):
385 cls.ssh_proxy = ssh_proxy
386
387 def _Fetch(self, projects, opt, err_event, ssh_proxy):
384 ret = True 388 ret = True
385 389
386 jobs = opt.jobs_network if opt.jobs_network else self.jobs 390 jobs = opt.jobs_network if opt.jobs_network else self.jobs
@@ -410,8 +414,14 @@ later is required to fix a server side protocol bug.
410 break 414 break
411 return ret 415 return ret
412 416
417 # We pass the ssh proxy settings via the class. This allows multiprocessing
418 # to pickle it up when spawning children. We can't pass it as an argument
419 # to _FetchProjectList below as multiprocessing is unable to pickle those.
420 Sync.ssh_proxy = None
421
413 # NB: Multiprocessing is heavy, so don't spin it up for one job. 422 # NB: Multiprocessing is heavy, so don't spin it up for one job.
414 if len(projects_list) == 1 or jobs == 1: 423 if len(projects_list) == 1 or jobs == 1:
424 self._FetchInitChild(ssh_proxy)
415 if not _ProcessResults(self._FetchProjectList(opt, x) for x in projects_list): 425 if not _ProcessResults(self._FetchProjectList(opt, x) for x in projects_list):
416 ret = False 426 ret = False
417 else: 427 else:
@@ -429,7 +439,8 @@ later is required to fix a server side protocol bug.
429 else: 439 else:
430 pm.update(inc=0, msg='warming up') 440 pm.update(inc=0, msg='warming up')
431 chunksize = 4 441 chunksize = 4
432 with multiprocessing.Pool(jobs) as pool: 442 with multiprocessing.Pool(
443 jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,)) as pool:
433 results = pool.imap_unordered( 444 results = pool.imap_unordered(
434 functools.partial(self._FetchProjectList, opt), 445 functools.partial(self._FetchProjectList, opt),
435 projects_list, 446 projects_list,
@@ -438,6 +449,11 @@ later is required to fix a server side protocol bug.
438 ret = False 449 ret = False
439 pool.close() 450 pool.close()
440 451
452 # Cleanup the reference now that we're done with it, and we're going to
453 # release any resources it points to. If we don't, later multiprocessing
454 # usage (e.g. checkouts) will try to pickle and then crash.
455 del Sync.ssh_proxy
456
441 pm.end() 457 pm.end()
442 self._fetch_times.Save() 458 self._fetch_times.Save()
443 459
@@ -447,7 +463,7 @@ later is required to fix a server side protocol bug.
447 return (ret, fetched) 463 return (ret, fetched)
448 464
449 def _FetchMain(self, opt, args, all_projects, err_event, manifest_name, 465 def _FetchMain(self, opt, args, all_projects, err_event, manifest_name,
450 load_local_manifests): 466 load_local_manifests, ssh_proxy):
451 """The main network fetch loop. 467 """The main network fetch loop.
452 468
453 Args: 469 Args:
@@ -457,6 +473,7 @@ later is required to fix a server side protocol bug.
457 err_event: Whether an error was hit while processing. 473 err_event: Whether an error was hit while processing.
458 manifest_name: Manifest file to be reloaded. 474 manifest_name: Manifest file to be reloaded.
459 load_local_manifests: Whether to load local manifests. 475 load_local_manifests: Whether to load local manifests.
476 ssh_proxy: SSH manager for clients & masters.
460 """ 477 """
461 rp = self.manifest.repoProject 478 rp = self.manifest.repoProject
462 479
@@ -467,7 +484,7 @@ later is required to fix a server side protocol bug.
467 to_fetch.extend(all_projects) 484 to_fetch.extend(all_projects)
468 to_fetch.sort(key=self._fetch_times.Get, reverse=True) 485 to_fetch.sort(key=self._fetch_times.Get, reverse=True)
469 486
470 success, fetched = self._Fetch(to_fetch, opt, err_event) 487 success, fetched = self._Fetch(to_fetch, opt, err_event, ssh_proxy)
471 if not success: 488 if not success:
472 err_event.set() 489 err_event.set()
473 490
@@ -498,7 +515,7 @@ later is required to fix a server side protocol bug.
498 if previously_missing_set == missing_set: 515 if previously_missing_set == missing_set:
499 break 516 break
500 previously_missing_set = missing_set 517 previously_missing_set = missing_set
501 success, new_fetched = self._Fetch(missing, opt, err_event) 518 success, new_fetched = self._Fetch(missing, opt, err_event, ssh_proxy)
502 if not success: 519 if not success:
503 err_event.set() 520 err_event.set()
504 fetched.update(new_fetched) 521 fetched.update(new_fetched)
@@ -985,12 +1002,15 @@ later is required to fix a server side protocol bug.
985 1002
986 self._fetch_times = _FetchTimes(self.manifest) 1003 self._fetch_times = _FetchTimes(self.manifest)
987 if not opt.local_only: 1004 if not opt.local_only:
988 try: 1005 with multiprocessing.Manager() as manager:
989 ssh.init() 1006 with ssh.ProxyManager(manager) as ssh_proxy:
990 self._FetchMain(opt, args, all_projects, err_event, manifest_name, 1007 # Initialize the socket dir once in the parent.
991 load_local_manifests) 1008 ssh_proxy.sock()
992 finally: 1009 self._FetchMain(opt, args, all_projects, err_event, manifest_name,
993 ssh.close() 1010 load_local_manifests, ssh_proxy)
1011
1012 if opt.network_only:
1013 return
994 1014
995 # If we saw an error, exit with code 1 so that other scripts can check. 1015 # If we saw an error, exit with code 1 so that other scripts can check.
996 if err_event.is_set(): 1016 if err_event.is_set():