summaryrefslogtreecommitdiffstats
path: root/subcmds
diff options
context:
space:
mode:
authorKuang-che Wu <kcwu@google.com>2024-10-18 23:32:08 +0800
committerLUCI <gerrit-scoped@luci-project-accounts.iam.gserviceaccount.com>2024-10-23 02:58:45 +0000
commit39ffd9977e2f6cb1ca1757e59173fc93e0eab72c (patch)
tree4b2f5f39ce1a20ef06c336cfb87a4269cedfd8d7 /subcmds
parent584863fb5e3e17ab364de40f80b10ac030b47788 (diff)
downloadgit-repo-39ffd9977e2f6cb1ca1757e59173fc93e0eab72c.tar.gz
sync: reduce multiprocessing serialization overhead
Background: - Manifest object is large (for projects like Android) in terms of serialization cost and size (more than 1mb). - Lots of Project objects usually share only a few manifest objects. Before this CL, Project objects were passed to workers via function parameters. Function parameters are pickled separately (in chunk). In other words, manifests are serialized again and again. The major serialization overhead of repo sync was O(manifest_size * projects / chunksize) This CL uses following tricks to reduce serialization overhead. - All projects are pickled in one invocation. Because Project objects share manifests, pickle library remembers which objects are already seen and avoid the serialization cost. - Pass the Project objects to workers at worker intialization time. And pass project index as function parameters instead. The number of workers is much smaller than the number of projects. - Worker init state are shared on Linux (fork based). So it requires zero serialization for Project objects. On Linux (fork based), the serialization overhead is O(projects) --- one int per project On Windows (spawn based), the serialization overhead is O(manifest_size * min(workers, projects)) Moreover, use chunksize=1 to avoid the chance that some workers are idle while other workers still have more than one job in their chunk queue. Using 2.7k projects as the baseline, originally "repo sync" no-op sync takes 31s for fetch and 25s for checkout on my Linux workstation. With this CL, it takes 12s for fetch and 1s for checkout. Bug: b/371638995 Change-Id: Ifa22072ea54eacb4a5c525c050d84de371e87caa Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/439921 Tested-by: Kuang-che Wu <kcwu@google.com> Reviewed-by: Josip Sokcevic <sokcevic@google.com> Commit-Queue: Kuang-che Wu <kcwu@google.com>
Diffstat (limited to 'subcmds')
-rw-r--r--subcmds/sync.py171
1 files changed, 88 insertions, 83 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py
index bebe18b9..00fee776 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -141,7 +141,7 @@ class _FetchOneResult(NamedTuple):
141 141
142 Attributes: 142 Attributes:
143 success (bool): True if successful. 143 success (bool): True if successful.
144 project (Project): The fetched project. 144 project_idx (int): The fetched project index.
145 start (float): The starting time.time(). 145 start (float): The starting time.time().
146 finish (float): The ending time.time(). 146 finish (float): The ending time.time().
147 remote_fetched (bool): True if the remote was actually queried. 147 remote_fetched (bool): True if the remote was actually queried.
@@ -149,7 +149,7 @@ class _FetchOneResult(NamedTuple):
149 149
150 success: bool 150 success: bool
151 errors: List[Exception] 151 errors: List[Exception]
152 project: Project 152 project_idx: int
153 start: float 153 start: float
154 finish: float 154 finish: float
155 remote_fetched: bool 155 remote_fetched: bool
@@ -182,14 +182,14 @@ class _CheckoutOneResult(NamedTuple):
182 182
183 Attributes: 183 Attributes:
184 success (bool): True if successful. 184 success (bool): True if successful.
185 project (Project): The project. 185 project_idx (int): The project index.
186 start (float): The starting time.time(). 186 start (float): The starting time.time().
187 finish (float): The ending time.time(). 187 finish (float): The ending time.time().
188 """ 188 """
189 189
190 success: bool 190 success: bool
191 errors: List[Exception] 191 errors: List[Exception]
192 project: Project 192 project_idx: int
193 start: float 193 start: float
194 finish: float 194 finish: float
195 195
@@ -592,7 +592,8 @@ later is required to fix a server side protocol bug.
592 branch = branch[len(R_HEADS) :] 592 branch = branch[len(R_HEADS) :]
593 return branch 593 return branch
594 594
595 def _GetCurrentBranchOnly(self, opt, manifest): 595 @classmethod
596 def _GetCurrentBranchOnly(cls, opt, manifest):
596 """Returns whether current-branch or use-superproject options are 597 """Returns whether current-branch or use-superproject options are
597 enabled. 598 enabled.
598 599
@@ -710,7 +711,8 @@ later is required to fix a server side protocol bug.
710 if need_unload: 711 if need_unload:
711 m.outer_client.manifest.Unload() 712 m.outer_client.manifest.Unload()
712 713
713 def _FetchProjectList(self, opt, projects): 714 @classmethod
715 def _FetchProjectList(cls, opt, projects):
714 """Main function of the fetch worker. 716 """Main function of the fetch worker.
715 717
716 The projects we're given share the same underlying git object store, so 718 The projects we're given share the same underlying git object store, so
@@ -722,21 +724,23 @@ later is required to fix a server side protocol bug.
722 opt: Program options returned from optparse. See _Options(). 724 opt: Program options returned from optparse. See _Options().
723 projects: Projects to fetch. 725 projects: Projects to fetch.
724 """ 726 """
725 return [self._FetchOne(opt, x) for x in projects] 727 return [cls._FetchOne(opt, x) for x in projects]
726 728
727 def _FetchOne(self, opt, project): 729 @classmethod
730 def _FetchOne(cls, opt, project_idx):
728 """Fetch git objects for a single project. 731 """Fetch git objects for a single project.
729 732
730 Args: 733 Args:
731 opt: Program options returned from optparse. See _Options(). 734 opt: Program options returned from optparse. See _Options().
732 project: Project object for the project to fetch. 735 project_idx: Project index for the project to fetch.
733 736
734 Returns: 737 Returns:
735 Whether the fetch was successful. 738 Whether the fetch was successful.
736 """ 739 """
740 project = cls.get_parallel_context()["projects"][project_idx]
737 start = time.time() 741 start = time.time()
738 k = f"{project.name} @ {project.relpath}" 742 k = f"{project.name} @ {project.relpath}"
739 self._sync_dict[k] = start 743 cls.get_parallel_context()["sync_dict"][k] = start
740 success = False 744 success = False
741 remote_fetched = False 745 remote_fetched = False
742 errors = [] 746 errors = []
@@ -746,7 +750,7 @@ later is required to fix a server side protocol bug.
746 quiet=opt.quiet, 750 quiet=opt.quiet,
747 verbose=opt.verbose, 751 verbose=opt.verbose,
748 output_redir=buf, 752 output_redir=buf,
749 current_branch_only=self._GetCurrentBranchOnly( 753 current_branch_only=cls._GetCurrentBranchOnly(
750 opt, project.manifest 754 opt, project.manifest
751 ), 755 ),
752 force_sync=opt.force_sync, 756 force_sync=opt.force_sync,
@@ -756,7 +760,7 @@ later is required to fix a server side protocol bug.
756 optimized_fetch=opt.optimized_fetch, 760 optimized_fetch=opt.optimized_fetch,
757 retry_fetches=opt.retry_fetches, 761 retry_fetches=opt.retry_fetches,
758 prune=opt.prune, 762 prune=opt.prune,
759 ssh_proxy=self.ssh_proxy, 763 ssh_proxy=cls.get_parallel_context()["ssh_proxy"],
760 clone_filter=project.manifest.CloneFilter, 764 clone_filter=project.manifest.CloneFilter,
761 partial_clone_exclude=project.manifest.PartialCloneExclude, 765 partial_clone_exclude=project.manifest.PartialCloneExclude,
762 clone_filter_for_depth=project.manifest.CloneFilterForDepth, 766 clone_filter_for_depth=project.manifest.CloneFilterForDepth,
@@ -788,24 +792,20 @@ later is required to fix a server side protocol bug.
788 type(e).__name__, 792 type(e).__name__,
789 e, 793 e,
790 ) 794 )
791 del self._sync_dict[k]
792 errors.append(e) 795 errors.append(e)
793 raise 796 raise
797 finally:
798 del cls.get_parallel_context()["sync_dict"][k]
794 799
795 finish = time.time() 800 finish = time.time()
796 del self._sync_dict[k]
797 return _FetchOneResult( 801 return _FetchOneResult(
798 success, errors, project, start, finish, remote_fetched 802 success, errors, project_idx, start, finish, remote_fetched
799 ) 803 )
800 804
801 @classmethod
802 def _FetchInitChild(cls, ssh_proxy):
803 cls.ssh_proxy = ssh_proxy
804
805 def _GetSyncProgressMessage(self): 805 def _GetSyncProgressMessage(self):
806 earliest_time = float("inf") 806 earliest_time = float("inf")
807 earliest_proj = None 807 earliest_proj = None
808 items = self._sync_dict.items() 808 items = self.get_parallel_context()["sync_dict"].items()
809 for project, t in items: 809 for project, t in items:
810 if t < earliest_time: 810 if t < earliest_time:
811 earliest_time = t 811 earliest_time = t
@@ -813,7 +813,7 @@ later is required to fix a server side protocol bug.
813 813
814 if not earliest_proj: 814 if not earliest_proj:
815 # This function is called when sync is still running but in some 815 # This function is called when sync is still running but in some
816 # cases (by chance), _sync_dict can contain no entries. Return some 816 # cases (by chance), sync_dict can contain no entries. Return some
817 # text to indicate that sync is still working. 817 # text to indicate that sync is still working.
818 return "..working.." 818 return "..working.."
819 819
@@ -835,7 +835,6 @@ later is required to fix a server side protocol bug.
835 elide=True, 835 elide=True,
836 ) 836 )
837 837
838 self._sync_dict = multiprocessing.Manager().dict()
839 sync_event = _threading.Event() 838 sync_event = _threading.Event()
840 839
841 def _MonitorSyncLoop(): 840 def _MonitorSyncLoop():
@@ -846,21 +845,13 @@ later is required to fix a server side protocol bug.
846 845
847 sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop) 846 sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop)
848 sync_progress_thread.daemon = True 847 sync_progress_thread.daemon = True
849 sync_progress_thread.start()
850
851 objdir_project_map = dict()
852 for project in projects:
853 objdir_project_map.setdefault(project.objdir, []).append(project)
854 projects_list = list(objdir_project_map.values())
855 848
856 jobs = min(opt.jobs_network, len(projects_list)) 849 def _ProcessResults(pool, pm, results_sets):
857
858 def _ProcessResults(results_sets):
859 ret = True 850 ret = True
860 for results in results_sets: 851 for results in results_sets:
861 for result in results: 852 for result in results:
862 success = result.success 853 success = result.success
863 project = result.project 854 project = projects[result.project_idx]
864 start = result.start 855 start = result.start
865 finish = result.finish 856 finish = result.finish
866 self._fetch_times.Set(project, finish - start) 857 self._fetch_times.Set(project, finish - start)
@@ -884,45 +875,49 @@ later is required to fix a server side protocol bug.
884 fetched.add(project.gitdir) 875 fetched.add(project.gitdir)
885 pm.update() 876 pm.update()
886 if not ret and opt.fail_fast: 877 if not ret and opt.fail_fast:
878 if pool:
879 pool.close()
887 break 880 break
888 return ret 881 return ret
889 882
890 # We pass the ssh proxy settings via the class. This allows 883 with self.ParallelContext():
891 # multiprocessing to pickle it up when spawning children. We can't pass 884 self.get_parallel_context()["projects"] = projects
892 # it as an argument to _FetchProjectList below as multiprocessing is 885 self.get_parallel_context()[
893 # unable to pickle those. 886 "sync_dict"
894 Sync.ssh_proxy = None 887 ] = multiprocessing.Manager().dict()
895 888
896 # NB: Multiprocessing is heavy, so don't spin it up for one job. 889 objdir_project_map = dict()
897 if jobs == 1: 890 for index, project in enumerate(projects):
898 self._FetchInitChild(ssh_proxy) 891 objdir_project_map.setdefault(project.objdir, []).append(index)
899 if not _ProcessResults( 892 projects_list = list(objdir_project_map.values())
900 self._FetchProjectList(opt, x) for x in projects_list 893
901 ): 894 jobs = min(opt.jobs_network, len(projects_list))
902 ret = False 895
903 else: 896 # We pass the ssh proxy settings via the class. This allows
897 # multiprocessing to pickle it up when spawning children. We can't
898 # pass it as an argument to _FetchProjectList below as
899 # multiprocessing is unable to pickle those.
900 self.get_parallel_context()["ssh_proxy"] = ssh_proxy
901
902 sync_progress_thread.start()
904 if not opt.quiet: 903 if not opt.quiet:
905 pm.update(inc=0, msg="warming up") 904 pm.update(inc=0, msg="warming up")
906 with multiprocessing.Pool( 905 try:
907 jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,) 906 ret = self.ExecuteInParallel(
908 ) as pool: 907 jobs,
909 results = pool.imap_unordered(
910 functools.partial(self._FetchProjectList, opt), 908 functools.partial(self._FetchProjectList, opt),
911 projects_list, 909 projects_list,
912 chunksize=_chunksize(len(projects_list), jobs), 910 callback=_ProcessResults,
911 output=pm,
912 # Use chunksize=1 to avoid the chance that some workers are
913 # idle while other workers still have more than one job in
914 # their chunk queue.
915 chunksize=1,
913 ) 916 )
914 if not _ProcessResults(results): 917 finally:
915 ret = False 918 sync_event.set()
916 pool.close() 919 sync_progress_thread.join()
917
918 # Cleanup the reference now that we're done with it, and we're going to
919 # release any resources it points to. If we don't, later
920 # multiprocessing usage (e.g. checkouts) will try to pickle and then
921 # crash.
922 del Sync.ssh_proxy
923 920
924 sync_event.set()
925 pm.end()
926 self._fetch_times.Save() 921 self._fetch_times.Save()
927 self._local_sync_state.Save() 922 self._local_sync_state.Save()
928 923
@@ -1008,14 +1003,15 @@ later is required to fix a server side protocol bug.
1008 1003
1009 return _FetchMainResult(all_projects) 1004 return _FetchMainResult(all_projects)
1010 1005
1006 @classmethod
1011 def _CheckoutOne( 1007 def _CheckoutOne(
1012 self, 1008 cls,
1013 detach_head, 1009 detach_head,
1014 force_sync, 1010 force_sync,
1015 force_checkout, 1011 force_checkout,
1016 force_rebase, 1012 force_rebase,
1017 verbose, 1013 verbose,
1018 project, 1014 project_idx,
1019 ): 1015 ):
1020 """Checkout work tree for one project 1016 """Checkout work tree for one project
1021 1017
@@ -1027,11 +1023,12 @@ later is required to fix a server side protocol bug.
1027 force_checkout: Force checking out of the repo content. 1023 force_checkout: Force checking out of the repo content.
1028 force_rebase: Force rebase. 1024 force_rebase: Force rebase.
1029 verbose: Whether to show verbose messages. 1025 verbose: Whether to show verbose messages.
1030 project: Project object for the project to checkout. 1026 project_idx: Project index for the project to checkout.
1031 1027
1032 Returns: 1028 Returns:
1033 Whether the fetch was successful. 1029 Whether the fetch was successful.
1034 """ 1030 """
1031 project = cls.get_parallel_context()["projects"][project_idx]
1035 start = time.time() 1032 start = time.time()
1036 syncbuf = SyncBuffer( 1033 syncbuf = SyncBuffer(
1037 project.manifest.manifestProject.config, detach_head=detach_head 1034 project.manifest.manifestProject.config, detach_head=detach_head
@@ -1065,7 +1062,7 @@ later is required to fix a server side protocol bug.
1065 if not success: 1062 if not success:
1066 logger.error("error: Cannot checkout %s", project.name) 1063 logger.error("error: Cannot checkout %s", project.name)
1067 finish = time.time() 1064 finish = time.time()
1068 return _CheckoutOneResult(success, errors, project, start, finish) 1065 return _CheckoutOneResult(success, errors, project_idx, start, finish)
1069 1066
1070 def _Checkout(self, all_projects, opt, err_results, checkout_errors): 1067 def _Checkout(self, all_projects, opt, err_results, checkout_errors):
1071 """Checkout projects listed in all_projects 1068 """Checkout projects listed in all_projects
@@ -1083,7 +1080,9 @@ later is required to fix a server side protocol bug.
1083 ret = True 1080 ret = True
1084 for result in results: 1081 for result in results:
1085 success = result.success 1082 success = result.success
1086 project = result.project 1083 project = self.get_parallel_context()["projects"][
1084 result.project_idx
1085 ]
1087 start = result.start 1086 start = result.start
1088 finish = result.finish 1087 finish = result.finish
1089 self.event_log.AddSync( 1088 self.event_log.AddSync(
@@ -1110,22 +1109,28 @@ later is required to fix a server side protocol bug.
1110 return ret 1109 return ret
1111 1110
1112 for projects in _SafeCheckoutOrder(all_projects): 1111 for projects in _SafeCheckoutOrder(all_projects):
1113 proc_res = self.ExecuteInParallel( 1112 with self.ParallelContext():
1114 opt.jobs_checkout, 1113 self.get_parallel_context()["projects"] = projects
1115 functools.partial( 1114 proc_res = self.ExecuteInParallel(
1116 self._CheckoutOne, 1115 opt.jobs_checkout,
1117 opt.detach_head, 1116 functools.partial(
1118 opt.force_sync, 1117 self._CheckoutOne,
1119 opt.force_checkout, 1118 opt.detach_head,
1120 opt.rebase, 1119 opt.force_sync,
1121 opt.verbose, 1120 opt.force_checkout,
1122 ), 1121 opt.rebase,
1123 projects, 1122 opt.verbose,
1124 callback=_ProcessResults, 1123 ),
1125 output=Progress( 1124 range(len(projects)),
1126 "Checking out", len(all_projects), quiet=opt.quiet 1125 callback=_ProcessResults,
1127 ), 1126 output=Progress(
1128 ) 1127 "Checking out", len(all_projects), quiet=opt.quiet
1128 ),
1129 # Use chunksize=1 to avoid the chance that some workers are
1130 # idle while other workers still have more than one job in
1131 # their chunk queue.
1132 chunksize=1,
1133 )
1129 1134
1130 self._local_sync_state.Save() 1135 self._local_sync_state.Save()
1131 return proc_res and not err_results 1136 return proc_res and not err_results