summaryrefslogtreecommitdiffstats
path: root/subcmds/forall.py
diff options
context:
space:
mode:
Diffstat (limited to 'subcmds/forall.py')
-rw-r--r--subcmds/forall.py325
1 files changed, 213 insertions, 112 deletions
diff --git a/subcmds/forall.py b/subcmds/forall.py
index e2a420a9..b93cd6d0 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -14,10 +14,13 @@
14# limitations under the License. 14# limitations under the License.
15 15
16from __future__ import print_function 16from __future__ import print_function
17import errno
17import fcntl 18import fcntl
19import multiprocessing
18import re 20import re
19import os 21import os
20import select 22import select
23import signal
21import sys 24import sys
22import subprocess 25import subprocess
23 26
@@ -31,6 +34,7 @@ _CAN_COLOR = [
31 'log', 34 'log',
32] 35]
33 36
37
34class ForallColoring(Coloring): 38class ForallColoring(Coloring):
35 def __init__(self, config): 39 def __init__(self, config):
36 Coloring.__init__(self, config, 'forall') 40 Coloring.__init__(self, config, 'forall')
@@ -87,6 +91,12 @@ revision to a locally executed git command, use REPO_LREV.
87REPO_RREV is the name of the revision from the manifest, exactly 91REPO_RREV is the name of the revision from the manifest, exactly
88as written in the manifest. 92as written in the manifest.
89 93
94REPO_COUNT is the total number of projects being iterated.
95
96REPO_I is the current (1-based) iteration count. Can be used in
97conjunction with REPO_COUNT to add a simple progress indicator to your
98command.
99
90REPO__* are any extra environment variables, specified by the 100REPO__* are any extra environment variables, specified by the
91"annotation" element under any project element. This can be useful 101"annotation" element under any project element. This can be useful
92for differentiating trees based on user-specific criteria, or simply 102for differentiating trees based on user-specific criteria, or simply
@@ -126,9 +136,35 @@ without iterating through the remaining projects.
126 g.add_option('-v', '--verbose', 136 g.add_option('-v', '--verbose',
127 dest='verbose', action='store_true', 137 dest='verbose', action='store_true',
128 help='Show command error messages') 138 help='Show command error messages')
139 g.add_option('-j', '--jobs',
140 dest='jobs', action='store', type='int', default=1,
141 help='number of commands to execute simultaneously')
129 142
130 def WantPager(self, opt): 143 def WantPager(self, opt):
131 return opt.project_header 144 return opt.project_header and opt.jobs == 1
145
146 def _SerializeProject(self, project):
147 """ Serialize a project._GitGetByExec instance.
148
149 project._GitGetByExec is not pickle-able. Instead of trying to pass it
150 around between processes, make a dict ourselves containing only the
151 attributes that we need.
152
153 """
154 if not self.manifest.IsMirror:
155 lrev = project.GetRevisionId()
156 else:
157 lrev = None
158 return {
159 'name': project.name,
160 'relpath': project.relpath,
161 'remote_name': project.remote.name,
162 'lrev': lrev,
163 'rrev': project.revisionExpr,
164 'annotations': dict((a.name, a.value) for a in project.annotations),
165 'gitdir': project.gitdir,
166 'worktree': project.worktree,
167 }
132 168
133 def Execute(self, opt, args): 169 def Execute(self, opt, args):
134 if not opt.command: 170 if not opt.command:
@@ -167,123 +203,188 @@ without iterating through the remaining projects.
167 # pylint: enable=W0631 203 # pylint: enable=W0631
168 204
169 mirror = self.manifest.IsMirror 205 mirror = self.manifest.IsMirror
170 out = ForallColoring(self.manifest.manifestProject.config)
171 out.redirect(sys.stdout)
172
173 rc = 0 206 rc = 0
174 first = True 207
208 smart_sync_manifest_name = "smart_sync_override.xml"
209 smart_sync_manifest_path = os.path.join(
210 self.manifest.manifestProject.worktree, smart_sync_manifest_name)
211
212 if os.path.isfile(smart_sync_manifest_path):
213 self.manifest.Override(smart_sync_manifest_path)
175 214
176 if not opt.regex: 215 if not opt.regex:
177 projects = self.GetProjects(args) 216 projects = self.GetProjects(args)
178 else: 217 else:
179 projects = self.FindProjects(args) 218 projects = self.FindProjects(args)
180 219
181 for project in projects: 220 os.environ['REPO_COUNT'] = str(len(projects))
182 env = os.environ.copy() 221
183 def setenv(name, val): 222 pool = multiprocessing.Pool(opt.jobs, InitWorker)
184 if val is None: 223 try:
185 val = '' 224 config = self.manifest.manifestProject.config
186 env[name] = val.encode() 225 results_it = pool.imap(
187 226 DoWorkWrapper,
188 setenv('REPO_PROJECT', project.name) 227 self.ProjectArgs(projects, mirror, opt, cmd, shell, config))
189 setenv('REPO_PATH', project.relpath) 228 pool.close()
190 setenv('REPO_REMOTE', project.remote.name) 229 for r in results_it:
191 setenv('REPO_LREV', project.GetRevisionId()) 230 rc = rc or r
192 setenv('REPO_RREV', project.revisionExpr) 231 if r != 0 and opt.abort_on_errors:
193 for a in project.annotations: 232 raise Exception('Aborting due to previous error')
194 setenv("REPO__%s" % (a.name), a.value) 233 except (KeyboardInterrupt, WorkerKeyboardInterrupt):
195 234 # Catch KeyboardInterrupt raised inside and outside of workers
196 if mirror: 235 print('Interrupted - terminating the pool')
197 setenv('GIT_DIR', project.gitdir) 236 pool.terminate()
198 cwd = project.gitdir 237 rc = rc or errno.EINTR
199 else: 238 except Exception as e:
200 cwd = project.worktree 239 # Catch any other exceptions raised
201 240 print('Got an error, terminating the pool: %r' % e,
202 if not os.path.exists(cwd): 241 file=sys.stderr)
203 if (opt.project_header and opt.verbose) \ 242 pool.terminate()
204 or not opt.project_header: 243 rc = rc or getattr(e, 'errno', 1)
205 print('skipping %s/' % project.relpath, file=sys.stderr) 244 finally:
206 continue 245 pool.join()
207
208 if opt.project_header:
209 stdin = subprocess.PIPE
210 stdout = subprocess.PIPE
211 stderr = subprocess.PIPE
212 else:
213 stdin = None
214 stdout = None
215 stderr = None
216
217 p = subprocess.Popen(cmd,
218 cwd = cwd,
219 shell = shell,
220 env = env,
221 stdin = stdin,
222 stdout = stdout,
223 stderr = stderr)
224
225 if opt.project_header:
226 class sfd(object):
227 def __init__(self, fd, dest):
228 self.fd = fd
229 self.dest = dest
230 def fileno(self):
231 return self.fd.fileno()
232
233 empty = True
234 errbuf = ''
235
236 p.stdin.close()
237 s_in = [sfd(p.stdout, sys.stdout),
238 sfd(p.stderr, sys.stderr)]
239
240 for s in s_in:
241 flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
242 fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
243
244 while s_in:
245 in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
246 for s in in_ready:
247 buf = s.fd.read(4096)
248 if not buf:
249 s.fd.close()
250 s_in.remove(s)
251 continue
252
253 if not opt.verbose:
254 if s.fd != p.stdout:
255 errbuf += buf
256 continue
257
258 if empty:
259 if first:
260 first = False
261 else:
262 out.nl()
263
264 if mirror:
265 project_header_path = project.name
266 else:
267 project_header_path = project.relpath
268 out.project('project %s/', project_header_path)
269 out.nl()
270 out.flush()
271 if errbuf:
272 sys.stderr.write(errbuf)
273 sys.stderr.flush()
274 errbuf = ''
275 empty = False
276
277 s.dest.write(buf)
278 s.dest.flush()
279
280 r = p.wait()
281 if r != 0:
282 if r != rc:
283 rc = r
284 if opt.abort_on_errors:
285 print("error: %s: Aborting due to previous error" % project.relpath,
286 file=sys.stderr)
287 sys.exit(r)
288 if rc != 0: 246 if rc != 0:
289 sys.exit(rc) 247 sys.exit(rc)
248
249 def ProjectArgs(self, projects, mirror, opt, cmd, shell, config):
250 for cnt, p in enumerate(projects):
251 try:
252 project = self._SerializeProject(p)
253 except Exception as e:
254 print('Project list error: %r' % e,
255 file=sys.stderr)
256 return
257 except KeyboardInterrupt:
258 print('Project list interrupted',
259 file=sys.stderr)
260 return
261 yield [mirror, opt, cmd, shell, cnt, config, project]
262
263class WorkerKeyboardInterrupt(Exception):
264 """ Keyboard interrupt exception for worker processes. """
265 pass
266
267
268def InitWorker():
269 signal.signal(signal.SIGINT, signal.SIG_IGN)
270
271def DoWorkWrapper(args):
272 """ A wrapper around the DoWork() method.
273
274 Catch the KeyboardInterrupt exceptions here and re-raise them as a different,
275 ``Exception``-based exception to stop it flooding the console with stacktraces
276 and making the parent hang indefinitely.
277
278 """
279 project = args.pop()
280 try:
281 return DoWork(project, *args)
282 except KeyboardInterrupt:
283 print('%s: Worker interrupted' % project['name'])
284 raise WorkerKeyboardInterrupt()
285
286
287def DoWork(project, mirror, opt, cmd, shell, cnt, config):
288 env = os.environ.copy()
289 def setenv(name, val):
290 if val is None:
291 val = ''
292 if hasattr(val, 'encode'):
293 val = val.encode()
294 env[name] = val
295
296 setenv('REPO_PROJECT', project['name'])
297 setenv('REPO_PATH', project['relpath'])
298 setenv('REPO_REMOTE', project['remote_name'])
299 setenv('REPO_LREV', project['lrev'])
300 setenv('REPO_RREV', project['rrev'])
301 setenv('REPO_I', str(cnt + 1))
302 for name in project['annotations']:
303 setenv("REPO__%s" % (name), project['annotations'][name])
304
305 if mirror:
306 setenv('GIT_DIR', project['gitdir'])
307 cwd = project['gitdir']
308 else:
309 cwd = project['worktree']
310
311 if not os.path.exists(cwd):
312 if (opt.project_header and opt.verbose) \
313 or not opt.project_header:
314 print('skipping %s/' % project['relpath'], file=sys.stderr)
315 return
316
317 if opt.project_header:
318 stdin = subprocess.PIPE
319 stdout = subprocess.PIPE
320 stderr = subprocess.PIPE
321 else:
322 stdin = None
323 stdout = None
324 stderr = None
325
326 p = subprocess.Popen(cmd,
327 cwd=cwd,
328 shell=shell,
329 env=env,
330 stdin=stdin,
331 stdout=stdout,
332 stderr=stderr)
333
334 if opt.project_header:
335 out = ForallColoring(config)
336 out.redirect(sys.stdout)
337 class sfd(object):
338 def __init__(self, fd, dest):
339 self.fd = fd
340 self.dest = dest
341 def fileno(self):
342 return self.fd.fileno()
343
344 empty = True
345 errbuf = ''
346
347 p.stdin.close()
348 s_in = [sfd(p.stdout, sys.stdout),
349 sfd(p.stderr, sys.stderr)]
350
351 for s in s_in:
352 flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
353 fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
354
355 while s_in:
356 in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
357 for s in in_ready:
358 buf = s.fd.read(4096)
359 if not buf:
360 s.fd.close()
361 s_in.remove(s)
362 continue
363
364 if not opt.verbose:
365 if s.fd != p.stdout:
366 errbuf += buf
367 continue
368
369 if empty and out:
370 if not cnt == 0:
371 out.nl()
372
373 if mirror:
374 project_header_path = project['name']
375 else:
376 project_header_path = project['relpath']
377 out.project('project %s/', project_header_path)
378 out.nl()
379 out.flush()
380 if errbuf:
381 sys.stderr.write(errbuf)
382 sys.stderr.flush()
383 errbuf = ''
384 empty = False
385
386 s.dest.write(buf)
387 s.dest.flush()
388
389 r = p.wait()
390 return r