summaryrefslogtreecommitdiffstats
path: root/subcmds
diff options
context:
space:
mode:
Diffstat (limited to 'subcmds')
-rw-r--r--subcmds/forall.py290
-rw-r--r--subcmds/status.py6
-rw-r--r--subcmds/sync.py47
-rw-r--r--subcmds/upload.py6
4 files changed, 205 insertions, 144 deletions
diff --git a/subcmds/forall.py b/subcmds/forall.py
index 03ebcb21..7771ec16 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -14,7 +14,9 @@
14# limitations under the License. 14# limitations under the License.
15 15
16from __future__ import print_function 16from __future__ import print_function
17import errno
17import fcntl 18import fcntl
19import multiprocessing
18import re 20import re
19import os 21import os
20import select 22import select
@@ -31,6 +33,7 @@ _CAN_COLOR = [
31 'log', 33 'log',
32] 34]
33 35
36
34class ForallColoring(Coloring): 37class ForallColoring(Coloring):
35 def __init__(self, config): 38 def __init__(self, config):
36 Coloring.__init__(self, config, 'forall') 39 Coloring.__init__(self, config, 'forall')
@@ -132,9 +135,31 @@ without iterating through the remaining projects.
132 g.add_option('-v', '--verbose', 135 g.add_option('-v', '--verbose',
133 dest='verbose', action='store_true', 136 dest='verbose', action='store_true',
134 help='Show command error messages') 137 help='Show command error messages')
138 g.add_option('-j', '--jobs',
139 dest='jobs', action='store', type='int', default=1,
140 help='number of commands to execute simultaneously')
135 141
136 def WantPager(self, opt): 142 def WantPager(self, opt):
137 return opt.project_header 143 return opt.project_header and opt.jobs == 1
144
145 def _SerializeProject(self, project):
146 """ Serialize a project._GitGetByExec instance.
147
148 project._GitGetByExec is not pickle-able. Instead of trying to pass it
149 around between processes, make a dict ourselves containing only the
150 attributes that we need.
151
152 """
153 return {
154 'name': project.name,
155 'relpath': project.relpath,
156 'remote_name': project.remote.name,
157 'lrev': project.GetRevisionId(),
158 'rrev': project.revisionExpr,
159 'annotations': dict((a.name, a.value) for a in project.annotations),
160 'gitdir': project.gitdir,
161 'worktree': project.worktree,
162 }
138 163
139 def Execute(self, opt, args): 164 def Execute(self, opt, args):
140 if not opt.command: 165 if not opt.command:
@@ -173,11 +198,7 @@ without iterating through the remaining projects.
173 # pylint: enable=W0631 198 # pylint: enable=W0631
174 199
175 mirror = self.manifest.IsMirror 200 mirror = self.manifest.IsMirror
176 out = ForallColoring(self.manifest.manifestProject.config)
177 out.redirect(sys.stdout)
178
179 rc = 0 201 rc = 0
180 first = True
181 202
182 if not opt.regex: 203 if not opt.regex:
183 projects = self.GetProjects(args) 204 projects = self.GetProjects(args)
@@ -186,113 +207,156 @@ without iterating through the remaining projects.
186 207
187 os.environ['REPO_COUNT'] = str(len(projects)) 208 os.environ['REPO_COUNT'] = str(len(projects))
188 209
189 for (cnt, project) in enumerate(projects): 210 pool = multiprocessing.Pool(opt.jobs)
190 env = os.environ.copy() 211 try:
191 def setenv(name, val): 212 config = self.manifest.manifestProject.config
192 if val is None: 213 results_it = pool.imap(
193 val = '' 214 DoWorkWrapper,
194 env[name] = val.encode() 215 [[mirror, opt, cmd, shell, cnt, config, self._SerializeProject(p)]
195 216 for cnt, p in enumerate(projects)]
196 setenv('REPO_PROJECT', project.name) 217 )
197 setenv('REPO_PATH', project.relpath) 218 pool.close()
198 setenv('REPO_REMOTE', project.remote.name) 219 for r in results_it:
199 setenv('REPO_LREV', project.GetRevisionId()) 220 rc = rc or r
200 setenv('REPO_RREV', project.revisionExpr) 221 if r != 0 and opt.abort_on_errors:
201 setenv('REPO_I', str(cnt + 1)) 222 raise Exception('Aborting due to previous error')
202 for a in project.annotations: 223 except (KeyboardInterrupt, WorkerKeyboardInterrupt):
203 setenv("REPO__%s" % (a.name), a.value) 224 # Catch KeyboardInterrupt raised inside and outside of workers
204 225 print('Interrupted - terminating the pool')
205 if mirror: 226 pool.terminate()
206 setenv('GIT_DIR', project.gitdir) 227 rc = rc or errno.EINTR
207 cwd = project.gitdir 228 except Exception as e:
208 else: 229 # Catch any other exceptions raised
209 cwd = project.worktree 230 print('Got an error, terminating the pool: %r' % e,
210 231 file=sys.stderr)
211 if not os.path.exists(cwd): 232 pool.terminate()
212 if (opt.project_header and opt.verbose) \ 233 rc = rc or getattr(e, 'errno', 1)
213 or not opt.project_header: 234 finally:
214 print('skipping %s/' % project.relpath, file=sys.stderr) 235 pool.join()
215 continue
216
217 if opt.project_header:
218 stdin = subprocess.PIPE
219 stdout = subprocess.PIPE
220 stderr = subprocess.PIPE
221 else:
222 stdin = None
223 stdout = None
224 stderr = None
225
226 p = subprocess.Popen(cmd,
227 cwd = cwd,
228 shell = shell,
229 env = env,
230 stdin = stdin,
231 stdout = stdout,
232 stderr = stderr)
233
234 if opt.project_header:
235 class sfd(object):
236 def __init__(self, fd, dest):
237 self.fd = fd
238 self.dest = dest
239 def fileno(self):
240 return self.fd.fileno()
241
242 empty = True
243 errbuf = ''
244
245 p.stdin.close()
246 s_in = [sfd(p.stdout, sys.stdout),
247 sfd(p.stderr, sys.stderr)]
248
249 for s in s_in:
250 flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
251 fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
252
253 while s_in:
254 in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
255 for s in in_ready:
256 buf = s.fd.read(4096)
257 if not buf:
258 s.fd.close()
259 s_in.remove(s)
260 continue
261
262 if not opt.verbose:
263 if s.fd != p.stdout:
264 errbuf += buf
265 continue
266
267 if empty:
268 if first:
269 first = False
270 else:
271 out.nl()
272
273 if mirror:
274 project_header_path = project.name
275 else:
276 project_header_path = project.relpath
277 out.project('project %s/', project_header_path)
278 out.nl()
279 out.flush()
280 if errbuf:
281 sys.stderr.write(errbuf)
282 sys.stderr.flush()
283 errbuf = ''
284 empty = False
285
286 s.dest.write(buf)
287 s.dest.flush()
288
289 r = p.wait()
290 if r != 0:
291 if r != rc:
292 rc = r
293 if opt.abort_on_errors:
294 print("error: %s: Aborting due to previous error" % project.relpath,
295 file=sys.stderr)
296 sys.exit(r)
297 if rc != 0: 236 if rc != 0:
298 sys.exit(rc) 237 sys.exit(rc)
238
239
240class WorkerKeyboardInterrupt(Exception):
241 """ Keyboard interrupt exception for worker processes. """
242 pass
243
244
245def DoWorkWrapper(args):
246 """ A wrapper around the DoWork() method.
247
248 Catch the KeyboardInterrupt exceptions here and re-raise them as a different,
249 ``Exception``-based exception to stop it flooding the console with stacktraces
250 and making the parent hang indefinitely.
251
252 """
253 project = args.pop()
254 try:
255 return DoWork(project, *args)
256 except KeyboardInterrupt:
257 print('%s: Worker interrupted' % project['name'])
258 raise WorkerKeyboardInterrupt()
259
260
261def DoWork(project, mirror, opt, cmd, shell, cnt, config):
262 env = os.environ.copy()
263 def setenv(name, val):
264 if val is None:
265 val = ''
266 env[name] = val.encode()
267
268 setenv('REPO_PROJECT', project['name'])
269 setenv('REPO_PATH', project['relpath'])
270 setenv('REPO_REMOTE', project['remote_name'])
271 setenv('REPO_LREV', project['lrev'])
272 setenv('REPO_RREV', project['rrev'])
273 setenv('REPO_I', str(cnt + 1))
274 for name in project['annotations']:
275 setenv("REPO__%s" % (name), project['annotations'][name])
276
277 if mirror:
278 setenv('GIT_DIR', project['gitdir'])
279 cwd = project['gitdir']
280 else:
281 cwd = project['worktree']
282
283 if not os.path.exists(cwd):
284 if (opt.project_header and opt.verbose) \
285 or not opt.project_header:
286 print('skipping %s/' % project['relpath'], file=sys.stderr)
287 return
288
289 if opt.project_header:
290 stdin = subprocess.PIPE
291 stdout = subprocess.PIPE
292 stderr = subprocess.PIPE
293 else:
294 stdin = None
295 stdout = None
296 stderr = None
297
298 p = subprocess.Popen(cmd,
299 cwd=cwd,
300 shell=shell,
301 env=env,
302 stdin=stdin,
303 stdout=stdout,
304 stderr=stderr)
305
306 if opt.project_header:
307 out = ForallColoring(config)
308 out.redirect(sys.stdout)
309 class sfd(object):
310 def __init__(self, fd, dest):
311 self.fd = fd
312 self.dest = dest
313 def fileno(self):
314 return self.fd.fileno()
315
316 empty = True
317 errbuf = ''
318
319 p.stdin.close()
320 s_in = [sfd(p.stdout, sys.stdout),
321 sfd(p.stderr, sys.stderr)]
322
323 for s in s_in:
324 flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
325 fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
326
327 while s_in:
328 in_ready, _out_ready, _err_ready = select.select(s_in, [], [])
329 for s in in_ready:
330 buf = s.fd.read(4096)
331 if not buf:
332 s.fd.close()
333 s_in.remove(s)
334 continue
335
336 if not opt.verbose:
337 if s.fd != p.stdout:
338 errbuf += buf
339 continue
340
341 if empty and out:
342 if not cnt == 0:
343 out.nl()
344
345 if mirror:
346 project_header_path = project['name']
347 else:
348 project_header_path = project['relpath']
349 out.project('project %s/', project_header_path)
350 out.nl()
351 out.flush()
352 if errbuf:
353 sys.stderr.write(errbuf)
354 sys.stderr.flush()
355 errbuf = ''
356 empty = False
357
358 s.dest.write(buf)
359 s.dest.flush()
360
361 r = p.wait()
362 return r
diff --git a/subcmds/status.py b/subcmds/status.py
index 41c4429a..b42675e0 100644
--- a/subcmds/status.py
+++ b/subcmds/status.py
@@ -113,7 +113,7 @@ the following meanings:
113 try: 113 try:
114 state = project.PrintWorkTreeStatus(output) 114 state = project.PrintWorkTreeStatus(output)
115 if state == 'CLEAN': 115 if state == 'CLEAN':
116 clean_counter.next() 116 next(clean_counter)
117 finally: 117 finally:
118 sem.release() 118 sem.release()
119 119
@@ -141,7 +141,7 @@ the following meanings:
141 for project in all_projects: 141 for project in all_projects:
142 state = project.PrintWorkTreeStatus() 142 state = project.PrintWorkTreeStatus()
143 if state == 'CLEAN': 143 if state == 'CLEAN':
144 counter.next() 144 next(counter)
145 else: 145 else:
146 sem = _threading.Semaphore(opt.jobs) 146 sem = _threading.Semaphore(opt.jobs)
147 threads_and_output = [] 147 threads_and_output = []
@@ -164,7 +164,7 @@ the following meanings:
164 t.join() 164 t.join()
165 output.dump(sys.stdout) 165 output.dump(sys.stdout)
166 output.close() 166 output.close()
167 if len(all_projects) == counter.next(): 167 if len(all_projects) == next(counter):
168 print('nothing to commit (working directory clean)') 168 print('nothing to commit (working directory clean)')
169 169
170 if opt.orphans: 170 if opt.orphans:
diff --git a/subcmds/sync.py b/subcmds/sync.py
index a0a68960..6f77310f 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -14,10 +14,10 @@
14# limitations under the License. 14# limitations under the License.
15 15
16from __future__ import print_function 16from __future__ import print_function
17import json
17import netrc 18import netrc
18from optparse import SUPPRESS_HELP 19from optparse import SUPPRESS_HELP
19import os 20import os
20import pickle
21import re 21import re
22import shutil 22import shutil
23import socket 23import socket
@@ -760,7 +760,7 @@ class _FetchTimes(object):
760 _ALPHA = 0.5 760 _ALPHA = 0.5
761 761
762 def __init__(self, manifest): 762 def __init__(self, manifest):
763 self._path = os.path.join(manifest.repodir, '.repopickle_fetchtimes') 763 self._path = os.path.join(manifest.repodir, '.repo_fetchtimes.json')
764 self._times = None 764 self._times = None
765 self._seen = set() 765 self._seen = set()
766 766
@@ -779,22 +779,17 @@ class _FetchTimes(object):
779 def _Load(self): 779 def _Load(self):
780 if self._times is None: 780 if self._times is None:
781 try: 781 try:
782 f = open(self._path, 'rb') 782 f = open(self._path)
783 except IOError:
784 self._times = {}
785 return self._times
786 try:
787 try: 783 try:
788 self._times = pickle.load(f) 784 self._times = json.load(f)
789 except IOError: 785 finally:
790 try: 786 f.close()
791 os.remove(self._path) 787 except (IOError, ValueError):
792 except OSError: 788 try:
793 pass 789 os.remove(self._path)
794 self._times = {} 790 except OSError:
795 finally: 791 pass
796 f.close() 792 self._times = {}
797 return self._times
798 793
799 def Save(self): 794 def Save(self):
800 if self._times is None: 795 if self._times is None:
@@ -808,13 +803,13 @@ class _FetchTimes(object):
808 del self._times[name] 803 del self._times[name]
809 804
810 try: 805 try:
811 f = open(self._path, 'wb') 806 f = open(self._path, 'w')
812 try: 807 try:
813 pickle.dump(self._times, f) 808 json.dump(self._times, f, indent=2)
814 except (IOError, OSError, pickle.PickleError): 809 finally:
815 try: 810 f.close()
816 os.remove(self._path) 811 except (IOError, TypeError):
817 except OSError: 812 try:
818 pass 813 os.remove(self._path)
819 finally: 814 except OSError:
820 f.close() 815 pass
diff --git a/subcmds/upload.py b/subcmds/upload.py
index e2fa261e..0ee36df1 100644
--- a/subcmds/upload.py
+++ b/subcmds/upload.py
@@ -25,10 +25,12 @@ from git_command import GitCommand
25from project import RepoHook 25from project import RepoHook
26 26
27from pyversion import is_python3 27from pyversion import is_python3
28# pylint:disable=W0622
28if not is_python3(): 29if not is_python3():
29 # pylint:disable=W0622
30 input = raw_input 30 input = raw_input
31 # pylint:enable=W0622 31else:
32 unicode = str
33# pylint:enable=W0622
32 34
33UNUSUAL_COMMIT_THRESHOLD = 5 35UNUSUAL_COMMIT_THRESHOLD = 5
34 36