diff options
Diffstat (limited to 'subcmds/forall.py')
-rw-r--r-- | subcmds/forall.py | 290 |
1 files changed, 177 insertions, 113 deletions
diff --git a/subcmds/forall.py b/subcmds/forall.py index 03ebcb21..7771ec16 100644 --- a/subcmds/forall.py +++ b/subcmds/forall.py | |||
@@ -14,7 +14,9 @@ | |||
14 | # limitations under the License. | 14 | # limitations under the License. |
15 | 15 | ||
16 | from __future__ import print_function | 16 | from __future__ import print_function |
17 | import errno | ||
17 | import fcntl | 18 | import fcntl |
19 | import multiprocessing | ||
18 | import re | 20 | import re |
19 | import os | 21 | import os |
20 | import select | 22 | import select |
@@ -31,6 +33,7 @@ _CAN_COLOR = [ | |||
31 | 'log', | 33 | 'log', |
32 | ] | 34 | ] |
33 | 35 | ||
36 | |||
34 | class ForallColoring(Coloring): | 37 | class ForallColoring(Coloring): |
35 | def __init__(self, config): | 38 | def __init__(self, config): |
36 | Coloring.__init__(self, config, 'forall') | 39 | Coloring.__init__(self, config, 'forall') |
@@ -132,9 +135,31 @@ without iterating through the remaining projects. | |||
132 | g.add_option('-v', '--verbose', | 135 | g.add_option('-v', '--verbose', |
133 | dest='verbose', action='store_true', | 136 | dest='verbose', action='store_true', |
134 | help='Show command error messages') | 137 | help='Show command error messages') |
138 | g.add_option('-j', '--jobs', | ||
139 | dest='jobs', action='store', type='int', default=1, | ||
140 | help='number of commands to execute simultaneously') | ||
135 | 141 | ||
136 | def WantPager(self, opt): | 142 | def WantPager(self, opt): |
137 | return opt.project_header | 143 | return opt.project_header and opt.jobs == 1 |
144 | |||
145 | def _SerializeProject(self, project): | ||
146 | """ Serialize a project._GitGetByExec instance. | ||
147 | |||
148 | project._GitGetByExec is not pickle-able. Instead of trying to pass it | ||
149 | around between processes, make a dict ourselves containing only the | ||
150 | attributes that we need. | ||
151 | |||
152 | """ | ||
153 | return { | ||
154 | 'name': project.name, | ||
155 | 'relpath': project.relpath, | ||
156 | 'remote_name': project.remote.name, | ||
157 | 'lrev': project.GetRevisionId(), | ||
158 | 'rrev': project.revisionExpr, | ||
159 | 'annotations': dict((a.name, a.value) for a in project.annotations), | ||
160 | 'gitdir': project.gitdir, | ||
161 | 'worktree': project.worktree, | ||
162 | } | ||
138 | 163 | ||
139 | def Execute(self, opt, args): | 164 | def Execute(self, opt, args): |
140 | if not opt.command: | 165 | if not opt.command: |
@@ -173,11 +198,7 @@ without iterating through the remaining projects. | |||
173 | # pylint: enable=W0631 | 198 | # pylint: enable=W0631 |
174 | 199 | ||
175 | mirror = self.manifest.IsMirror | 200 | mirror = self.manifest.IsMirror |
176 | out = ForallColoring(self.manifest.manifestProject.config) | ||
177 | out.redirect(sys.stdout) | ||
178 | |||
179 | rc = 0 | 201 | rc = 0 |
180 | first = True | ||
181 | 202 | ||
182 | if not opt.regex: | 203 | if not opt.regex: |
183 | projects = self.GetProjects(args) | 204 | projects = self.GetProjects(args) |
@@ -186,113 +207,156 @@ without iterating through the remaining projects. | |||
186 | 207 | ||
187 | os.environ['REPO_COUNT'] = str(len(projects)) | 208 | os.environ['REPO_COUNT'] = str(len(projects)) |
188 | 209 | ||
189 | for (cnt, project) in enumerate(projects): | 210 | pool = multiprocessing.Pool(opt.jobs) |
190 | env = os.environ.copy() | 211 | try: |
191 | def setenv(name, val): | 212 | config = self.manifest.manifestProject.config |
192 | if val is None: | 213 | results_it = pool.imap( |
193 | val = '' | 214 | DoWorkWrapper, |
194 | env[name] = val.encode() | 215 | [[mirror, opt, cmd, shell, cnt, config, self._SerializeProject(p)] |
195 | 216 | for cnt, p in enumerate(projects)] | |
196 | setenv('REPO_PROJECT', project.name) | 217 | ) |
197 | setenv('REPO_PATH', project.relpath) | 218 | pool.close() |
198 | setenv('REPO_REMOTE', project.remote.name) | 219 | for r in results_it: |
199 | setenv('REPO_LREV', project.GetRevisionId()) | 220 | rc = rc or r |
200 | setenv('REPO_RREV', project.revisionExpr) | 221 | if r != 0 and opt.abort_on_errors: |
201 | setenv('REPO_I', str(cnt + 1)) | 222 | raise Exception('Aborting due to previous error') |
202 | for a in project.annotations: | 223 | except (KeyboardInterrupt, WorkerKeyboardInterrupt): |
203 | setenv("REPO__%s" % (a.name), a.value) | 224 | # Catch KeyboardInterrupt raised inside and outside of workers |
204 | 225 | print('Interrupted - terminating the pool') | |
205 | if mirror: | 226 | pool.terminate() |
206 | setenv('GIT_DIR', project.gitdir) | 227 | rc = rc or errno.EINTR |
207 | cwd = project.gitdir | 228 | except Exception as e: |
208 | else: | 229 | # Catch any other exceptions raised |
209 | cwd = project.worktree | 230 | print('Got an error, terminating the pool: %r' % e, |
210 | 231 | file=sys.stderr) | |
211 | if not os.path.exists(cwd): | 232 | pool.terminate() |
212 | if (opt.project_header and opt.verbose) \ | 233 | rc = rc or getattr(e, 'errno', 1) |
213 | or not opt.project_header: | 234 | finally: |
214 | print('skipping %s/' % project.relpath, file=sys.stderr) | 235 | pool.join() |
215 | continue | ||
216 | |||
217 | if opt.project_header: | ||
218 | stdin = subprocess.PIPE | ||
219 | stdout = subprocess.PIPE | ||
220 | stderr = subprocess.PIPE | ||
221 | else: | ||
222 | stdin = None | ||
223 | stdout = None | ||
224 | stderr = None | ||
225 | |||
226 | p = subprocess.Popen(cmd, | ||
227 | cwd = cwd, | ||
228 | shell = shell, | ||
229 | env = env, | ||
230 | stdin = stdin, | ||
231 | stdout = stdout, | ||
232 | stderr = stderr) | ||
233 | |||
234 | if opt.project_header: | ||
235 | class sfd(object): | ||
236 | def __init__(self, fd, dest): | ||
237 | self.fd = fd | ||
238 | self.dest = dest | ||
239 | def fileno(self): | ||
240 | return self.fd.fileno() | ||
241 | |||
242 | empty = True | ||
243 | errbuf = '' | ||
244 | |||
245 | p.stdin.close() | ||
246 | s_in = [sfd(p.stdout, sys.stdout), | ||
247 | sfd(p.stderr, sys.stderr)] | ||
248 | |||
249 | for s in s_in: | ||
250 | flags = fcntl.fcntl(s.fd, fcntl.F_GETFL) | ||
251 | fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) | ||
252 | |||
253 | while s_in: | ||
254 | in_ready, _out_ready, _err_ready = select.select(s_in, [], []) | ||
255 | for s in in_ready: | ||
256 | buf = s.fd.read(4096) | ||
257 | if not buf: | ||
258 | s.fd.close() | ||
259 | s_in.remove(s) | ||
260 | continue | ||
261 | |||
262 | if not opt.verbose: | ||
263 | if s.fd != p.stdout: | ||
264 | errbuf += buf | ||
265 | continue | ||
266 | |||
267 | if empty: | ||
268 | if first: | ||
269 | first = False | ||
270 | else: | ||
271 | out.nl() | ||
272 | |||
273 | if mirror: | ||
274 | project_header_path = project.name | ||
275 | else: | ||
276 | project_header_path = project.relpath | ||
277 | out.project('project %s/', project_header_path) | ||
278 | out.nl() | ||
279 | out.flush() | ||
280 | if errbuf: | ||
281 | sys.stderr.write(errbuf) | ||
282 | sys.stderr.flush() | ||
283 | errbuf = '' | ||
284 | empty = False | ||
285 | |||
286 | s.dest.write(buf) | ||
287 | s.dest.flush() | ||
288 | |||
289 | r = p.wait() | ||
290 | if r != 0: | ||
291 | if r != rc: | ||
292 | rc = r | ||
293 | if opt.abort_on_errors: | ||
294 | print("error: %s: Aborting due to previous error" % project.relpath, | ||
295 | file=sys.stderr) | ||
296 | sys.exit(r) | ||
297 | if rc != 0: | 236 | if rc != 0: |
298 | sys.exit(rc) | 237 | sys.exit(rc) |
238 | |||
239 | |||
240 | class WorkerKeyboardInterrupt(Exception): | ||
241 | """ Keyboard interrupt exception for worker processes. """ | ||
242 | pass | ||
243 | |||
244 | |||
245 | def DoWorkWrapper(args): | ||
246 | """ A wrapper around the DoWork() method. | ||
247 | |||
248 | Catch the KeyboardInterrupt exceptions here and re-raise them as a different, | ||
249 | ``Exception``-based exception to stop it flooding the console with stacktraces | ||
250 | and making the parent hang indefinitely. | ||
251 | |||
252 | """ | ||
253 | project = args.pop() | ||
254 | try: | ||
255 | return DoWork(project, *args) | ||
256 | except KeyboardInterrupt: | ||
257 | print('%s: Worker interrupted' % project['name']) | ||
258 | raise WorkerKeyboardInterrupt() | ||
259 | |||
260 | |||
261 | def DoWork(project, mirror, opt, cmd, shell, cnt, config): | ||
262 | env = os.environ.copy() | ||
263 | def setenv(name, val): | ||
264 | if val is None: | ||
265 | val = '' | ||
266 | env[name] = val.encode() | ||
267 | |||
268 | setenv('REPO_PROJECT', project['name']) | ||
269 | setenv('REPO_PATH', project['relpath']) | ||
270 | setenv('REPO_REMOTE', project['remote_name']) | ||
271 | setenv('REPO_LREV', project['lrev']) | ||
272 | setenv('REPO_RREV', project['rrev']) | ||
273 | setenv('REPO_I', str(cnt + 1)) | ||
274 | for name in project['annotations']: | ||
275 | setenv("REPO__%s" % (name), project['annotations'][name]) | ||
276 | |||
277 | if mirror: | ||
278 | setenv('GIT_DIR', project['gitdir']) | ||
279 | cwd = project['gitdir'] | ||
280 | else: | ||
281 | cwd = project['worktree'] | ||
282 | |||
283 | if not os.path.exists(cwd): | ||
284 | if (opt.project_header and opt.verbose) \ | ||
285 | or not opt.project_header: | ||
286 | print('skipping %s/' % project['relpath'], file=sys.stderr) | ||
287 | return | ||
288 | |||
289 | if opt.project_header: | ||
290 | stdin = subprocess.PIPE | ||
291 | stdout = subprocess.PIPE | ||
292 | stderr = subprocess.PIPE | ||
293 | else: | ||
294 | stdin = None | ||
295 | stdout = None | ||
296 | stderr = None | ||
297 | |||
298 | p = subprocess.Popen(cmd, | ||
299 | cwd=cwd, | ||
300 | shell=shell, | ||
301 | env=env, | ||
302 | stdin=stdin, | ||
303 | stdout=stdout, | ||
304 | stderr=stderr) | ||
305 | |||
306 | if opt.project_header: | ||
307 | out = ForallColoring(config) | ||
308 | out.redirect(sys.stdout) | ||
309 | class sfd(object): | ||
310 | def __init__(self, fd, dest): | ||
311 | self.fd = fd | ||
312 | self.dest = dest | ||
313 | def fileno(self): | ||
314 | return self.fd.fileno() | ||
315 | |||
316 | empty = True | ||
317 | errbuf = '' | ||
318 | |||
319 | p.stdin.close() | ||
320 | s_in = [sfd(p.stdout, sys.stdout), | ||
321 | sfd(p.stderr, sys.stderr)] | ||
322 | |||
323 | for s in s_in: | ||
324 | flags = fcntl.fcntl(s.fd, fcntl.F_GETFL) | ||
325 | fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) | ||
326 | |||
327 | while s_in: | ||
328 | in_ready, _out_ready, _err_ready = select.select(s_in, [], []) | ||
329 | for s in in_ready: | ||
330 | buf = s.fd.read(4096) | ||
331 | if not buf: | ||
332 | s.fd.close() | ||
333 | s_in.remove(s) | ||
334 | continue | ||
335 | |||
336 | if not opt.verbose: | ||
337 | if s.fd != p.stdout: | ||
338 | errbuf += buf | ||
339 | continue | ||
340 | |||
341 | if empty and out: | ||
342 | if not cnt == 0: | ||
343 | out.nl() | ||
344 | |||
345 | if mirror: | ||
346 | project_header_path = project['name'] | ||
347 | else: | ||
348 | project_header_path = project['relpath'] | ||
349 | out.project('project %s/', project_header_path) | ||
350 | out.nl() | ||
351 | out.flush() | ||
352 | if errbuf: | ||
353 | sys.stderr.write(errbuf) | ||
354 | sys.stderr.flush() | ||
355 | errbuf = '' | ||
356 | empty = False | ||
357 | |||
358 | s.dest.write(buf) | ||
359 | s.dest.flush() | ||
360 | |||
361 | r = p.wait() | ||
362 | return r | ||