summaryrefslogtreecommitdiffstats
path: root/platform_utils.py
blob: 1c719b1d0d0996ea56978d80976e76ceb5dcf574 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import platform
import select

from Queue import Queue
from threading import Thread


def isWindows():
  """ Returns True when running with the native port of Python for Windows,
  False when running on any other platform (including the Cygwin port of
  Python).
  """
  # Note: The cygwin port of Python returns "CYGWIN_NT_xxx"
  return platform.system() == "Windows"


class FileDescriptorStreams(object):
  """ Platform agnostic abstraction enabling non-blocking I/O over a
  collection of file descriptors. This abstraction is required because
  fctnl(os.O_NONBLOCK) is not supported on Windows.
  """
  @classmethod
  def create(cls):
    """ Factory method: instantiates the concrete class according to the
    current platform.
    """
    if isWindows():
      return _FileDescriptorStreamsThreads()
    else:
      return _FileDescriptorStreamsNonBlocking()

  def __init__(self):
    self.streams = []

  def add(self, fd, dest, std_name):
    """ Wraps an existing file descriptor as a stream.
    """
    self.streams.append(self._create_stream(fd, dest, std_name))

  def remove(self, stream):
    """ Removes a stream, when done with it.
    """
    self.streams.remove(stream)

  @property
  def is_done(self):
    """ Returns True when all streams have been processed.
    """
    return len(self.streams) == 0

  def select(self):
    """ Returns the set of streams that have data available to read.
    The returned streams each expose a read() and a close() method.
    When done with a stream, call the remove(stream) method.
    """
    raise NotImplementedError

  def _create_stream(fd, dest, std_name):
    """ Creates a new stream wrapping an existing file descriptor.
    """
    raise NotImplementedError


class _FileDescriptorStreamsNonBlocking(FileDescriptorStreams):
  """ Implementation of FileDescriptorStreams for platforms that support
  non blocking I/O.
  """
  class Stream(object):
    """ Encapsulates a file descriptor """
    def __init__(self, fd, dest, std_name):
      self.fd = fd
      self.dest = dest
      self.std_name = std_name
      self.set_non_blocking()

    def set_non_blocking(self):
      import fcntl
      flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
      fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)

    def fileno(self):
      return self.fd.fileno()

    def read(self):
      return self.fd.read(4096)

    def close(self):
      self.fd.close()

  def _create_stream(self, fd, dest, std_name):
    return self.Stream(fd, dest, std_name)

  def select(self):
    ready_streams, _, _ = select.select(self.streams, [], [])
    return ready_streams


class _FileDescriptorStreamsThreads(FileDescriptorStreams):
  """ Implementation of FileDescriptorStreams for platforms that don't support
  non blocking I/O. This implementation requires creating threads issuing
  blocking read operations on file descriptors.
  """
  def __init__(self):
    super(_FileDescriptorStreamsThreads, self).__init__()
    # The queue is shared accross all threads so we can simulate the
    # behavior of the select() function
    self.queue = Queue(10)  # Limit incoming data from streams

  def _create_stream(self, fd, dest, std_name):
    return self.Stream(fd, dest, std_name, self.queue)

  def select(self):
    # Return only one stream at a time, as it is the most straighforward
    # thing to do and it is compatible with the select() function.
    item = self.queue.get()
    stream = item.stream
    stream.data = item.data
    return [stream]

  class QueueItem(object):
    """ Item put in the shared queue """
    def __init__(self, stream, data):
      self.stream = stream
      self.data = data

  class Stream(object):
    """ Encapsulates a file descriptor """
    def __init__(self, fd, dest, std_name, queue):
      self.fd = fd
      self.dest = dest
      self.std_name = std_name
      self.queue = queue
      self.data = None
      self.thread = Thread(target=self.read_to_queue)
      self.thread.daemon = True
      self.thread.start()

    def close(self):
      self.fd.close()

    def read(self):
      data = self.data
      self.data = None
      return data

    def read_to_queue(self):
      """ The thread function: reads everything from the file descriptor into
      the shared queue and terminates when reaching EOF.
      """
      for line in iter(self.fd.readline, b''):
        self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, line))
      self.fd.close()
      self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, None))