summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--git_command.py29
-rw-r--r--platform_utils.py169
-rw-r--r--subcmds/forall.py29
3 files changed, 186 insertions, 41 deletions
diff --git a/git_command.py b/git_command.py
index 9f7d2930..dfa6a924 100644
--- a/git_command.py
+++ b/git_command.py
@@ -14,14 +14,14 @@
14# limitations under the License. 14# limitations under the License.
15 15
16from __future__ import print_function 16from __future__ import print_function
17import fcntl
18import os 17import os
19import select
20import sys 18import sys
21import subprocess 19import subprocess
22import tempfile 20import tempfile
23from signal import SIGTERM 21from signal import SIGTERM
22
24from error import GitError 23from error import GitError
24import platform_utils
25from trace import REPO_TRACE, IsTrace, Trace 25from trace import REPO_TRACE, IsTrace, Trace
26from wrapper import Wrapper 26from wrapper import Wrapper
27 27
@@ -78,16 +78,6 @@ def terminate_ssh_clients():
78 78
79_git_version = None 79_git_version = None
80 80
81class _sfd(object):
82 """select file descriptor class"""
83 def __init__(self, fd, dest, std_name):
84 assert std_name in ('stdout', 'stderr')
85 self.fd = fd
86 self.dest = dest
87 self.std_name = std_name
88 def fileno(self):
89 return self.fd.fileno()
90
91class _GitCall(object): 81class _GitCall(object):
92 def version(self): 82 def version(self):
93 p = GitCommand(None, ['--version'], capture_stdout=True) 83 p = GitCommand(None, ['--version'], capture_stdout=True)
@@ -253,19 +243,16 @@ class GitCommand(object):
253 243
254 def _CaptureOutput(self): 244 def _CaptureOutput(self):
255 p = self.process 245 p = self.process
256 s_in = [_sfd(p.stdout, sys.stdout, 'stdout'), 246 s_in = platform_utils.FileDescriptorStreams.create()
257 _sfd(p.stderr, sys.stderr, 'stderr')] 247 s_in.add(p.stdout, sys.stdout, 'stdout')
248 s_in.add(p.stderr, sys.stderr, 'stderr')
258 self.stdout = '' 249 self.stdout = ''
259 self.stderr = '' 250 self.stderr = ''
260 251
261 for s in s_in: 252 while not s_in.is_done:
262 flags = fcntl.fcntl(s.fd, fcntl.F_GETFL) 253 in_ready = s_in.select()
263 fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
264
265 while s_in:
266 in_ready, _, _ = select.select(s_in, [], [])
267 for s in in_ready: 254 for s in in_ready:
268 buf = s.fd.read(4096) 255 buf = s.read()
269 if not buf: 256 if not buf:
270 s_in.remove(s) 257 s_in.remove(s)
271 continue 258 continue
diff --git a/platform_utils.py b/platform_utils.py
new file mode 100644
index 00000000..1c719b1d
--- /dev/null
+++ b/platform_utils.py
@@ -0,0 +1,169 @@
1#
2# Copyright (C) 2016 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import os
17import platform
18import select
19
20from Queue import Queue
21from threading import Thread
22
23
24def isWindows():
25 """ Returns True when running with the native port of Python for Windows,
26 False when running on any other platform (including the Cygwin port of
27 Python).
28 """
29 # Note: The cygwin port of Python returns "CYGWIN_NT_xxx"
30 return platform.system() == "Windows"
31
32
33class FileDescriptorStreams(object):
34 """ Platform agnostic abstraction enabling non-blocking I/O over a
35 collection of file descriptors. This abstraction is required because
36 fctnl(os.O_NONBLOCK) is not supported on Windows.
37 """
38 @classmethod
39 def create(cls):
40 """ Factory method: instantiates the concrete class according to the
41 current platform.
42 """
43 if isWindows():
44 return _FileDescriptorStreamsThreads()
45 else:
46 return _FileDescriptorStreamsNonBlocking()
47
48 def __init__(self):
49 self.streams = []
50
51 def add(self, fd, dest, std_name):
52 """ Wraps an existing file descriptor as a stream.
53 """
54 self.streams.append(self._create_stream(fd, dest, std_name))
55
56 def remove(self, stream):
57 """ Removes a stream, when done with it.
58 """
59 self.streams.remove(stream)
60
61 @property
62 def is_done(self):
63 """ Returns True when all streams have been processed.
64 """
65 return len(self.streams) == 0
66
67 def select(self):
68 """ Returns the set of streams that have data available to read.
69 The returned streams each expose a read() and a close() method.
70 When done with a stream, call the remove(stream) method.
71 """
72 raise NotImplementedError
73
74 def _create_stream(fd, dest, std_name):
75 """ Creates a new stream wrapping an existing file descriptor.
76 """
77 raise NotImplementedError
78
79
80class _FileDescriptorStreamsNonBlocking(FileDescriptorStreams):
81 """ Implementation of FileDescriptorStreams for platforms that support
82 non blocking I/O.
83 """
84 class Stream(object):
85 """ Encapsulates a file descriptor """
86 def __init__(self, fd, dest, std_name):
87 self.fd = fd
88 self.dest = dest
89 self.std_name = std_name
90 self.set_non_blocking()
91
92 def set_non_blocking(self):
93 import fcntl
94 flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
95 fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
96
97 def fileno(self):
98 return self.fd.fileno()
99
100 def read(self):
101 return self.fd.read(4096)
102
103 def close(self):
104 self.fd.close()
105
106 def _create_stream(self, fd, dest, std_name):
107 return self.Stream(fd, dest, std_name)
108
109 def select(self):
110 ready_streams, _, _ = select.select(self.streams, [], [])
111 return ready_streams
112
113
114class _FileDescriptorStreamsThreads(FileDescriptorStreams):
115 """ Implementation of FileDescriptorStreams for platforms that don't support
116 non blocking I/O. This implementation requires creating threads issuing
117 blocking read operations on file descriptors.
118 """
119 def __init__(self):
120 super(_FileDescriptorStreamsThreads, self).__init__()
121 # The queue is shared accross all threads so we can simulate the
122 # behavior of the select() function
123 self.queue = Queue(10) # Limit incoming data from streams
124
125 def _create_stream(self, fd, dest, std_name):
126 return self.Stream(fd, dest, std_name, self.queue)
127
128 def select(self):
129 # Return only one stream at a time, as it is the most straighforward
130 # thing to do and it is compatible with the select() function.
131 item = self.queue.get()
132 stream = item.stream
133 stream.data = item.data
134 return [stream]
135
136 class QueueItem(object):
137 """ Item put in the shared queue """
138 def __init__(self, stream, data):
139 self.stream = stream
140 self.data = data
141
142 class Stream(object):
143 """ Encapsulates a file descriptor """
144 def __init__(self, fd, dest, std_name, queue):
145 self.fd = fd
146 self.dest = dest
147 self.std_name = std_name
148 self.queue = queue
149 self.data = None
150 self.thread = Thread(target=self.read_to_queue)
151 self.thread.daemon = True
152 self.thread.start()
153
154 def close(self):
155 self.fd.close()
156
157 def read(self):
158 data = self.data
159 self.data = None
160 return data
161
162 def read_to_queue(self):
163 """ The thread function: reads everything from the file descriptor into
164 the shared queue and terminates when reaching EOF.
165 """
166 for line in iter(self.fd.readline, b''):
167 self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, line))
168 self.fd.close()
169 self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, None))
diff --git a/subcmds/forall.py b/subcmds/forall.py
index 07ee8d58..2c12c55f 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -15,17 +15,16 @@
15 15
16from __future__ import print_function 16from __future__ import print_function
17import errno 17import errno
18import fcntl
19import multiprocessing 18import multiprocessing
20import re 19import re
21import os 20import os
22import select
23import signal 21import signal
24import sys 22import sys
25import subprocess 23import subprocess
26 24
27from color import Coloring 25from color import Coloring
28from command import Command, MirrorSafeCommand 26from command import Command, MirrorSafeCommand
27import platform_utils
29 28
30_CAN_COLOR = [ 29_CAN_COLOR = [
31 'branch', 30 'branch',
@@ -344,35 +343,25 @@ def DoWork(project, mirror, opt, cmd, shell, cnt, config):
344 if opt.project_header: 343 if opt.project_header:
345 out = ForallColoring(config) 344 out = ForallColoring(config)
346 out.redirect(sys.stdout) 345 out.redirect(sys.stdout)
347 class sfd(object):
348 def __init__(self, fd, dest):
349 self.fd = fd
350 self.dest = dest
351 def fileno(self):
352 return self.fd.fileno()
353
354 empty = True 346 empty = True
355 errbuf = '' 347 errbuf = ''
356 348
357 p.stdin.close() 349 p.stdin.close()
358 s_in = [sfd(p.stdout, sys.stdout), 350 s_in = platform_utils.FileDescriptorStreams.create()
359 sfd(p.stderr, sys.stderr)] 351 s_in.add(p.stdout, sys.stdout, 'stdout')
360 352 s_in.add(p.stderr, sys.stderr, 'stderr')
361 for s in s_in:
362 flags = fcntl.fcntl(s.fd, fcntl.F_GETFL)
363 fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
364 353
365 while s_in: 354 while not s_in.is_done:
366 in_ready, _out_ready, _err_ready = select.select(s_in, [], []) 355 in_ready = s_in.select()
367 for s in in_ready: 356 for s in in_ready:
368 buf = s.fd.read(4096) 357 buf = s.read()
369 if not buf: 358 if not buf:
370 s.fd.close() 359 s.close()
371 s_in.remove(s) 360 s_in.remove(s)
372 continue 361 continue
373 362
374 if not opt.verbose: 363 if not opt.verbose:
375 if s.fd != p.stdout: 364 if s.std_name == 'stderr':
376 errbuf += buf 365 errbuf += buf
377 continue 366 continue
378 367