summaryrefslogtreecommitdiffstats
path: root/subcmds/forall.py
diff options
context:
space:
mode:
Diffstat (limited to 'subcmds/forall.py')
-rw-r--r--subcmds/forall.py289
1 files changed, 120 insertions, 169 deletions
diff --git a/subcmds/forall.py b/subcmds/forall.py
index 131ba676..7c1dea9e 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -1,5 +1,3 @@
1# -*- coding:utf-8 -*-
2#
3# Copyright (C) 2008 The Android Open Source Project 1# Copyright (C) 2008 The Android Open Source Project
4# 2#
5# Licensed under the Apache License, Version 2.0 (the "License"); 3# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,8 +12,9 @@
14# See the License for the specific language governing permissions and 12# See the License for the specific language governing permissions and
15# limitations under the License. 13# limitations under the License.
16 14
17from __future__ import print_function
18import errno 15import errno
16import functools
17import io
19import multiprocessing 18import multiprocessing
20import re 19import re
21import os 20import os
@@ -24,14 +23,14 @@ import sys
24import subprocess 23import subprocess
25 24
26from color import Coloring 25from color import Coloring
27from command import Command, MirrorSafeCommand 26from command import DEFAULT_LOCAL_JOBS, Command, MirrorSafeCommand, WORKER_BATCH_SIZE
28import platform_utils 27from error import ManifestInvalidRevisionError
29 28
30_CAN_COLOR = [ 29_CAN_COLOR = [
31 'branch', 30 'branch',
32 'diff', 31 'diff',
33 'grep', 32 'grep',
34 'log', 33 'log',
35] 34]
36 35
37 36
@@ -42,11 +41,11 @@ class ForallColoring(Coloring):
42 41
43 42
44class Forall(Command, MirrorSafeCommand): 43class Forall(Command, MirrorSafeCommand):
45 common = False 44 COMMON = False
46 helpSummary = "Run a shell command in each project" 45 helpSummary = "Run a shell command in each project"
47 helpUsage = """ 46 helpUsage = """
48%prog [<project>...] -c <command> [<arg>...] 47%prog [<project>...] -c <command> [<arg>...]
49%prog -r str1 [str2] ... -c <command> [<arg>...]" 48%prog -r str1 [str2] ... -c <command> [<arg>...]
50""" 49"""
51 helpDescription = """ 50 helpDescription = """
52Executes the same shell command in each project. 51Executes the same shell command in each project.
@@ -54,6 +53,11 @@ Executes the same shell command in each project.
54The -r option allows running the command only on projects matching 53The -r option allows running the command only on projects matching
55regex or wildcard expression. 54regex or wildcard expression.
56 55
56By default, projects are processed non-interactively in parallel. If you want
57to run interactive commands, make sure to pass --interactive to force --jobs 1.
58While the processing order of projects is not guaranteed, the order of project
59output is stable.
60
57# Output Formatting 61# Output Formatting
58 62
59The -p option causes '%prog' to bind pipes to the command's stdin, 63The -p option causes '%prog' to bind pipes to the command's stdin,
@@ -116,70 +120,48 @@ terminal and are not redirected.
116If -e is used, when a command exits unsuccessfully, '%prog' will abort 120If -e is used, when a command exits unsuccessfully, '%prog' will abort
117without iterating through the remaining projects. 121without iterating through the remaining projects.
118""" 122"""
123 PARALLEL_JOBS = DEFAULT_LOCAL_JOBS
124
125 @staticmethod
126 def _cmd_option(option, _opt_str, _value, parser):
127 setattr(parser.values, option.dest, list(parser.rargs))
128 while parser.rargs:
129 del parser.rargs[0]
119 130
120 def _Options(self, p): 131 def _Options(self, p):
121 def cmd(option, opt_str, value, parser):
122 setattr(parser.values, option.dest, list(parser.rargs))
123 while parser.rargs:
124 del parser.rargs[0]
125 p.add_option('-r', '--regex', 132 p.add_option('-r', '--regex',
126 dest='regex', action='store_true', 133 dest='regex', action='store_true',
127 help="Execute the command only on projects matching regex or wildcard expression") 134 help='execute the command only on projects matching regex or wildcard expression')
128 p.add_option('-i', '--inverse-regex', 135 p.add_option('-i', '--inverse-regex',
129 dest='inverse_regex', action='store_true', 136 dest='inverse_regex', action='store_true',
130 help="Execute the command only on projects not matching regex or wildcard expression") 137 help='execute the command only on projects not matching regex or '
138 'wildcard expression')
131 p.add_option('-g', '--groups', 139 p.add_option('-g', '--groups',
132 dest='groups', 140 dest='groups',
133 help="Execute the command only on projects matching the specified groups") 141 help='execute the command only on projects matching the specified groups')
134 p.add_option('-c', '--command', 142 p.add_option('-c', '--command',
135 help='Command (and arguments) to execute', 143 help='command (and arguments) to execute',
136 dest='command', 144 dest='command',
137 action='callback', 145 action='callback',
138 callback=cmd) 146 callback=self._cmd_option)
139 p.add_option('-e', '--abort-on-errors', 147 p.add_option('-e', '--abort-on-errors',
140 dest='abort_on_errors', action='store_true', 148 dest='abort_on_errors', action='store_true',
141 help='Abort if a command exits unsuccessfully') 149 help='abort if a command exits unsuccessfully')
142 p.add_option('--ignore-missing', action='store_true', 150 p.add_option('--ignore-missing', action='store_true',
143 help='Silently skip & do not exit non-zero due missing ' 151 help='silently skip & do not exit non-zero due missing '
144 'checkouts') 152 'checkouts')
145 153
146 g = p.add_option_group('Output') 154 g = p.get_option_group('--quiet')
147 g.add_option('-p', 155 g.add_option('-p',
148 dest='project_header', action='store_true', 156 dest='project_header', action='store_true',
149 help='Show project headers before output') 157 help='show project headers before output')
150 g.add_option('-v', '--verbose', 158 p.add_option('--interactive',
151 dest='verbose', action='store_true', 159 action='store_true',
152 help='Show command error messages') 160 help='force interactive usage')
153 g.add_option('-j', '--jobs',
154 dest='jobs', action='store', type='int', default=1,
155 help='number of commands to execute simultaneously')
156 161
157 def WantPager(self, opt): 162 def WantPager(self, opt):
158 return opt.project_header and opt.jobs == 1 163 return opt.project_header and opt.jobs == 1
159 164
160 def _SerializeProject(self, project):
161 """ Serialize a project._GitGetByExec instance.
162
163 project._GitGetByExec is not pickle-able. Instead of trying to pass it
164 around between processes, make a dict ourselves containing only the
165 attributes that we need.
166
167 """
168 if not self.manifest.IsMirror:
169 lrev = project.GetRevisionId()
170 else:
171 lrev = None
172 return {
173 'name': project.name,
174 'relpath': project.relpath,
175 'remote_name': project.remote.name,
176 'lrev': lrev,
177 'rrev': project.revisionExpr,
178 'annotations': dict((a.name, a.value) for a in project.annotations),
179 'gitdir': project.gitdir,
180 'worktree': project.worktree,
181 }
182
183 def ValidateOptions(self, opt, args): 165 def ValidateOptions(self, opt, args):
184 if not opt.command: 166 if not opt.command:
185 self.Usage() 167 self.Usage()
@@ -195,9 +177,14 @@ without iterating through the remaining projects.
195 cmd.append(cmd[0]) 177 cmd.append(cmd[0])
196 cmd.extend(opt.command[1:]) 178 cmd.extend(opt.command[1:])
197 179
198 if opt.project_header \ 180 # Historically, forall operated interactively, and in serial. If the user
199 and not shell \ 181 # has selected 1 job, then default to interacive mode.
200 and cmd[0] == 'git': 182 if opt.jobs == 1:
183 opt.interactive = True
184
185 if opt.project_header \
186 and not shell \
187 and cmd[0] == 'git':
201 # If this is a direct git command that can enable colorized 188 # If this is a direct git command that can enable colorized
202 # output and the user prefers coloring, add --color into the 189 # output and the user prefers coloring, add --color into the
203 # command line because we are going to wrap the command into 190 # command line because we are going to wrap the command into
@@ -220,7 +207,7 @@ without iterating through the remaining projects.
220 207
221 smart_sync_manifest_name = "smart_sync_override.xml" 208 smart_sync_manifest_name = "smart_sync_override.xml"
222 smart_sync_manifest_path = os.path.join( 209 smart_sync_manifest_path = os.path.join(
223 self.manifest.manifestProject.worktree, smart_sync_manifest_name) 210 self.manifest.manifestProject.worktree, smart_sync_manifest_name)
224 211
225 if os.path.isfile(smart_sync_manifest_path): 212 if os.path.isfile(smart_sync_manifest_path):
226 self.manifest.Override(smart_sync_manifest_path) 213 self.manifest.Override(smart_sync_manifest_path)
@@ -234,58 +221,50 @@ without iterating through the remaining projects.
234 221
235 os.environ['REPO_COUNT'] = str(len(projects)) 222 os.environ['REPO_COUNT'] = str(len(projects))
236 223
237 pool = multiprocessing.Pool(opt.jobs, InitWorker)
238 try: 224 try:
239 config = self.manifest.manifestProject.config 225 config = self.manifest.manifestProject.config
240 results_it = pool.imap( 226 with multiprocessing.Pool(opt.jobs, InitWorker) as pool:
241 DoWorkWrapper, 227 results_it = pool.imap(
242 self.ProjectArgs(projects, mirror, opt, cmd, shell, config)) 228 functools.partial(DoWorkWrapper, mirror, opt, cmd, shell, config),
243 pool.close() 229 enumerate(projects),
244 for r in results_it: 230 chunksize=WORKER_BATCH_SIZE)
245 rc = rc or r 231 first = True
246 if r != 0 and opt.abort_on_errors: 232 for (r, output) in results_it:
247 raise Exception('Aborting due to previous error') 233 if output:
234 if first:
235 first = False
236 elif opt.project_header:
237 print()
238 # To simplify the DoWorkWrapper, take care of automatic newlines.
239 end = '\n'
240 if output[-1] == '\n':
241 end = ''
242 print(output, end=end)
243 rc = rc or r
244 if r != 0 and opt.abort_on_errors:
245 raise Exception('Aborting due to previous error')
248 except (KeyboardInterrupt, WorkerKeyboardInterrupt): 246 except (KeyboardInterrupt, WorkerKeyboardInterrupt):
249 # Catch KeyboardInterrupt raised inside and outside of workers 247 # Catch KeyboardInterrupt raised inside and outside of workers
250 print('Interrupted - terminating the pool')
251 pool.terminate()
252 rc = rc or errno.EINTR 248 rc = rc or errno.EINTR
253 except Exception as e: 249 except Exception as e:
254 # Catch any other exceptions raised 250 # Catch any other exceptions raised
255 print('Got an error, terminating the pool: %s: %s' % 251 print('forall: unhandled error, terminating the pool: %s: %s' %
256 (type(e).__name__, e), 252 (type(e).__name__, e),
257 file=sys.stderr) 253 file=sys.stderr)
258 pool.terminate()
259 rc = rc or getattr(e, 'errno', 1) 254 rc = rc or getattr(e, 'errno', 1)
260 finally:
261 pool.join()
262 if rc != 0: 255 if rc != 0:
263 sys.exit(rc) 256 sys.exit(rc)
264 257
265 def ProjectArgs(self, projects, mirror, opt, cmd, shell, config):
266 for cnt, p in enumerate(projects):
267 try:
268 project = self._SerializeProject(p)
269 except Exception as e:
270 print('Project list error on project %s: %s: %s' %
271 (p.name, type(e).__name__, e),
272 file=sys.stderr)
273 return
274 except KeyboardInterrupt:
275 print('Project list interrupted',
276 file=sys.stderr)
277 return
278 yield [mirror, opt, cmd, shell, cnt, config, project]
279 258
280class WorkerKeyboardInterrupt(Exception): 259class WorkerKeyboardInterrupt(Exception):
281 """ Keyboard interrupt exception for worker processes. """ 260 """ Keyboard interrupt exception for worker processes. """
282 pass
283 261
284 262
285def InitWorker(): 263def InitWorker():
286 signal.signal(signal.SIGINT, signal.SIG_IGN) 264 signal.signal(signal.SIGINT, signal.SIG_IGN)
287 265
288def DoWorkWrapper(args): 266
267def DoWorkWrapper(mirror, opt, cmd, shell, config, args):
289 """ A wrapper around the DoWork() method. 268 """ A wrapper around the DoWork() method.
290 269
291 Catch the KeyboardInterrupt exceptions here and re-raise them as a different, 270 Catch the KeyboardInterrupt exceptions here and re-raise them as a different,
@@ -293,109 +272,81 @@ def DoWorkWrapper(args):
293 and making the parent hang indefinitely. 272 and making the parent hang indefinitely.
294 273
295 """ 274 """
296 project = args.pop() 275 cnt, project = args
297 try: 276 try:
298 return DoWork(project, *args) 277 return DoWork(project, mirror, opt, cmd, shell, cnt, config)
299 except KeyboardInterrupt: 278 except KeyboardInterrupt:
300 print('%s: Worker interrupted' % project['name']) 279 print('%s: Worker interrupted' % project.name)
301 raise WorkerKeyboardInterrupt() 280 raise WorkerKeyboardInterrupt()
302 281
303 282
304def DoWork(project, mirror, opt, cmd, shell, cnt, config): 283def DoWork(project, mirror, opt, cmd, shell, cnt, config):
305 env = os.environ.copy() 284 env = os.environ.copy()
285
306 def setenv(name, val): 286 def setenv(name, val):
307 if val is None: 287 if val is None:
308 val = '' 288 val = ''
309 if hasattr(val, 'encode'):
310 val = val.encode()
311 env[name] = val 289 env[name] = val
312 290
313 setenv('REPO_PROJECT', project['name']) 291 setenv('REPO_PROJECT', project.name)
314 setenv('REPO_PATH', project['relpath']) 292 setenv('REPO_PATH', project.relpath)
315 setenv('REPO_REMOTE', project['remote_name']) 293 setenv('REPO_REMOTE', project.remote.name)
316 setenv('REPO_LREV', project['lrev']) 294 try:
317 setenv('REPO_RREV', project['rrev']) 295 # If we aren't in a fully synced state and we don't have the ref the manifest
296 # wants, then this will fail. Ignore it for the purposes of this code.
297 lrev = '' if mirror else project.GetRevisionId()
298 except ManifestInvalidRevisionError:
299 lrev = ''
300 setenv('REPO_LREV', lrev)
301 setenv('REPO_RREV', project.revisionExpr)
302 setenv('REPO_UPSTREAM', project.upstream)
303 setenv('REPO_DEST_BRANCH', project.dest_branch)
318 setenv('REPO_I', str(cnt + 1)) 304 setenv('REPO_I', str(cnt + 1))
319 for name in project['annotations']: 305 for annotation in project.annotations:
320 setenv("REPO__%s" % (name), project['annotations'][name]) 306 setenv("REPO__%s" % (annotation.name), annotation.value)
321 307
322 if mirror: 308 if mirror:
323 setenv('GIT_DIR', project['gitdir']) 309 setenv('GIT_DIR', project.gitdir)
324 cwd = project['gitdir'] 310 cwd = project.gitdir
325 else: 311 else:
326 cwd = project['worktree'] 312 cwd = project.worktree
327 313
328 if not os.path.exists(cwd): 314 if not os.path.exists(cwd):
329 # Allow the user to silently ignore missing checkouts so they can run on 315 # Allow the user to silently ignore missing checkouts so they can run on
330 # partial checkouts (good for infra recovery tools). 316 # partial checkouts (good for infra recovery tools).
331 if opt.ignore_missing: 317 if opt.ignore_missing:
332 return 0 318 return (0, '')
319
320 output = ''
333 if ((opt.project_header and opt.verbose) 321 if ((opt.project_header and opt.verbose)
334 or not opt.project_header): 322 or not opt.project_header):
335 print('skipping %s/' % project['relpath'], file=sys.stderr) 323 output = 'skipping %s/' % project.relpath
336 return 1 324 return (1, output)
337 325
338 if opt.project_header: 326 if opt.verbose:
339 stdin = subprocess.PIPE 327 stderr = subprocess.STDOUT
340 stdout = subprocess.PIPE
341 stderr = subprocess.PIPE
342 else: 328 else:
343 stdin = None 329 stderr = subprocess.DEVNULL
344 stdout = None 330
345 stderr = None 331 stdin = None if opt.interactive else subprocess.DEVNULL
346
347 p = subprocess.Popen(cmd,
348 cwd=cwd,
349 shell=shell,
350 env=env,
351 stdin=stdin,
352 stdout=stdout,
353 stderr=stderr)
354 332
333 result = subprocess.run(
334 cmd, cwd=cwd, shell=shell, env=env, check=False,
335 encoding='utf-8', errors='replace',
336 stdin=stdin, stdout=subprocess.PIPE, stderr=stderr)
337
338 output = result.stdout
355 if opt.project_header: 339 if opt.project_header:
356 out = ForallColoring(config) 340 if output:
357 out.redirect(sys.stdout) 341 buf = io.StringIO()
358 empty = True 342 out = ForallColoring(config)
359 errbuf = '' 343 out.redirect(buf)
360 344 if mirror:
361 p.stdin.close() 345 project_header_path = project.name
362 s_in = platform_utils.FileDescriptorStreams.create() 346 else:
363 s_in.add(p.stdout, sys.stdout, 'stdout') 347 project_header_path = project.relpath
364 s_in.add(p.stderr, sys.stderr, 'stderr') 348 out.project('project %s/' % project_header_path)
365 349 out.nl()
366 while not s_in.is_done: 350 buf.write(output)
367 in_ready = s_in.select() 351 output = buf.getvalue()
368 for s in in_ready: 352 return (result.returncode, output)
369 buf = s.read().decode()
370 if not buf:
371 s.close()
372 s_in.remove(s)
373 continue
374
375 if not opt.verbose:
376 if s.std_name == 'stderr':
377 errbuf += buf
378 continue
379
380 if empty and out:
381 if not cnt == 0:
382 out.nl()
383
384 if mirror:
385 project_header_path = project['name']
386 else:
387 project_header_path = project['relpath']
388 out.project('project %s/', project_header_path)
389 out.nl()
390 out.flush()
391 if errbuf:
392 sys.stderr.write(errbuf)
393 sys.stderr.flush()
394 errbuf = ''
395 empty = False
396
397 s.dest.write(buf)
398 s.dest.flush()
399
400 r = p.wait()
401 return r