diff options
Diffstat (limited to 'subcmds/forall.py')
-rw-r--r-- | subcmds/forall.py | 325 |
1 files changed, 213 insertions, 112 deletions
diff --git a/subcmds/forall.py b/subcmds/forall.py index e2a420a9..b93cd6d0 100644 --- a/subcmds/forall.py +++ b/subcmds/forall.py | |||
@@ -14,10 +14,13 @@ | |||
14 | # limitations under the License. | 14 | # limitations under the License. |
15 | 15 | ||
16 | from __future__ import print_function | 16 | from __future__ import print_function |
17 | import errno | ||
17 | import fcntl | 18 | import fcntl |
19 | import multiprocessing | ||
18 | import re | 20 | import re |
19 | import os | 21 | import os |
20 | import select | 22 | import select |
23 | import signal | ||
21 | import sys | 24 | import sys |
22 | import subprocess | 25 | import subprocess |
23 | 26 | ||
@@ -31,6 +34,7 @@ _CAN_COLOR = [ | |||
31 | 'log', | 34 | 'log', |
32 | ] | 35 | ] |
33 | 36 | ||
37 | |||
34 | class ForallColoring(Coloring): | 38 | class ForallColoring(Coloring): |
35 | def __init__(self, config): | 39 | def __init__(self, config): |
36 | Coloring.__init__(self, config, 'forall') | 40 | Coloring.__init__(self, config, 'forall') |
@@ -87,6 +91,12 @@ revision to a locally executed git command, use REPO_LREV. | |||
87 | REPO_RREV is the name of the revision from the manifest, exactly | 91 | REPO_RREV is the name of the revision from the manifest, exactly |
88 | as written in the manifest. | 92 | as written in the manifest. |
89 | 93 | ||
94 | REPO_COUNT is the total number of projects being iterated. | ||
95 | |||
96 | REPO_I is the current (1-based) iteration count. Can be used in | ||
97 | conjunction with REPO_COUNT to add a simple progress indicator to your | ||
98 | command. | ||
99 | |||
90 | REPO__* are any extra environment variables, specified by the | 100 | REPO__* are any extra environment variables, specified by the |
91 | "annotation" element under any project element. This can be useful | 101 | "annotation" element under any project element. This can be useful |
92 | for differentiating trees based on user-specific criteria, or simply | 102 | for differentiating trees based on user-specific criteria, or simply |
@@ -126,9 +136,35 @@ without iterating through the remaining projects. | |||
126 | g.add_option('-v', '--verbose', | 136 | g.add_option('-v', '--verbose', |
127 | dest='verbose', action='store_true', | 137 | dest='verbose', action='store_true', |
128 | help='Show command error messages') | 138 | help='Show command error messages') |
139 | g.add_option('-j', '--jobs', | ||
140 | dest='jobs', action='store', type='int', default=1, | ||
141 | help='number of commands to execute simultaneously') | ||
129 | 142 | ||
130 | def WantPager(self, opt): | 143 | def WantPager(self, opt): |
131 | return opt.project_header | 144 | return opt.project_header and opt.jobs == 1 |
145 | |||
146 | def _SerializeProject(self, project): | ||
147 | """ Serialize a project._GitGetByExec instance. | ||
148 | |||
149 | project._GitGetByExec is not pickle-able. Instead of trying to pass it | ||
150 | around between processes, make a dict ourselves containing only the | ||
151 | attributes that we need. | ||
152 | |||
153 | """ | ||
154 | if not self.manifest.IsMirror: | ||
155 | lrev = project.GetRevisionId() | ||
156 | else: | ||
157 | lrev = None | ||
158 | return { | ||
159 | 'name': project.name, | ||
160 | 'relpath': project.relpath, | ||
161 | 'remote_name': project.remote.name, | ||
162 | 'lrev': lrev, | ||
163 | 'rrev': project.revisionExpr, | ||
164 | 'annotations': dict((a.name, a.value) for a in project.annotations), | ||
165 | 'gitdir': project.gitdir, | ||
166 | 'worktree': project.worktree, | ||
167 | } | ||
132 | 168 | ||
133 | def Execute(self, opt, args): | 169 | def Execute(self, opt, args): |
134 | if not opt.command: | 170 | if not opt.command: |
@@ -167,123 +203,188 @@ without iterating through the remaining projects. | |||
167 | # pylint: enable=W0631 | 203 | # pylint: enable=W0631 |
168 | 204 | ||
169 | mirror = self.manifest.IsMirror | 205 | mirror = self.manifest.IsMirror |
170 | out = ForallColoring(self.manifest.manifestProject.config) | ||
171 | out.redirect(sys.stdout) | ||
172 | |||
173 | rc = 0 | 206 | rc = 0 |
174 | first = True | 207 | |
208 | smart_sync_manifest_name = "smart_sync_override.xml" | ||
209 | smart_sync_manifest_path = os.path.join( | ||
210 | self.manifest.manifestProject.worktree, smart_sync_manifest_name) | ||
211 | |||
212 | if os.path.isfile(smart_sync_manifest_path): | ||
213 | self.manifest.Override(smart_sync_manifest_path) | ||
175 | 214 | ||
176 | if not opt.regex: | 215 | if not opt.regex: |
177 | projects = self.GetProjects(args) | 216 | projects = self.GetProjects(args) |
178 | else: | 217 | else: |
179 | projects = self.FindProjects(args) | 218 | projects = self.FindProjects(args) |
180 | 219 | ||
181 | for project in projects: | 220 | os.environ['REPO_COUNT'] = str(len(projects)) |
182 | env = os.environ.copy() | 221 | |
183 | def setenv(name, val): | 222 | pool = multiprocessing.Pool(opt.jobs, InitWorker) |
184 | if val is None: | 223 | try: |
185 | val = '' | 224 | config = self.manifest.manifestProject.config |
186 | env[name] = val.encode() | 225 | results_it = pool.imap( |
187 | 226 | DoWorkWrapper, | |
188 | setenv('REPO_PROJECT', project.name) | 227 | self.ProjectArgs(projects, mirror, opt, cmd, shell, config)) |
189 | setenv('REPO_PATH', project.relpath) | 228 | pool.close() |
190 | setenv('REPO_REMOTE', project.remote.name) | 229 | for r in results_it: |
191 | setenv('REPO_LREV', project.GetRevisionId()) | 230 | rc = rc or r |
192 | setenv('REPO_RREV', project.revisionExpr) | 231 | if r != 0 and opt.abort_on_errors: |
193 | for a in project.annotations: | 232 | raise Exception('Aborting due to previous error') |
194 | setenv("REPO__%s" % (a.name), a.value) | 233 | except (KeyboardInterrupt, WorkerKeyboardInterrupt): |
195 | 234 | # Catch KeyboardInterrupt raised inside and outside of workers | |
196 | if mirror: | 235 | print('Interrupted - terminating the pool') |
197 | setenv('GIT_DIR', project.gitdir) | 236 | pool.terminate() |
198 | cwd = project.gitdir | 237 | rc = rc or errno.EINTR |
199 | else: | 238 | except Exception as e: |
200 | cwd = project.worktree | 239 | # Catch any other exceptions raised |
201 | 240 | print('Got an error, terminating the pool: %r' % e, | |
202 | if not os.path.exists(cwd): | 241 | file=sys.stderr) |
203 | if (opt.project_header and opt.verbose) \ | 242 | pool.terminate() |
204 | or not opt.project_header: | 243 | rc = rc or getattr(e, 'errno', 1) |
205 | print('skipping %s/' % project.relpath, file=sys.stderr) | 244 | finally: |
206 | continue | 245 | pool.join() |
207 | |||
208 | if opt.project_header: | ||
209 | stdin = subprocess.PIPE | ||
210 | stdout = subprocess.PIPE | ||
211 | stderr = subprocess.PIPE | ||
212 | else: | ||
213 | stdin = None | ||
214 | stdout = None | ||
215 | stderr = None | ||
216 | |||
217 | p = subprocess.Popen(cmd, | ||
218 | cwd = cwd, | ||
219 | shell = shell, | ||
220 | env = env, | ||
221 | stdin = stdin, | ||
222 | stdout = stdout, | ||
223 | stderr = stderr) | ||
224 | |||
225 | if opt.project_header: | ||
226 | class sfd(object): | ||
227 | def __init__(self, fd, dest): | ||
228 | self.fd = fd | ||
229 | self.dest = dest | ||
230 | def fileno(self): | ||
231 | return self.fd.fileno() | ||
232 | |||
233 | empty = True | ||
234 | errbuf = '' | ||
235 | |||
236 | p.stdin.close() | ||
237 | s_in = [sfd(p.stdout, sys.stdout), | ||
238 | sfd(p.stderr, sys.stderr)] | ||
239 | |||
240 | for s in s_in: | ||
241 | flags = fcntl.fcntl(s.fd, fcntl.F_GETFL) | ||
242 | fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) | ||
243 | |||
244 | while s_in: | ||
245 | in_ready, _out_ready, _err_ready = select.select(s_in, [], []) | ||
246 | for s in in_ready: | ||
247 | buf = s.fd.read(4096) | ||
248 | if not buf: | ||
249 | s.fd.close() | ||
250 | s_in.remove(s) | ||
251 | continue | ||
252 | |||
253 | if not opt.verbose: | ||
254 | if s.fd != p.stdout: | ||
255 | errbuf += buf | ||
256 | continue | ||
257 | |||
258 | if empty: | ||
259 | if first: | ||
260 | first = False | ||
261 | else: | ||
262 | out.nl() | ||
263 | |||
264 | if mirror: | ||
265 | project_header_path = project.name | ||
266 | else: | ||
267 | project_header_path = project.relpath | ||
268 | out.project('project %s/', project_header_path) | ||
269 | out.nl() | ||
270 | out.flush() | ||
271 | if errbuf: | ||
272 | sys.stderr.write(errbuf) | ||
273 | sys.stderr.flush() | ||
274 | errbuf = '' | ||
275 | empty = False | ||
276 | |||
277 | s.dest.write(buf) | ||
278 | s.dest.flush() | ||
279 | |||
280 | r = p.wait() | ||
281 | if r != 0: | ||
282 | if r != rc: | ||
283 | rc = r | ||
284 | if opt.abort_on_errors: | ||
285 | print("error: %s: Aborting due to previous error" % project.relpath, | ||
286 | file=sys.stderr) | ||
287 | sys.exit(r) | ||
288 | if rc != 0: | 246 | if rc != 0: |
289 | sys.exit(rc) | 247 | sys.exit(rc) |
248 | |||
249 | def ProjectArgs(self, projects, mirror, opt, cmd, shell, config): | ||
250 | for cnt, p in enumerate(projects): | ||
251 | try: | ||
252 | project = self._SerializeProject(p) | ||
253 | except Exception as e: | ||
254 | print('Project list error: %r' % e, | ||
255 | file=sys.stderr) | ||
256 | return | ||
257 | except KeyboardInterrupt: | ||
258 | print('Project list interrupted', | ||
259 | file=sys.stderr) | ||
260 | return | ||
261 | yield [mirror, opt, cmd, shell, cnt, config, project] | ||
262 | |||
263 | class WorkerKeyboardInterrupt(Exception): | ||
264 | """ Keyboard interrupt exception for worker processes. """ | ||
265 | pass | ||
266 | |||
267 | |||
268 | def InitWorker(): | ||
269 | signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
270 | |||
271 | def DoWorkWrapper(args): | ||
272 | """ A wrapper around the DoWork() method. | ||
273 | |||
274 | Catch the KeyboardInterrupt exceptions here and re-raise them as a different, | ||
275 | ``Exception``-based exception to stop it flooding the console with stacktraces | ||
276 | and making the parent hang indefinitely. | ||
277 | |||
278 | """ | ||
279 | project = args.pop() | ||
280 | try: | ||
281 | return DoWork(project, *args) | ||
282 | except KeyboardInterrupt: | ||
283 | print('%s: Worker interrupted' % project['name']) | ||
284 | raise WorkerKeyboardInterrupt() | ||
285 | |||
286 | |||
287 | def DoWork(project, mirror, opt, cmd, shell, cnt, config): | ||
288 | env = os.environ.copy() | ||
289 | def setenv(name, val): | ||
290 | if val is None: | ||
291 | val = '' | ||
292 | if hasattr(val, 'encode'): | ||
293 | val = val.encode() | ||
294 | env[name] = val | ||
295 | |||
296 | setenv('REPO_PROJECT', project['name']) | ||
297 | setenv('REPO_PATH', project['relpath']) | ||
298 | setenv('REPO_REMOTE', project['remote_name']) | ||
299 | setenv('REPO_LREV', project['lrev']) | ||
300 | setenv('REPO_RREV', project['rrev']) | ||
301 | setenv('REPO_I', str(cnt + 1)) | ||
302 | for name in project['annotations']: | ||
303 | setenv("REPO__%s" % (name), project['annotations'][name]) | ||
304 | |||
305 | if mirror: | ||
306 | setenv('GIT_DIR', project['gitdir']) | ||
307 | cwd = project['gitdir'] | ||
308 | else: | ||
309 | cwd = project['worktree'] | ||
310 | |||
311 | if not os.path.exists(cwd): | ||
312 | if (opt.project_header and opt.verbose) \ | ||
313 | or not opt.project_header: | ||
314 | print('skipping %s/' % project['relpath'], file=sys.stderr) | ||
315 | return | ||
316 | |||
317 | if opt.project_header: | ||
318 | stdin = subprocess.PIPE | ||
319 | stdout = subprocess.PIPE | ||
320 | stderr = subprocess.PIPE | ||
321 | else: | ||
322 | stdin = None | ||
323 | stdout = None | ||
324 | stderr = None | ||
325 | |||
326 | p = subprocess.Popen(cmd, | ||
327 | cwd=cwd, | ||
328 | shell=shell, | ||
329 | env=env, | ||
330 | stdin=stdin, | ||
331 | stdout=stdout, | ||
332 | stderr=stderr) | ||
333 | |||
334 | if opt.project_header: | ||
335 | out = ForallColoring(config) | ||
336 | out.redirect(sys.stdout) | ||
337 | class sfd(object): | ||
338 | def __init__(self, fd, dest): | ||
339 | self.fd = fd | ||
340 | self.dest = dest | ||
341 | def fileno(self): | ||
342 | return self.fd.fileno() | ||
343 | |||
344 | empty = True | ||
345 | errbuf = '' | ||
346 | |||
347 | p.stdin.close() | ||
348 | s_in = [sfd(p.stdout, sys.stdout), | ||
349 | sfd(p.stderr, sys.stderr)] | ||
350 | |||
351 | for s in s_in: | ||
352 | flags = fcntl.fcntl(s.fd, fcntl.F_GETFL) | ||
353 | fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) | ||
354 | |||
355 | while s_in: | ||
356 | in_ready, _out_ready, _err_ready = select.select(s_in, [], []) | ||
357 | for s in in_ready: | ||
358 | buf = s.fd.read(4096) | ||
359 | if not buf: | ||
360 | s.fd.close() | ||
361 | s_in.remove(s) | ||
362 | continue | ||
363 | |||
364 | if not opt.verbose: | ||
365 | if s.fd != p.stdout: | ||
366 | errbuf += buf | ||
367 | continue | ||
368 | |||
369 | if empty and out: | ||
370 | if not cnt == 0: | ||
371 | out.nl() | ||
372 | |||
373 | if mirror: | ||
374 | project_header_path = project['name'] | ||
375 | else: | ||
376 | project_header_path = project['relpath'] | ||
377 | out.project('project %s/', project_header_path) | ||
378 | out.nl() | ||
379 | out.flush() | ||
380 | if errbuf: | ||
381 | sys.stderr.write(errbuf) | ||
382 | sys.stderr.flush() | ||
383 | errbuf = '' | ||
384 | empty = False | ||
385 | |||
386 | s.dest.write(buf) | ||
387 | s.dest.flush() | ||
388 | |||
389 | r = p.wait() | ||
390 | return r | ||