diff options
author | Kuang-che Wu <kcwu@google.com> | 2024-10-18 23:32:08 +0800 |
---|---|---|
committer | LUCI <gerrit-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2024-10-23 02:58:45 +0000 |
commit | 39ffd9977e2f6cb1ca1757e59173fc93e0eab72c (patch) | |
tree | 4b2f5f39ce1a20ef06c336cfb87a4269cedfd8d7 /command.py | |
parent | 584863fb5e3e17ab364de40f80b10ac030b47788 (diff) | |
download | git-repo-39ffd9977e2f6cb1ca1757e59173fc93e0eab72c.tar.gz |
sync: reduce multiprocessing serialization overhead
Background:
- Manifest object is large (for projects like Android) in terms of
serialization cost and size (more than 1mb).
- Lots of Project objects usually share only a few manifest objects.
Before this CL, Project objects were passed to workers via function
parameters. Function parameters are pickled separately (in chunk). In
other words, manifests are serialized again and again. The major
serialization overhead of repo sync was
O(manifest_size * projects / chunksize)
This CL uses following tricks to reduce serialization overhead.
- All projects are pickled in one invocation. Because Project objects
share manifests, pickle library remembers which objects are already
seen and avoid the serialization cost.
- Pass the Project objects to workers at worker intialization time.
And pass project index as function parameters instead. The number of
workers is much smaller than the number of projects.
- Worker init state are shared on Linux (fork based). So it requires
zero serialization for Project objects.
On Linux (fork based), the serialization overhead is
O(projects) --- one int per project
On Windows (spawn based), the serialization overhead is
O(manifest_size * min(workers, projects))
Moreover, use chunksize=1 to avoid the chance that some workers are idle
while other workers still have more than one job in their chunk queue.
Using 2.7k projects as the baseline, originally "repo sync" no-op
sync takes 31s for fetch and 25s for checkout on my Linux workstation.
With this CL, it takes 12s for fetch and 1s for checkout.
Bug: b/371638995
Change-Id: Ifa22072ea54eacb4a5c525c050d84de371e87caa
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/439921
Tested-by: Kuang-che Wu <kcwu@google.com>
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
Commit-Queue: Kuang-che Wu <kcwu@google.com>
Diffstat (limited to 'command.py')
-rw-r--r-- | command.py | 50 |
1 files changed, 46 insertions, 4 deletions
@@ -12,6 +12,7 @@ | |||
12 | # See the License for the specific language governing permissions and | 12 | # See the License for the specific language governing permissions and |
13 | # limitations under the License. | 13 | # limitations under the License. |
14 | 14 | ||
15 | import contextlib | ||
15 | import multiprocessing | 16 | import multiprocessing |
16 | import optparse | 17 | import optparse |
17 | import os | 18 | import os |
@@ -70,6 +71,14 @@ class Command: | |||
70 | # migrated subcommands can set it to False. | 71 | # migrated subcommands can set it to False. |
71 | MULTI_MANIFEST_SUPPORT = True | 72 | MULTI_MANIFEST_SUPPORT = True |
72 | 73 | ||
74 | # Shared data across parallel execution workers. | ||
75 | _parallel_context = None | ||
76 | |||
77 | @classmethod | ||
78 | def get_parallel_context(cls): | ||
79 | assert cls._parallel_context is not None | ||
80 | return cls._parallel_context | ||
81 | |||
73 | def __init__( | 82 | def __init__( |
74 | self, | 83 | self, |
75 | repodir=None, | 84 | repodir=None, |
@@ -242,9 +251,36 @@ class Command: | |||
242 | """Perform the action, after option parsing is complete.""" | 251 | """Perform the action, after option parsing is complete.""" |
243 | raise NotImplementedError | 252 | raise NotImplementedError |
244 | 253 | ||
245 | @staticmethod | 254 | @classmethod |
255 | @contextlib.contextmanager | ||
256 | def ParallelContext(cls): | ||
257 | """Obtains the context, which is shared to ExecuteInParallel workers. | ||
258 | |||
259 | Callers can store data in the context dict before invocation of | ||
260 | ExecuteInParallel. The dict will then be shared to child workers of | ||
261 | ExecuteInParallel. | ||
262 | """ | ||
263 | assert cls._parallel_context is None | ||
264 | cls._parallel_context = {} | ||
265 | try: | ||
266 | yield | ||
267 | finally: | ||
268 | cls._parallel_context = None | ||
269 | |||
270 | @classmethod | ||
271 | def _SetParallelContext(cls, context): | ||
272 | cls._parallel_context = context | ||
273 | |||
274 | @classmethod | ||
246 | def ExecuteInParallel( | 275 | def ExecuteInParallel( |
247 | jobs, func, inputs, callback, output=None, ordered=False | 276 | cls, |
277 | jobs, | ||
278 | func, | ||
279 | inputs, | ||
280 | callback, | ||
281 | output=None, | ||
282 | ordered=False, | ||
283 | chunksize=WORKER_BATCH_SIZE, | ||
248 | ): | 284 | ): |
249 | """Helper for managing parallel execution boiler plate. | 285 | """Helper for managing parallel execution boiler plate. |
250 | 286 | ||
@@ -269,6 +305,8 @@ class Command: | |||
269 | output: An output manager. May be progress.Progess or | 305 | output: An output manager. May be progress.Progess or |
270 | color.Coloring. | 306 | color.Coloring. |
271 | ordered: Whether the jobs should be processed in order. | 307 | ordered: Whether the jobs should be processed in order. |
308 | chunksize: The number of jobs processed in batch by parallel | ||
309 | workers. | ||
272 | 310 | ||
273 | Returns: | 311 | Returns: |
274 | The |callback| function's results are returned. | 312 | The |callback| function's results are returned. |
@@ -278,12 +316,16 @@ class Command: | |||
278 | if len(inputs) == 1 or jobs == 1: | 316 | if len(inputs) == 1 or jobs == 1: |
279 | return callback(None, output, (func(x) for x in inputs)) | 317 | return callback(None, output, (func(x) for x in inputs)) |
280 | else: | 318 | else: |
281 | with multiprocessing.Pool(jobs) as pool: | 319 | with multiprocessing.Pool( |
320 | jobs, | ||
321 | initializer=cls._SetParallelContext, | ||
322 | initargs=(cls._parallel_context,), | ||
323 | ) as pool: | ||
282 | submit = pool.imap if ordered else pool.imap_unordered | 324 | submit = pool.imap if ordered else pool.imap_unordered |
283 | return callback( | 325 | return callback( |
284 | pool, | 326 | pool, |
285 | output, | 327 | output, |
286 | submit(func, inputs, chunksize=WORKER_BATCH_SIZE), | 328 | submit(func, inputs, chunksize=chunksize), |
287 | ) | 329 | ) |
288 | finally: | 330 | finally: |
289 | if isinstance(output, progress.Progress): | 331 | if isinstance(output, progress.Progress): |