diff options
-rw-r--r-- | command.py | 50 | ||||
-rw-r--r-- | subcmds/sync.py | 171 |
2 files changed, 134 insertions, 87 deletions
@@ -12,6 +12,7 @@ | |||
12 | # See the License for the specific language governing permissions and | 12 | # See the License for the specific language governing permissions and |
13 | # limitations under the License. | 13 | # limitations under the License. |
14 | 14 | ||
15 | import contextlib | ||
15 | import multiprocessing | 16 | import multiprocessing |
16 | import optparse | 17 | import optparse |
17 | import os | 18 | import os |
@@ -70,6 +71,14 @@ class Command: | |||
70 | # migrated subcommands can set it to False. | 71 | # migrated subcommands can set it to False. |
71 | MULTI_MANIFEST_SUPPORT = True | 72 | MULTI_MANIFEST_SUPPORT = True |
72 | 73 | ||
74 | # Shared data across parallel execution workers. | ||
75 | _parallel_context = None | ||
76 | |||
77 | @classmethod | ||
78 | def get_parallel_context(cls): | ||
79 | assert cls._parallel_context is not None | ||
80 | return cls._parallel_context | ||
81 | |||
73 | def __init__( | 82 | def __init__( |
74 | self, | 83 | self, |
75 | repodir=None, | 84 | repodir=None, |
@@ -242,9 +251,36 @@ class Command: | |||
242 | """Perform the action, after option parsing is complete.""" | 251 | """Perform the action, after option parsing is complete.""" |
243 | raise NotImplementedError | 252 | raise NotImplementedError |
244 | 253 | ||
245 | @staticmethod | 254 | @classmethod |
255 | @contextlib.contextmanager | ||
256 | def ParallelContext(cls): | ||
257 | """Obtains the context, which is shared to ExecuteInParallel workers. | ||
258 | |||
259 | Callers can store data in the context dict before invocation of | ||
260 | ExecuteInParallel. The dict will then be shared to child workers of | ||
261 | ExecuteInParallel. | ||
262 | """ | ||
263 | assert cls._parallel_context is None | ||
264 | cls._parallel_context = {} | ||
265 | try: | ||
266 | yield | ||
267 | finally: | ||
268 | cls._parallel_context = None | ||
269 | |||
270 | @classmethod | ||
271 | def _SetParallelContext(cls, context): | ||
272 | cls._parallel_context = context | ||
273 | |||
274 | @classmethod | ||
246 | def ExecuteInParallel( | 275 | def ExecuteInParallel( |
247 | jobs, func, inputs, callback, output=None, ordered=False | 276 | cls, |
277 | jobs, | ||
278 | func, | ||
279 | inputs, | ||
280 | callback, | ||
281 | output=None, | ||
282 | ordered=False, | ||
283 | chunksize=WORKER_BATCH_SIZE, | ||
248 | ): | 284 | ): |
249 | """Helper for managing parallel execution boiler plate. | 285 | """Helper for managing parallel execution boiler plate. |
250 | 286 | ||
@@ -269,6 +305,8 @@ class Command: | |||
269 | output: An output manager. May be progress.Progess or | 305 | output: An output manager. May be progress.Progess or |
270 | color.Coloring. | 306 | color.Coloring. |
271 | ordered: Whether the jobs should be processed in order. | 307 | ordered: Whether the jobs should be processed in order. |
308 | chunksize: The number of jobs processed in batch by parallel | ||
309 | workers. | ||
272 | 310 | ||
273 | Returns: | 311 | Returns: |
274 | The |callback| function's results are returned. | 312 | The |callback| function's results are returned. |
@@ -278,12 +316,16 @@ class Command: | |||
278 | if len(inputs) == 1 or jobs == 1: | 316 | if len(inputs) == 1 or jobs == 1: |
279 | return callback(None, output, (func(x) for x in inputs)) | 317 | return callback(None, output, (func(x) for x in inputs)) |
280 | else: | 318 | else: |
281 | with multiprocessing.Pool(jobs) as pool: | 319 | with multiprocessing.Pool( |
320 | jobs, | ||
321 | initializer=cls._SetParallelContext, | ||
322 | initargs=(cls._parallel_context,), | ||
323 | ) as pool: | ||
282 | submit = pool.imap if ordered else pool.imap_unordered | 324 | submit = pool.imap if ordered else pool.imap_unordered |
283 | return callback( | 325 | return callback( |
284 | pool, | 326 | pool, |
285 | output, | 327 | output, |
286 | submit(func, inputs, chunksize=WORKER_BATCH_SIZE), | 328 | submit(func, inputs, chunksize=chunksize), |
287 | ) | 329 | ) |
288 | finally: | 330 | finally: |
289 | if isinstance(output, progress.Progress): | 331 | if isinstance(output, progress.Progress): |
diff --git a/subcmds/sync.py b/subcmds/sync.py index bebe18b9..00fee776 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
@@ -141,7 +141,7 @@ class _FetchOneResult(NamedTuple): | |||
141 | 141 | ||
142 | Attributes: | 142 | Attributes: |
143 | success (bool): True if successful. | 143 | success (bool): True if successful. |
144 | project (Project): The fetched project. | 144 | project_idx (int): The fetched project index. |
145 | start (float): The starting time.time(). | 145 | start (float): The starting time.time(). |
146 | finish (float): The ending time.time(). | 146 | finish (float): The ending time.time(). |
147 | remote_fetched (bool): True if the remote was actually queried. | 147 | remote_fetched (bool): True if the remote was actually queried. |
@@ -149,7 +149,7 @@ class _FetchOneResult(NamedTuple): | |||
149 | 149 | ||
150 | success: bool | 150 | success: bool |
151 | errors: List[Exception] | 151 | errors: List[Exception] |
152 | project: Project | 152 | project_idx: int |
153 | start: float | 153 | start: float |
154 | finish: float | 154 | finish: float |
155 | remote_fetched: bool | 155 | remote_fetched: bool |
@@ -182,14 +182,14 @@ class _CheckoutOneResult(NamedTuple): | |||
182 | 182 | ||
183 | Attributes: | 183 | Attributes: |
184 | success (bool): True if successful. | 184 | success (bool): True if successful. |
185 | project (Project): The project. | 185 | project_idx (int): The project index. |
186 | start (float): The starting time.time(). | 186 | start (float): The starting time.time(). |
187 | finish (float): The ending time.time(). | 187 | finish (float): The ending time.time(). |
188 | """ | 188 | """ |
189 | 189 | ||
190 | success: bool | 190 | success: bool |
191 | errors: List[Exception] | 191 | errors: List[Exception] |
192 | project: Project | 192 | project_idx: int |
193 | start: float | 193 | start: float |
194 | finish: float | 194 | finish: float |
195 | 195 | ||
@@ -592,7 +592,8 @@ later is required to fix a server side protocol bug. | |||
592 | branch = branch[len(R_HEADS) :] | 592 | branch = branch[len(R_HEADS) :] |
593 | return branch | 593 | return branch |
594 | 594 | ||
595 | def _GetCurrentBranchOnly(self, opt, manifest): | 595 | @classmethod |
596 | def _GetCurrentBranchOnly(cls, opt, manifest): | ||
596 | """Returns whether current-branch or use-superproject options are | 597 | """Returns whether current-branch or use-superproject options are |
597 | enabled. | 598 | enabled. |
598 | 599 | ||
@@ -710,7 +711,8 @@ later is required to fix a server side protocol bug. | |||
710 | if need_unload: | 711 | if need_unload: |
711 | m.outer_client.manifest.Unload() | 712 | m.outer_client.manifest.Unload() |
712 | 713 | ||
713 | def _FetchProjectList(self, opt, projects): | 714 | @classmethod |
715 | def _FetchProjectList(cls, opt, projects): | ||
714 | """Main function of the fetch worker. | 716 | """Main function of the fetch worker. |
715 | 717 | ||
716 | The projects we're given share the same underlying git object store, so | 718 | The projects we're given share the same underlying git object store, so |
@@ -722,21 +724,23 @@ later is required to fix a server side protocol bug. | |||
722 | opt: Program options returned from optparse. See _Options(). | 724 | opt: Program options returned from optparse. See _Options(). |
723 | projects: Projects to fetch. | 725 | projects: Projects to fetch. |
724 | """ | 726 | """ |
725 | return [self._FetchOne(opt, x) for x in projects] | 727 | return [cls._FetchOne(opt, x) for x in projects] |
726 | 728 | ||
727 | def _FetchOne(self, opt, project): | 729 | @classmethod |
730 | def _FetchOne(cls, opt, project_idx): | ||
728 | """Fetch git objects for a single project. | 731 | """Fetch git objects for a single project. |
729 | 732 | ||
730 | Args: | 733 | Args: |
731 | opt: Program options returned from optparse. See _Options(). | 734 | opt: Program options returned from optparse. See _Options(). |
732 | project: Project object for the project to fetch. | 735 | project_idx: Project index for the project to fetch. |
733 | 736 | ||
734 | Returns: | 737 | Returns: |
735 | Whether the fetch was successful. | 738 | Whether the fetch was successful. |
736 | """ | 739 | """ |
740 | project = cls.get_parallel_context()["projects"][project_idx] | ||
737 | start = time.time() | 741 | start = time.time() |
738 | k = f"{project.name} @ {project.relpath}" | 742 | k = f"{project.name} @ {project.relpath}" |
739 | self._sync_dict[k] = start | 743 | cls.get_parallel_context()["sync_dict"][k] = start |
740 | success = False | 744 | success = False |
741 | remote_fetched = False | 745 | remote_fetched = False |
742 | errors = [] | 746 | errors = [] |
@@ -746,7 +750,7 @@ later is required to fix a server side protocol bug. | |||
746 | quiet=opt.quiet, | 750 | quiet=opt.quiet, |
747 | verbose=opt.verbose, | 751 | verbose=opt.verbose, |
748 | output_redir=buf, | 752 | output_redir=buf, |
749 | current_branch_only=self._GetCurrentBranchOnly( | 753 | current_branch_only=cls._GetCurrentBranchOnly( |
750 | opt, project.manifest | 754 | opt, project.manifest |
751 | ), | 755 | ), |
752 | force_sync=opt.force_sync, | 756 | force_sync=opt.force_sync, |
@@ -756,7 +760,7 @@ later is required to fix a server side protocol bug. | |||
756 | optimized_fetch=opt.optimized_fetch, | 760 | optimized_fetch=opt.optimized_fetch, |
757 | retry_fetches=opt.retry_fetches, | 761 | retry_fetches=opt.retry_fetches, |
758 | prune=opt.prune, | 762 | prune=opt.prune, |
759 | ssh_proxy=self.ssh_proxy, | 763 | ssh_proxy=cls.get_parallel_context()["ssh_proxy"], |
760 | clone_filter=project.manifest.CloneFilter, | 764 | clone_filter=project.manifest.CloneFilter, |
761 | partial_clone_exclude=project.manifest.PartialCloneExclude, | 765 | partial_clone_exclude=project.manifest.PartialCloneExclude, |
762 | clone_filter_for_depth=project.manifest.CloneFilterForDepth, | 766 | clone_filter_for_depth=project.manifest.CloneFilterForDepth, |
@@ -788,24 +792,20 @@ later is required to fix a server side protocol bug. | |||
788 | type(e).__name__, | 792 | type(e).__name__, |
789 | e, | 793 | e, |
790 | ) | 794 | ) |
791 | del self._sync_dict[k] | ||
792 | errors.append(e) | 795 | errors.append(e) |
793 | raise | 796 | raise |
797 | finally: | ||
798 | del cls.get_parallel_context()["sync_dict"][k] | ||
794 | 799 | ||
795 | finish = time.time() | 800 | finish = time.time() |
796 | del self._sync_dict[k] | ||
797 | return _FetchOneResult( | 801 | return _FetchOneResult( |
798 | success, errors, project, start, finish, remote_fetched | 802 | success, errors, project_idx, start, finish, remote_fetched |
799 | ) | 803 | ) |
800 | 804 | ||
801 | @classmethod | ||
802 | def _FetchInitChild(cls, ssh_proxy): | ||
803 | cls.ssh_proxy = ssh_proxy | ||
804 | |||
805 | def _GetSyncProgressMessage(self): | 805 | def _GetSyncProgressMessage(self): |
806 | earliest_time = float("inf") | 806 | earliest_time = float("inf") |
807 | earliest_proj = None | 807 | earliest_proj = None |
808 | items = self._sync_dict.items() | 808 | items = self.get_parallel_context()["sync_dict"].items() |
809 | for project, t in items: | 809 | for project, t in items: |
810 | if t < earliest_time: | 810 | if t < earliest_time: |
811 | earliest_time = t | 811 | earliest_time = t |
@@ -813,7 +813,7 @@ later is required to fix a server side protocol bug. | |||
813 | 813 | ||
814 | if not earliest_proj: | 814 | if not earliest_proj: |
815 | # This function is called when sync is still running but in some | 815 | # This function is called when sync is still running but in some |
816 | # cases (by chance), _sync_dict can contain no entries. Return some | 816 | # cases (by chance), sync_dict can contain no entries. Return some |
817 | # text to indicate that sync is still working. | 817 | # text to indicate that sync is still working. |
818 | return "..working.." | 818 | return "..working.." |
819 | 819 | ||
@@ -835,7 +835,6 @@ later is required to fix a server side protocol bug. | |||
835 | elide=True, | 835 | elide=True, |
836 | ) | 836 | ) |
837 | 837 | ||
838 | self._sync_dict = multiprocessing.Manager().dict() | ||
839 | sync_event = _threading.Event() | 838 | sync_event = _threading.Event() |
840 | 839 | ||
841 | def _MonitorSyncLoop(): | 840 | def _MonitorSyncLoop(): |
@@ -846,21 +845,13 @@ later is required to fix a server side protocol bug. | |||
846 | 845 | ||
847 | sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop) | 846 | sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop) |
848 | sync_progress_thread.daemon = True | 847 | sync_progress_thread.daemon = True |
849 | sync_progress_thread.start() | ||
850 | |||
851 | objdir_project_map = dict() | ||
852 | for project in projects: | ||
853 | objdir_project_map.setdefault(project.objdir, []).append(project) | ||
854 | projects_list = list(objdir_project_map.values()) | ||
855 | 848 | ||
856 | jobs = min(opt.jobs_network, len(projects_list)) | 849 | def _ProcessResults(pool, pm, results_sets): |
857 | |||
858 | def _ProcessResults(results_sets): | ||
859 | ret = True | 850 | ret = True |
860 | for results in results_sets: | 851 | for results in results_sets: |
861 | for result in results: | 852 | for result in results: |
862 | success = result.success | 853 | success = result.success |
863 | project = result.project | 854 | project = projects[result.project_idx] |
864 | start = result.start | 855 | start = result.start |
865 | finish = result.finish | 856 | finish = result.finish |
866 | self._fetch_times.Set(project, finish - start) | 857 | self._fetch_times.Set(project, finish - start) |
@@ -884,45 +875,49 @@ later is required to fix a server side protocol bug. | |||
884 | fetched.add(project.gitdir) | 875 | fetched.add(project.gitdir) |
885 | pm.update() | 876 | pm.update() |
886 | if not ret and opt.fail_fast: | 877 | if not ret and opt.fail_fast: |
878 | if pool: | ||
879 | pool.close() | ||
887 | break | 880 | break |
888 | return ret | 881 | return ret |
889 | 882 | ||
890 | # We pass the ssh proxy settings via the class. This allows | 883 | with self.ParallelContext(): |
891 | # multiprocessing to pickle it up when spawning children. We can't pass | 884 | self.get_parallel_context()["projects"] = projects |
892 | # it as an argument to _FetchProjectList below as multiprocessing is | 885 | self.get_parallel_context()[ |
893 | # unable to pickle those. | 886 | "sync_dict" |
894 | Sync.ssh_proxy = None | 887 | ] = multiprocessing.Manager().dict() |
895 | 888 | ||
896 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 889 | objdir_project_map = dict() |
897 | if jobs == 1: | 890 | for index, project in enumerate(projects): |
898 | self._FetchInitChild(ssh_proxy) | 891 | objdir_project_map.setdefault(project.objdir, []).append(index) |
899 | if not _ProcessResults( | 892 | projects_list = list(objdir_project_map.values()) |
900 | self._FetchProjectList(opt, x) for x in projects_list | 893 | |
901 | ): | 894 | jobs = min(opt.jobs_network, len(projects_list)) |
902 | ret = False | 895 | |
903 | else: | 896 | # We pass the ssh proxy settings via the class. This allows |
897 | # multiprocessing to pickle it up when spawning children. We can't | ||
898 | # pass it as an argument to _FetchProjectList below as | ||
899 | # multiprocessing is unable to pickle those. | ||
900 | self.get_parallel_context()["ssh_proxy"] = ssh_proxy | ||
901 | |||
902 | sync_progress_thread.start() | ||
904 | if not opt.quiet: | 903 | if not opt.quiet: |
905 | pm.update(inc=0, msg="warming up") | 904 | pm.update(inc=0, msg="warming up") |
906 | with multiprocessing.Pool( | 905 | try: |
907 | jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,) | 906 | ret = self.ExecuteInParallel( |
908 | ) as pool: | 907 | jobs, |
909 | results = pool.imap_unordered( | ||
910 | functools.partial(self._FetchProjectList, opt), | 908 | functools.partial(self._FetchProjectList, opt), |
911 | projects_list, | 909 | projects_list, |
912 | chunksize=_chunksize(len(projects_list), jobs), | 910 | callback=_ProcessResults, |
911 | output=pm, | ||
912 | # Use chunksize=1 to avoid the chance that some workers are | ||
913 | # idle while other workers still have more than one job in | ||
914 | # their chunk queue. | ||
915 | chunksize=1, | ||
913 | ) | 916 | ) |
914 | if not _ProcessResults(results): | 917 | finally: |
915 | ret = False | 918 | sync_event.set() |
916 | pool.close() | 919 | sync_progress_thread.join() |
917 | |||
918 | # Cleanup the reference now that we're done with it, and we're going to | ||
919 | # release any resources it points to. If we don't, later | ||
920 | # multiprocessing usage (e.g. checkouts) will try to pickle and then | ||
921 | # crash. | ||
922 | del Sync.ssh_proxy | ||
923 | 920 | ||
924 | sync_event.set() | ||
925 | pm.end() | ||
926 | self._fetch_times.Save() | 921 | self._fetch_times.Save() |
927 | self._local_sync_state.Save() | 922 | self._local_sync_state.Save() |
928 | 923 | ||
@@ -1008,14 +1003,15 @@ later is required to fix a server side protocol bug. | |||
1008 | 1003 | ||
1009 | return _FetchMainResult(all_projects) | 1004 | return _FetchMainResult(all_projects) |
1010 | 1005 | ||
1006 | @classmethod | ||
1011 | def _CheckoutOne( | 1007 | def _CheckoutOne( |
1012 | self, | 1008 | cls, |
1013 | detach_head, | 1009 | detach_head, |
1014 | force_sync, | 1010 | force_sync, |
1015 | force_checkout, | 1011 | force_checkout, |
1016 | force_rebase, | 1012 | force_rebase, |
1017 | verbose, | 1013 | verbose, |
1018 | project, | 1014 | project_idx, |
1019 | ): | 1015 | ): |
1020 | """Checkout work tree for one project | 1016 | """Checkout work tree for one project |
1021 | 1017 | ||
@@ -1027,11 +1023,12 @@ later is required to fix a server side protocol bug. | |||
1027 | force_checkout: Force checking out of the repo content. | 1023 | force_checkout: Force checking out of the repo content. |
1028 | force_rebase: Force rebase. | 1024 | force_rebase: Force rebase. |
1029 | verbose: Whether to show verbose messages. | 1025 | verbose: Whether to show verbose messages. |
1030 | project: Project object for the project to checkout. | 1026 | project_idx: Project index for the project to checkout. |
1031 | 1027 | ||
1032 | Returns: | 1028 | Returns: |
1033 | Whether the fetch was successful. | 1029 | Whether the fetch was successful. |
1034 | """ | 1030 | """ |
1031 | project = cls.get_parallel_context()["projects"][project_idx] | ||
1035 | start = time.time() | 1032 | start = time.time() |
1036 | syncbuf = SyncBuffer( | 1033 | syncbuf = SyncBuffer( |
1037 | project.manifest.manifestProject.config, detach_head=detach_head | 1034 | project.manifest.manifestProject.config, detach_head=detach_head |
@@ -1065,7 +1062,7 @@ later is required to fix a server side protocol bug. | |||
1065 | if not success: | 1062 | if not success: |
1066 | logger.error("error: Cannot checkout %s", project.name) | 1063 | logger.error("error: Cannot checkout %s", project.name) |
1067 | finish = time.time() | 1064 | finish = time.time() |
1068 | return _CheckoutOneResult(success, errors, project, start, finish) | 1065 | return _CheckoutOneResult(success, errors, project_idx, start, finish) |
1069 | 1066 | ||
1070 | def _Checkout(self, all_projects, opt, err_results, checkout_errors): | 1067 | def _Checkout(self, all_projects, opt, err_results, checkout_errors): |
1071 | """Checkout projects listed in all_projects | 1068 | """Checkout projects listed in all_projects |
@@ -1083,7 +1080,9 @@ later is required to fix a server side protocol bug. | |||
1083 | ret = True | 1080 | ret = True |
1084 | for result in results: | 1081 | for result in results: |
1085 | success = result.success | 1082 | success = result.success |
1086 | project = result.project | 1083 | project = self.get_parallel_context()["projects"][ |
1084 | result.project_idx | ||
1085 | ] | ||
1087 | start = result.start | 1086 | start = result.start |
1088 | finish = result.finish | 1087 | finish = result.finish |
1089 | self.event_log.AddSync( | 1088 | self.event_log.AddSync( |
@@ -1110,22 +1109,28 @@ later is required to fix a server side protocol bug. | |||
1110 | return ret | 1109 | return ret |
1111 | 1110 | ||
1112 | for projects in _SafeCheckoutOrder(all_projects): | 1111 | for projects in _SafeCheckoutOrder(all_projects): |
1113 | proc_res = self.ExecuteInParallel( | 1112 | with self.ParallelContext(): |
1114 | opt.jobs_checkout, | 1113 | self.get_parallel_context()["projects"] = projects |
1115 | functools.partial( | 1114 | proc_res = self.ExecuteInParallel( |
1116 | self._CheckoutOne, | 1115 | opt.jobs_checkout, |
1117 | opt.detach_head, | 1116 | functools.partial( |
1118 | opt.force_sync, | 1117 | self._CheckoutOne, |
1119 | opt.force_checkout, | 1118 | opt.detach_head, |
1120 | opt.rebase, | 1119 | opt.force_sync, |
1121 | opt.verbose, | 1120 | opt.force_checkout, |
1122 | ), | 1121 | opt.rebase, |
1123 | projects, | 1122 | opt.verbose, |
1124 | callback=_ProcessResults, | 1123 | ), |
1125 | output=Progress( | 1124 | range(len(projects)), |
1126 | "Checking out", len(all_projects), quiet=opt.quiet | 1125 | callback=_ProcessResults, |
1127 | ), | 1126 | output=Progress( |
1128 | ) | 1127 | "Checking out", len(all_projects), quiet=opt.quiet |
1128 | ), | ||
1129 | # Use chunksize=1 to avoid the chance that some workers are | ||
1130 | # idle while other workers still have more than one job in | ||
1131 | # their chunk queue. | ||
1132 | chunksize=1, | ||
1133 | ) | ||
1129 | 1134 | ||
1130 | self._local_sync_state.Save() | 1135 | self._local_sync_state.Save() |
1131 | return proc_res and not err_results | 1136 | return proc_res and not err_results |