comparison examples/trac/trac/web/_fcgi.py @ 39:93b4dcbafd7b trunk

Copy Trac to main branch.
author cmlenz
date Mon, 03 Jul 2006 18:53:27 +0000
parents
children
comparison
equal deleted inserted replaced
38:ee669cb9cccc 39:93b4dcbafd7b
1 # -*- coding: iso-8859-1 -*-
2 #
3 # Copyright (c) 2002, 2003, 2005 Allan Saddi <allan@saddi.com>
4 # All rights reserved.
5 #
6 # This software is licensed as described in the file COPYING, which
7 # you should have received as part of this distribution. The terms
8 # are also available at http://trac.edgewall.com/license.html.
9 #
10 # This software consists of voluntary contributions made by many
11 # individuals. For the exact contribution history, see the revision
12 # history and logs, available at http://projects.edgewall.com/trac/.
13 #
14 # Author: Allan Saddi <allan@saddi.com>
15
16 """
17 fcgi - a FastCGI/WSGI gateway.
18
19 For more information about FastCGI, see <http://www.fastcgi.com/>.
20
21 For more information about the Web Server Gateway Interface, see
22 <http://www.python.org/peps/pep-0333.html>.
23
24 Example usage:
25
26 #!/usr/bin/env python
27 from myapplication import app # Assume app is your WSGI application object
28 from fcgi import WSGIServer
29 WSGIServer(app).run()
30
31 See the documentation for WSGIServer/Server for more information.
32
33 On most platforms, fcgi will fallback to regular CGI behavior if run in a
34 non-FastCGI context. If you want to force CGI behavior, set the environment
35 variable FCGI_FORCE_CGI to "Y" or "y".
36 """
37
38 __author__ = 'Allan Saddi <allan@saddi.com>'
39 __version__ = '$Revision: 1797 $'
40
41 import sys
42 import os
43 import signal
44 import struct
45 import StringIO
46 import select
47 import socket
48 import errno
49 import traceback
50
51 try:
52 import thread
53 import threading
54 thread_available = True
55 except ImportError:
56 import dummy_thread as thread
57 import dummy_threading as threading
58 thread_available = False
59
60 __all__ = ['WSGIServer']
61
62 # Constants from the spec.
63 FCGI_LISTENSOCK_FILENO = 0
64
65 FCGI_HEADER_LEN = 8
66
67 FCGI_VERSION_1 = 1
68
69 FCGI_BEGIN_REQUEST = 1
70 FCGI_ABORT_REQUEST = 2
71 FCGI_END_REQUEST = 3
72 FCGI_PARAMS = 4
73 FCGI_STDIN = 5
74 FCGI_STDOUT = 6
75 FCGI_STDERR = 7
76 FCGI_DATA = 8
77 FCGI_GET_VALUES = 9
78 FCGI_GET_VALUES_RESULT = 10
79 FCGI_UNKNOWN_TYPE = 11
80 FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
81
82 FCGI_NULL_REQUEST_ID = 0
83
84 FCGI_KEEP_CONN = 1
85
86 FCGI_RESPONDER = 1
87 FCGI_AUTHORIZER = 2
88 FCGI_FILTER = 3
89
90 FCGI_REQUEST_COMPLETE = 0
91 FCGI_CANT_MPX_CONN = 1
92 FCGI_OVERLOADED = 2
93 FCGI_UNKNOWN_ROLE = 3
94
95 FCGI_MAX_CONNS = 'FCGI_MAX_CONNS'
96 FCGI_MAX_REQS = 'FCGI_MAX_REQS'
97 FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS'
98
99 FCGI_Header = '!BBHHBx'
100 FCGI_BeginRequestBody = '!HB5x'
101 FCGI_EndRequestBody = '!LB3x'
102 FCGI_UnknownTypeBody = '!B7x'
103
104 FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody)
105 FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody)
106
107 if __debug__:
108 import time
109
110 # Set non-zero to write debug output to a file.
111 DEBUG = 0
112 DEBUGLOG = '/tmp/fcgi.log'
113
114 def _debug(level, msg):
115 if DEBUG < level:
116 return
117
118 try:
119 f = open(DEBUGLOG, 'a')
120 f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg))
121 f.close()
122 except:
123 pass
124
125 class InputStream(object):
126 """
127 File-like object representing FastCGI input streams (FCGI_STDIN and
128 FCGI_DATA). Supports the minimum methods required by WSGI spec.
129 """
130 def __init__(self, conn):
131 self._conn = conn
132
133 # See Server.
134 self._shrinkThreshold = conn.server.inputStreamShrinkThreshold
135
136 self._buf = ''
137 self._bufList = []
138 self._pos = 0 # Current read position.
139 self._avail = 0 # Number of bytes currently available.
140
141 self._eof = False # True when server has sent EOF notification.
142
143 def _shrinkBuffer(self):
144 """Gets rid of already read data (since we can't rewind)."""
145 if self._pos >= self._shrinkThreshold:
146 self._buf = self._buf[self._pos:]
147 self._avail -= self._pos
148 self._pos = 0
149
150 assert self._avail >= 0
151
152 def _waitForData(self):
153 """Waits for more data to become available."""
154 self._conn.process_input()
155
156 def read(self, n=-1):
157 if self._pos == self._avail and self._eof:
158 return ''
159 while True:
160 if n < 0 or (self._avail - self._pos) < n:
161 # Not enough data available.
162 if self._eof:
163 # And there's no more coming.
164 newPos = self._avail
165 break
166 else:
167 # Wait for more data.
168 self._waitForData()
169 continue
170 else:
171 newPos = self._pos + n
172 break
173 # Merge buffer list, if necessary.
174 if self._bufList:
175 self._buf += ''.join(self._bufList)
176 self._bufList = []
177 r = self._buf[self._pos:newPos]
178 self._pos = newPos
179 self._shrinkBuffer()
180 return r
181
182 def readline(self, length=None):
183 if self._pos == self._avail and self._eof:
184 return ''
185 while True:
186 # Unfortunately, we need to merge the buffer list early.
187 if self._bufList:
188 self._buf += ''.join(self._bufList)
189 self._bufList = []
190 # Find newline.
191 i = self._buf.find('\n', self._pos)
192 if i < 0:
193 # Not found?
194 if self._eof:
195 # No more data coming.
196 newPos = self._avail
197 break
198 else:
199 # Wait for more to come.
200 self._waitForData()
201 continue
202 else:
203 newPos = i + 1
204 break
205 if length is not None:
206 if self._pos + length < newPos:
207 newPos = self._pos + length
208 r = self._buf[self._pos:newPos]
209 self._pos = newPos
210 self._shrinkBuffer()
211 return r
212
213 def readlines(self, sizehint=0):
214 total = 0
215 lines = []
216 line = self.readline()
217 while line:
218 lines.append(line)
219 total += len(line)
220 if 0 < sizehint <= total:
221 break
222 line = self.readline()
223 return lines
224
225 def __iter__(self):
226 return self
227
228 def next(self):
229 r = self.readline()
230 if not r:
231 raise StopIteration
232 return r
233
234 def add_data(self, data):
235 if not data:
236 self._eof = True
237 else:
238 self._bufList.append(data)
239 self._avail += len(data)
240
241 class MultiplexedInputStream(InputStream):
242 """
243 A version of InputStream meant to be used with MultiplexedConnections.
244 Assumes the MultiplexedConnection (the producer) and the Request
245 (the consumer) are running in different threads.
246 """
247 def __init__(self, conn):
248 super(MultiplexedInputStream, self).__init__(conn)
249
250 # Arbitrates access to this InputStream (it's used simultaneously
251 # by a Request and its owning Connection object).
252 lock = threading.RLock()
253
254 # Notifies Request thread that there is new data available.
255 self._lock = threading.Condition(lock)
256
257 def _waitForData(self):
258 # Wait for notification from add_data().
259 self._lock.wait()
260
261 def read(self, n=-1):
262 self._lock.acquire()
263 try:
264 return super(MultiplexedInputStream, self).read(n)
265 finally:
266 self._lock.release()
267
268 def readline(self, length=None):
269 self._lock.acquire()
270 try:
271 return super(MultiplexedInputStream, self).readline(length)
272 finally:
273 self._lock.release()
274
275 def add_data(self, data):
276 self._lock.acquire()
277 try:
278 super(MultiplexedInputStream, self).add_data(data)
279 self._lock.notify()
280 finally:
281 self._lock.release()
282
283 class OutputStream(object):
284 """
285 FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to
286 write() or writelines() immediately result in Records being sent back
287 to the server. Buffering should be done in a higher level!
288 """
289 def __init__(self, conn, req, type, buffered=False):
290 self._conn = conn
291 self._req = req
292 self._type = type
293 self._buffered = buffered
294 self._bufList = [] # Used if buffered is True
295 self.dataWritten = False
296 self.closed = False
297
298 def _write(self, data):
299 length = len(data)
300 while length:
301 toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN)
302
303 rec = Record(self._type, self._req.requestId)
304 rec.contentLength = toWrite
305 rec.contentData = data[:toWrite]
306 self._conn.writeRecord(rec)
307
308 data = data[toWrite:]
309 length -= toWrite
310
311 def write(self, data):
312 assert not self.closed
313
314 if not data:
315 return
316
317 self.dataWritten = True
318
319 if self._buffered:
320 self._bufList.append(data)
321 else:
322 self._write(data)
323
324 def writelines(self, lines):
325 assert not self.closed
326
327 for line in lines:
328 self.write(line)
329
330 def flush(self):
331 # Only need to flush if this OutputStream is actually buffered.
332 if self._buffered:
333 data = ''.join(self._bufList)
334 self._bufList = []
335 self._write(data)
336
337 # Though available, the following should NOT be called by WSGI apps.
338 def close(self):
339 """Sends end-of-stream notification, if necessary."""
340 if not self.closed and self.dataWritten:
341 self.flush()
342 rec = Record(self._type, self._req.requestId)
343 self._conn.writeRecord(rec)
344 self.closed = True
345
346 class TeeOutputStream(object):
347 """
348 Simple wrapper around two or more output file-like objects that copies
349 written data to all streams.
350 """
351 def __init__(self, streamList):
352 self._streamList = streamList
353
354 def write(self, data):
355 for f in self._streamList:
356 f.write(data)
357
358 def writelines(self, lines):
359 for line in lines:
360 self.write(line)
361
362 def flush(self):
363 for f in self._streamList:
364 f.flush()
365
366 class StdoutWrapper(object):
367 """
368 Wrapper for sys.stdout so we know if data has actually been written.
369 """
370 def __init__(self, stdout):
371 self._file = stdout
372 self.dataWritten = False
373
374 def write(self, data):
375 if data:
376 self.dataWritten = True
377 self._file.write(data)
378
379 def writelines(self, lines):
380 for line in lines:
381 self.write(line)
382
383 def __getattr__(self, name):
384 return getattr(self._file, name)
385
386 def decode_pair(s, pos=0):
387 """
388 Decodes a name/value pair.
389
390 The number of bytes decoded as well as the name/value pair
391 are returned.
392 """
393 nameLength = ord(s[pos])
394 if nameLength & 128:
395 nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
396 pos += 4
397 else:
398 pos += 1
399
400 valueLength = ord(s[pos])
401 if valueLength & 128:
402 valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
403 pos += 4
404 else:
405 pos += 1
406
407 name = s[pos:pos+nameLength]
408 pos += nameLength
409 value = s[pos:pos+valueLength]
410 pos += valueLength
411
412 return (pos, (name, value))
413
414 def encode_pair(name, value):
415 """
416 Encodes a name/value pair.
417
418 The encoded string is returned.
419 """
420 nameLength = len(name)
421 if nameLength < 128:
422 s = chr(nameLength)
423 else:
424 s = struct.pack('!L', nameLength | 0x80000000L)
425
426 valueLength = len(value)
427 if valueLength < 128:
428 s += chr(valueLength)
429 else:
430 s += struct.pack('!L', valueLength | 0x80000000L)
431
432 return s + name + value
433
434 class Record(object):
435 """
436 A FastCGI Record.
437
438 Used for encoding/decoding records.
439 """
440 def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID):
441 self.version = FCGI_VERSION_1
442 self.type = type
443 self.requestId = requestId
444 self.contentLength = 0
445 self.paddingLength = 0
446 self.contentData = ''
447
448 def _recvall(sock, length):
449 """
450 Attempts to receive length bytes from a socket, blocking if necessary.
451 (Socket may be blocking or non-blocking.)
452 """
453 dataList = []
454 recvLen = 0
455 while length:
456 try:
457 data = sock.recv(length)
458 except socket.error, e:
459 if e[0] == errno.EAGAIN:
460 select.select([sock], [], [])
461 continue
462 else:
463 raise
464 if not data: # EOF
465 break
466 dataList.append(data)
467 dataLen = len(data)
468 recvLen += dataLen
469 length -= dataLen
470 return ''.join(dataList), recvLen
471 _recvall = staticmethod(_recvall)
472
473 def read(self, sock):
474 """Read and decode a Record from a socket."""
475 try:
476 header, length = self._recvall(sock, FCGI_HEADER_LEN)
477 except:
478 raise EOFError
479
480 if length < FCGI_HEADER_LEN:
481 raise EOFError
482
483 self.version, self.type, self.requestId, self.contentLength, \
484 self.paddingLength = struct.unpack(FCGI_Header, header)
485
486 if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, '
487 'contentLength = %d' %
488 (sock.fileno(), self.type, self.requestId,
489 self.contentLength))
490
491 if self.contentLength:
492 try:
493 self.contentData, length = self._recvall(sock,
494 self.contentLength)
495 except:
496 raise EOFError
497
498 if length < self.contentLength:
499 raise EOFError
500
501 if self.paddingLength:
502 try:
503 self._recvall(sock, self.paddingLength)
504 except:
505 raise EOFError
506
507 def _sendall(sock, data):
508 """
509 Writes data to a socket and does not return until all the data is sent.
510 """
511 length = len(data)
512 while length:
513 try:
514 sent = sock.send(data)
515 except socket.error, e:
516 if e[0] == errno.EPIPE:
517 return # Don't bother raising an exception. Just ignore.
518 elif e[0] == errno.EAGAIN:
519 select.select([], [sock], [])
520 continue
521 else:
522 raise
523 data = data[sent:]
524 length -= sent
525 _sendall = staticmethod(_sendall)
526
527 def write(self, sock):
528 """Encode and write a Record to a socket."""
529 self.paddingLength = -self.contentLength & 7
530
531 if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, '
532 'contentLength = %d' %
533 (sock.fileno(), self.type, self.requestId,
534 self.contentLength))
535
536 header = struct.pack(FCGI_Header, self.version, self.type,
537 self.requestId, self.contentLength,
538 self.paddingLength)
539 self._sendall(sock, header)
540 if self.contentLength:
541 self._sendall(sock, self.contentData)
542 if self.paddingLength:
543 self._sendall(sock, '\x00'*self.paddingLength)
544
545 class Request(object):
546 """
547 Represents a single FastCGI request.
548
549 These objects are passed to your handler and is the main interface
550 between your handler and the fcgi module. The methods should not
551 be called by your handler. However, server, params, stdin, stdout,
552 stderr, and data are free for your handler's use.
553 """
554 def __init__(self, conn, inputStreamClass):
555 self._conn = conn
556
557 self.server = conn.server
558 self.params = {}
559 self.stdin = inputStreamClass(conn)
560 self.stdout = OutputStream(conn, self, FCGI_STDOUT)
561 self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True)
562 self.data = inputStreamClass(conn)
563
564 def run(self):
565 """Runs the handler, flushes the streams, and ends the request."""
566 try:
567 protocolStatus, appStatus = self.server.handler(self)
568 except:
569 traceback.print_exc(file=self.stderr)
570 self.stderr.flush()
571 if not self.stdout.dataWritten:
572 self.server.error(self)
573
574 protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0
575
576 if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' %
577 (protocolStatus, appStatus))
578
579 self._flush()
580 self._end(appStatus, protocolStatus)
581
582 def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
583 self._conn.end_request(self, appStatus, protocolStatus)
584
585 def _flush(self):
586 self.stdout.close()
587 self.stderr.close()
588
589 class CGIRequest(Request):
590 """A normal CGI request disguised as a FastCGI request."""
591 def __init__(self, server):
592 # These are normally filled in by Connection.
593 self.requestId = 1
594 self.role = FCGI_RESPONDER
595 self.flags = 0
596 self.aborted = False
597
598 self.server = server
599 self.params = dict(os.environ)
600 self.stdin = sys.stdin
601 self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity!
602 self.stderr = sys.stderr
603 self.data = StringIO.StringIO()
604
605 def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
606 sys.exit(appStatus)
607
608 def _flush(self):
609 # Not buffered, do nothing.
610 pass
611
612 class Connection(object):
613 """
614 A Connection with the web server.
615
616 Each Connection is associated with a single socket (which is
617 connected to the web server) and is responsible for handling all
618 the FastCGI message processing for that socket.
619 """
620 _multiplexed = False
621 _inputStreamClass = InputStream
622
623 def __init__(self, sock, addr, server):
624 self._sock = sock
625 self._addr = addr
626 self.server = server
627
628 # Active Requests for this Connection, mapped by request ID.
629 self._requests = {}
630
631 def _cleanupSocket(self):
632 """Close the Connection's socket."""
633 try:
634 self._sock.shutdown(socket.SHUT_WR)
635 except:
636 return
637 try:
638 while True:
639 r, w, e = select.select([self._sock], [], [])
640 if not r or not self._sock.recv(1024):
641 break
642 except:
643 pass
644 self._sock.close()
645
646 def run(self):
647 """Begin processing data from the socket."""
648 self._keepGoing = True
649 while self._keepGoing:
650 try:
651 self.process_input()
652 except EOFError:
653 break
654 except (select.error, socket.error), e:
655 if e[0] == errno.EBADF: # Socket was closed by Request.
656 break
657 raise
658
659 self._cleanupSocket()
660
661 def process_input(self):
662 """Attempt to read a single Record from the socket and process it."""
663 # Currently, any children Request threads notify this Connection
664 # that it is no longer needed by closing the Connection's socket.
665 # We need to put a timeout on select, otherwise we might get
666 # stuck in it indefinitely... (I don't like this solution.)
667 while self._keepGoing:
668 try:
669 r, w, e = select.select([self._sock], [], [], 1.0)
670 except ValueError:
671 # Sigh. ValueError gets thrown sometimes when passing select
672 # a closed socket.
673 raise EOFError
674 if r: break
675 if not self._keepGoing:
676 return
677 rec = Record()
678 rec.read(self._sock)
679
680 if rec.type == FCGI_GET_VALUES:
681 self._do_get_values(rec)
682 elif rec.type == FCGI_BEGIN_REQUEST:
683 self._do_begin_request(rec)
684 elif rec.type == FCGI_ABORT_REQUEST:
685 self._do_abort_request(rec)
686 elif rec.type == FCGI_PARAMS:
687 self._do_params(rec)
688 elif rec.type == FCGI_STDIN:
689 self._do_stdin(rec)
690 elif rec.type == FCGI_DATA:
691 self._do_data(rec)
692 elif rec.requestId == FCGI_NULL_REQUEST_ID:
693 self._do_unknown_type(rec)
694 else:
695 # Need to complain about this.
696 pass
697
698 def writeRecord(self, rec):
699 """
700 Write a Record to the socket.
701 """
702 rec.write(self._sock)
703
704 def end_request(self, req, appStatus=0L,
705 protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
706 """
707 End a Request.
708
709 Called by Request objects. An FCGI_END_REQUEST Record is
710 sent to the web server. If the web server no longer requires
711 the connection, the socket is closed, thereby ending this
712 Connection (run() returns).
713 """
714 rec = Record(FCGI_END_REQUEST, req.requestId)
715 rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus,
716 protocolStatus)
717 rec.contentLength = FCGI_EndRequestBody_LEN
718 self.writeRecord(rec)
719
720 if remove:
721 del self._requests[req.requestId]
722
723 if __debug__: _debug(2, 'end_request: flags = %d' % req.flags)
724
725 if not (req.flags & FCGI_KEEP_CONN) and not self._requests:
726 self._cleanupSocket()
727 self._keepGoing = False
728
729 def _do_get_values(self, inrec):
730 """Handle an FCGI_GET_VALUES request from the web server."""
731 outrec = Record(FCGI_GET_VALUES_RESULT)
732
733 pos = 0
734 while pos < inrec.contentLength:
735 pos, (name, value) = decode_pair(inrec.contentData, pos)
736 cap = self.server.capability.get(name)
737 if cap is not None:
738 outrec.contentData += encode_pair(name, str(cap))
739
740 outrec.contentLength = len(outrec.contentData)
741 self.writeRecord(rec)
742
743 def _do_begin_request(self, inrec):
744 """Handle an FCGI_BEGIN_REQUEST from the web server."""
745 role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData)
746
747 req = self.server.request_class(self, self._inputStreamClass)
748 req.requestId, req.role, req.flags = inrec.requestId, role, flags
749 req.aborted = False
750
751 if not self._multiplexed and self._requests:
752 # Can't multiplex requests.
753 self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False)
754 else:
755 self._requests[inrec.requestId] = req
756
757 def _do_abort_request(self, inrec):
758 """
759 Handle an FCGI_ABORT_REQUEST from the web server.
760
761 We just mark a flag in the associated Request.
762 """
763 req = self._requests.get(inrec.requestId)
764 if req is not None:
765 req.aborted = True
766
767 def _start_request(self, req):
768 """Run the request."""
769 # Not multiplexed, so run it inline.
770 req.run()
771
772 def _do_params(self, inrec):
773 """
774 Handle an FCGI_PARAMS Record.
775
776 If the last FCGI_PARAMS Record is received, start the request.
777 """
778 req = self._requests.get(inrec.requestId)
779 if req is not None:
780 if inrec.contentLength:
781 pos = 0
782 while pos < inrec.contentLength:
783 pos, (name, value) = decode_pair(inrec.contentData, pos)
784 req.params[name] = value
785 else:
786 self._start_request(req)
787
788 def _do_stdin(self, inrec):
789 """Handle the FCGI_STDIN stream."""
790 req = self._requests.get(inrec.requestId)
791 if req is not None:
792 req.stdin.add_data(inrec.contentData)
793
794 def _do_data(self, inrec):
795 """Handle the FCGI_DATA stream."""
796 req = self._requests.get(inrec.requestId)
797 if req is not None:
798 req.data.add_data(inrec.contentData)
799
800 def _do_unknown_type(self, inrec):
801 """Handle an unknown request type. Respond accordingly."""
802 outrec = Record(FCGI_UNKNOWN_TYPE)
803 outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type)
804 outrec.contentLength = FCGI_UnknownTypeBody_LEN
805 self.writeRecord(rec)
806
807 class MultiplexedConnection(Connection):
808 """
809 A version of Connection capable of handling multiple requests
810 simultaneously.
811 """
812 _multiplexed = True
813 _inputStreamClass = MultiplexedInputStream
814
815 def __init__(self, sock, addr, server):
816 super(MultiplexedConnection, self).__init__(sock, addr, server)
817
818 # Used to arbitrate access to self._requests.
819 lock = threading.RLock()
820
821 # Notification is posted everytime a request completes, allowing us
822 # to quit cleanly.
823 self._lock = threading.Condition(lock)
824
825 def _cleanupSocket(self):
826 # Wait for any outstanding requests before closing the socket.
827 self._lock.acquire()
828 while self._requests:
829 self._lock.wait()
830 self._lock.release()
831
832 super(MultiplexedConnection, self)._cleanupSocket()
833
834 def writeRecord(self, rec):
835 # Must use locking to prevent intermingling of Records from different
836 # threads.
837 self._lock.acquire()
838 try:
839 # Probably faster than calling super. ;)
840 rec.write(self._sock)
841 finally:
842 self._lock.release()
843
844 def end_request(self, req, appStatus=0L,
845 protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
846 self._lock.acquire()
847 try:
848 super(MultiplexedConnection, self).end_request(req, appStatus,
849 protocolStatus,
850 remove)
851 self._lock.notify()
852 finally:
853 self._lock.release()
854
855 def _do_begin_request(self, inrec):
856 self._lock.acquire()
857 try:
858 super(MultiplexedConnection, self)._do_begin_request(inrec)
859 finally:
860 self._lock.release()
861
862 def _do_abort_request(self, inrec):
863 self._lock.acquire()
864 try:
865 super(MultiplexedConnection, self)._do_abort_request(inrec)
866 finally:
867 self._lock.release()
868
869 def _start_request(self, req):
870 thread.start_new_thread(req.run, ())
871
872 def _do_params(self, inrec):
873 self._lock.acquire()
874 try:
875 super(MultiplexedConnection, self)._do_params(inrec)
876 finally:
877 self._lock.release()
878
879 def _do_stdin(self, inrec):
880 self._lock.acquire()
881 try:
882 super(MultiplexedConnection, self)._do_stdin(inrec)
883 finally:
884 self._lock.release()
885
886 def _do_data(self, inrec):
887 self._lock.acquire()
888 try:
889 super(MultiplexedConnection, self)._do_data(inrec)
890 finally:
891 self._lock.release()
892
893 class Server(object):
894 """
895 The FastCGI server.
896
897 Waits for connections from the web server, processing each
898 request.
899
900 If run in a normal CGI context, it will instead instantiate a
901 CGIRequest and run the handler through there.
902 """
903 request_class = Request
904 cgirequest_class = CGIRequest
905
906 # Limits the size of the InputStream's string buffer to this size + the
907 # server's maximum Record size. Since the InputStream is not seekable,
908 # we throw away already-read data once this certain amount has been read.
909 inputStreamShrinkThreshold = 102400 - 8192
910
911 def __init__(self, handler=None, maxwrite=8192, bindAddress=None,
912 multiplexed=False):
913 """
914 handler, if present, must reference a function or method that
915 takes one argument: a Request object. If handler is not
916 specified at creation time, Server *must* be subclassed.
917 (The handler method below is abstract.)
918
919 maxwrite is the maximum number of bytes (per Record) to write
920 to the server. I've noticed mod_fastcgi has a relatively small
921 receive buffer (8K or so).
922
923 bindAddress, if present, must either be a string or a 2-tuple. If
924 present, run() will open its own listening socket. You would use
925 this if you wanted to run your application as an 'external' FastCGI
926 app. (i.e. the webserver would no longer be responsible for starting
927 your app) If a string, it will be interpreted as a filename and a UNIX
928 socket will be opened. If a tuple, the first element, a string,
929 is the interface name/IP to bind to, and the second element (an int)
930 is the port number.
931
932 Set multiplexed to True if you want to handle multiple requests
933 per connection. Some FastCGI backends (namely mod_fastcgi) don't
934 multiplex requests at all, so by default this is off (which saves
935 on thread creation/locking overhead). If threads aren't available,
936 this keyword is ignored; it's not possible to multiplex requests
937 at all.
938 """
939 if handler is not None:
940 self.handler = handler
941 self.maxwrite = maxwrite
942 if thread_available:
943 try:
944 import resource
945 # Attempt to glean the maximum number of connections
946 # from the OS.
947 maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
948 except ImportError:
949 maxConns = 100 # Just some made up number.
950 maxReqs = maxConns
951 if multiplexed:
952 self._connectionClass = MultiplexedConnection
953 maxReqs *= 5 # Another made up number.
954 else:
955 self._connectionClass = Connection
956 self.capability = {
957 FCGI_MAX_CONNS: maxConns,
958 FCGI_MAX_REQS: maxReqs,
959 FCGI_MPXS_CONNS: multiplexed and 1 or 0
960 }
961 else:
962 self._connectionClass = Connection
963 self.capability = {
964 # If threads aren't available, these are pretty much correct.
965 FCGI_MAX_CONNS: 1,
966 FCGI_MAX_REQS: 1,
967 FCGI_MPXS_CONNS: 0
968 }
969 self._bindAddress = bindAddress
970
971 def _setupSocket(self):
972 if self._bindAddress is None: # Run as a normal FastCGI?
973 isFCGI = True
974
975 sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET,
976 socket.SOCK_STREAM)
977 try:
978 sock.getpeername()
979 except socket.error, e:
980 if e[0] == errno.ENOTSOCK:
981 # Not a socket, assume CGI context.
982 isFCGI = False
983 elif e[0] != errno.ENOTCONN:
984 raise
985
986 # FastCGI/CGI discrimination is broken on Mac OS X.
987 # Set the environment variable FCGI_FORCE_CGI to "Y" or "y"
988 # if you want to run your app as a simple CGI. (You can do
989 # this with Apache's mod_env [not loaded by default in OS X
990 # client, ha ha] and the SetEnv directive.)
991 if not isFCGI or \
992 os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y'):
993 req = self.cgirequest_class(self)
994 req.run()
995 sys.exit(0)
996 else:
997 # Run as a server
998 if type(self._bindAddress) is str:
999 # Unix socket
1000 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1001 try:
1002 os.unlink(self._bindAddress)
1003 except OSError:
1004 pass
1005 else:
1006 # INET socket
1007 assert type(self._bindAddress) is tuple
1008 assert len(self._bindAddress) == 2
1009 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1010 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1011
1012 sock.bind(self._bindAddress)
1013 sock.listen(socket.SOMAXCONN)
1014
1015 return sock
1016
1017 def _cleanupSocket(self, sock):
1018 """Closes the main socket."""
1019 sock.close()
1020
1021 def _installSignalHandlers(self):
1022 self._oldSIGs = [(x,signal.getsignal(x)) for x in
1023 (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)]
1024 signal.signal(signal.SIGHUP, self._hupHandler)
1025 signal.signal(signal.SIGINT, self._intHandler)
1026 signal.signal(signal.SIGTERM, self._intHandler)
1027
1028 def _restoreSignalHandlers(self):
1029 for signum,handler in self._oldSIGs:
1030 signal.signal(signum, handler)
1031
1032 def _hupHandler(self, signum, frame):
1033 self._hupReceived = True
1034 self._keepGoing = False
1035
1036 def _intHandler(self, signum, frame):
1037 self._keepGoing = False
1038
1039 def run(self, timeout=1.0):
1040 """
1041 The main loop. Exits on SIGHUP, SIGINT, SIGTERM. Returns True if
1042 SIGHUP was received, False otherwise.
1043 """
1044 web_server_addrs = os.environ.get('FCGI_WEB_SERVER_ADDRS')
1045 if web_server_addrs is not None:
1046 web_server_addrs = map(lambda x: x.strip(),
1047 web_server_addrs.split(','))
1048
1049 sock = self._setupSocket()
1050
1051 self._keepGoing = True
1052 self._hupReceived = False
1053
1054 # Install signal handlers.
1055 self._installSignalHandlers()
1056
1057 while self._keepGoing:
1058 try:
1059 r, w, e = select.select([sock], [], [], timeout)
1060 except select.error, e:
1061 if e[0] == errno.EINTR:
1062 continue
1063 raise
1064
1065 if r:
1066 try:
1067 clientSock, addr = sock.accept()
1068 except socket.error, e:
1069 if e[0] in (errno.EINTR, errno.EAGAIN):
1070 continue
1071 raise
1072
1073 if web_server_addrs and \
1074 (len(addr) != 2 or addr[0] not in web_server_addrs):
1075 clientSock.close()
1076 continue
1077
1078 # Instantiate a new Connection and begin processing FastCGI
1079 # messages (either in a new thread or this thread).
1080 conn = self._connectionClass(clientSock, addr, self)
1081 thread.start_new_thread(conn.run, ())
1082
1083 self._mainloopPeriodic()
1084
1085 # Restore signal handlers.
1086 self._restoreSignalHandlers()
1087
1088 self._cleanupSocket(sock)
1089
1090 return self._hupReceived
1091
1092 def _mainloopPeriodic(self):
1093 """
1094 Called with just about each iteration of the main loop. Meant to
1095 be overridden.
1096 """
1097 pass
1098
1099 def _exit(self, reload=False):
1100 """
1101 Protected convenience method for subclasses to force an exit. Not
1102 really thread-safe, which is why it isn't public.
1103 """
1104 if self._keepGoing:
1105 self._keepGoing = False
1106 self._hupReceived = reload
1107
1108 def handler(self, req):
1109 """
1110 Default handler, which just raises an exception. Unless a handler
1111 is passed at initialization time, this must be implemented by
1112 a subclass.
1113 """
1114 raise NotImplementedError, self.__class__.__name__ + '.handler'
1115
1116 def error(self, req):
1117 """
1118 Called by Request if an exception occurs within the handler. May and
1119 should be overridden.
1120 """
1121 import cgitb
1122 req.stdout.write('Content-Type: text/html\r\n\r\n' +
1123 cgitb.html(sys.exc_info()))
1124
1125 class WSGIServer(Server):
1126 """
1127 FastCGI server that supports the Web Server Gateway Interface. See
1128 <http://www.python.org/peps/pep-0333.html>.
1129 """
1130 def __init__(self, application, environ=None, multithreaded=True, **kw):
1131 """
1132 environ, if present, must be a dictionary-like object. Its
1133 contents will be copied into application's environ. Useful
1134 for passing application-specific variables.
1135
1136 Set multithreaded to False if your application is not MT-safe.
1137 """
1138 if kw.has_key('handler'):
1139 del kw['handler'] # Doesn't make sense to let this through
1140 super(WSGIServer, self).__init__(**kw)
1141
1142 if environ is None:
1143 environ = {}
1144
1145 self.application = application
1146 self.environ = environ
1147 self.multithreaded = multithreaded
1148
1149 # Used to force single-threadedness
1150 self._app_lock = thread.allocate_lock()
1151
1152 def handler(self, req):
1153 """Special handler for WSGI."""
1154 if req.role != FCGI_RESPONDER:
1155 return FCGI_UNKNOWN_ROLE, 0
1156
1157 # Mostly taken from example CGI gateway.
1158 environ = req.params
1159 environ.update(self.environ)
1160
1161 environ['wsgi.version'] = (1,0)
1162 environ['wsgi.input'] = req.stdin
1163 if self._bindAddress is None:
1164 stderr = req.stderr
1165 else:
1166 stderr = TeeOutputStream((sys.stderr, req.stderr))
1167 environ['wsgi.errors'] = stderr
1168 environ['wsgi.multithread'] = not isinstance(req, CGIRequest) and \
1169 thread_available and self.multithreaded
1170 # Rationale for the following: If started by the web server
1171 # (self._bindAddress is None) in either FastCGI or CGI mode, the
1172 # possibility of being spawned multiple times simultaneously is quite
1173 # real. And, if started as an external server, multiple copies may be
1174 # spawned for load-balancing/redundancy. (Though I don't think
1175 # mod_fastcgi supports this?)
1176 environ['wsgi.multiprocess'] = True
1177 environ['wsgi.run_once'] = isinstance(req, CGIRequest)
1178
1179 if environ.get('HTTPS', 'off') in ('on', '1'):
1180 environ['wsgi.url_scheme'] = 'https'
1181 else:
1182 environ['wsgi.url_scheme'] = 'http'
1183
1184 self._sanitizeEnv(environ)
1185
1186 headers_set = []
1187 headers_sent = []
1188 result = None
1189
1190 def write(data):
1191 assert type(data) is str, 'write() argument must be string'
1192 assert headers_set, 'write() before start_response()'
1193
1194 if not headers_sent:
1195 status, responseHeaders = headers_sent[:] = headers_set
1196 found = False
1197 for header,value in responseHeaders:
1198 if header.lower() == 'content-length':
1199 found = True
1200 break
1201 if not found and result is not None:
1202 try:
1203 if len(result) == 1:
1204 responseHeaders.append(('Content-Length',
1205 str(len(data))))
1206 except:
1207 pass
1208 s = 'Status: %s\r\n' % status
1209 for header in responseHeaders:
1210 s += '%s: %s\r\n' % header
1211 s += '\r\n'
1212 req.stdout.write(s)
1213
1214 req.stdout.write(data)
1215 req.stdout.flush()
1216
1217 def start_response(status, response_headers, exc_info=None):
1218 if exc_info:
1219 try:
1220 if headers_sent:
1221 # Re-raise if too late
1222 raise exc_info[0], exc_info[1], exc_info[2]
1223 finally:
1224 exc_info = None # avoid dangling circular ref
1225 else:
1226 assert not headers_set, 'Headers already set!'
1227
1228 assert type(status) is str, 'Status must be a string'
1229 assert len(status) >= 4, 'Status must be at least 4 characters'
1230 assert int(status[:3]), 'Status must begin with 3-digit code'
1231 assert status[3] == ' ', 'Status must have a space after code'
1232 assert type(response_headers) is list, 'Headers must be a list'
1233 if __debug__:
1234 for name,val in response_headers:
1235 assert type(name) is str, 'Header names must be strings'
1236 assert type(val) is str, 'Header values must be strings'
1237
1238 headers_set[:] = [status, response_headers]
1239 return write
1240
1241 if not self.multithreaded:
1242 self._app_lock.acquire()
1243 try:
1244 result = self.application(environ, start_response)
1245 try:
1246 for data in result:
1247 if data:
1248 write(data)
1249 if not headers_sent:
1250 write('') # in case body was empty
1251 finally:
1252 if hasattr(result, 'close'):
1253 result.close()
1254 finally:
1255 if not self.multithreaded:
1256 self._app_lock.release()
1257
1258 return FCGI_REQUEST_COMPLETE, 0
1259
1260 def _sanitizeEnv(self, environ):
1261 """Ensure certain values are present, if required by WSGI."""
1262 if not environ.has_key('SCRIPT_NAME'):
1263 environ['SCRIPT_NAME'] = ''
1264 if not environ.has_key('PATH_INFO'):
1265 environ['PATH_INFO'] = ''
1266
1267 # If any of these are missing, it probably signifies a broken
1268 # server...
1269 for name,default in [('REQUEST_METHOD', 'GET'),
1270 ('SERVER_NAME', 'localhost'),
1271 ('SERVER_PORT', '80'),
1272 ('SERVER_PROTOCOL', 'HTTP/1.0')]:
1273 if not environ.has_key(name):
1274 environ['wsgi.errors'].write('%s: missing FastCGI param %s '
1275 'required by WSGI!\n' %
1276 (self.__class__.__name__, name))
1277 environ[name] = default
1278
1279 if __name__ == '__main__':
1280 def test_app(environ, start_response):
1281 """Probably not the most efficient example."""
1282 import cgi
1283 start_response('200 OK', [('Content-Type', 'text/html')])
1284 yield '<html><head><title>Hello World!</title></head>\n' \
1285 '<body>\n' \
1286 '<p>Hello World!</p>\n' \
1287 '<table border="1">'
1288 names = environ.keys()
1289 names.sort()
1290 for name in names:
1291 yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
1292 name, cgi.escape(`environ[name]`))
1293
1294 form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
1295 keep_blank_values=1)
1296 if form.list:
1297 yield '<tr><th colspan="2">Form data</th></tr>'
1298
1299 for field in form.list:
1300 yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
1301 field.name, field.value)
1302
1303 yield '</table>\n' \
1304 '</body></html>\n'
1305
1306 WSGIServer(test_app).run()
Copyright (C) 2012-2017 Edgewall Software