summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--command.py50
-rw-r--r--subcmds/sync.py171
2 files changed, 134 insertions, 87 deletions
diff --git a/command.py b/command.py
index fa48264b..2a2ce138 100644
--- a/command.py
+++ b/command.py
@@ -12,6 +12,7 @@
12# See the License for the specific language governing permissions and 12# See the License for the specific language governing permissions and
13# limitations under the License. 13# limitations under the License.
14 14
15import contextlib
15import multiprocessing 16import multiprocessing
16import optparse 17import optparse
17import os 18import os
@@ -70,6 +71,14 @@ class Command:
70 # migrated subcommands can set it to False. 71 # migrated subcommands can set it to False.
71 MULTI_MANIFEST_SUPPORT = True 72 MULTI_MANIFEST_SUPPORT = True
72 73
74 # Shared data across parallel execution workers.
75 _parallel_context = None
76
77 @classmethod
78 def get_parallel_context(cls):
79 assert cls._parallel_context is not None
80 return cls._parallel_context
81
73 def __init__( 82 def __init__(
74 self, 83 self,
75 repodir=None, 84 repodir=None,
@@ -242,9 +251,36 @@ class Command:
242 """Perform the action, after option parsing is complete.""" 251 """Perform the action, after option parsing is complete."""
243 raise NotImplementedError 252 raise NotImplementedError
244 253
245 @staticmethod 254 @classmethod
255 @contextlib.contextmanager
256 def ParallelContext(cls):
257 """Obtains the context, which is shared to ExecuteInParallel workers.
258
259 Callers can store data in the context dict before invocation of
260 ExecuteInParallel. The dict will then be shared to child workers of
261 ExecuteInParallel.
262 """
263 assert cls._parallel_context is None
264 cls._parallel_context = {}
265 try:
266 yield
267 finally:
268 cls._parallel_context = None
269
270 @classmethod
271 def _SetParallelContext(cls, context):
272 cls._parallel_context = context
273
274 @classmethod
246 def ExecuteInParallel( 275 def ExecuteInParallel(
247 jobs, func, inputs, callback, output=None, ordered=False 276 cls,
277 jobs,
278 func,
279 inputs,
280 callback,
281 output=None,
282 ordered=False,
283 chunksize=WORKER_BATCH_SIZE,
248 ): 284 ):
249 """Helper for managing parallel execution boiler plate. 285 """Helper for managing parallel execution boiler plate.
250 286
@@ -269,6 +305,8 @@ class Command:
269 output: An output manager. May be progress.Progess or 305 output: An output manager. May be progress.Progess or
270 color.Coloring. 306 color.Coloring.
271 ordered: Whether the jobs should be processed in order. 307 ordered: Whether the jobs should be processed in order.
308 chunksize: The number of jobs processed in batch by parallel
309 workers.
272 310
273 Returns: 311 Returns:
274 The |callback| function's results are returned. 312 The |callback| function's results are returned.
@@ -278,12 +316,16 @@ class Command:
278 if len(inputs) == 1 or jobs == 1: 316 if len(inputs) == 1 or jobs == 1:
279 return callback(None, output, (func(x) for x in inputs)) 317 return callback(None, output, (func(x) for x in inputs))
280 else: 318 else:
281 with multiprocessing.Pool(jobs) as pool: 319 with multiprocessing.Pool(
320 jobs,
321 initializer=cls._SetParallelContext,
322 initargs=(cls._parallel_context,),
323 ) as pool:
282 submit = pool.imap if ordered else pool.imap_unordered 324 submit = pool.imap if ordered else pool.imap_unordered
283 return callback( 325 return callback(
284 pool, 326 pool,
285 output, 327 output,
286 submit(func, inputs, chunksize=WORKER_BATCH_SIZE), 328 submit(func, inputs, chunksize=chunksize),
287 ) 329 )
288 finally: 330 finally:
289 if isinstance(output, progress.Progress): 331 if isinstance(output, progress.Progress):
diff --git a/subcmds/sync.py b/subcmds/sync.py
index bebe18b9..00fee776 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -141,7 +141,7 @@ class _FetchOneResult(NamedTuple):
141 141
142 Attributes: 142 Attributes:
143 success (bool): True if successful. 143 success (bool): True if successful.
144 project (Project): The fetched project. 144 project_idx (int): The fetched project index.
145 start (float): The starting time.time(). 145 start (float): The starting time.time().
146 finish (float): The ending time.time(). 146 finish (float): The ending time.time().
147 remote_fetched (bool): True if the remote was actually queried. 147 remote_fetched (bool): True if the remote was actually queried.
@@ -149,7 +149,7 @@ class _FetchOneResult(NamedTuple):
149 149
150 success: bool 150 success: bool
151 errors: List[Exception] 151 errors: List[Exception]
152 project: Project 152 project_idx: int
153 start: float 153 start: float
154 finish: float 154 finish: float
155 remote_fetched: bool 155 remote_fetched: bool
@@ -182,14 +182,14 @@ class _CheckoutOneResult(NamedTuple):
182 182
183 Attributes: 183 Attributes:
184 success (bool): True if successful. 184 success (bool): True if successful.
185 project (Project): The project. 185 project_idx (int): The project index.
186 start (float): The starting time.time(). 186 start (float): The starting time.time().
187 finish (float): The ending time.time(). 187 finish (float): The ending time.time().
188 """ 188 """
189 189
190 success: bool 190 success: bool
191 errors: List[Exception] 191 errors: List[Exception]
192 project: Project 192 project_idx: int
193 start: float 193 start: float
194 finish: float 194 finish: float
195 195
@@ -592,7 +592,8 @@ later is required to fix a server side protocol bug.
592 branch = branch[len(R_HEADS) :] 592 branch = branch[len(R_HEADS) :]
593 return branch 593 return branch
594 594
595 def _GetCurrentBranchOnly(self, opt, manifest): 595 @classmethod
596 def _GetCurrentBranchOnly(cls, opt, manifest):
596 """Returns whether current-branch or use-superproject options are 597 """Returns whether current-branch or use-superproject options are
597 enabled. 598 enabled.
598 599
@@ -710,7 +711,8 @@ later is required to fix a server side protocol bug.
710 if need_unload: 711 if need_unload:
711 m.outer_client.manifest.Unload() 712 m.outer_client.manifest.Unload()
712 713
713 def _FetchProjectList(self, opt, projects): 714 @classmethod
715 def _FetchProjectList(cls, opt, projects):
714 """Main function of the fetch worker. 716 """Main function of the fetch worker.
715 717
716 The projects we're given share the same underlying git object store, so 718 The projects we're given share the same underlying git object store, so
@@ -722,21 +724,23 @@ later is required to fix a server side protocol bug.
722 opt: Program options returned from optparse. See _Options(). 724 opt: Program options returned from optparse. See _Options().
723 projects: Projects to fetch. 725 projects: Projects to fetch.
724 """ 726 """
725 return [self._FetchOne(opt, x) for x in projects] 727 return [cls._FetchOne(opt, x) for x in projects]
726 728
727 def _FetchOne(self, opt, project): 729 @classmethod
730 def _FetchOne(cls, opt, project_idx):
728 """Fetch git objects for a single project. 731 """Fetch git objects for a single project.
729 732
730 Args: 733 Args:
731 opt: Program options returned from optparse. See _Options(). 734 opt: Program options returned from optparse. See _Options().
732 project: Project object for the project to fetch. 735 project_idx: Project index for the project to fetch.
733 736
734 Returns: 737 Returns:
735 Whether the fetch was successful. 738 Whether the fetch was successful.
736 """ 739 """
740 project = cls.get_parallel_context()["projects"][project_idx]
737 start = time.time() 741 start = time.time()
738 k = f"{project.name} @ {project.relpath}" 742 k = f"{project.name} @ {project.relpath}"
739 self._sync_dict[k] = start 743 cls.get_parallel_context()["sync_dict"][k] = start
740 success = False 744 success = False
741 remote_fetched = False 745 remote_fetched = False
742 errors = [] 746 errors = []
@@ -746,7 +750,7 @@ later is required to fix a server side protocol bug.
746 quiet=opt.quiet, 750 quiet=opt.quiet,
747 verbose=opt.verbose, 751 verbose=opt.verbose,
748 output_redir=buf, 752 output_redir=buf,
749 current_branch_only=self._GetCurrentBranchOnly( 753 current_branch_only=cls._GetCurrentBranchOnly(
750 opt, project.manifest 754 opt, project.manifest
751 ), 755 ),
752 force_sync=opt.force_sync, 756 force_sync=opt.force_sync,
@@ -756,7 +760,7 @@ later is required to fix a server side protocol bug.
756 optimized_fetch=opt.optimized_fetch, 760 optimized_fetch=opt.optimized_fetch,
757 retry_fetches=opt.retry_fetches, 761 retry_fetches=opt.retry_fetches,
758 prune=opt.prune, 762 prune=opt.prune,
759 ssh_proxy=self.ssh_proxy, 763 ssh_proxy=cls.get_parallel_context()["ssh_proxy"],
760 clone_filter=project.manifest.CloneFilter, 764 clone_filter=project.manifest.CloneFilter,
761 partial_clone_exclude=project.manifest.PartialCloneExclude, 765 partial_clone_exclude=project.manifest.PartialCloneExclude,
762 clone_filter_for_depth=project.manifest.CloneFilterForDepth, 766 clone_filter_for_depth=project.manifest.CloneFilterForDepth,
@@ -788,24 +792,20 @@ later is required to fix a server side protocol bug.
788 type(e).__name__, 792 type(e).__name__,
789 e, 793 e,
790 ) 794 )
791 del self._sync_dict[k]
792 errors.append(e) 795 errors.append(e)
793 raise 796 raise
797 finally:
798 del cls.get_parallel_context()["sync_dict"][k]
794 799
795 finish = time.time() 800 finish = time.time()
796 del self._sync_dict[k]
797 return _FetchOneResult( 801 return _FetchOneResult(
798 success, errors, project, start, finish, remote_fetched 802 success, errors, project_idx, start, finish, remote_fetched
799 ) 803 )
800 804
801 @classmethod
802 def _FetchInitChild(cls, ssh_proxy):
803 cls.ssh_proxy = ssh_proxy
804
805 def _GetSyncProgressMessage(self): 805 def _GetSyncProgressMessage(self):
806 earliest_time = float("inf") 806 earliest_time = float("inf")
807 earliest_proj = None 807 earliest_proj = None
808 items = self._sync_dict.items() 808 items = self.get_parallel_context()["sync_dict"].items()
809 for project, t in items: 809 for project, t in items:
810 if t < earliest_time: 810 if t < earliest_time:
811 earliest_time = t 811 earliest_time = t
@@ -813,7 +813,7 @@ later is required to fix a server side protocol bug.
813 813
814 if not earliest_proj: 814 if not earliest_proj:
815 # This function is called when sync is still running but in some 815 # This function is called when sync is still running but in some
816 # cases (by chance), _sync_dict can contain no entries. Return some 816 # cases (by chance), sync_dict can contain no entries. Return some
817 # text to indicate that sync is still working. 817 # text to indicate that sync is still working.
818 return "..working.." 818 return "..working.."
819 819
@@ -835,7 +835,6 @@ later is required to fix a server side protocol bug.
835 elide=True, 835 elide=True,
836 ) 836 )
837 837
838 self._sync_dict = multiprocessing.Manager().dict()
839 sync_event = _threading.Event() 838 sync_event = _threading.Event()
840 839
841 def _MonitorSyncLoop(): 840 def _MonitorSyncLoop():
@@ -846,21 +845,13 @@ later is required to fix a server side protocol bug.
846 845
847 sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop) 846 sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop)
848 sync_progress_thread.daemon = True 847 sync_progress_thread.daemon = True
849 sync_progress_thread.start()
850
851 objdir_project_map = dict()
852 for project in projects:
853 objdir_project_map.setdefault(project.objdir, []).append(project)
854 projects_list = list(objdir_project_map.values())
855 848
856 jobs = min(opt.jobs_network, len(projects_list)) 849 def _ProcessResults(pool, pm, results_sets):
857
858 def _ProcessResults(results_sets):
859 ret = True 850 ret = True
860 for results in results_sets: 851 for results in results_sets:
861 for result in results: 852 for result in results:
862 success = result.success 853 success = result.success
863 project = result.project 854 project = projects[result.project_idx]
864 start = result.start 855 start = result.start
865 finish = result.finish 856 finish = result.finish
866 self._fetch_times.Set(project, finish - start) 857 self._fetch_times.Set(project, finish - start)
@@ -884,45 +875,49 @@ later is required to fix a server side protocol bug.
884 fetched.add(project.gitdir) 875 fetched.add(project.gitdir)
885 pm.update() 876 pm.update()
886 if not ret and opt.fail_fast: 877 if not ret and opt.fail_fast:
878 if pool:
879 pool.close()
887 break 880 break
888 return ret 881 return ret
889 882
890 # We pass the ssh proxy settings via the class. This allows 883 with self.ParallelContext():
891 # multiprocessing to pickle it up when spawning children. We can't pass 884 self.get_parallel_context()["projects"] = projects
892 # it as an argument to _FetchProjectList below as multiprocessing is 885 self.get_parallel_context()[
893 # unable to pickle those. 886 "sync_dict"
894 Sync.ssh_proxy = None 887 ] = multiprocessing.Manager().dict()
895 888
896 # NB: Multiprocessing is heavy, so don't spin it up for one job. 889 objdir_project_map = dict()
897 if jobs == 1: 890 for index, project in enumerate(projects):
898 self._FetchInitChild(ssh_proxy) 891 objdir_project_map.setdefault(project.objdir, []).append(index)
899 if not _ProcessResults( 892 projects_list = list(objdir_project_map.values())
900 self._FetchProjectList(opt, x) for x in projects_list 893
901 ): 894 jobs = min(opt.jobs_network, len(projects_list))
902 ret = False 895
903 else: 896 # We pass the ssh proxy settings via the class. This allows
897 # multiprocessing to pickle it up when spawning children. We can't
898 # pass it as an argument to _FetchProjectList below as
899 # multiprocessing is unable to pickle those.
900 self.get_parallel_context()["ssh_proxy"] = ssh_proxy
901
902 sync_progress_thread.start()
904 if not opt.quiet: 903 if not opt.quiet:
905 pm.update(inc=0, msg="warming up") 904 pm.update(inc=0, msg="warming up")
906 with multiprocessing.Pool( 905 try:
907 jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,) 906 ret = self.ExecuteInParallel(
908 ) as pool: 907 jobs,
909 results = pool.imap_unordered(
910 functools.partial(self._FetchProjectList, opt), 908 functools.partial(self._FetchProjectList, opt),
911 projects_list, 909 projects_list,
912 chunksize=_chunksize(len(projects_list), jobs), 910 callback=_ProcessResults,
911 output=pm,
912 # Use chunksize=1 to avoid the chance that some workers are
913 # idle while other workers still have more than one job in
914 # their chunk queue.
915 chunksize=1,
913 ) 916 )
914 if not _ProcessResults(results): 917 finally:
915 ret = False 918 sync_event.set()
916 pool.close() 919 sync_progress_thread.join()
917
918 # Cleanup the reference now that we're done with it, and we're going to
919 # release any resources it points to. If we don't, later
920 # multiprocessing usage (e.g. checkouts) will try to pickle and then
921 # crash.
922 del Sync.ssh_proxy
923 920
924 sync_event.set()
925 pm.end()
926 self._fetch_times.Save() 921 self._fetch_times.Save()
927 self._local_sync_state.Save() 922 self._local_sync_state.Save()
928 923
@@ -1008,14 +1003,15 @@ later is required to fix a server side protocol bug.
1008 1003
1009 return _FetchMainResult(all_projects) 1004 return _FetchMainResult(all_projects)
1010 1005
1006 @classmethod
1011 def _CheckoutOne( 1007 def _CheckoutOne(
1012 self, 1008 cls,
1013 detach_head, 1009 detach_head,
1014 force_sync, 1010 force_sync,
1015 force_checkout, 1011 force_checkout,
1016 force_rebase, 1012 force_rebase,
1017 verbose, 1013 verbose,
1018 project, 1014 project_idx,
1019 ): 1015 ):
1020 """Checkout work tree for one project 1016 """Checkout work tree for one project
1021 1017
@@ -1027,11 +1023,12 @@ later is required to fix a server side protocol bug.
1027 force_checkout: Force checking out of the repo content. 1023 force_checkout: Force checking out of the repo content.
1028 force_rebase: Force rebase. 1024 force_rebase: Force rebase.
1029 verbose: Whether to show verbose messages. 1025 verbose: Whether to show verbose messages.
1030 project: Project object for the project to checkout. 1026 project_idx: Project index for the project to checkout.
1031 1027
1032 Returns: 1028 Returns:
1033 Whether the fetch was successful. 1029 Whether the fetch was successful.
1034 """ 1030 """
1031 project = cls.get_parallel_context()["projects"][project_idx]
1035 start = time.time() 1032 start = time.time()
1036 syncbuf = SyncBuffer( 1033 syncbuf = SyncBuffer(
1037 project.manifest.manifestProject.config, detach_head=detach_head 1034 project.manifest.manifestProject.config, detach_head=detach_head
@@ -1065,7 +1062,7 @@ later is required to fix a server side protocol bug.
1065 if not success: 1062 if not success:
1066 logger.error("error: Cannot checkout %s", project.name) 1063 logger.error("error: Cannot checkout %s", project.name)
1067 finish = time.time() 1064 finish = time.time()
1068 return _CheckoutOneResult(success, errors, project, start, finish) 1065 return _CheckoutOneResult(success, errors, project_idx, start, finish)
1069 1066
1070 def _Checkout(self, all_projects, opt, err_results, checkout_errors): 1067 def _Checkout(self, all_projects, opt, err_results, checkout_errors):
1071 """Checkout projects listed in all_projects 1068 """Checkout projects listed in all_projects
@@ -1083,7 +1080,9 @@ later is required to fix a server side protocol bug.
1083 ret = True 1080 ret = True
1084 for result in results: 1081 for result in results:
1085 success = result.success 1082 success = result.success
1086 project = result.project 1083 project = self.get_parallel_context()["projects"][
1084 result.project_idx
1085 ]
1087 start = result.start 1086 start = result.start
1088 finish = result.finish 1087 finish = result.finish
1089 self.event_log.AddSync( 1088 self.event_log.AddSync(
@@ -1110,22 +1109,28 @@ later is required to fix a server side protocol bug.
1110 return ret 1109 return ret
1111 1110
1112 for projects in _SafeCheckoutOrder(all_projects): 1111 for projects in _SafeCheckoutOrder(all_projects):
1113 proc_res = self.ExecuteInParallel( 1112 with self.ParallelContext():
1114 opt.jobs_checkout, 1113 self.get_parallel_context()["projects"] = projects
1115 functools.partial( 1114 proc_res = self.ExecuteInParallel(
1116 self._CheckoutOne, 1115 opt.jobs_checkout,
1117 opt.detach_head, 1116 functools.partial(
1118 opt.force_sync, 1117 self._CheckoutOne,
1119 opt.force_checkout, 1118 opt.detach_head,
1120 opt.rebase, 1119 opt.force_sync,
1121 opt.verbose, 1120 opt.force_checkout,
1122 ), 1121 opt.rebase,
1123 projects, 1122 opt.verbose,
1124 callback=_ProcessResults, 1123 ),
1125 output=Progress( 1124 range(len(projects)),
1126 "Checking out", len(all_projects), quiet=opt.quiet 1125 callback=_ProcessResults,
1127 ), 1126 output=Progress(
1128 ) 1127 "Checking out", len(all_projects), quiet=opt.quiet
1128 ),
1129 # Use chunksize=1 to avoid the chance that some workers are
1130 # idle while other workers still have more than one job in
1131 # their chunk queue.
1132 chunksize=1,
1133 )
1129 1134
1130 self._local_sync_state.Save() 1135 self._local_sync_state.Save()
1131 return proc_res and not err_results 1136 return proc_res and not err_results