summaryrefslogtreecommitdiffstats
path: root/subcmds/forall.py
diff options
context:
space:
mode:
Diffstat (limited to 'subcmds/forall.py')
-rw-r--r--subcmds/forall.py297
1 files changed, 185 insertions, 112 deletions
diff --git a/subcmds/forall.py b/subcmds/forall.py
index e2a420a9..7771ec16 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -14,7 +14,9 @@
14# limitations under the License. 14# limitations under the License.
15 15
16from __future__ import print_function 16from __future__ import print_function
17import errno
17import fcntl 18import fcntl
19import multiprocessing
18import re 20import re
19import os 21import os
20import select 22import select
@@ -31,6 +33,7 @@ _CAN_COLOR = [
31 'log', 33 'log',
32] 34]
33 35
36
34class ForallColoring(Coloring): 37class ForallColoring(Coloring):
35 def __init__(self, config): 38 def __init__(self, config):
36 Coloring.__init__(self, config, 'forall') 39 Coloring.__init__(self, config, 'forall')
@@ -87,6 +90,12 @@ revision to a locally executed git command, use REPO_LREV.
87REPO_RREV is the name of the revision from the manifest, exactly 90REPO_RREV is the name of the revision from the manifest, exactly
88as written in the manifest. 91as written in the manifest.
89 92
93REPO_COUNT is the total number of projects being iterated.
94
95REPO_I is the current (1-based) iteration count. Can be used in
96conjunction with REPO_COUNT to add a simple progress indicator to your
97command.
98
90REPO__* are any extra environment variables, specified by the 99REPO__* are any extra environment variables, specified by the
91"annotation" element under any project element. This can be useful 100"annotation" element under any project element. This can be useful
92for differentiating trees based on user-specific criteria, or simply 101for differentiating trees based on user-specific criteria, or simply
@@ -126,9 +135,31 @@ without iterating through the remaining projects.
126 g.add_option('-v', '--verbose', 135 g.add_option('-v', '--verbose',
127 dest='verbose', action='store_true', 136 dest='verbose', action='store_true',
128 help='Show command error messages') 137 help='Show command error messages')
138 g.add_option('-j', '--jobs',
139 dest='jobs', action='store', type='int', default=1,
140 help='number of commands to execute simultaneously')
129 141
130 def WantPager(self, opt): 142 def WantPager(self, opt):
131 return opt.project_header 143 return opt.project_header and opt.jobs == 1
144
145 def _SerializeProject(self, project):
146 """ Serialize a project._GitGetByExec instance.
147
148 project._GitGetByExec is not pickle-able. Instead of trying to pass it
149 around between processes, make a dict ourselves containing only the
150 attributes that we need.
151
152 """
153 return {
154 'name': project.name,
155 'relpath': project.relpath,
156 'remote_name': project.remote.name,
157 'lrev': project.GetRevisionId(),
158 'rrev': project.revisionExpr,
159 'annotations': dict((a.name, a.value) for a in project.annotations),
160 'gitdir': project.gitdir,
161 'worktree': project.worktree,
162 }
132 163
133 def Execute(self, opt, args): 164 def Execute(self, opt, args):
134 if not opt.command: 165 if not opt.command:
@@ -167,123 +198,165 @@ without iterating through the remaining projects.
167 # pylint: enable=W0631 198 # pylint: enable=W0631
168 199
169 mirror = self.manifest.IsMirror 200 mirror = self.manifest.IsMirror
170 out = ForallColoring(self.manifest.manifestProject.config)
171 out.redirect(sys.stdout)
172
173 rc = 0 201 rc = 0
174 first = True
175 202
176 if not opt.regex: 203 if not opt.regex:
177 projects = self.GetProjects(args) 204 projects = self.GetProjects(args)
178 else: 205 else:
179 projects = self.FindProjects(args) 206 projects = self.FindProjects(args)
180 207
181 for project in projects: 208 os.environ['REPO_COUNT'] = str(len(projects))
182 env = os.environ.copy() 209
183 def setenv(name, val): 210 pool = multiprocessing.Pool(opt.jobs)
184 if val is None: 211 try:
185 val = '' 212 config = self.manifest.manifestProject.config
186 env[name] = val.encode() 213 results_it = pool.imap(
187 214 DoWorkWrapper,
188 setenv('REPO_PROJECT', project.name) 215 [[mirror, opt, cmd, shell, cnt, config, self._SerializeProject(p)]
189 setenv('REPO_PATH', project.relpath) 216 for cnt, p in enumerate(projects)]
190 setenv('REPO_REMOTE', project.remote.name) 217 )
191 setenv('REPO_LREV', project.GetRevisionId()) 218 pool.close()
192 setenv('REPO_RREV', project.revisionExpr) 219 for r in results_it:
193 for a in project.annotations: 220 rc = rc or r
194 setenv("REPO__%s" % (a.name), a.value) 221 if r != 0 and opt.abort_on_errors:
195 222 raise Exception('Aborting due to previous error')
196 if mirror: 223 except (KeyboardInterrupt, WorkerKeyboardInterrupt):
197 setenv('GIT_DIR', project.gitdir) 224 # Catch KeyboardInterrupt raised inside and outside of workers
198 cwd = project.gitdir 225 print('Interrupted - terminating the pool')
199 else: 226 pool.terminate()
200 cwd = project.worktree 227 rc = rc or errno.EINTR
201 228 except Exception as e:
202 if not os.path.exists(cwd): 229 # Catch any other exceptions raised
203 if (opt.project_header and opt.verbose) \ 230 print('Got an error, terminating the pool: %r' % e,
204 or not opt.project_header: 231 file=sys.stderr)
205 print('skipping %s/' % project.relpath, file=sys.stderr) 232 pool.terminate()
206 continue 233 rc = rc or getattr(e, 'errno', 1)
207 234 finally:
208 if opt.project_header: 235 pool.join()
209 stdin = subprocess.PIPE
210 stdout = subprocess.PIPE
211 stderr = subprocess.PIPE
212 else:
213 stdin = None
214 stdout = None
215 stderr = None
216
217 p = subprocess.Popen(cmd,
218 cwd = cwd,
219 shell = shell,
220 env = env,
221 stdin = stdin,
222 stdout = stdout,
223 stderr = stderr)
224
225 if opt.project_header:
226 class sfd(object):
227 def __init__(self, fd, dest):
228 self.fd = fd
229 self.dest = dest
230 def fileno(self):
231 return self.fd.fileno()
232
233 empty = True
234 errbuf = ''
235
236 p.stdin.close()
237 s_in = [sfd(p.stdout, sys.stdout),
238 sfd(p.stderr, sys.stderr)]
239
240 for s in s_in:
241 flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
242 fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
243
244 while s_in:
245 in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
246 for s in in_ready:
247 buf = s.fd.read(4096)
248 if not buf:
249 s.fd.close()
250 s_in.remove(s)
251 continue
252
253 if not opt.verbose:
254 if s.fd != p.stdout:
255 errbuf += buf
256 continue
257
258 if empty:
259 if first:
260 first = False
261 else:
262 out.nl()
263
264 if mirror:
265 project_header_path = project.name
266 else:
267 project_header_path = project.relpath
268 out.project('project %s/', project_header_path)
269 out.nl()
270 out.flush()
271 if errbuf:
272 sys.stderr.write(errbuf)
273 sys.stderr.flush()
274 errbuf = ''
275 empty = False
276
277 s.dest.write(buf)
278 s.dest.flush()
279
280 r = p.wait()
281 if r != 0:
282 if r != rc:
283 rc = r
284 if opt.abort_on_errors:
285 print("error: %s: Aborting due to previous error" % project.relpath,
286 file=sys.stderr)
287 sys.exit(r)
288 if rc != 0: 236 if rc != 0:
289 sys.exit(rc) 237 sys.exit(rc)
238
239
240class WorkerKeyboardInterrupt(Exception):
241 """ Keyboard interrupt exception for worker processes. """
242 pass
243
244
245def DoWorkWrapper(args):
246 """ A wrapper around the DoWork() method.
247
248 Catch the KeyboardInterrupt exceptions here and re-raise them as a different,
249 ``Exception``-based exception to stop it flooding the console with stacktraces
250 and making the parent hang indefinitely.
251
252 """
253 project = args.pop()
254 try:
255 return DoWork(project, *args)
256 except KeyboardInterrupt:
257 print('%s: Worker interrupted' % project['name'])
258 raise WorkerKeyboardInterrupt()
259
260
261def DoWork(project, mirror, opt, cmd, shell, cnt, config):
262 env = os.environ.copy()
263 def setenv(name, val):
264 if val is None:
265 val = ''
266 env[name] = val.encode()
267
268 setenv('REPO_PROJECT', project['name'])
269 setenv('REPO_PATH', project['relpath'])
270 setenv('REPO_REMOTE', project['remote_name'])
271 setenv('REPO_LREV', project['lrev'])
272 setenv('REPO_RREV', project['rrev'])
273 setenv('REPO_I', str(cnt + 1))
274 for name in project['annotations']:
275 setenv("REPO__%s" % (name), project['annotations'][name])
276
277 if mirror:
278 setenv('GIT_DIR', project['gitdir'])
279 cwd = project['gitdir']
280 else:
281 cwd = project['worktree']
282
283 if not os.path.exists(cwd):
284 if (opt.project_header and opt.verbose) \
285 or not opt.project_header:
286 print('skipping %s/' % project['relpath'], file=sys.stderr)
287 return
288
289 if opt.project_header:
290 stdin = subprocess.PIPE
291 stdout = subprocess.PIPE
292 stderr = subprocess.PIPE
293 else:
294 stdin = None
295 stdout = None
296 stderr = None
297
298 p = subprocess.Popen(cmd,
299 cwd=cwd,
300 shell=shell,
301 env=env,
302 stdin=stdin,
303 stdout=stdout,
304 stderr=stderr)
305
306 if opt.project_header:
307 out = ForallColoring(config)
308 out.redirect(sys.stdout)
309 class sfd(object):
310 def __init__(self, fd, dest):
311 self.fd = fd
312 self.dest = dest
313 def fileno(self):
314 return self.fd.fileno()
315
316 empty = True
317 errbuf = ''
318
319 p.stdin.close()
320 s_in = [sfd(p.stdout, sys.stdout),
321 sfd(p.stderr, sys.stderr)]
322
323 for s in s_in:
324 flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
325 fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
326
327 while s_in:
328 in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
329 for s in in_ready:
330 buf = s.fd.read(4096)
331 if not buf:
332 s.fd.close()
333 s_in.remove(s)
334 continue
335
336 if not opt.verbose:
337 if s.fd != p.stdout:
338 errbuf += buf
339 continue
340
341 if empty and out:
342 if not cnt == 0:
343 out.nl()
344
345 if mirror:
346 project_header_path = project['name']
347 else:
348 project_header_path = project['relpath']
349 out.project('project %s/', project_header_path)
350 out.nl()
351 out.flush()
352 if errbuf:
353 sys.stderr.write(errbuf)
354 sys.stderr.flush()
355 errbuf = ''
356 empty = False
357
358 s.dest.write(buf)
359 s.dest.flush()
360
361 r = p.wait()
362 return r