summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMike Frysinger <vapier@google.com>2021-02-16 17:21:22 -0500
committerMike Frysinger <vapier@google.com>2021-02-24 01:45:57 +0000
commitf0925c482f4bd73aa1c6f8e0816955dedddbc678 (patch)
tree01952e3af4f6cfdd3d902d2071ca7d3dd665b2fb
parentbe24a54d9ce7e8819f6cb6857fb5db51b9d62bbc (diff)
downloadgit-repo-f0925c482f4bd73aa1c6f8e0816955dedddbc678.tar.gz
platform_utils: delete unused FileDescriptorStreams APIs
Now that we've converted the few users of this over to subprocess APIs, we don't need this anymore. It's been a bit hairy to maintain across different operating systems, so there's no desire to bring it back. Using multiprocessing Pool to batch things has been working better in general anyways. Change-Id: I10769e96f60ecf27a80d8cc2aa0d1b199085252e Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/297682 Reviewed-by: Michael Mortensen <mmortensen@google.com> Tested-by: Mike Frysinger <vapier@google.com>
-rw-r--r--platform_utils.py158
1 files changed, 0 insertions, 158 deletions
diff --git a/platform_utils.py b/platform_utils.py
index a280982a..00c51d9b 100644
--- a/platform_utils.py
+++ b/platform_utils.py
@@ -15,11 +15,8 @@
15import errno 15import errno
16import os 16import os
17import platform 17import platform
18from queue import Queue
19import select
20import shutil 18import shutil
21import stat 19import stat
22from threading import Thread
23 20
24 21
25def isWindows(): 22def isWindows():
@@ -31,161 +28,6 @@ def isWindows():
31 return platform.system() == "Windows" 28 return platform.system() == "Windows"
32 29
33 30
34class FileDescriptorStreams(object):
35 """ Platform agnostic abstraction enabling non-blocking I/O over a
36 collection of file descriptors. This abstraction is required because
37 fctnl(os.O_NONBLOCK) is not supported on Windows.
38 """
39 @classmethod
40 def create(cls):
41 """ Factory method: instantiates the concrete class according to the
42 current platform.
43 """
44 if isWindows():
45 return _FileDescriptorStreamsThreads()
46 else:
47 return _FileDescriptorStreamsNonBlocking()
48
49 def __init__(self):
50 self.streams = []
51
52 def add(self, fd, dest, std_name):
53 """ Wraps an existing file descriptor as a stream.
54 """
55 self.streams.append(self._create_stream(fd, dest, std_name))
56
57 def remove(self, stream):
58 """ Removes a stream, when done with it.
59 """
60 self.streams.remove(stream)
61
62 @property
63 def is_done(self):
64 """ Returns True when all streams have been processed.
65 """
66 return len(self.streams) == 0
67
68 def select(self):
69 """ Returns the set of streams that have data available to read.
70 The returned streams each expose a read() and a close() method.
71 When done with a stream, call the remove(stream) method.
72 """
73 raise NotImplementedError
74
75 def _create_stream(self, fd, dest, std_name):
76 """ Creates a new stream wrapping an existing file descriptor.
77 """
78 raise NotImplementedError
79
80
81class _FileDescriptorStreamsNonBlocking(FileDescriptorStreams):
82 """ Implementation of FileDescriptorStreams for platforms that support
83 non blocking I/O.
84 """
85 def __init__(self):
86 super(_FileDescriptorStreamsNonBlocking, self).__init__()
87 self._poll = select.poll()
88 self._fd_to_stream = {}
89
90 class Stream(object):
91 """ Encapsulates a file descriptor """
92
93 def __init__(self, fd, dest, std_name):
94 self.fd = fd
95 self.dest = dest
96 self.std_name = std_name
97 self.set_non_blocking()
98
99 def set_non_blocking(self):
100 import fcntl
101 flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
102 fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
103
104 def fileno(self):
105 return self.fd.fileno()
106
107 def read(self):
108 return self.fd.read(4096)
109
110 def close(self):
111 self.fd.close()
112
113 def _create_stream(self, fd, dest, std_name):
114 stream = self.Stream(fd, dest, std_name)
115 self._fd_to_stream[stream.fileno()] = stream
116 self._poll.register(stream, select.POLLIN)
117 return stream
118
119 def remove(self, stream):
120 self._poll.unregister(stream)
121 del self._fd_to_stream[stream.fileno()]
122 super(_FileDescriptorStreamsNonBlocking, self).remove(stream)
123
124 def select(self):
125 return [self._fd_to_stream[fd] for fd, _ in self._poll.poll()]
126
127
128class _FileDescriptorStreamsThreads(FileDescriptorStreams):
129 """ Implementation of FileDescriptorStreams for platforms that don't support
130 non blocking I/O. This implementation requires creating threads issuing
131 blocking read operations on file descriptors.
132 """
133
134 def __init__(self):
135 super(_FileDescriptorStreamsThreads, self).__init__()
136 # The queue is shared accross all threads so we can simulate the
137 # behavior of the select() function
138 self.queue = Queue(10) # Limit incoming data from streams
139
140 def _create_stream(self, fd, dest, std_name):
141 return self.Stream(fd, dest, std_name, self.queue)
142
143 def select(self):
144 # Return only one stream at a time, as it is the most straighforward
145 # thing to do and it is compatible with the select() function.
146 item = self.queue.get()
147 stream = item.stream
148 stream.data = item.data
149 return [stream]
150
151 class QueueItem(object):
152 """ Item put in the shared queue """
153
154 def __init__(self, stream, data):
155 self.stream = stream
156 self.data = data
157
158 class Stream(object):
159 """ Encapsulates a file descriptor """
160
161 def __init__(self, fd, dest, std_name, queue):
162 self.fd = fd
163 self.dest = dest
164 self.std_name = std_name
165 self.queue = queue
166 self.data = None
167 self.thread = Thread(target=self.read_to_queue)
168 self.thread.daemon = True
169 self.thread.start()
170
171 def close(self):
172 self.fd.close()
173
174 def read(self):
175 data = self.data
176 self.data = None
177 return data
178
179 def read_to_queue(self):
180 """ The thread function: reads everything from the file descriptor into
181 the shared queue and terminates when reaching EOF.
182 """
183 for line in iter(self.fd.readline, b''):
184 self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, line))
185 self.fd.close()
186 self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, b''))
187
188
189def symlink(source, link_name): 31def symlink(source, link_name):
190 """Creates a symbolic link pointing to source named link_name. 32 """Creates a symbolic link pointing to source named link_name.
191 Note: On Windows, source must exist on disk, as the implementation needs 33 Note: On Windows, source must exist on disk, as the implementation needs