diff options
-rw-r--r-- | subcmds/sync.py | 84 |
1 files changed, 70 insertions, 14 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py index 36ef16db..eac0556d 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
@@ -39,6 +39,10 @@ from project import R_HEADS | |||
39 | from project import SyncBuffer | 39 | from project import SyncBuffer |
40 | from progress import Progress | 40 | from progress import Progress |
41 | 41 | ||
42 | class _FetchError(Exception): | ||
43 | """Internal error thrown in _FetchHelper() when we don't want stack trace.""" | ||
44 | pass | ||
45 | |||
42 | class Sync(Command, MirrorSafeCommand): | 46 | class Sync(Command, MirrorSafeCommand): |
43 | jobs = 1 | 47 | jobs = 1 |
44 | common = True | 48 | common = True |
@@ -135,20 +139,60 @@ later is required to fix a server side protocol bug. | |||
135 | dest='repo_upgraded', action='store_true', | 139 | dest='repo_upgraded', action='store_true', |
136 | help=SUPPRESS_HELP) | 140 | help=SUPPRESS_HELP) |
137 | 141 | ||
138 | def _FetchHelper(self, opt, project, lock, fetched, pm, sem): | 142 | def _FetchHelper(self, opt, project, lock, fetched, pm, sem, err_event): |
139 | if not project.Sync_NetworkHalf(quiet=opt.quiet): | 143 | """Main function of the fetch threads when jobs are > 1. |
140 | print >>sys.stderr, 'error: Cannot fetch %s' % project.name | 144 | |
141 | if opt.force_broken: | 145 | Args: |
142 | print >>sys.stderr, 'warn: --force-broken, continuing to sync' | 146 | opt: Program options returned from optparse. See _Options(). |
143 | else: | 147 | project: Project object for the project to fetch. |
144 | sem.release() | 148 | lock: Lock for accessing objects that are shared amongst multiple |
145 | sys.exit(1) | 149 | _FetchHelper() threads. |
150 | fetched: set object that we will add project.gitdir to when we're done | ||
151 | (with our lock held). | ||
152 | pm: Instance of a Project object. We will call pm.update() (with our | ||
153 | lock held). | ||
154 | sem: We'll release() this semaphore when we exit so that another thread | ||
155 | can be started up. | ||
156 | err_event: We'll set this event in the case of an error (after printing | ||
157 | out info about the error). | ||
158 | """ | ||
159 | # We'll set to true once we've locked the lock. | ||
160 | did_lock = False | ||
161 | |||
162 | # Encapsulate everything in a try/except/finally so that: | ||
163 | # - We always set err_event in the case of an exception. | ||
164 | # - We always make sure we call sem.release(). | ||
165 | # - We always make sure we unlock the lock if we locked it. | ||
166 | try: | ||
167 | success = project.Sync_NetworkHalf(quiet=opt.quiet) | ||
146 | 168 | ||
147 | lock.acquire() | 169 | # Lock around all the rest of the code, since printing, updating a set |
148 | fetched.add(project.gitdir) | 170 | # and Progress.update() are not thread safe. |
149 | pm.update() | 171 | lock.acquire() |
150 | lock.release() | 172 | did_lock = True |
151 | sem.release() | 173 | |
174 | if not success: | ||
175 | print >>sys.stderr, 'error: Cannot fetch %s' % project.name | ||
176 | if opt.force_broken: | ||
177 | print >>sys.stderr, 'warn: --force-broken, continuing to sync' | ||
178 | else: | ||
179 | raise _FetchError() | ||
180 | |||
181 | fetched.add(project.gitdir) | ||
182 | pm.update() | ||
183 | except BaseException, e: | ||
184 | # Notify the _Fetch() function about all errors. | ||
185 | err_event.set() | ||
186 | |||
187 | # If we got our own _FetchError, we don't want a stack trace. | ||
188 | # However, if we got something else (something in Sync_NetworkHalf?), | ||
189 | # we'd like one (so re-raise after we've set err_event). | ||
190 | if not isinstance(e, _FetchError): | ||
191 | raise | ||
192 | finally: | ||
193 | if did_lock: | ||
194 | lock.release() | ||
195 | sem.release() | ||
152 | 196 | ||
153 | def _Fetch(self, projects, opt): | 197 | def _Fetch(self, projects, opt): |
154 | fetched = set() | 198 | fetched = set() |
@@ -169,7 +213,13 @@ later is required to fix a server side protocol bug. | |||
169 | threads = set() | 213 | threads = set() |
170 | lock = _threading.Lock() | 214 | lock = _threading.Lock() |
171 | sem = _threading.Semaphore(self.jobs) | 215 | sem = _threading.Semaphore(self.jobs) |
216 | err_event = _threading.Event() | ||
172 | for project in projects: | 217 | for project in projects: |
218 | # Check for any errors before starting any new threads. | ||
219 | # ...we'll let existing threads finish, though. | ||
220 | if err_event.is_set(): | ||
221 | break | ||
222 | |||
173 | sem.acquire() | 223 | sem.acquire() |
174 | t = _threading.Thread(target = self._FetchHelper, | 224 | t = _threading.Thread(target = self._FetchHelper, |
175 | args = (opt, | 225 | args = (opt, |
@@ -177,13 +227,19 @@ later is required to fix a server side protocol bug. | |||
177 | lock, | 227 | lock, |
178 | fetched, | 228 | fetched, |
179 | pm, | 229 | pm, |
180 | sem)) | 230 | sem, |
231 | err_event)) | ||
181 | threads.add(t) | 232 | threads.add(t) |
182 | t.start() | 233 | t.start() |
183 | 234 | ||
184 | for t in threads: | 235 | for t in threads: |
185 | t.join() | 236 | t.join() |
186 | 237 | ||
238 | # If we saw an error, exit with code 1 so that other scripts can check. | ||
239 | if err_event.is_set(): | ||
240 | print >>sys.stderr, '\nerror: Exited sync due to fetch errors' | ||
241 | sys.exit(1) | ||
242 | |||
187 | pm.end() | 243 | pm.end() |
188 | for project in projects: | 244 | for project in projects: |
189 | project.bare_git.gc('--auto') | 245 | project.bare_git.gc('--auto') |