From 2e7029116204cf2d6f516e4514091f0b492bc689 Mon Sep 17 00:00:00 2001 From: Renaud Paquay Date: Tue, 1 Nov 2016 11:23:38 -0700 Subject: Make "git command" and "forall" work on Windows Python on Windows does not support non blocking file operations. To workaround this issue, we instead use Threads and a Queue to simulate non-blocking calls. This is happens only when running with the native Windows version of Python, meaning Linux and Cygwin are not affected by this change. Change-Id: I4ce23827b096c5138f67a85c721f58a12279bb6f --- git_command.py | 29 +++------- platform_utils.py | 169 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ subcmds/forall.py | 29 +++------- 3 files changed, 186 insertions(+), 41 deletions(-) create mode 100644 platform_utils.py diff --git a/git_command.py b/git_command.py index 9f7d2930..dfa6a924 100644 --- a/git_command.py +++ b/git_command.py @@ -14,14 +14,14 @@ # limitations under the License. from __future__ import print_function -import fcntl import os -import select import sys import subprocess import tempfile from signal import SIGTERM + from error import GitError +import platform_utils from trace import REPO_TRACE, IsTrace, Trace from wrapper import Wrapper @@ -78,16 +78,6 @@ def terminate_ssh_clients(): _git_version = None -class _sfd(object): - """select file descriptor class""" - def __init__(self, fd, dest, std_name): - assert std_name in ('stdout', 'stderr') - self.fd = fd - self.dest = dest - self.std_name = std_name - def fileno(self): - return self.fd.fileno() - class _GitCall(object): def version(self): p = GitCommand(None, ['--version'], capture_stdout=True) @@ -253,19 +243,16 @@ class GitCommand(object): def _CaptureOutput(self): p = self.process - s_in = [_sfd(p.stdout, sys.stdout, 'stdout'), - _sfd(p.stderr, sys.stderr, 'stderr')] + s_in = platform_utils.FileDescriptorStreams.create() + s_in.add(p.stdout, sys.stdout, 'stdout') + s_in.add(p.stderr, sys.stderr, 'stderr') self.stdout = '' self.stderr = '' - for s in s_in: - flags = fcntl.fcntl(s.fd, fcntl.F_GETFL) - fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - - while s_in: - in_ready, _, _ = select.select(s_in, [], []) + while not s_in.is_done: + in_ready = s_in.select() for s in in_ready: - buf = s.fd.read(4096) + buf = s.read() if not buf: s_in.remove(s) continue diff --git a/platform_utils.py b/platform_utils.py new file mode 100644 index 00000000..1c719b1d --- /dev/null +++ b/platform_utils.py @@ -0,0 +1,169 @@ +# +# Copyright (C) 2016 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import platform +import select + +from Queue import Queue +from threading import Thread + + +def isWindows(): + """ Returns True when running with the native port of Python for Windows, + False when running on any other platform (including the Cygwin port of + Python). + """ + # Note: The cygwin port of Python returns "CYGWIN_NT_xxx" + return platform.system() == "Windows" + + +class FileDescriptorStreams(object): + """ Platform agnostic abstraction enabling non-blocking I/O over a + collection of file descriptors. This abstraction is required because + fctnl(os.O_NONBLOCK) is not supported on Windows. + """ + @classmethod + def create(cls): + """ Factory method: instantiates the concrete class according to the + current platform. + """ + if isWindows(): + return _FileDescriptorStreamsThreads() + else: + return _FileDescriptorStreamsNonBlocking() + + def __init__(self): + self.streams = [] + + def add(self, fd, dest, std_name): + """ Wraps an existing file descriptor as a stream. + """ + self.streams.append(self._create_stream(fd, dest, std_name)) + + def remove(self, stream): + """ Removes a stream, when done with it. + """ + self.streams.remove(stream) + + @property + def is_done(self): + """ Returns True when all streams have been processed. + """ + return len(self.streams) == 0 + + def select(self): + """ Returns the set of streams that have data available to read. + The returned streams each expose a read() and a close() method. + When done with a stream, call the remove(stream) method. + """ + raise NotImplementedError + + def _create_stream(fd, dest, std_name): + """ Creates a new stream wrapping an existing file descriptor. + """ + raise NotImplementedError + + +class _FileDescriptorStreamsNonBlocking(FileDescriptorStreams): + """ Implementation of FileDescriptorStreams for platforms that support + non blocking I/O. + """ + class Stream(object): + """ Encapsulates a file descriptor """ + def __init__(self, fd, dest, std_name): + self.fd = fd + self.dest = dest + self.std_name = std_name + self.set_non_blocking() + + def set_non_blocking(self): + import fcntl + flags = fcntl.fcntl(self.fd, fcntl.F_GETFL) + fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + def fileno(self): + return self.fd.fileno() + + def read(self): + return self.fd.read(4096) + + def close(self): + self.fd.close() + + def _create_stream(self, fd, dest, std_name): + return self.Stream(fd, dest, std_name) + + def select(self): + ready_streams, _, _ = select.select(self.streams, [], []) + return ready_streams + + +class _FileDescriptorStreamsThreads(FileDescriptorStreams): + """ Implementation of FileDescriptorStreams for platforms that don't support + non blocking I/O. This implementation requires creating threads issuing + blocking read operations on file descriptors. + """ + def __init__(self): + super(_FileDescriptorStreamsThreads, self).__init__() + # The queue is shared accross all threads so we can simulate the + # behavior of the select() function + self.queue = Queue(10) # Limit incoming data from streams + + def _create_stream(self, fd, dest, std_name): + return self.Stream(fd, dest, std_name, self.queue) + + def select(self): + # Return only one stream at a time, as it is the most straighforward + # thing to do and it is compatible with the select() function. + item = self.queue.get() + stream = item.stream + stream.data = item.data + return [stream] + + class QueueItem(object): + """ Item put in the shared queue """ + def __init__(self, stream, data): + self.stream = stream + self.data = data + + class Stream(object): + """ Encapsulates a file descriptor """ + def __init__(self, fd, dest, std_name, queue): + self.fd = fd + self.dest = dest + self.std_name = std_name + self.queue = queue + self.data = None + self.thread = Thread(target=self.read_to_queue) + self.thread.daemon = True + self.thread.start() + + def close(self): + self.fd.close() + + def read(self): + data = self.data + self.data = None + return data + + def read_to_queue(self): + """ The thread function: reads everything from the file descriptor into + the shared queue and terminates when reaching EOF. + """ + for line in iter(self.fd.readline, b''): + self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, line)) + self.fd.close() + self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, None)) diff --git a/subcmds/forall.py b/subcmds/forall.py index 07ee8d58..2c12c55f 100644 --- a/subcmds/forall.py +++ b/subcmds/forall.py @@ -15,17 +15,16 @@ from __future__ import print_function import errno -import fcntl import multiprocessing import re import os -import select import signal import sys import subprocess from color import Coloring from command import Command, MirrorSafeCommand +import platform_utils _CAN_COLOR = [ 'branch', @@ -344,35 +343,25 @@ def DoWork(project, mirror, opt, cmd, shell, cnt, config): if opt.project_header: out = ForallColoring(config) out.redirect(sys.stdout) - class sfd(object): - def __init__(self, fd, dest): - self.fd = fd - self.dest = dest - def fileno(self): - return self.fd.fileno() - empty = True errbuf = '' p.stdin.close() - s_in = [sfd(p.stdout, sys.stdout), - sfd(p.stderr, sys.stderr)] - - for s in s_in: - flags = fcntl.fcntl(s.fd, fcntl.F_GETFL) - fcntl.fcntl(s.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + s_in = platform_utils.FileDescriptorStreams.create() + s_in.add(p.stdout, sys.stdout, 'stdout') + s_in.add(p.stderr, sys.stderr, 'stderr') - while s_in: - in_ready, _out_ready, _err_ready = select.select(s_in, [], []) + while not s_in.is_done: + in_ready = s_in.select() for s in in_ready: - buf = s.fd.read(4096) + buf = s.read() if not buf: - s.fd.close() + s.close() s_in.remove(s) continue if not opt.verbose: - if s.fd != p.stdout: + if s.std_name == 'stderr': errbuf += buf continue -- cgit v1.2.3-54-g00ecf