Mercurial > genshi > mirror
comparison examples/trac/trac/web/_fcgi.py @ 39:93b4dcbafd7b trunk
Copy Trac to main branch.
author | cmlenz |
---|---|
date | Mon, 03 Jul 2006 18:53:27 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
38:ee669cb9cccc | 39:93b4dcbafd7b |
---|---|
1 # -*- coding: iso-8859-1 -*- | |
2 # | |
3 # Copyright (c) 2002, 2003, 2005 Allan Saddi <allan@saddi.com> | |
4 # All rights reserved. | |
5 # | |
6 # This software is licensed as described in the file COPYING, which | |
7 # you should have received as part of this distribution. The terms | |
8 # are also available at http://trac.edgewall.com/license.html. | |
9 # | |
10 # This software consists of voluntary contributions made by many | |
11 # individuals. For the exact contribution history, see the revision | |
12 # history and logs, available at http://projects.edgewall.com/trac/. | |
13 # | |
14 # Author: Allan Saddi <allan@saddi.com> | |
15 | |
16 """ | |
17 fcgi - a FastCGI/WSGI gateway. | |
18 | |
19 For more information about FastCGI, see <http://www.fastcgi.com/>. | |
20 | |
21 For more information about the Web Server Gateway Interface, see | |
22 <http://www.python.org/peps/pep-0333.html>. | |
23 | |
24 Example usage: | |
25 | |
26 #!/usr/bin/env python | |
27 from myapplication import app # Assume app is your WSGI application object | |
28 from fcgi import WSGIServer | |
29 WSGIServer(app).run() | |
30 | |
31 See the documentation for WSGIServer/Server for more information. | |
32 | |
33 On most platforms, fcgi will fallback to regular CGI behavior if run in a | |
34 non-FastCGI context. If you want to force CGI behavior, set the environment | |
35 variable FCGI_FORCE_CGI to "Y" or "y". | |
36 """ | |
37 | |
38 __author__ = 'Allan Saddi <allan@saddi.com>' | |
39 __version__ = '$Revision: 1797 $' | |
40 | |
41 import sys | |
42 import os | |
43 import signal | |
44 import struct | |
45 import StringIO | |
46 import select | |
47 import socket | |
48 import errno | |
49 import traceback | |
50 | |
51 try: | |
52 import thread | |
53 import threading | |
54 thread_available = True | |
55 except ImportError: | |
56 import dummy_thread as thread | |
57 import dummy_threading as threading | |
58 thread_available = False | |
59 | |
60 __all__ = ['WSGIServer'] | |
61 | |
62 # Constants from the spec. | |
63 FCGI_LISTENSOCK_FILENO = 0 | |
64 | |
65 FCGI_HEADER_LEN = 8 | |
66 | |
67 FCGI_VERSION_1 = 1 | |
68 | |
69 FCGI_BEGIN_REQUEST = 1 | |
70 FCGI_ABORT_REQUEST = 2 | |
71 FCGI_END_REQUEST = 3 | |
72 FCGI_PARAMS = 4 | |
73 FCGI_STDIN = 5 | |
74 FCGI_STDOUT = 6 | |
75 FCGI_STDERR = 7 | |
76 FCGI_DATA = 8 | |
77 FCGI_GET_VALUES = 9 | |
78 FCGI_GET_VALUES_RESULT = 10 | |
79 FCGI_UNKNOWN_TYPE = 11 | |
80 FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE | |
81 | |
82 FCGI_NULL_REQUEST_ID = 0 | |
83 | |
84 FCGI_KEEP_CONN = 1 | |
85 | |
86 FCGI_RESPONDER = 1 | |
87 FCGI_AUTHORIZER = 2 | |
88 FCGI_FILTER = 3 | |
89 | |
90 FCGI_REQUEST_COMPLETE = 0 | |
91 FCGI_CANT_MPX_CONN = 1 | |
92 FCGI_OVERLOADED = 2 | |
93 FCGI_UNKNOWN_ROLE = 3 | |
94 | |
95 FCGI_MAX_CONNS = 'FCGI_MAX_CONNS' | |
96 FCGI_MAX_REQS = 'FCGI_MAX_REQS' | |
97 FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS' | |
98 | |
99 FCGI_Header = '!BBHHBx' | |
100 FCGI_BeginRequestBody = '!HB5x' | |
101 FCGI_EndRequestBody = '!LB3x' | |
102 FCGI_UnknownTypeBody = '!B7x' | |
103 | |
104 FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody) | |
105 FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody) | |
106 | |
107 if __debug__: | |
108 import time | |
109 | |
110 # Set non-zero to write debug output to a file. | |
111 DEBUG = 0 | |
112 DEBUGLOG = '/tmp/fcgi.log' | |
113 | |
114 def _debug(level, msg): | |
115 if DEBUG < level: | |
116 return | |
117 | |
118 try: | |
119 f = open(DEBUGLOG, 'a') | |
120 f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg)) | |
121 f.close() | |
122 except: | |
123 pass | |
124 | |
125 class InputStream(object): | |
126 """ | |
127 File-like object representing FastCGI input streams (FCGI_STDIN and | |
128 FCGI_DATA). Supports the minimum methods required by WSGI spec. | |
129 """ | |
130 def __init__(self, conn): | |
131 self._conn = conn | |
132 | |
133 # See Server. | |
134 self._shrinkThreshold = conn.server.inputStreamShrinkThreshold | |
135 | |
136 self._buf = '' | |
137 self._bufList = [] | |
138 self._pos = 0 # Current read position. | |
139 self._avail = 0 # Number of bytes currently available. | |
140 | |
141 self._eof = False # True when server has sent EOF notification. | |
142 | |
143 def _shrinkBuffer(self): | |
144 """Gets rid of already read data (since we can't rewind).""" | |
145 if self._pos >= self._shrinkThreshold: | |
146 self._buf = self._buf[self._pos:] | |
147 self._avail -= self._pos | |
148 self._pos = 0 | |
149 | |
150 assert self._avail >= 0 | |
151 | |
152 def _waitForData(self): | |
153 """Waits for more data to become available.""" | |
154 self._conn.process_input() | |
155 | |
156 def read(self, n=-1): | |
157 if self._pos == self._avail and self._eof: | |
158 return '' | |
159 while True: | |
160 if n < 0 or (self._avail - self._pos) < n: | |
161 # Not enough data available. | |
162 if self._eof: | |
163 # And there's no more coming. | |
164 newPos = self._avail | |
165 break | |
166 else: | |
167 # Wait for more data. | |
168 self._waitForData() | |
169 continue | |
170 else: | |
171 newPos = self._pos + n | |
172 break | |
173 # Merge buffer list, if necessary. | |
174 if self._bufList: | |
175 self._buf += ''.join(self._bufList) | |
176 self._bufList = [] | |
177 r = self._buf[self._pos:newPos] | |
178 self._pos = newPos | |
179 self._shrinkBuffer() | |
180 return r | |
181 | |
182 def readline(self, length=None): | |
183 if self._pos == self._avail and self._eof: | |
184 return '' | |
185 while True: | |
186 # Unfortunately, we need to merge the buffer list early. | |
187 if self._bufList: | |
188 self._buf += ''.join(self._bufList) | |
189 self._bufList = [] | |
190 # Find newline. | |
191 i = self._buf.find('\n', self._pos) | |
192 if i < 0: | |
193 # Not found? | |
194 if self._eof: | |
195 # No more data coming. | |
196 newPos = self._avail | |
197 break | |
198 else: | |
199 # Wait for more to come. | |
200 self._waitForData() | |
201 continue | |
202 else: | |
203 newPos = i + 1 | |
204 break | |
205 if length is not None: | |
206 if self._pos + length < newPos: | |
207 newPos = self._pos + length | |
208 r = self._buf[self._pos:newPos] | |
209 self._pos = newPos | |
210 self._shrinkBuffer() | |
211 return r | |
212 | |
213 def readlines(self, sizehint=0): | |
214 total = 0 | |
215 lines = [] | |
216 line = self.readline() | |
217 while line: | |
218 lines.append(line) | |
219 total += len(line) | |
220 if 0 < sizehint <= total: | |
221 break | |
222 line = self.readline() | |
223 return lines | |
224 | |
225 def __iter__(self): | |
226 return self | |
227 | |
228 def next(self): | |
229 r = self.readline() | |
230 if not r: | |
231 raise StopIteration | |
232 return r | |
233 | |
234 def add_data(self, data): | |
235 if not data: | |
236 self._eof = True | |
237 else: | |
238 self._bufList.append(data) | |
239 self._avail += len(data) | |
240 | |
241 class MultiplexedInputStream(InputStream): | |
242 """ | |
243 A version of InputStream meant to be used with MultiplexedConnections. | |
244 Assumes the MultiplexedConnection (the producer) and the Request | |
245 (the consumer) are running in different threads. | |
246 """ | |
247 def __init__(self, conn): | |
248 super(MultiplexedInputStream, self).__init__(conn) | |
249 | |
250 # Arbitrates access to this InputStream (it's used simultaneously | |
251 # by a Request and its owning Connection object). | |
252 lock = threading.RLock() | |
253 | |
254 # Notifies Request thread that there is new data available. | |
255 self._lock = threading.Condition(lock) | |
256 | |
257 def _waitForData(self): | |
258 # Wait for notification from add_data(). | |
259 self._lock.wait() | |
260 | |
261 def read(self, n=-1): | |
262 self._lock.acquire() | |
263 try: | |
264 return super(MultiplexedInputStream, self).read(n) | |
265 finally: | |
266 self._lock.release() | |
267 | |
268 def readline(self, length=None): | |
269 self._lock.acquire() | |
270 try: | |
271 return super(MultiplexedInputStream, self).readline(length) | |
272 finally: | |
273 self._lock.release() | |
274 | |
275 def add_data(self, data): | |
276 self._lock.acquire() | |
277 try: | |
278 super(MultiplexedInputStream, self).add_data(data) | |
279 self._lock.notify() | |
280 finally: | |
281 self._lock.release() | |
282 | |
283 class OutputStream(object): | |
284 """ | |
285 FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to | |
286 write() or writelines() immediately result in Records being sent back | |
287 to the server. Buffering should be done in a higher level! | |
288 """ | |
289 def __init__(self, conn, req, type, buffered=False): | |
290 self._conn = conn | |
291 self._req = req | |
292 self._type = type | |
293 self._buffered = buffered | |
294 self._bufList = [] # Used if buffered is True | |
295 self.dataWritten = False | |
296 self.closed = False | |
297 | |
298 def _write(self, data): | |
299 length = len(data) | |
300 while length: | |
301 toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN) | |
302 | |
303 rec = Record(self._type, self._req.requestId) | |
304 rec.contentLength = toWrite | |
305 rec.contentData = data[:toWrite] | |
306 self._conn.writeRecord(rec) | |
307 | |
308 data = data[toWrite:] | |
309 length -= toWrite | |
310 | |
311 def write(self, data): | |
312 assert not self.closed | |
313 | |
314 if not data: | |
315 return | |
316 | |
317 self.dataWritten = True | |
318 | |
319 if self._buffered: | |
320 self._bufList.append(data) | |
321 else: | |
322 self._write(data) | |
323 | |
324 def writelines(self, lines): | |
325 assert not self.closed | |
326 | |
327 for line in lines: | |
328 self.write(line) | |
329 | |
330 def flush(self): | |
331 # Only need to flush if this OutputStream is actually buffered. | |
332 if self._buffered: | |
333 data = ''.join(self._bufList) | |
334 self._bufList = [] | |
335 self._write(data) | |
336 | |
337 # Though available, the following should NOT be called by WSGI apps. | |
338 def close(self): | |
339 """Sends end-of-stream notification, if necessary.""" | |
340 if not self.closed and self.dataWritten: | |
341 self.flush() | |
342 rec = Record(self._type, self._req.requestId) | |
343 self._conn.writeRecord(rec) | |
344 self.closed = True | |
345 | |
346 class TeeOutputStream(object): | |
347 """ | |
348 Simple wrapper around two or more output file-like objects that copies | |
349 written data to all streams. | |
350 """ | |
351 def __init__(self, streamList): | |
352 self._streamList = streamList | |
353 | |
354 def write(self, data): | |
355 for f in self._streamList: | |
356 f.write(data) | |
357 | |
358 def writelines(self, lines): | |
359 for line in lines: | |
360 self.write(line) | |
361 | |
362 def flush(self): | |
363 for f in self._streamList: | |
364 f.flush() | |
365 | |
366 class StdoutWrapper(object): | |
367 """ | |
368 Wrapper for sys.stdout so we know if data has actually been written. | |
369 """ | |
370 def __init__(self, stdout): | |
371 self._file = stdout | |
372 self.dataWritten = False | |
373 | |
374 def write(self, data): | |
375 if data: | |
376 self.dataWritten = True | |
377 self._file.write(data) | |
378 | |
379 def writelines(self, lines): | |
380 for line in lines: | |
381 self.write(line) | |
382 | |
383 def __getattr__(self, name): | |
384 return getattr(self._file, name) | |
385 | |
386 def decode_pair(s, pos=0): | |
387 """ | |
388 Decodes a name/value pair. | |
389 | |
390 The number of bytes decoded as well as the name/value pair | |
391 are returned. | |
392 """ | |
393 nameLength = ord(s[pos]) | |
394 if nameLength & 128: | |
395 nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff | |
396 pos += 4 | |
397 else: | |
398 pos += 1 | |
399 | |
400 valueLength = ord(s[pos]) | |
401 if valueLength & 128: | |
402 valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff | |
403 pos += 4 | |
404 else: | |
405 pos += 1 | |
406 | |
407 name = s[pos:pos+nameLength] | |
408 pos += nameLength | |
409 value = s[pos:pos+valueLength] | |
410 pos += valueLength | |
411 | |
412 return (pos, (name, value)) | |
413 | |
414 def encode_pair(name, value): | |
415 """ | |
416 Encodes a name/value pair. | |
417 | |
418 The encoded string is returned. | |
419 """ | |
420 nameLength = len(name) | |
421 if nameLength < 128: | |
422 s = chr(nameLength) | |
423 else: | |
424 s = struct.pack('!L', nameLength | 0x80000000L) | |
425 | |
426 valueLength = len(value) | |
427 if valueLength < 128: | |
428 s += chr(valueLength) | |
429 else: | |
430 s += struct.pack('!L', valueLength | 0x80000000L) | |
431 | |
432 return s + name + value | |
433 | |
434 class Record(object): | |
435 """ | |
436 A FastCGI Record. | |
437 | |
438 Used for encoding/decoding records. | |
439 """ | |
440 def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID): | |
441 self.version = FCGI_VERSION_1 | |
442 self.type = type | |
443 self.requestId = requestId | |
444 self.contentLength = 0 | |
445 self.paddingLength = 0 | |
446 self.contentData = '' | |
447 | |
448 def _recvall(sock, length): | |
449 """ | |
450 Attempts to receive length bytes from a socket, blocking if necessary. | |
451 (Socket may be blocking or non-blocking.) | |
452 """ | |
453 dataList = [] | |
454 recvLen = 0 | |
455 while length: | |
456 try: | |
457 data = sock.recv(length) | |
458 except socket.error, e: | |
459 if e[0] == errno.EAGAIN: | |
460 select.select([sock], [], []) | |
461 continue | |
462 else: | |
463 raise | |
464 if not data: # EOF | |
465 break | |
466 dataList.append(data) | |
467 dataLen = len(data) | |
468 recvLen += dataLen | |
469 length -= dataLen | |
470 return ''.join(dataList), recvLen | |
471 _recvall = staticmethod(_recvall) | |
472 | |
473 def read(self, sock): | |
474 """Read and decode a Record from a socket.""" | |
475 try: | |
476 header, length = self._recvall(sock, FCGI_HEADER_LEN) | |
477 except: | |
478 raise EOFError | |
479 | |
480 if length < FCGI_HEADER_LEN: | |
481 raise EOFError | |
482 | |
483 self.version, self.type, self.requestId, self.contentLength, \ | |
484 self.paddingLength = struct.unpack(FCGI_Header, header) | |
485 | |
486 if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, ' | |
487 'contentLength = %d' % | |
488 (sock.fileno(), self.type, self.requestId, | |
489 self.contentLength)) | |
490 | |
491 if self.contentLength: | |
492 try: | |
493 self.contentData, length = self._recvall(sock, | |
494 self.contentLength) | |
495 except: | |
496 raise EOFError | |
497 | |
498 if length < self.contentLength: | |
499 raise EOFError | |
500 | |
501 if self.paddingLength: | |
502 try: | |
503 self._recvall(sock, self.paddingLength) | |
504 except: | |
505 raise EOFError | |
506 | |
507 def _sendall(sock, data): | |
508 """ | |
509 Writes data to a socket and does not return until all the data is sent. | |
510 """ | |
511 length = len(data) | |
512 while length: | |
513 try: | |
514 sent = sock.send(data) | |
515 except socket.error, e: | |
516 if e[0] == errno.EPIPE: | |
517 return # Don't bother raising an exception. Just ignore. | |
518 elif e[0] == errno.EAGAIN: | |
519 select.select([], [sock], []) | |
520 continue | |
521 else: | |
522 raise | |
523 data = data[sent:] | |
524 length -= sent | |
525 _sendall = staticmethod(_sendall) | |
526 | |
527 def write(self, sock): | |
528 """Encode and write a Record to a socket.""" | |
529 self.paddingLength = -self.contentLength & 7 | |
530 | |
531 if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, ' | |
532 'contentLength = %d' % | |
533 (sock.fileno(), self.type, self.requestId, | |
534 self.contentLength)) | |
535 | |
536 header = struct.pack(FCGI_Header, self.version, self.type, | |
537 self.requestId, self.contentLength, | |
538 self.paddingLength) | |
539 self._sendall(sock, header) | |
540 if self.contentLength: | |
541 self._sendall(sock, self.contentData) | |
542 if self.paddingLength: | |
543 self._sendall(sock, '\x00'*self.paddingLength) | |
544 | |
545 class Request(object): | |
546 """ | |
547 Represents a single FastCGI request. | |
548 | |
549 These objects are passed to your handler and is the main interface | |
550 between your handler and the fcgi module. The methods should not | |
551 be called by your handler. However, server, params, stdin, stdout, | |
552 stderr, and data are free for your handler's use. | |
553 """ | |
554 def __init__(self, conn, inputStreamClass): | |
555 self._conn = conn | |
556 | |
557 self.server = conn.server | |
558 self.params = {} | |
559 self.stdin = inputStreamClass(conn) | |
560 self.stdout = OutputStream(conn, self, FCGI_STDOUT) | |
561 self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True) | |
562 self.data = inputStreamClass(conn) | |
563 | |
564 def run(self): | |
565 """Runs the handler, flushes the streams, and ends the request.""" | |
566 try: | |
567 protocolStatus, appStatus = self.server.handler(self) | |
568 except: | |
569 traceback.print_exc(file=self.stderr) | |
570 self.stderr.flush() | |
571 if not self.stdout.dataWritten: | |
572 self.server.error(self) | |
573 | |
574 protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0 | |
575 | |
576 if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' % | |
577 (protocolStatus, appStatus)) | |
578 | |
579 self._flush() | |
580 self._end(appStatus, protocolStatus) | |
581 | |
582 def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): | |
583 self._conn.end_request(self, appStatus, protocolStatus) | |
584 | |
585 def _flush(self): | |
586 self.stdout.close() | |
587 self.stderr.close() | |
588 | |
589 class CGIRequest(Request): | |
590 """A normal CGI request disguised as a FastCGI request.""" | |
591 def __init__(self, server): | |
592 # These are normally filled in by Connection. | |
593 self.requestId = 1 | |
594 self.role = FCGI_RESPONDER | |
595 self.flags = 0 | |
596 self.aborted = False | |
597 | |
598 self.server = server | |
599 self.params = dict(os.environ) | |
600 self.stdin = sys.stdin | |
601 self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity! | |
602 self.stderr = sys.stderr | |
603 self.data = StringIO.StringIO() | |
604 | |
605 def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): | |
606 sys.exit(appStatus) | |
607 | |
608 def _flush(self): | |
609 # Not buffered, do nothing. | |
610 pass | |
611 | |
612 class Connection(object): | |
613 """ | |
614 A Connection with the web server. | |
615 | |
616 Each Connection is associated with a single socket (which is | |
617 connected to the web server) and is responsible for handling all | |
618 the FastCGI message processing for that socket. | |
619 """ | |
620 _multiplexed = False | |
621 _inputStreamClass = InputStream | |
622 | |
623 def __init__(self, sock, addr, server): | |
624 self._sock = sock | |
625 self._addr = addr | |
626 self.server = server | |
627 | |
628 # Active Requests for this Connection, mapped by request ID. | |
629 self._requests = {} | |
630 | |
631 def _cleanupSocket(self): | |
632 """Close the Connection's socket.""" | |
633 try: | |
634 self._sock.shutdown(socket.SHUT_WR) | |
635 except: | |
636 return | |
637 try: | |
638 while True: | |
639 r, w, e = select.select([self._sock], [], []) | |
640 if not r or not self._sock.recv(1024): | |
641 break | |
642 except: | |
643 pass | |
644 self._sock.close() | |
645 | |
646 def run(self): | |
647 """Begin processing data from the socket.""" | |
648 self._keepGoing = True | |
649 while self._keepGoing: | |
650 try: | |
651 self.process_input() | |
652 except EOFError: | |
653 break | |
654 except (select.error, socket.error), e: | |
655 if e[0] == errno.EBADF: # Socket was closed by Request. | |
656 break | |
657 raise | |
658 | |
659 self._cleanupSocket() | |
660 | |
661 def process_input(self): | |
662 """Attempt to read a single Record from the socket and process it.""" | |
663 # Currently, any children Request threads notify this Connection | |
664 # that it is no longer needed by closing the Connection's socket. | |
665 # We need to put a timeout on select, otherwise we might get | |
666 # stuck in it indefinitely... (I don't like this solution.) | |
667 while self._keepGoing: | |
668 try: | |
669 r, w, e = select.select([self._sock], [], [], 1.0) | |
670 except ValueError: | |
671 # Sigh. ValueError gets thrown sometimes when passing select | |
672 # a closed socket. | |
673 raise EOFError | |
674 if r: break | |
675 if not self._keepGoing: | |
676 return | |
677 rec = Record() | |
678 rec.read(self._sock) | |
679 | |
680 if rec.type == FCGI_GET_VALUES: | |
681 self._do_get_values(rec) | |
682 elif rec.type == FCGI_BEGIN_REQUEST: | |
683 self._do_begin_request(rec) | |
684 elif rec.type == FCGI_ABORT_REQUEST: | |
685 self._do_abort_request(rec) | |
686 elif rec.type == FCGI_PARAMS: | |
687 self._do_params(rec) | |
688 elif rec.type == FCGI_STDIN: | |
689 self._do_stdin(rec) | |
690 elif rec.type == FCGI_DATA: | |
691 self._do_data(rec) | |
692 elif rec.requestId == FCGI_NULL_REQUEST_ID: | |
693 self._do_unknown_type(rec) | |
694 else: | |
695 # Need to complain about this. | |
696 pass | |
697 | |
698 def writeRecord(self, rec): | |
699 """ | |
700 Write a Record to the socket. | |
701 """ | |
702 rec.write(self._sock) | |
703 | |
704 def end_request(self, req, appStatus=0L, | |
705 protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): | |
706 """ | |
707 End a Request. | |
708 | |
709 Called by Request objects. An FCGI_END_REQUEST Record is | |
710 sent to the web server. If the web server no longer requires | |
711 the connection, the socket is closed, thereby ending this | |
712 Connection (run() returns). | |
713 """ | |
714 rec = Record(FCGI_END_REQUEST, req.requestId) | |
715 rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus, | |
716 protocolStatus) | |
717 rec.contentLength = FCGI_EndRequestBody_LEN | |
718 self.writeRecord(rec) | |
719 | |
720 if remove: | |
721 del self._requests[req.requestId] | |
722 | |
723 if __debug__: _debug(2, 'end_request: flags = %d' % req.flags) | |
724 | |
725 if not (req.flags & FCGI_KEEP_CONN) and not self._requests: | |
726 self._cleanupSocket() | |
727 self._keepGoing = False | |
728 | |
729 def _do_get_values(self, inrec): | |
730 """Handle an FCGI_GET_VALUES request from the web server.""" | |
731 outrec = Record(FCGI_GET_VALUES_RESULT) | |
732 | |
733 pos = 0 | |
734 while pos < inrec.contentLength: | |
735 pos, (name, value) = decode_pair(inrec.contentData, pos) | |
736 cap = self.server.capability.get(name) | |
737 if cap is not None: | |
738 outrec.contentData += encode_pair(name, str(cap)) | |
739 | |
740 outrec.contentLength = len(outrec.contentData) | |
741 self.writeRecord(rec) | |
742 | |
743 def _do_begin_request(self, inrec): | |
744 """Handle an FCGI_BEGIN_REQUEST from the web server.""" | |
745 role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData) | |
746 | |
747 req = self.server.request_class(self, self._inputStreamClass) | |
748 req.requestId, req.role, req.flags = inrec.requestId, role, flags | |
749 req.aborted = False | |
750 | |
751 if not self._multiplexed and self._requests: | |
752 # Can't multiplex requests. | |
753 self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False) | |
754 else: | |
755 self._requests[inrec.requestId] = req | |
756 | |
757 def _do_abort_request(self, inrec): | |
758 """ | |
759 Handle an FCGI_ABORT_REQUEST from the web server. | |
760 | |
761 We just mark a flag in the associated Request. | |
762 """ | |
763 req = self._requests.get(inrec.requestId) | |
764 if req is not None: | |
765 req.aborted = True | |
766 | |
767 def _start_request(self, req): | |
768 """Run the request.""" | |
769 # Not multiplexed, so run it inline. | |
770 req.run() | |
771 | |
772 def _do_params(self, inrec): | |
773 """ | |
774 Handle an FCGI_PARAMS Record. | |
775 | |
776 If the last FCGI_PARAMS Record is received, start the request. | |
777 """ | |
778 req = self._requests.get(inrec.requestId) | |
779 if req is not None: | |
780 if inrec.contentLength: | |
781 pos = 0 | |
782 while pos < inrec.contentLength: | |
783 pos, (name, value) = decode_pair(inrec.contentData, pos) | |
784 req.params[name] = value | |
785 else: | |
786 self._start_request(req) | |
787 | |
788 def _do_stdin(self, inrec): | |
789 """Handle the FCGI_STDIN stream.""" | |
790 req = self._requests.get(inrec.requestId) | |
791 if req is not None: | |
792 req.stdin.add_data(inrec.contentData) | |
793 | |
794 def _do_data(self, inrec): | |
795 """Handle the FCGI_DATA stream.""" | |
796 req = self._requests.get(inrec.requestId) | |
797 if req is not None: | |
798 req.data.add_data(inrec.contentData) | |
799 | |
800 def _do_unknown_type(self, inrec): | |
801 """Handle an unknown request type. Respond accordingly.""" | |
802 outrec = Record(FCGI_UNKNOWN_TYPE) | |
803 outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type) | |
804 outrec.contentLength = FCGI_UnknownTypeBody_LEN | |
805 self.writeRecord(rec) | |
806 | |
807 class MultiplexedConnection(Connection): | |
808 """ | |
809 A version of Connection capable of handling multiple requests | |
810 simultaneously. | |
811 """ | |
812 _multiplexed = True | |
813 _inputStreamClass = MultiplexedInputStream | |
814 | |
815 def __init__(self, sock, addr, server): | |
816 super(MultiplexedConnection, self).__init__(sock, addr, server) | |
817 | |
818 # Used to arbitrate access to self._requests. | |
819 lock = threading.RLock() | |
820 | |
821 # Notification is posted everytime a request completes, allowing us | |
822 # to quit cleanly. | |
823 self._lock = threading.Condition(lock) | |
824 | |
825 def _cleanupSocket(self): | |
826 # Wait for any outstanding requests before closing the socket. | |
827 self._lock.acquire() | |
828 while self._requests: | |
829 self._lock.wait() | |
830 self._lock.release() | |
831 | |
832 super(MultiplexedConnection, self)._cleanupSocket() | |
833 | |
834 def writeRecord(self, rec): | |
835 # Must use locking to prevent intermingling of Records from different | |
836 # threads. | |
837 self._lock.acquire() | |
838 try: | |
839 # Probably faster than calling super. ;) | |
840 rec.write(self._sock) | |
841 finally: | |
842 self._lock.release() | |
843 | |
844 def end_request(self, req, appStatus=0L, | |
845 protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): | |
846 self._lock.acquire() | |
847 try: | |
848 super(MultiplexedConnection, self).end_request(req, appStatus, | |
849 protocolStatus, | |
850 remove) | |
851 self._lock.notify() | |
852 finally: | |
853 self._lock.release() | |
854 | |
855 def _do_begin_request(self, inrec): | |
856 self._lock.acquire() | |
857 try: | |
858 super(MultiplexedConnection, self)._do_begin_request(inrec) | |
859 finally: | |
860 self._lock.release() | |
861 | |
862 def _do_abort_request(self, inrec): | |
863 self._lock.acquire() | |
864 try: | |
865 super(MultiplexedConnection, self)._do_abort_request(inrec) | |
866 finally: | |
867 self._lock.release() | |
868 | |
869 def _start_request(self, req): | |
870 thread.start_new_thread(req.run, ()) | |
871 | |
872 def _do_params(self, inrec): | |
873 self._lock.acquire() | |
874 try: | |
875 super(MultiplexedConnection, self)._do_params(inrec) | |
876 finally: | |
877 self._lock.release() | |
878 | |
879 def _do_stdin(self, inrec): | |
880 self._lock.acquire() | |
881 try: | |
882 super(MultiplexedConnection, self)._do_stdin(inrec) | |
883 finally: | |
884 self._lock.release() | |
885 | |
886 def _do_data(self, inrec): | |
887 self._lock.acquire() | |
888 try: | |
889 super(MultiplexedConnection, self)._do_data(inrec) | |
890 finally: | |
891 self._lock.release() | |
892 | |
893 class Server(object): | |
894 """ | |
895 The FastCGI server. | |
896 | |
897 Waits for connections from the web server, processing each | |
898 request. | |
899 | |
900 If run in a normal CGI context, it will instead instantiate a | |
901 CGIRequest and run the handler through there. | |
902 """ | |
903 request_class = Request | |
904 cgirequest_class = CGIRequest | |
905 | |
906 # Limits the size of the InputStream's string buffer to this size + the | |
907 # server's maximum Record size. Since the InputStream is not seekable, | |
908 # we throw away already-read data once this certain amount has been read. | |
909 inputStreamShrinkThreshold = 102400 - 8192 | |
910 | |
911 def __init__(self, handler=None, maxwrite=8192, bindAddress=None, | |
912 multiplexed=False): | |
913 """ | |
914 handler, if present, must reference a function or method that | |
915 takes one argument: a Request object. If handler is not | |
916 specified at creation time, Server *must* be subclassed. | |
917 (The handler method below is abstract.) | |
918 | |
919 maxwrite is the maximum number of bytes (per Record) to write | |
920 to the server. I've noticed mod_fastcgi has a relatively small | |
921 receive buffer (8K or so). | |
922 | |
923 bindAddress, if present, must either be a string or a 2-tuple. If | |
924 present, run() will open its own listening socket. You would use | |
925 this if you wanted to run your application as an 'external' FastCGI | |
926 app. (i.e. the webserver would no longer be responsible for starting | |
927 your app) If a string, it will be interpreted as a filename and a UNIX | |
928 socket will be opened. If a tuple, the first element, a string, | |
929 is the interface name/IP to bind to, and the second element (an int) | |
930 is the port number. | |
931 | |
932 Set multiplexed to True if you want to handle multiple requests | |
933 per connection. Some FastCGI backends (namely mod_fastcgi) don't | |
934 multiplex requests at all, so by default this is off (which saves | |
935 on thread creation/locking overhead). If threads aren't available, | |
936 this keyword is ignored; it's not possible to multiplex requests | |
937 at all. | |
938 """ | |
939 if handler is not None: | |
940 self.handler = handler | |
941 self.maxwrite = maxwrite | |
942 if thread_available: | |
943 try: | |
944 import resource | |
945 # Attempt to glean the maximum number of connections | |
946 # from the OS. | |
947 maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0] | |
948 except ImportError: | |
949 maxConns = 100 # Just some made up number. | |
950 maxReqs = maxConns | |
951 if multiplexed: | |
952 self._connectionClass = MultiplexedConnection | |
953 maxReqs *= 5 # Another made up number. | |
954 else: | |
955 self._connectionClass = Connection | |
956 self.capability = { | |
957 FCGI_MAX_CONNS: maxConns, | |
958 FCGI_MAX_REQS: maxReqs, | |
959 FCGI_MPXS_CONNS: multiplexed and 1 or 0 | |
960 } | |
961 else: | |
962 self._connectionClass = Connection | |
963 self.capability = { | |
964 # If threads aren't available, these are pretty much correct. | |
965 FCGI_MAX_CONNS: 1, | |
966 FCGI_MAX_REQS: 1, | |
967 FCGI_MPXS_CONNS: 0 | |
968 } | |
969 self._bindAddress = bindAddress | |
970 | |
971 def _setupSocket(self): | |
972 if self._bindAddress is None: # Run as a normal FastCGI? | |
973 isFCGI = True | |
974 | |
975 sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET, | |
976 socket.SOCK_STREAM) | |
977 try: | |
978 sock.getpeername() | |
979 except socket.error, e: | |
980 if e[0] == errno.ENOTSOCK: | |
981 # Not a socket, assume CGI context. | |
982 isFCGI = False | |
983 elif e[0] != errno.ENOTCONN: | |
984 raise | |
985 | |
986 # FastCGI/CGI discrimination is broken on Mac OS X. | |
987 # Set the environment variable FCGI_FORCE_CGI to "Y" or "y" | |
988 # if you want to run your app as a simple CGI. (You can do | |
989 # this with Apache's mod_env [not loaded by default in OS X | |
990 # client, ha ha] and the SetEnv directive.) | |
991 if not isFCGI or \ | |
992 os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y'): | |
993 req = self.cgirequest_class(self) | |
994 req.run() | |
995 sys.exit(0) | |
996 else: | |
997 # Run as a server | |
998 if type(self._bindAddress) is str: | |
999 # Unix socket | |
1000 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
1001 try: | |
1002 os.unlink(self._bindAddress) | |
1003 except OSError: | |
1004 pass | |
1005 else: | |
1006 # INET socket | |
1007 assert type(self._bindAddress) is tuple | |
1008 assert len(self._bindAddress) == 2 | |
1009 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
1010 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
1011 | |
1012 sock.bind(self._bindAddress) | |
1013 sock.listen(socket.SOMAXCONN) | |
1014 | |
1015 return sock | |
1016 | |
1017 def _cleanupSocket(self, sock): | |
1018 """Closes the main socket.""" | |
1019 sock.close() | |
1020 | |
1021 def _installSignalHandlers(self): | |
1022 self._oldSIGs = [(x,signal.getsignal(x)) for x in | |
1023 (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)] | |
1024 signal.signal(signal.SIGHUP, self._hupHandler) | |
1025 signal.signal(signal.SIGINT, self._intHandler) | |
1026 signal.signal(signal.SIGTERM, self._intHandler) | |
1027 | |
1028 def _restoreSignalHandlers(self): | |
1029 for signum,handler in self._oldSIGs: | |
1030 signal.signal(signum, handler) | |
1031 | |
1032 def _hupHandler(self, signum, frame): | |
1033 self._hupReceived = True | |
1034 self._keepGoing = False | |
1035 | |
1036 def _intHandler(self, signum, frame): | |
1037 self._keepGoing = False | |
1038 | |
1039 def run(self, timeout=1.0): | |
1040 """ | |
1041 The main loop. Exits on SIGHUP, SIGINT, SIGTERM. Returns True if | |
1042 SIGHUP was received, False otherwise. | |
1043 """ | |
1044 web_server_addrs = os.environ.get('FCGI_WEB_SERVER_ADDRS') | |
1045 if web_server_addrs is not None: | |
1046 web_server_addrs = map(lambda x: x.strip(), | |
1047 web_server_addrs.split(',')) | |
1048 | |
1049 sock = self._setupSocket() | |
1050 | |
1051 self._keepGoing = True | |
1052 self._hupReceived = False | |
1053 | |
1054 # Install signal handlers. | |
1055 self._installSignalHandlers() | |
1056 | |
1057 while self._keepGoing: | |
1058 try: | |
1059 r, w, e = select.select([sock], [], [], timeout) | |
1060 except select.error, e: | |
1061 if e[0] == errno.EINTR: | |
1062 continue | |
1063 raise | |
1064 | |
1065 if r: | |
1066 try: | |
1067 clientSock, addr = sock.accept() | |
1068 except socket.error, e: | |
1069 if e[0] in (errno.EINTR, errno.EAGAIN): | |
1070 continue | |
1071 raise | |
1072 | |
1073 if web_server_addrs and \ | |
1074 (len(addr) != 2 or addr[0] not in web_server_addrs): | |
1075 clientSock.close() | |
1076 continue | |
1077 | |
1078 # Instantiate a new Connection and begin processing FastCGI | |
1079 # messages (either in a new thread or this thread). | |
1080 conn = self._connectionClass(clientSock, addr, self) | |
1081 thread.start_new_thread(conn.run, ()) | |
1082 | |
1083 self._mainloopPeriodic() | |
1084 | |
1085 # Restore signal handlers. | |
1086 self._restoreSignalHandlers() | |
1087 | |
1088 self._cleanupSocket(sock) | |
1089 | |
1090 return self._hupReceived | |
1091 | |
1092 def _mainloopPeriodic(self): | |
1093 """ | |
1094 Called with just about each iteration of the main loop. Meant to | |
1095 be overridden. | |
1096 """ | |
1097 pass | |
1098 | |
1099 def _exit(self, reload=False): | |
1100 """ | |
1101 Protected convenience method for subclasses to force an exit. Not | |
1102 really thread-safe, which is why it isn't public. | |
1103 """ | |
1104 if self._keepGoing: | |
1105 self._keepGoing = False | |
1106 self._hupReceived = reload | |
1107 | |
1108 def handler(self, req): | |
1109 """ | |
1110 Default handler, which just raises an exception. Unless a handler | |
1111 is passed at initialization time, this must be implemented by | |
1112 a subclass. | |
1113 """ | |
1114 raise NotImplementedError, self.__class__.__name__ + '.handler' | |
1115 | |
1116 def error(self, req): | |
1117 """ | |
1118 Called by Request if an exception occurs within the handler. May and | |
1119 should be overridden. | |
1120 """ | |
1121 import cgitb | |
1122 req.stdout.write('Content-Type: text/html\r\n\r\n' + | |
1123 cgitb.html(sys.exc_info())) | |
1124 | |
1125 class WSGIServer(Server): | |
1126 """ | |
1127 FastCGI server that supports the Web Server Gateway Interface. See | |
1128 <http://www.python.org/peps/pep-0333.html>. | |
1129 """ | |
1130 def __init__(self, application, environ=None, multithreaded=True, **kw): | |
1131 """ | |
1132 environ, if present, must be a dictionary-like object. Its | |
1133 contents will be copied into application's environ. Useful | |
1134 for passing application-specific variables. | |
1135 | |
1136 Set multithreaded to False if your application is not MT-safe. | |
1137 """ | |
1138 if kw.has_key('handler'): | |
1139 del kw['handler'] # Doesn't make sense to let this through | |
1140 super(WSGIServer, self).__init__(**kw) | |
1141 | |
1142 if environ is None: | |
1143 environ = {} | |
1144 | |
1145 self.application = application | |
1146 self.environ = environ | |
1147 self.multithreaded = multithreaded | |
1148 | |
1149 # Used to force single-threadedness | |
1150 self._app_lock = thread.allocate_lock() | |
1151 | |
1152 def handler(self, req): | |
1153 """Special handler for WSGI.""" | |
1154 if req.role != FCGI_RESPONDER: | |
1155 return FCGI_UNKNOWN_ROLE, 0 | |
1156 | |
1157 # Mostly taken from example CGI gateway. | |
1158 environ = req.params | |
1159 environ.update(self.environ) | |
1160 | |
1161 environ['wsgi.version'] = (1,0) | |
1162 environ['wsgi.input'] = req.stdin | |
1163 if self._bindAddress is None: | |
1164 stderr = req.stderr | |
1165 else: | |
1166 stderr = TeeOutputStream((sys.stderr, req.stderr)) | |
1167 environ['wsgi.errors'] = stderr | |
1168 environ['wsgi.multithread'] = not isinstance(req, CGIRequest) and \ | |
1169 thread_available and self.multithreaded | |
1170 # Rationale for the following: If started by the web server | |
1171 # (self._bindAddress is None) in either FastCGI or CGI mode, the | |
1172 # possibility of being spawned multiple times simultaneously is quite | |
1173 # real. And, if started as an external server, multiple copies may be | |
1174 # spawned for load-balancing/redundancy. (Though I don't think | |
1175 # mod_fastcgi supports this?) | |
1176 environ['wsgi.multiprocess'] = True | |
1177 environ['wsgi.run_once'] = isinstance(req, CGIRequest) | |
1178 | |
1179 if environ.get('HTTPS', 'off') in ('on', '1'): | |
1180 environ['wsgi.url_scheme'] = 'https' | |
1181 else: | |
1182 environ['wsgi.url_scheme'] = 'http' | |
1183 | |
1184 self._sanitizeEnv(environ) | |
1185 | |
1186 headers_set = [] | |
1187 headers_sent = [] | |
1188 result = None | |
1189 | |
1190 def write(data): | |
1191 assert type(data) is str, 'write() argument must be string' | |
1192 assert headers_set, 'write() before start_response()' | |
1193 | |
1194 if not headers_sent: | |
1195 status, responseHeaders = headers_sent[:] = headers_set | |
1196 found = False | |
1197 for header,value in responseHeaders: | |
1198 if header.lower() == 'content-length': | |
1199 found = True | |
1200 break | |
1201 if not found and result is not None: | |
1202 try: | |
1203 if len(result) == 1: | |
1204 responseHeaders.append(('Content-Length', | |
1205 str(len(data)))) | |
1206 except: | |
1207 pass | |
1208 s = 'Status: %s\r\n' % status | |
1209 for header in responseHeaders: | |
1210 s += '%s: %s\r\n' % header | |
1211 s += '\r\n' | |
1212 req.stdout.write(s) | |
1213 | |
1214 req.stdout.write(data) | |
1215 req.stdout.flush() | |
1216 | |
1217 def start_response(status, response_headers, exc_info=None): | |
1218 if exc_info: | |
1219 try: | |
1220 if headers_sent: | |
1221 # Re-raise if too late | |
1222 raise exc_info[0], exc_info[1], exc_info[2] | |
1223 finally: | |
1224 exc_info = None # avoid dangling circular ref | |
1225 else: | |
1226 assert not headers_set, 'Headers already set!' | |
1227 | |
1228 assert type(status) is str, 'Status must be a string' | |
1229 assert len(status) >= 4, 'Status must be at least 4 characters' | |
1230 assert int(status[:3]), 'Status must begin with 3-digit code' | |
1231 assert status[3] == ' ', 'Status must have a space after code' | |
1232 assert type(response_headers) is list, 'Headers must be a list' | |
1233 if __debug__: | |
1234 for name,val in response_headers: | |
1235 assert type(name) is str, 'Header names must be strings' | |
1236 assert type(val) is str, 'Header values must be strings' | |
1237 | |
1238 headers_set[:] = [status, response_headers] | |
1239 return write | |
1240 | |
1241 if not self.multithreaded: | |
1242 self._app_lock.acquire() | |
1243 try: | |
1244 result = self.application(environ, start_response) | |
1245 try: | |
1246 for data in result: | |
1247 if data: | |
1248 write(data) | |
1249 if not headers_sent: | |
1250 write('') # in case body was empty | |
1251 finally: | |
1252 if hasattr(result, 'close'): | |
1253 result.close() | |
1254 finally: | |
1255 if not self.multithreaded: | |
1256 self._app_lock.release() | |
1257 | |
1258 return FCGI_REQUEST_COMPLETE, 0 | |
1259 | |
1260 def _sanitizeEnv(self, environ): | |
1261 """Ensure certain values are present, if required by WSGI.""" | |
1262 if not environ.has_key('SCRIPT_NAME'): | |
1263 environ['SCRIPT_NAME'] = '' | |
1264 if not environ.has_key('PATH_INFO'): | |
1265 environ['PATH_INFO'] = '' | |
1266 | |
1267 # If any of these are missing, it probably signifies a broken | |
1268 # server... | |
1269 for name,default in [('REQUEST_METHOD', 'GET'), | |
1270 ('SERVER_NAME', 'localhost'), | |
1271 ('SERVER_PORT', '80'), | |
1272 ('SERVER_PROTOCOL', 'HTTP/1.0')]: | |
1273 if not environ.has_key(name): | |
1274 environ['wsgi.errors'].write('%s: missing FastCGI param %s ' | |
1275 'required by WSGI!\n' % | |
1276 (self.__class__.__name__, name)) | |
1277 environ[name] = default | |
1278 | |
1279 if __name__ == '__main__': | |
1280 def test_app(environ, start_response): | |
1281 """Probably not the most efficient example.""" | |
1282 import cgi | |
1283 start_response('200 OK', [('Content-Type', 'text/html')]) | |
1284 yield '<html><head><title>Hello World!</title></head>\n' \ | |
1285 '<body>\n' \ | |
1286 '<p>Hello World!</p>\n' \ | |
1287 '<table border="1">' | |
1288 names = environ.keys() | |
1289 names.sort() | |
1290 for name in names: | |
1291 yield '<tr><td>%s</td><td>%s</td></tr>\n' % ( | |
1292 name, cgi.escape(`environ[name]`)) | |
1293 | |
1294 form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ, | |
1295 keep_blank_values=1) | |
1296 if form.list: | |
1297 yield '<tr><th colspan="2">Form data</th></tr>' | |
1298 | |
1299 for field in form.list: | |
1300 yield '<tr><td>%s</td><td>%s</td></tr>\n' % ( | |
1301 field.name, field.value) | |
1302 | |
1303 yield '</table>\n' \ | |
1304 '</body></html>\n' | |
1305 | |
1306 WSGIServer(test_app).run() |