summaryrefslogtreecommitdiffstats
path: root/subcmds/forall.py
diff options
context:
space:
mode:
Diffstat (limited to 'subcmds/forall.py')
-rw-r--r--subcmds/forall.py290
1 files changed, 177 insertions, 113 deletions
diff --git a/subcmds/forall.py b/subcmds/forall.py
index 03ebcb21..7771ec16 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -14,7 +14,9 @@
14# limitations under the License. 14# limitations under the License.
15 15
16from __future__ import print_function 16from __future__ import print_function
17import errno
17import fcntl 18import fcntl
19import multiprocessing
18import re 20import re
19import os 21import os
20import select 22import select
@@ -31,6 +33,7 @@ _CAN_COLOR = [
31 'log', 33 'log',
32] 34]
33 35
36
34class ForallColoring(Coloring): 37class ForallColoring(Coloring):
35 def __init__(self, config): 38 def __init__(self, config):
36 Coloring.__init__(self, config, 'forall') 39 Coloring.__init__(self, config, 'forall')
@@ -132,9 +135,31 @@ without iterating through the remaining projects.
132 g.add_option('-v', '--verbose', 135 g.add_option('-v', '--verbose',
133 dest='verbose', action='store_true', 136 dest='verbose', action='store_true',
134 help='Show command error messages') 137 help='Show command error messages')
138 g.add_option('-j', '--jobs',
139 dest='jobs', action='store', type='int', default=1,
140 help='number of commands to execute simultaneously')
135 141
136 def WantPager(self, opt): 142 def WantPager(self, opt):
137 return opt.project_header 143 return opt.project_header and opt.jobs == 1
144
145 def _SerializeProject(self, project):
146 """ Serialize a project._GitGetByExec instance.
147
148 project._GitGetByExec is not pickle-able. Instead of trying to pass it
149 around between processes, make a dict ourselves containing only the
150 attributes that we need.
151
152 """
153 return {
154 'name': project.name,
155 'relpath': project.relpath,
156 'remote_name': project.remote.name,
157 'lrev': project.GetRevisionId(),
158 'rrev': project.revisionExpr,
159 'annotations': dict((a.name, a.value) for a in project.annotations),
160 'gitdir': project.gitdir,
161 'worktree': project.worktree,
162 }
138 163
139 def Execute(self, opt, args): 164 def Execute(self, opt, args):
140 if not opt.command: 165 if not opt.command:
@@ -173,11 +198,7 @@ without iterating through the remaining projects.
173 # pylint: enable=W0631 198 # pylint: enable=W0631
174 199
175 mirror = self.manifest.IsMirror 200 mirror = self.manifest.IsMirror
176 out = ForallColoring(self.manifest.manifestProject.config)
177 out.redirect(sys.stdout)
178
179 rc = 0 201 rc = 0
180 first = True
181 202
182 if not opt.regex: 203 if not opt.regex:
183 projects = self.GetProjects(args) 204 projects = self.GetProjects(args)
@@ -186,113 +207,156 @@ without iterating through the remaining projects.
186 207
187 os.environ['REPO_COUNT'] = str(len(projects)) 208 os.environ['REPO_COUNT'] = str(len(projects))
188 209
189 for (cnt, project) in enumerate(projects): 210 pool = multiprocessing.Pool(opt.jobs)
190 env = os.environ.copy() 211 try:
191 def setenv(name, val): 212 config = self.manifest.manifestProject.config
192 if val is None: 213 results_it = pool.imap(
193 val = '' 214 DoWorkWrapper,
194 env[name] = val.encode() 215 [[mirror, opt, cmd, shell, cnt, config, self._SerializeProject(p)]
195 216 for cnt, p in enumerate(projects)]
196 setenv('REPO_PROJECT', project.name) 217 )
197 setenv('REPO_PATH', project.relpath) 218 pool.close()
198 setenv('REPO_REMOTE', project.remote.name) 219 for r in results_it:
199 setenv('REPO_LREV', project.GetRevisionId()) 220 rc = rc or r
200 setenv('REPO_RREV', project.revisionExpr) 221 if r != 0 and opt.abort_on_errors:
201 setenv('REPO_I', str(cnt + 1)) 222 raise Exception('Aborting due to previous error')
202 for a in project.annotations: 223 except (KeyboardInterrupt, WorkerKeyboardInterrupt):
203 setenv("REPO__%s" % (a.name), a.value) 224 # Catch KeyboardInterrupt raised inside and outside of workers
204 225 print('Interrupted - terminating the pool')
205 if mirror: 226 pool.terminate()
206 setenv('GIT_DIR', project.gitdir) 227 rc = rc or errno.EINTR
207 cwd = project.gitdir 228 except Exception as e:
208 else: 229 # Catch any other exceptions raised
209 cwd = project.worktree 230 print('Got an error, terminating the pool: %r' % e,
210 231 file=sys.stderr)
211 if not os.path.exists(cwd): 232 pool.terminate()
212 if (opt.project_header and opt.verbose) \ 233 rc = rc or getattr(e, 'errno', 1)
213 or not opt.project_header: 234 finally:
214 print('skipping %s/' % project.relpath, file=sys.stderr) 235 pool.join()
215 continue
216
217 if opt.project_header:
218 stdin = subprocess.PIPE
219 stdout = subprocess.PIPE
220 stderr = subprocess.PIPE
221 else:
222 stdin = None
223 stdout = None
224 stderr = None
225
226 p = subprocess.Popen(cmd,
227 cwd = cwd,
228 shell = shell,
229 env = env,
230 stdin = stdin,
231 stdout = stdout,
232 stderr = stderr)
233
234 if opt.project_header:
235 class sfd(object):
236 def __init__(self, fd, dest):
237 self.fd = fd
238 self.dest = dest
239 def fileno(self):
240 return self.fd.fileno()
241
242 empty = True
243 errbuf = ''
244
245 p.stdin.close()
246 s_in = [sfd(p.stdout, sys.stdout),
247 sfd(p.stderr, sys.stderr)]
248
249 for s in s_in:
250 flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
251 fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
252
253 while s_in:
254 in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
255 for s in in_ready:
256 buf = s.fd.read(4096)
257 if not buf:
258 s.fd.close()
259 s_in.remove(s)
260 continue
261
262 if not opt.verbose:
263 if s.fd != p.stdout:
264 errbuf += buf
265 continue
266
267 if empty:
268 if first:
269 first = False
270 else:
271 out.nl()
272
273 if mirror:
274 project_header_path = project.name
275 else:
276 project_header_path = project.relpath
277 out.project('project %s/', project_header_path)
278 out.nl()
279 out.flush()
280 if errbuf:
281 sys.stderr.write(errbuf)
282 sys.stderr.flush()
283 errbuf = ''
284 empty = False
285
286 s.dest.write(buf)
287 s.dest.flush()
288
289 r = p.wait()
290 if r != 0:
291 if r != rc:
292 rc = r
293 if opt.abort_on_errors:
294 print("error: %s: Aborting due to previous error" % project.relpath,
295 file=sys.stderr)
296 sys.exit(r)
297 if rc != 0: 236 if rc != 0:
298 sys.exit(rc) 237 sys.exit(rc)
238
239
240class WorkerKeyboardInterrupt(Exception):
241 """ Keyboard interrupt exception for worker processes. """
242 pass
243
244
245def DoWorkWrapper(args):
246 """ A wrapper around the DoWork() method.
247
248 Catch the KeyboardInterrupt exceptions here and re-raise them as a different,
249 ``Exception``-based exception to stop it flooding the console with stacktraces
250 and making the parent hang indefinitely.
251
252 """
253 project = args.pop()
254 try:
255 return DoWork(project, *args)
256 except KeyboardInterrupt:
257 print('%s: Worker interrupted' % project['name'])
258 raise WorkerKeyboardInterrupt()
259
260
261def DoWork(project, mirror, opt, cmd, shell, cnt, config):
262 env = os.environ.copy()
263 def setenv(name, val):
264 if val is None:
265 val = ''
266 env[name] = val.encode()
267
268 setenv('REPO_PROJECT', project['name'])
269 setenv('REPO_PATH', project['relpath'])
270 setenv('REPO_REMOTE', project['remote_name'])
271 setenv('REPO_LREV', project['lrev'])
272 setenv('REPO_RREV', project['rrev'])
273 setenv('REPO_I', str(cnt + 1))
274 for name in project['annotations']:
275 setenv("REPO__%s" % (name), project['annotations'][name])
276
277 if mirror:
278 setenv('GIT_DIR', project['gitdir'])
279 cwd = project['gitdir']
280 else:
281 cwd = project['worktree']
282
283 if not os.path.exists(cwd):
284 if (opt.project_header and opt.verbose) \
285 or not opt.project_header:
286 print('skipping %s/' % project['relpath'], file=sys.stderr)
287 return
288
289 if opt.project_header:
290 stdin = subprocess.PIPE
291 stdout = subprocess.PIPE
292 stderr = subprocess.PIPE
293 else:
294 stdin = None
295 stdout = None
296 stderr = None
297
298 p = subprocess.Popen(cmd,
299 cwd=cwd,
300 shell=shell,
301 env=env,
302 stdin=stdin,
303 stdout=stdout,
304 stderr=stderr)
305
306 if opt.project_header:
307 out = ForallColoring(config)
308 out.redirect(sys.stdout)
309 class sfd(object):
310 def __init__(self, fd, dest):
311 self.fd = fd
312 self.dest = dest
313 def fileno(self):
314 return self.fd.fileno()
315
316 empty = True
317 errbuf = ''
318
319 p.stdin.close()
320 s_in = [sfd(p.stdout, sys.stdout),
321 sfd(p.stderr, sys.stderr)]
322
323 for s in s_in:
324 flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
325 fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
326
327 while s_in:
328 in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
329 for s in in_ready:
330 buf = s.fd.read(4096)
331 if not buf:
332 s.fd.close()
333 s_in.remove(s)
334 continue
335
336 if not opt.verbose:
337 if s.fd != p.stdout:
338 errbuf += buf
339 continue
340
341 if empty and out:
342 if not cnt == 0:
343 out.nl()
344
345 if mirror:
346 project_header_path = project['name']
347 else:
348 project_header_path = project['relpath']
349 out.project('project %s/', project_header_path)
350 out.nl()
351 out.flush()
352 if errbuf:
353 sys.stderr.write(errbuf)
354 sys.stderr.flush()
355 errbuf = ''
356 empty = False
357
358 s.dest.write(buf)
359 s.dest.flush()
360
361 r = p.wait()
362 return r