diff options
author | Mike Frysinger <vapier@google.com> | 2021-02-16 17:21:22 -0500 |
---|---|---|
committer | Mike Frysinger <vapier@google.com> | 2021-02-24 01:45:57 +0000 |
commit | f0925c482f4bd73aa1c6f8e0816955dedddbc678 (patch) | |
tree | 01952e3af4f6cfdd3d902d2071ca7d3dd665b2fb /platform_utils.py | |
parent | be24a54d9ce7e8819f6cb6857fb5db51b9d62bbc (diff) | |
download | git-repo-f0925c482f4bd73aa1c6f8e0816955dedddbc678.tar.gz |
platform_utils: delete unused FileDescriptorStreams APIs
Now that we've converted the few users of this over to subprocess APIs,
we don't need this anymore. It's been a bit hairy to maintain across
different operating systems, so there's no desire to bring it back.
Using multiprocessing Pool to batch things has been working better in
general anyways.
Change-Id: I10769e96f60ecf27a80d8cc2aa0d1b199085252e
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/297682
Reviewed-by: Michael Mortensen <mmortensen@google.com>
Tested-by: Mike Frysinger <vapier@google.com>
Diffstat (limited to 'platform_utils.py')
-rw-r--r-- | platform_utils.py | 158 |
1 files changed, 0 insertions, 158 deletions
diff --git a/platform_utils.py b/platform_utils.py index a280982a..00c51d9b 100644 --- a/platform_utils.py +++ b/platform_utils.py | |||
@@ -15,11 +15,8 @@ | |||
15 | import errno | 15 | import errno |
16 | import os | 16 | import os |
17 | import platform | 17 | import platform |
18 | from queue import Queue | ||
19 | import select | ||
20 | import shutil | 18 | import shutil |
21 | import stat | 19 | import stat |
22 | from threading import Thread | ||
23 | 20 | ||
24 | 21 | ||
25 | def isWindows(): | 22 | def isWindows(): |
@@ -31,161 +28,6 @@ def isWindows(): | |||
31 | return platform.system() == "Windows" | 28 | return platform.system() == "Windows" |
32 | 29 | ||
33 | 30 | ||
34 | class FileDescriptorStreams(object): | ||
35 | """ Platform agnostic abstraction enabling non-blocking I/O over a | ||
36 | collection of file descriptors. This abstraction is required because | ||
37 | fctnl(os.O_NONBLOCK) is not supported on Windows. | ||
38 | """ | ||
39 | @classmethod | ||
40 | def create(cls): | ||
41 | """ Factory method: instantiates the concrete class according to the | ||
42 | current platform. | ||
43 | """ | ||
44 | if isWindows(): | ||
45 | return _FileDescriptorStreamsThreads() | ||
46 | else: | ||
47 | return _FileDescriptorStreamsNonBlocking() | ||
48 | |||
49 | def __init__(self): | ||
50 | self.streams = [] | ||
51 | |||
52 | def add(self, fd, dest, std_name): | ||
53 | """ Wraps an existing file descriptor as a stream. | ||
54 | """ | ||
55 | self.streams.append(self._create_stream(fd, dest, std_name)) | ||
56 | |||
57 | def remove(self, stream): | ||
58 | """ Removes a stream, when done with it. | ||
59 | """ | ||
60 | self.streams.remove(stream) | ||
61 | |||
62 | @property | ||
63 | def is_done(self): | ||
64 | """ Returns True when all streams have been processed. | ||
65 | """ | ||
66 | return len(self.streams) == 0 | ||
67 | |||
68 | def select(self): | ||
69 | """ Returns the set of streams that have data available to read. | ||
70 | The returned streams each expose a read() and a close() method. | ||
71 | When done with a stream, call the remove(stream) method. | ||
72 | """ | ||
73 | raise NotImplementedError | ||
74 | |||
75 | def _create_stream(self, fd, dest, std_name): | ||
76 | """ Creates a new stream wrapping an existing file descriptor. | ||
77 | """ | ||
78 | raise NotImplementedError | ||
79 | |||
80 | |||
81 | class _FileDescriptorStreamsNonBlocking(FileDescriptorStreams): | ||
82 | """ Implementation of FileDescriptorStreams for platforms that support | ||
83 | non blocking I/O. | ||
84 | """ | ||
85 | def __init__(self): | ||
86 | super(_FileDescriptorStreamsNonBlocking, self).__init__() | ||
87 | self._poll = select.poll() | ||
88 | self._fd_to_stream = {} | ||
89 | |||
90 | class Stream(object): | ||
91 | """ Encapsulates a file descriptor """ | ||
92 | |||
93 | def __init__(self, fd, dest, std_name): | ||
94 | self.fd = fd | ||
95 | self.dest = dest | ||
96 | self.std_name = std_name | ||
97 | self.set_non_blocking() | ||
98 | |||
99 | def set_non_blocking(self): | ||
100 | import fcntl | ||
101 | flags = fcntl.fcntl(self.fd, fcntl.F_GETFL) | ||
102 | fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) | ||
103 | |||
104 | def fileno(self): | ||
105 | return self.fd.fileno() | ||
106 | |||
107 | def read(self): | ||
108 | return self.fd.read(4096) | ||
109 | |||
110 | def close(self): | ||
111 | self.fd.close() | ||
112 | |||
113 | def _create_stream(self, fd, dest, std_name): | ||
114 | stream = self.Stream(fd, dest, std_name) | ||
115 | self._fd_to_stream[stream.fileno()] = stream | ||
116 | self._poll.register(stream, select.POLLIN) | ||
117 | return stream | ||
118 | |||
119 | def remove(self, stream): | ||
120 | self._poll.unregister(stream) | ||
121 | del self._fd_to_stream[stream.fileno()] | ||
122 | super(_FileDescriptorStreamsNonBlocking, self).remove(stream) | ||
123 | |||
124 | def select(self): | ||
125 | return [self._fd_to_stream[fd] for fd, _ in self._poll.poll()] | ||
126 | |||
127 | |||
128 | class _FileDescriptorStreamsThreads(FileDescriptorStreams): | ||
129 | """ Implementation of FileDescriptorStreams for platforms that don't support | ||
130 | non blocking I/O. This implementation requires creating threads issuing | ||
131 | blocking read operations on file descriptors. | ||
132 | """ | ||
133 | |||
134 | def __init__(self): | ||
135 | super(_FileDescriptorStreamsThreads, self).__init__() | ||
136 | # The queue is shared accross all threads so we can simulate the | ||
137 | # behavior of the select() function | ||
138 | self.queue = Queue(10) # Limit incoming data from streams | ||
139 | |||
140 | def _create_stream(self, fd, dest, std_name): | ||
141 | return self.Stream(fd, dest, std_name, self.queue) | ||
142 | |||
143 | def select(self): | ||
144 | # Return only one stream at a time, as it is the most straighforward | ||
145 | # thing to do and it is compatible with the select() function. | ||
146 | item = self.queue.get() | ||
147 | stream = item.stream | ||
148 | stream.data = item.data | ||
149 | return [stream] | ||
150 | |||
151 | class QueueItem(object): | ||
152 | """ Item put in the shared queue """ | ||
153 | |||
154 | def __init__(self, stream, data): | ||
155 | self.stream = stream | ||
156 | self.data = data | ||
157 | |||
158 | class Stream(object): | ||
159 | """ Encapsulates a file descriptor """ | ||
160 | |||
161 | def __init__(self, fd, dest, std_name, queue): | ||
162 | self.fd = fd | ||
163 | self.dest = dest | ||
164 | self.std_name = std_name | ||
165 | self.queue = queue | ||
166 | self.data = None | ||
167 | self.thread = Thread(target=self.read_to_queue) | ||
168 | self.thread.daemon = True | ||
169 | self.thread.start() | ||
170 | |||
171 | def close(self): | ||
172 | self.fd.close() | ||
173 | |||
174 | def read(self): | ||
175 | data = self.data | ||
176 | self.data = None | ||
177 | return data | ||
178 | |||
179 | def read_to_queue(self): | ||
180 | """ The thread function: reads everything from the file descriptor into | ||
181 | the shared queue and terminates when reaching EOF. | ||
182 | """ | ||
183 | for line in iter(self.fd.readline, b''): | ||
184 | self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, line)) | ||
185 | self.fd.close() | ||
186 | self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, b'')) | ||
187 | |||
188 | |||
189 | def symlink(source, link_name): | 31 | def symlink(source, link_name): |
190 | """Creates a symbolic link pointing to source named link_name. | 32 | """Creates a symbolic link pointing to source named link_name. |
191 | Note: On Windows, source must exist on disk, as the implementation needs | 33 | Note: On Windows, source must exist on disk, as the implementation needs |