added temporary static web server
This commit is contained in:
parent
e9327b1ebb
commit
d984ebbec4
1 changed files with 242 additions and 0 deletions
242
static.py
Normal file
242
static.py
Normal file
|
|
@ -0,0 +1,242 @@
|
|||
#!/usr/bin/env python2.4
|
||||
"""static - A stupidly simple WSGI way to serve static (or mixed) content.
|
||||
|
||||
(See the docstrings of the various functions and classes.)
|
||||
|
||||
Copyright (C) 2006-2009 Luke Arno - http://lukearno.com/
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation; either
|
||||
version 2.1 of the License, or (at your option) any later version.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to:
|
||||
|
||||
The Free Software Foundation, Inc.,
|
||||
51 Franklin Street, Fifth Floor,
|
||||
Boston, MA 02110-1301, USA.
|
||||
|
||||
Luke Arno can be found at http://lukearno.com/
|
||||
|
||||
"""
|
||||
|
||||
import mimetypes
|
||||
import rfc822
|
||||
import time
|
||||
import string
|
||||
import sys
|
||||
from os import path, stat, getcwd
|
||||
from wsgiref import util
|
||||
from wsgiref.headers import Headers
|
||||
from wsgiref.simple_server import make_server
|
||||
from optparse import OptionParser
|
||||
|
||||
try: from pkg_resources import resource_filename, Requirement
|
||||
except: pass
|
||||
|
||||
try: import kid
|
||||
except: pass
|
||||
|
||||
|
||||
class MagicError(Exception): pass
|
||||
|
||||
|
||||
class StatusApp:
|
||||
"""Used by WSGI apps to return some HTTP status."""
|
||||
|
||||
def __init__(self, status, message=None):
|
||||
self.status = status
|
||||
if message is None:
|
||||
self.message = status
|
||||
else:
|
||||
self.message = message
|
||||
|
||||
def __call__(self, environ, start_response, headers=[]):
|
||||
if self.message:
|
||||
Headers(headers).add_header('Content-type', 'text/plain')
|
||||
start_response(self.status, headers)
|
||||
if environ['REQUEST_METHOD'] == 'HEAD':
|
||||
return [""]
|
||||
else:
|
||||
return [self.message]
|
||||
|
||||
|
||||
class Cling(object):
|
||||
"""A stupidly simple way to serve static content via WSGI.
|
||||
|
||||
Serve the file of the same path as PATH_INFO in self.datadir.
|
||||
|
||||
Look up the Content-type in self.content_types by extension
|
||||
or use 'text/plain' if the extension is not found.
|
||||
|
||||
Serve up the contents of the file or delegate to self.not_found.
|
||||
"""
|
||||
|
||||
block_size = 16 * 4096
|
||||
index_file = 'index.html'
|
||||
not_found = StatusApp('404 Not Found')
|
||||
not_modified = StatusApp('304 Not Modified', "")
|
||||
moved_permanently = StatusApp('301 Moved Permanently')
|
||||
method_not_allowed = StatusApp('405 Method Not Allowed')
|
||||
|
||||
def __init__(self, root, **kw):
|
||||
"""Just set the root and any other attribs passes via **kw."""
|
||||
self.root = root
|
||||
for k, v in kw.iteritems():
|
||||
setattr(self, k, v)
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
"""Respond to a request when called in the usual WSGI way."""
|
||||
if environ['REQUEST_METHOD'] not in ('GET', 'HEAD'):
|
||||
headers = [('Allow', 'GET, HEAD')]
|
||||
return self.method_not_allowed(environ, start_response, headers)
|
||||
path_info = environ.get('PATH_INFO', '')
|
||||
full_path = self._full_path(path_info)
|
||||
if not self._is_under_root(full_path):
|
||||
return self.not_found(environ, start_response)
|
||||
if path.isdir(full_path):
|
||||
if full_path[-1] <> '/' or full_path == self.root:
|
||||
location = util.request_uri(environ, include_query=False) + '/'
|
||||
if environ.get('QUERY_STRING'):
|
||||
location += '?' + environ.get('QUERY_STRING')
|
||||
headers = [('Location', location)]
|
||||
return self.moved_permanently(environ, start_response, headers)
|
||||
else:
|
||||
full_path = self._full_path(path_info + self.index_file)
|
||||
content_type = self._guess_type(full_path)
|
||||
try:
|
||||
etag, last_modified = self._conditions(full_path, environ)
|
||||
headers = [('Date', rfc822.formatdate(time.time())),
|
||||
('Last-Modified', last_modified),
|
||||
('ETag', etag)]
|
||||
if_modified = environ.get('HTTP_IF_MODIFIED_SINCE')
|
||||
if if_modified and (rfc822.parsedate(if_modified)
|
||||
>= rfc822.parsedate(last_modified)):
|
||||
return self.not_modified(environ, start_response, headers)
|
||||
if_none = environ.get('HTTP_IF_NONE_MATCH')
|
||||
if if_none and (if_none == '*' or etag in if_none):
|
||||
return self.not_modified(environ, start_response, headers)
|
||||
file_like = self._file_like(full_path)
|
||||
headers.append(('Content-Type', content_type))
|
||||
start_response("200 OK", headers)
|
||||
if environ['REQUEST_METHOD'] == 'GET':
|
||||
return self._body(full_path, environ, file_like)
|
||||
else:
|
||||
return ['']
|
||||
except (IOError, OSError), e:
|
||||
print e
|
||||
return self.not_found(environ, start_response)
|
||||
|
||||
def _full_path(self, path_info):
|
||||
"""Return the full path from which to read."""
|
||||
return self.root + path_info
|
||||
|
||||
def _is_under_root(self, full_path):
|
||||
"""Guard against arbitrary file retrieval."""
|
||||
if (path.abspath(full_path) + path.sep)\
|
||||
.startswith(path.abspath(self.root) + path.sep):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _guess_type(self, full_path):
|
||||
"""Guess the mime type using the mimetypes module."""
|
||||
return mimetypes.guess_type(full_path)[0] or 'text/plain'
|
||||
|
||||
def _conditions(self, full_path, environ):
|
||||
"""Return a tuple of etag, last_modified by mtime from stat."""
|
||||
mtime = stat(full_path).st_mtime
|
||||
return str(mtime), rfc822.formatdate(mtime)
|
||||
|
||||
def _file_like(self, full_path):
|
||||
"""Return the appropriate file object."""
|
||||
return open(full_path, 'rb')
|
||||
|
||||
def _body(self, full_path, environ, file_like):
|
||||
"""Return an iterator over the body of the response."""
|
||||
way_to_send = environ.get('wsgi.file_wrapper', iter_and_close)
|
||||
return way_to_send(file_like, self.block_size)
|
||||
|
||||
|
||||
def iter_and_close(file_like, block_size):
|
||||
"""Yield file contents by block then close the file."""
|
||||
while 1:
|
||||
try:
|
||||
block = file_like.read(block_size)
|
||||
if block: yield block
|
||||
else: raise StopIteration
|
||||
except StopIteration, si:
|
||||
file_like.close()
|
||||
return
|
||||
|
||||
|
||||
def cling_wrap(package_name, dir_name, **kw):
|
||||
"""Return a Cling that serves from the given package and dir_name.
|
||||
|
||||
This uses pkg_resources.resource_filename which is not the
|
||||
recommended way, since it extracts the files.
|
||||
|
||||
I think this works fine unless you have some _very_ serious
|
||||
requirements for static content, in which case you probably
|
||||
shouldn't be serving it through a WSGI app, IMHO. YMMV.
|
||||
"""
|
||||
resource = Requirement.parse(package_name)
|
||||
return Cling(resource_filename(resource, dir_name), **kw)
|
||||
|
||||
|
||||
def command():
|
||||
parser = OptionParser(usage="%prog DIR [HOST][:][PORT]",
|
||||
version="static 0.3.6")
|
||||
options, args = parser.parse_args()
|
||||
if len(args) in (1, 2):
|
||||
if len(args) == 2:
|
||||
parts = args[1].split(":")
|
||||
if len(parts) == 1:
|
||||
host = parts[0]
|
||||
port = None
|
||||
elif len(parts) == 2:
|
||||
host, port = parts
|
||||
else:
|
||||
sys.exit("Invalid host:port specification.")
|
||||
elif len(args) == 1:
|
||||
host, port = None, None
|
||||
if not host:
|
||||
host = '0.0.0.0'
|
||||
if not port:
|
||||
port = 9999
|
||||
try:
|
||||
port = int(port)
|
||||
except:
|
||||
sys.exit("Invalid host:port specification.")
|
||||
app = Cling(args[0])
|
||||
try:
|
||||
make_server(host, port, app).serve_forever()
|
||||
except KeyboardInterrupt, ki:
|
||||
print "Cio, baby!"
|
||||
except:
|
||||
sys.exit("Problem initializing server.")
|
||||
else:
|
||||
parser.print_help(sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def test():
|
||||
from wsgiref.validate import validator
|
||||
app = Cling(getcwd())
|
||||
try:
|
||||
print "Serving " + getcwd() + " to http://localhost:9999"
|
||||
make_server('localhost', 9999, validator(app)).serve_forever()
|
||||
except KeyboardInterrupt, ki:
|
||||
print ""
|
||||
print "Ciao, baby!"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test()
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue