diff options
author | Mike Frysinger <vapier@google.com> | 2021-05-06 00:44:42 -0400 |
---|---|---|
committer | Mike Frysinger <vapier@google.com> | 2021-05-10 21:16:06 +0000 |
commit | 339f2df1ddd741070e340ec01d6882dd1eee617c (patch) | |
tree | d16fe7c87ba966a400d545bef5a49c460b75dc57 /subcmds/sync.py | |
parent | 19e409c81863878d5d313fdc40b3975b98602454 (diff) | |
download | git-repo-339f2df1ddd741070e340ec01d6882dd1eee617c.tar.gz |
ssh: rewrite proxy management for multiprocessing usagev2.15
We changed sync to use multiprocessing for parallel work. This broke
the ssh proxy code as it's all based on threads. Rewrite the logic to
be multiprocessing safe.
Now instead of the module acting as a stateful object, callers have to
instantiate a new ProxyManager class that holds all the state, an pass
that down to any users.
Bug: https://crbug.com/gerrit/12389
Change-Id: I4b1af116f7306b91e825d3c56fb4274c9b033562
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/305486
Tested-by: Mike Frysinger <vapier@google.com>
Reviewed-by: Chris Mcdonald <cjmcdonald@google.com>
Diffstat (limited to 'subcmds/sync.py')
-rw-r--r-- | subcmds/sync.py | 44 |
1 files changed, 32 insertions, 12 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py index 28568062..fb25c221 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
@@ -358,7 +358,7 @@ later is required to fix a server side protocol bug. | |||
358 | optimized_fetch=opt.optimized_fetch, | 358 | optimized_fetch=opt.optimized_fetch, |
359 | retry_fetches=opt.retry_fetches, | 359 | retry_fetches=opt.retry_fetches, |
360 | prune=opt.prune, | 360 | prune=opt.prune, |
361 | ssh_proxy=True, | 361 | ssh_proxy=self.ssh_proxy, |
362 | clone_filter=self.manifest.CloneFilter, | 362 | clone_filter=self.manifest.CloneFilter, |
363 | partial_clone_exclude=self.manifest.PartialCloneExclude) | 363 | partial_clone_exclude=self.manifest.PartialCloneExclude) |
364 | 364 | ||
@@ -380,7 +380,11 @@ later is required to fix a server side protocol bug. | |||
380 | finish = time.time() | 380 | finish = time.time() |
381 | return (success, project, start, finish) | 381 | return (success, project, start, finish) |
382 | 382 | ||
383 | def _Fetch(self, projects, opt, err_event): | 383 | @classmethod |
384 | def _FetchInitChild(cls, ssh_proxy): | ||
385 | cls.ssh_proxy = ssh_proxy | ||
386 | |||
387 | def _Fetch(self, projects, opt, err_event, ssh_proxy): | ||
384 | ret = True | 388 | ret = True |
385 | 389 | ||
386 | jobs = opt.jobs_network if opt.jobs_network else self.jobs | 390 | jobs = opt.jobs_network if opt.jobs_network else self.jobs |
@@ -410,8 +414,14 @@ later is required to fix a server side protocol bug. | |||
410 | break | 414 | break |
411 | return ret | 415 | return ret |
412 | 416 | ||
417 | # We pass the ssh proxy settings via the class. This allows multiprocessing | ||
418 | # to pickle it up when spawning children. We can't pass it as an argument | ||
419 | # to _FetchProjectList below as multiprocessing is unable to pickle those. | ||
420 | Sync.ssh_proxy = None | ||
421 | |||
413 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 422 | # NB: Multiprocessing is heavy, so don't spin it up for one job. |
414 | if len(projects_list) == 1 or jobs == 1: | 423 | if len(projects_list) == 1 or jobs == 1: |
424 | self._FetchInitChild(ssh_proxy) | ||
415 | if not _ProcessResults(self._FetchProjectList(opt, x) for x in projects_list): | 425 | if not _ProcessResults(self._FetchProjectList(opt, x) for x in projects_list): |
416 | ret = False | 426 | ret = False |
417 | else: | 427 | else: |
@@ -429,7 +439,8 @@ later is required to fix a server side protocol bug. | |||
429 | else: | 439 | else: |
430 | pm.update(inc=0, msg='warming up') | 440 | pm.update(inc=0, msg='warming up') |
431 | chunksize = 4 | 441 | chunksize = 4 |
432 | with multiprocessing.Pool(jobs) as pool: | 442 | with multiprocessing.Pool( |
443 | jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,)) as pool: | ||
433 | results = pool.imap_unordered( | 444 | results = pool.imap_unordered( |
434 | functools.partial(self._FetchProjectList, opt), | 445 | functools.partial(self._FetchProjectList, opt), |
435 | projects_list, | 446 | projects_list, |
@@ -438,6 +449,11 @@ later is required to fix a server side protocol bug. | |||
438 | ret = False | 449 | ret = False |
439 | pool.close() | 450 | pool.close() |
440 | 451 | ||
452 | # Cleanup the reference now that we're done with it, and we're going to | ||
453 | # release any resources it points to. If we don't, later multiprocessing | ||
454 | # usage (e.g. checkouts) will try to pickle and then crash. | ||
455 | del Sync.ssh_proxy | ||
456 | |||
441 | pm.end() | 457 | pm.end() |
442 | self._fetch_times.Save() | 458 | self._fetch_times.Save() |
443 | 459 | ||
@@ -447,7 +463,7 @@ later is required to fix a server side protocol bug. | |||
447 | return (ret, fetched) | 463 | return (ret, fetched) |
448 | 464 | ||
449 | def _FetchMain(self, opt, args, all_projects, err_event, manifest_name, | 465 | def _FetchMain(self, opt, args, all_projects, err_event, manifest_name, |
450 | load_local_manifests): | 466 | load_local_manifests, ssh_proxy): |
451 | """The main network fetch loop. | 467 | """The main network fetch loop. |
452 | 468 | ||
453 | Args: | 469 | Args: |
@@ -457,6 +473,7 @@ later is required to fix a server side protocol bug. | |||
457 | err_event: Whether an error was hit while processing. | 473 | err_event: Whether an error was hit while processing. |
458 | manifest_name: Manifest file to be reloaded. | 474 | manifest_name: Manifest file to be reloaded. |
459 | load_local_manifests: Whether to load local manifests. | 475 | load_local_manifests: Whether to load local manifests. |
476 | ssh_proxy: SSH manager for clients & masters. | ||
460 | """ | 477 | """ |
461 | rp = self.manifest.repoProject | 478 | rp = self.manifest.repoProject |
462 | 479 | ||
@@ -467,7 +484,7 @@ later is required to fix a server side protocol bug. | |||
467 | to_fetch.extend(all_projects) | 484 | to_fetch.extend(all_projects) |
468 | to_fetch.sort(key=self._fetch_times.Get, reverse=True) | 485 | to_fetch.sort(key=self._fetch_times.Get, reverse=True) |
469 | 486 | ||
470 | success, fetched = self._Fetch(to_fetch, opt, err_event) | 487 | success, fetched = self._Fetch(to_fetch, opt, err_event, ssh_proxy) |
471 | if not success: | 488 | if not success: |
472 | err_event.set() | 489 | err_event.set() |
473 | 490 | ||
@@ -498,7 +515,7 @@ later is required to fix a server side protocol bug. | |||
498 | if previously_missing_set == missing_set: | 515 | if previously_missing_set == missing_set: |
499 | break | 516 | break |
500 | previously_missing_set = missing_set | 517 | previously_missing_set = missing_set |
501 | success, new_fetched = self._Fetch(missing, opt, err_event) | 518 | success, new_fetched = self._Fetch(missing, opt, err_event, ssh_proxy) |
502 | if not success: | 519 | if not success: |
503 | err_event.set() | 520 | err_event.set() |
504 | fetched.update(new_fetched) | 521 | fetched.update(new_fetched) |
@@ -985,12 +1002,15 @@ later is required to fix a server side protocol bug. | |||
985 | 1002 | ||
986 | self._fetch_times = _FetchTimes(self.manifest) | 1003 | self._fetch_times = _FetchTimes(self.manifest) |
987 | if not opt.local_only: | 1004 | if not opt.local_only: |
988 | try: | 1005 | with multiprocessing.Manager() as manager: |
989 | ssh.init() | 1006 | with ssh.ProxyManager(manager) as ssh_proxy: |
990 | self._FetchMain(opt, args, all_projects, err_event, manifest_name, | 1007 | # Initialize the socket dir once in the parent. |
991 | load_local_manifests) | 1008 | ssh_proxy.sock() |
992 | finally: | 1009 | self._FetchMain(opt, args, all_projects, err_event, manifest_name, |
993 | ssh.close() | 1010 | load_local_manifests, ssh_proxy) |
1011 | |||
1012 | if opt.network_only: | ||
1013 | return | ||
994 | 1014 | ||
995 | # If we saw an error, exit with code 1 so that other scripts can check. | 1015 | # If we saw an error, exit with code 1 so that other scripts can check. |
996 | if err_event.is_set(): | 1016 | if err_event.is_set(): |