diff options
Diffstat (limited to 'ssh.py')
-rw-r--r-- | ssh.py | 282 |
1 files changed, 151 insertions, 131 deletions
@@ -15,25 +15,20 @@ | |||
15 | """Common SSH management logic.""" | 15 | """Common SSH management logic.""" |
16 | 16 | ||
17 | import functools | 17 | import functools |
18 | import multiprocessing | ||
18 | import os | 19 | import os |
19 | import re | 20 | import re |
20 | import signal | 21 | import signal |
21 | import subprocess | 22 | import subprocess |
22 | import sys | 23 | import sys |
23 | import tempfile | 24 | import tempfile |
24 | try: | ||
25 | import threading as _threading | ||
26 | except ImportError: | ||
27 | import dummy_threading as _threading | ||
28 | import time | 25 | import time |
29 | 26 | ||
30 | import platform_utils | 27 | import platform_utils |
31 | from repo_trace import Trace | 28 | from repo_trace import Trace |
32 | 29 | ||
33 | 30 | ||
34 | _ssh_proxy_path = None | 31 | PROXY_PATH = os.path.join(os.path.dirname(__file__), 'git_ssh') |
35 | _ssh_sock_path = None | ||
36 | _ssh_clients = [] | ||
37 | 32 | ||
38 | 33 | ||
39 | def _run_ssh_version(): | 34 | def _run_ssh_version(): |
@@ -62,68 +57,104 @@ def version(): | |||
62 | sys.exit(1) | 57 | sys.exit(1) |
63 | 58 | ||
64 | 59 | ||
65 | def proxy(): | 60 | URI_SCP = re.compile(r'^([^@:]*@?[^:/]{1,}):') |
66 | global _ssh_proxy_path | 61 | URI_ALL = re.compile(r'^([a-z][a-z+-]*)://([^@/]*@?[^/]*)/') |
67 | if _ssh_proxy_path is None: | ||
68 | _ssh_proxy_path = os.path.join( | ||
69 | os.path.dirname(__file__), | ||
70 | 'git_ssh') | ||
71 | return _ssh_proxy_path | ||
72 | 62 | ||
73 | 63 | ||
74 | def add_client(p): | 64 | class ProxyManager: |
75 | _ssh_clients.append(p) | 65 | """Manage various ssh clients & masters that we spawn. |
76 | 66 | ||
67 | This will take care of sharing state between multiprocessing children, and | ||
68 | make sure that if we crash, we don't leak any of the ssh sessions. | ||
77 | 69 | ||
78 | def remove_client(p): | 70 | The code should work with a single-process scenario too, and not add too much |
79 | try: | 71 | overhead due to the manager. |
80 | _ssh_clients.remove(p) | 72 | """ |
81 | except ValueError: | ||
82 | pass | ||
83 | |||
84 | 73 | ||
85 | def _terminate_clients(): | 74 | # Path to the ssh program to run which will pass our master settings along. |
86 | global _ssh_clients | 75 | # Set here more as a convenience API. |
87 | for p in _ssh_clients: | 76 | proxy = PROXY_PATH |
77 | |||
78 | def __init__(self, manager): | ||
79 | # Protect access to the list of active masters. | ||
80 | self._lock = multiprocessing.Lock() | ||
81 | # List of active masters (pid). These will be spawned on demand, and we are | ||
82 | # responsible for shutting them all down at the end. | ||
83 | self._masters = manager.list() | ||
84 | # Set of active masters indexed by "host:port" information. | ||
85 | # The value isn't used, but multiprocessing doesn't provide a set class. | ||
86 | self._master_keys = manager.dict() | ||
87 | # Whether ssh masters are known to be broken, so we give up entirely. | ||
88 | self._master_broken = manager.Value('b', False) | ||
89 | # List of active ssh sesssions. Clients will be added & removed as | ||
90 | # connections finish, so this list is just for safety & cleanup if we crash. | ||
91 | self._clients = manager.list() | ||
92 | # Path to directory for holding master sockets. | ||
93 | self._sock_path = None | ||
94 | |||
95 | def __enter__(self): | ||
96 | """Enter a new context.""" | ||
97 | return self | ||
98 | |||
99 | def __exit__(self, exc_type, exc_value, traceback): | ||
100 | """Exit a context & clean up all resources.""" | ||
101 | self.close() | ||
102 | |||
103 | def add_client(self, proc): | ||
104 | """Track a new ssh session.""" | ||
105 | self._clients.append(proc.pid) | ||
106 | |||
107 | def remove_client(self, proc): | ||
108 | """Remove a completed ssh session.""" | ||
88 | try: | 109 | try: |
89 | os.kill(p.pid, signal.SIGTERM) | 110 | self._clients.remove(proc.pid) |
90 | p.wait() | 111 | except ValueError: |
91 | except OSError: | ||
92 | pass | 112 | pass |
93 | _ssh_clients = [] | ||
94 | |||
95 | |||
96 | _master_processes = [] | ||
97 | _master_keys = set() | ||
98 | _ssh_master = True | ||
99 | _master_keys_lock = None | ||
100 | |||
101 | |||
102 | def init(): | ||
103 | """Should be called once at the start of repo to init ssh master handling. | ||
104 | |||
105 | At the moment, all we do is to create our lock. | ||
106 | """ | ||
107 | global _master_keys_lock | ||
108 | assert _master_keys_lock is None, "Should only call init once" | ||
109 | _master_keys_lock = _threading.Lock() | ||
110 | |||
111 | |||
112 | def _open_ssh(host, port=None): | ||
113 | global _ssh_master | ||
114 | |||
115 | # Bail before grabbing the lock if we already know that we aren't going to | ||
116 | # try creating new masters below. | ||
117 | if sys.platform in ('win32', 'cygwin'): | ||
118 | return False | ||
119 | |||
120 | # Acquire the lock. This is needed to prevent opening multiple masters for | ||
121 | # the same host when we're running "repo sync -jN" (for N > 1) _and_ the | ||
122 | # manifest <remote fetch="ssh://xyz"> specifies a different host from the | ||
123 | # one that was passed to repo init. | ||
124 | _master_keys_lock.acquire() | ||
125 | try: | ||
126 | 113 | ||
114 | def add_master(self, proc): | ||
115 | """Track a new master connection.""" | ||
116 | self._masters.append(proc.pid) | ||
117 | |||
118 | def _terminate(self, procs): | ||
119 | """Kill all |procs|.""" | ||
120 | for pid in procs: | ||
121 | try: | ||
122 | os.kill(pid, signal.SIGTERM) | ||
123 | os.waitpid(pid, 0) | ||
124 | except OSError: | ||
125 | pass | ||
126 | |||
127 | # The multiprocessing.list() API doesn't provide many standard list() | ||
128 | # methods, so we have to manually clear the list. | ||
129 | while True: | ||
130 | try: | ||
131 | procs.pop(0) | ||
132 | except: | ||
133 | break | ||
134 | |||
135 | def close(self): | ||
136 | """Close this active ssh session. | ||
137 | |||
138 | Kill all ssh clients & masters we created, and nuke the socket dir. | ||
139 | """ | ||
140 | self._terminate(self._clients) | ||
141 | self._terminate(self._masters) | ||
142 | |||
143 | d = self.sock(create=False) | ||
144 | if d: | ||
145 | try: | ||
146 | platform_utils.rmdir(os.path.dirname(d)) | ||
147 | except OSError: | ||
148 | pass | ||
149 | |||
150 | def _open_unlocked(self, host, port=None): | ||
151 | """Make sure a ssh master session exists for |host| & |port|. | ||
152 | |||
153 | If one doesn't exist already, we'll create it. | ||
154 | |||
155 | We won't grab any locks, so the caller has to do that. This helps keep the | ||
156 | business logic of actually creating the master separate from grabbing locks. | ||
157 | """ | ||
127 | # Check to see whether we already think that the master is running; if we | 158 | # Check to see whether we already think that the master is running; if we |
128 | # think it's already running, return right away. | 159 | # think it's already running, return right away. |
129 | if port is not None: | 160 | if port is not None: |
@@ -131,17 +162,15 @@ def _open_ssh(host, port=None): | |||
131 | else: | 162 | else: |
132 | key = host | 163 | key = host |
133 | 164 | ||
134 | if key in _master_keys: | 165 | if key in self._master_keys: |
135 | return True | 166 | return True |
136 | 167 | ||
137 | if not _ssh_master or 'GIT_SSH' in os.environ: | 168 | if self._master_broken.value or 'GIT_SSH' in os.environ: |
138 | # Failed earlier, so don't retry. | 169 | # Failed earlier, so don't retry. |
139 | return False | 170 | return False |
140 | 171 | ||
141 | # We will make two calls to ssh; this is the common part of both calls. | 172 | # We will make two calls to ssh; this is the common part of both calls. |
142 | command_base = ['ssh', | 173 | command_base = ['ssh', '-o', 'ControlPath %s' % self.sock(), host] |
143 | '-o', 'ControlPath %s' % sock(), | ||
144 | host] | ||
145 | if port is not None: | 174 | if port is not None: |
146 | command_base[1:1] = ['-p', str(port)] | 175 | command_base[1:1] = ['-p', str(port)] |
147 | 176 | ||
@@ -161,7 +190,7 @@ def _open_ssh(host, port=None): | |||
161 | if not isnt_running: | 190 | if not isnt_running: |
162 | # Our double-check found that the master _was_ infact running. Add to | 191 | # Our double-check found that the master _was_ infact running. Add to |
163 | # the list of keys. | 192 | # the list of keys. |
164 | _master_keys.add(key) | 193 | self._master_keys[key] = True |
165 | return True | 194 | return True |
166 | except Exception: | 195 | except Exception: |
167 | # Ignore excpetions. We we will fall back to the normal command and print | 196 | # Ignore excpetions. We we will fall back to the normal command and print |
@@ -173,7 +202,7 @@ def _open_ssh(host, port=None): | |||
173 | Trace(': %s', ' '.join(command)) | 202 | Trace(': %s', ' '.join(command)) |
174 | p = subprocess.Popen(command) | 203 | p = subprocess.Popen(command) |
175 | except Exception as e: | 204 | except Exception as e: |
176 | _ssh_master = False | 205 | self._master_broken.value = True |
177 | print('\nwarn: cannot enable ssh control master for %s:%s\n%s' | 206 | print('\nwarn: cannot enable ssh control master for %s:%s\n%s' |
178 | % (host, port, str(e)), file=sys.stderr) | 207 | % (host, port, str(e)), file=sys.stderr) |
179 | return False | 208 | return False |
@@ -183,75 +212,66 @@ def _open_ssh(host, port=None): | |||
183 | if ssh_died: | 212 | if ssh_died: |
184 | return False | 213 | return False |
185 | 214 | ||
186 | _master_processes.append(p) | 215 | self.add_master(p) |
187 | _master_keys.add(key) | 216 | self._master_keys[key] = True |
188 | return True | 217 | return True |
189 | finally: | ||
190 | _master_keys_lock.release() | ||
191 | 218 | ||
219 | def _open(self, host, port=None): | ||
220 | """Make sure a ssh master session exists for |host| & |port|. | ||
192 | 221 | ||
193 | def close(): | 222 | If one doesn't exist already, we'll create it. |
194 | global _master_keys_lock | ||
195 | |||
196 | _terminate_clients() | ||
197 | |||
198 | for p in _master_processes: | ||
199 | try: | ||
200 | os.kill(p.pid, signal.SIGTERM) | ||
201 | p.wait() | ||
202 | except OSError: | ||
203 | pass | ||
204 | del _master_processes[:] | ||
205 | _master_keys.clear() | ||
206 | |||
207 | d = sock(create=False) | ||
208 | if d: | ||
209 | try: | ||
210 | platform_utils.rmdir(os.path.dirname(d)) | ||
211 | except OSError: | ||
212 | pass | ||
213 | |||
214 | # We're done with the lock, so we can delete it. | ||
215 | _master_keys_lock = None | ||
216 | 223 | ||
224 | This will obtain any necessary locks to avoid inter-process races. | ||
225 | """ | ||
226 | # Bail before grabbing the lock if we already know that we aren't going to | ||
227 | # try creating new masters below. | ||
228 | if sys.platform in ('win32', 'cygwin'): | ||
229 | return False | ||
217 | 230 | ||
218 | URI_SCP = re.compile(r'^([^@:]*@?[^:/]{1,}):') | 231 | # Acquire the lock. This is needed to prevent opening multiple masters for |
219 | URI_ALL = re.compile(r'^([a-z][a-z+-]*)://([^@/]*@?[^/]*)/') | 232 | # the same host when we're running "repo sync -jN" (for N > 1) _and_ the |
233 | # manifest <remote fetch="ssh://xyz"> specifies a different host from the | ||
234 | # one that was passed to repo init. | ||
235 | with self._lock: | ||
236 | return self._open_unlocked(host, port) | ||
237 | |||
238 | def preconnect(self, url): | ||
239 | """If |uri| will create a ssh connection, setup the ssh master for it.""" | ||
240 | m = URI_ALL.match(url) | ||
241 | if m: | ||
242 | scheme = m.group(1) | ||
243 | host = m.group(2) | ||
244 | if ':' in host: | ||
245 | host, port = host.split(':') | ||
246 | else: | ||
247 | port = None | ||
248 | if scheme in ('ssh', 'git+ssh', 'ssh+git'): | ||
249 | return self._open(host, port) | ||
250 | return False | ||
220 | 251 | ||
252 | m = URI_SCP.match(url) | ||
253 | if m: | ||
254 | host = m.group(1) | ||
255 | return self._open(host) | ||
221 | 256 | ||
222 | def preconnect(url): | ||
223 | m = URI_ALL.match(url) | ||
224 | if m: | ||
225 | scheme = m.group(1) | ||
226 | host = m.group(2) | ||
227 | if ':' in host: | ||
228 | host, port = host.split(':') | ||
229 | else: | ||
230 | port = None | ||
231 | if scheme in ('ssh', 'git+ssh', 'ssh+git'): | ||
232 | return _open_ssh(host, port) | ||
233 | return False | 257 | return False |
234 | 258 | ||
235 | m = URI_SCP.match(url) | 259 | def sock(self, create=True): |
236 | if m: | 260 | """Return the path to the ssh socket dir. |
237 | host = m.group(1) | 261 | |
238 | return _open_ssh(host) | 262 | This has all the master sockets so clients can talk to them. |
239 | 263 | """ | |
240 | return False | 264 | if self._sock_path is None: |
241 | 265 | if not create: | |
242 | def sock(create=True): | 266 | return None |
243 | global _ssh_sock_path | 267 | tmp_dir = '/tmp' |
244 | if _ssh_sock_path is None: | 268 | if not os.path.exists(tmp_dir): |
245 | if not create: | 269 | tmp_dir = tempfile.gettempdir() |
246 | return None | 270 | if version() < (6, 7): |
247 | tmp_dir = '/tmp' | 271 | tokens = '%r@%h:%p' |
248 | if not os.path.exists(tmp_dir): | 272 | else: |
249 | tmp_dir = tempfile.gettempdir() | 273 | tokens = '%C' # hash of %l%h%p%r |
250 | if version() < (6, 7): | 274 | self._sock_path = os.path.join( |
251 | tokens = '%r@%h:%p' | 275 | tempfile.mkdtemp('', 'ssh-', tmp_dir), |
252 | else: | 276 | 'master-' + tokens) |
253 | tokens = '%C' # hash of %l%h%p%r | 277 | return self._sock_path |
254 | _ssh_sock_path = os.path.join( | ||
255 | tempfile.mkdtemp('', 'ssh-', tmp_dir), | ||
256 | 'master-' + tokens) | ||
257 | return _ssh_sock_path | ||