Add files via upload

This commit is contained in:
Harold Finch
2023-04-10 07:22:09 +02:00
committed by GitHub
parent 12cd50b598
commit d1a4c3e77a
51 changed files with 13206 additions and 0 deletions

33
web/__init__.py Normal file
View File

@@ -0,0 +1,33 @@
#!/usr/bin/env python3
"""web.py: makes web apps (http://webpy.org)"""
from . import ( # noqa: F401
db,
debugerror,
form,
http,
httpserver,
net,
session,
template,
utils,
webapi,
wsgi,
)
from .application import * # noqa: F401,F403
from .db import * # noqa: F401,F403
from .debugerror import * # noqa: F401,F403
from .http import * # noqa: F401,F403
from .httpserver import * # noqa: F401,F403
from .net import * # noqa: F401,F403
from .utils import * # noqa: F401,F403
from .webapi import * # noqa: F401,F403
from .wsgi import * # noqa: F401,F403
__version__ = "0.62"
__author__ = [
"Aaron Swartz <me@aaronsw.com>",
"Anand Chitipothu <anandology@gmail.com>",
]
__license__ = "public domain"
__contributors__ = "see http://webpy.org/changes"

813
web/application.py Normal file
View File

@@ -0,0 +1,813 @@
"""
Web application
(from web.py)
"""
import itertools
import os
import sys
import traceback
import wsgiref.handlers
from importlib import reload
from inspect import isclass
from io import BytesIO
from urllib.parse import unquote, urlencode, urlparse
from . import browser, httpserver, utils
from . import webapi as web
from . import wsgi
from .debugerror import debugerror
from .py3helpers import iteritems
from .utils import lstrips
__all__ = [
"application",
"auto_application",
"subdir_application",
"subdomain_application",
"loadhook",
"unloadhook",
"autodelegate",
]
class application:
"""
Application to delegate requests based on path.
>>> urls = ("/hello", "hello")
>>> app = application(urls, globals())
>>> class hello:
... def GET(self): return "hello"
>>>
>>> app.request("/hello").data
'hello'
"""
# PY3DOCTEST: b'hello'
def __init__(self, mapping=(), fvars={}, autoreload=None):
if autoreload is None:
autoreload = web.config.get("debug", False)
self.init_mapping(mapping)
self.fvars = fvars
self.processors = []
self.add_processor(loadhook(self._load))
self.add_processor(unloadhook(self._unload))
if autoreload:
def main_module_name():
mod = sys.modules["__main__"]
file = getattr(
mod, "__file__", None
) # make sure this works even from python interpreter
return file and os.path.splitext(os.path.basename(file))[0]
def modname(fvars):
"""find name of the module name from fvars."""
file, name = fvars.get("__file__"), fvars.get("__name__")
if file is None or name is None:
return None
if name == "__main__":
# Since the __main__ module can't be reloaded, the module has
# to be imported using its file name.
name = main_module_name()
return name
mapping_name = utils.dictfind(fvars, mapping)
module_name = modname(fvars)
def reload_mapping():
"""loadhook to reload mapping and fvars."""
mod = __import__(module_name, None, None, [""])
mapping = getattr(mod, mapping_name, None)
if mapping:
self.fvars = mod.__dict__
self.init_mapping(mapping)
self.add_processor(loadhook(Reloader()))
if mapping_name and module_name:
# when app is ran as part of a package, this puts the app into
# `sys.modules` correctly, otherwise the first change to the
# app module will not be picked up by Reloader
reload_mapping()
self.add_processor(loadhook(reload_mapping))
# load __main__ module usings its filename, so that it can be reloaded.
if main_module_name() and "__main__" in sys.argv:
try:
__import__(main_module_name())
except ImportError:
pass
def _load(self):
web.ctx.app_stack.append(self)
def _unload(self):
web.ctx.app_stack = web.ctx.app_stack[:-1]
if web.ctx.app_stack:
# this is a sub-application, revert ctx to earlier state.
oldctx = web.ctx.get("_oldctx")
if oldctx:
web.ctx.home = oldctx.home
web.ctx.homepath = oldctx.homepath
web.ctx.path = oldctx.path
web.ctx.fullpath = oldctx.fullpath
def _cleanup(self):
# Threads can be recycled by WSGI servers.
# Clearing up all thread-local state to avoid interefereing with subsequent requests.
utils.ThreadedDict.clear_all()
def init_mapping(self, mapping):
self.mapping = list(utils.group(mapping, 2))
def add_mapping(self, pattern, classname):
self.mapping.append((pattern, classname))
def add_processor(self, processor):
"""
Adds a processor to the application.
>>> urls = ("/(.*)", "echo")
>>> app = application(urls, globals())
>>> class echo:
... def GET(self, name): return name
...
>>>
>>> def hello(handler): return "hello, " + handler()
...
>>> app.add_processor(hello)
>>> app.request("/web.py").data
'hello, web.py'
"""
# PY3DOCTEST: b'hello, web.py'
self.processors.append(processor)
def request(
self,
localpart="/",
method="GET",
data=None,
host="0.0.0.0:8080",
headers=None,
https=False,
**kw,
):
"""Makes request to this application for the specified path and method.
Response will be a storage object with data, status and headers.
>>> urls = ("/hello", "hello")
>>> app = application(urls, globals())
>>> class hello:
... def GET(self):
... web.header('Content-Type', 'text/plain')
... return "hello"
...
>>> response = app.request("/hello")
>>> response.data
'hello'
>>> response.status
'200 OK'
>>> response.headers['Content-Type']
'text/plain'
To use https, use https=True.
>>> urls = ("/redirect", "redirect")
>>> app = application(urls, globals())
>>> class redirect:
... def GET(self): raise web.seeother("/foo")
...
>>> response = app.request("/redirect")
>>> response.headers['Location']
'http://0.0.0.0:8080/foo'
>>> response = app.request("/redirect", https=True)
>>> response.headers['Location']
'https://0.0.0.0:8080/foo'
The headers argument specifies HTTP headers as a mapping object
such as a dict.
>>> urls = ('/ua', 'uaprinter')
>>> class uaprinter:
... def GET(self):
... return 'your user-agent is ' + web.ctx.env['HTTP_USER_AGENT']
...
>>> app = application(urls, globals())
>>> app.request('/ua', headers = {
... 'User-Agent': 'a small jumping bean/1.0 (compatible)'
... }).data
'your user-agent is a small jumping bean/1.0 (compatible)'
"""
# PY3DOCTEST: b'hello'
# PY3DOCTEST: b'your user-agent is a small jumping bean/1.0 (compatible)'
_p = urlparse(localpart)
path = _p.path
maybe_query = _p.query
query = maybe_query or ""
if "env" in kw:
env = kw["env"]
else:
env = {}
env = dict(
env,
HTTP_HOST=host,
REQUEST_METHOD=method,
PATH_INFO=path,
QUERY_STRING=query,
HTTPS=str(https),
)
headers = headers or {}
for k, v in headers.items():
env["HTTP_" + k.upper().replace("-", "_")] = v
if "HTTP_CONTENT_LENGTH" in env:
env["CONTENT_LENGTH"] = env.pop("HTTP_CONTENT_LENGTH")
if "HTTP_CONTENT_TYPE" in env:
env["CONTENT_TYPE"] = env.pop("HTTP_CONTENT_TYPE")
if method not in ["HEAD", "GET"]:
data = data or ""
if isinstance(data, dict):
q = urlencode(data)
else:
q = data
env["wsgi.input"] = BytesIO(q.encode("utf-8"))
# if not env.get('CONTENT_TYPE', '').lower().startswith('multipart/') and 'CONTENT_LENGTH' not in env:
if "CONTENT_LENGTH" not in env:
env["CONTENT_LENGTH"] = len(q)
response = web.storage()
def start_response(status, headers):
response.status = status
response.headers = dict(headers)
response.header_items = headers
data = self.wsgifunc()(env, start_response)
response.data = b"".join(data)
return response
def browser(self):
return browser.AppBrowser(self)
def handle(self):
fn, args = self._match(self.mapping, web.ctx.path)
return self._delegate(fn, self.fvars, args)
def handle_with_processors(self):
def process(processors):
try:
if processors:
p, processors = processors[0], processors[1:]
return p(lambda: process(processors))
else:
return self.handle()
except web.HTTPError:
raise
except (KeyboardInterrupt, SystemExit):
raise
except:
print(traceback.format_exc(), file=web.debug)
raise self.internalerror()
# processors must be applied in the reverse order. (??)
return process(self.processors)
def wsgifunc(self, *middleware):
"""Returns a WSGI-compatible function for this application."""
def peep(iterator):
"""Peeps into an iterator by doing an iteration
and returns an equivalent iterator.
"""
# wsgi requires the headers first
# so we need to do an iteration
# and save the result for later
try:
firstchunk = next(iterator)
except StopIteration:
firstchunk = ""
return itertools.chain([firstchunk], iterator)
def wsgi(env, start_resp):
# clear threadlocal to avoid interference of previous requests
self._cleanup()
self.load(env)
try:
# allow uppercase methods only
if web.ctx.method.upper() != web.ctx.method:
raise web.nomethod()
result = self.handle_with_processors()
if result and hasattr(result, "__next__"):
result = peep(result)
else:
result = [result]
except web.HTTPError as e:
result = [e.data]
def build_result(result):
for r in result:
if isinstance(r, bytes):
yield r
else:
yield str(r).encode("utf-8")
result = build_result(result)
status, headers = web.ctx.status, web.ctx.headers
start_resp(status, headers)
def cleanup():
self._cleanup()
yield b"" # force this function to be a generator
return itertools.chain(result, cleanup())
for m in middleware:
wsgi = m(wsgi)
return wsgi
def run(self, *middleware):
"""
Starts handling requests. If called in a CGI or FastCGI context, it will follow
that protocol. If called from the command line, it will start an HTTP
server on the port named in the first command line argument, or, if there
is no argument, on port 8080.
`middleware` is a list of WSGI middleware which is applied to the resulting WSGI
function.
"""
return wsgi.runwsgi(self.wsgifunc(*middleware))
def stop(self):
"""Stops the http server started by run."""
if httpserver.server:
httpserver.server.stop()
httpserver.server = None
def cgirun(self, *middleware):
"""
Return a CGI handler. This is mostly useful with Google App Engine.
There you can just do:
main = app.cgirun()
"""
wsgiapp = self.wsgifunc(*middleware)
try:
from google.appengine.ext.webapp.util import run_wsgi_app
return run_wsgi_app(wsgiapp)
except ImportError:
# we're not running from within Google App Engine
return wsgiref.handlers.CGIHandler().run(wsgiapp)
def gaerun(self, *middleware):
"""
Starts the program in a way that will work with Google app engine,
no matter which version you are using (2.5 / 2.7)
If it is 2.5, just normally start it with app.gaerun()
If it is 2.7, make sure to change the app.yaml handler to point to the
global variable that contains the result of app.gaerun()
For example:
in app.yaml (where code.py is where the main code is located)
handlers:
- url: /.*
script: code.app
Make sure that the app variable is globally accessible
"""
wsgiapp = self.wsgifunc(*middleware)
try:
# check what version of python is running
version = sys.version_info[:2]
major = version[0]
minor = version[1]
if major != 2:
raise OSError("Google App Engine only supports python 2.5 and 2.7")
# if 2.7, return a function that can be run by gae
if minor == 7:
return wsgiapp
# if 2.5, use run_wsgi_app
elif minor == 5:
from google.appengine.ext.webapp.util import run_wsgi_app
return run_wsgi_app(wsgiapp)
else:
raise OSError("Not a supported platform, use python 2.5 or 2.7")
except ImportError:
return wsgiref.handlers.CGIHandler().run(wsgiapp)
def load(self, env):
"""Initializes ctx using env."""
ctx = web.ctx
ctx.clear()
ctx.status = "200 OK"
ctx.headers = []
ctx.output = ""
ctx.environ = ctx.env = env
ctx.host = env.get("HTTP_HOST")
if env.get("wsgi.url_scheme") in ["http", "https"]:
ctx.protocol = env["wsgi.url_scheme"]
elif env.get("HTTPS", "").lower() in ["on", "true", "1"]:
ctx.protocol = "https"
else:
ctx.protocol = "http"
ctx.homedomain = ctx.protocol + "://" + env.get("HTTP_HOST", "[unknown]")
ctx.homepath = os.environ.get("REAL_SCRIPT_NAME", env.get("SCRIPT_NAME", ""))
ctx.home = ctx.homedomain + ctx.homepath
# @@ home is changed when the request is handled to a sub-application.
# @@ but the real home is required for doing absolute redirects.
ctx.realhome = ctx.home
ctx.ip = env.get("REMOTE_ADDR")
ctx.method = env.get("REQUEST_METHOD")
try:
ctx.path = bytes(env.get("PATH_INFO"), "latin1").decode("utf8")
except UnicodeDecodeError: # If there are Unicode characters...
ctx.path = env.get("PATH_INFO")
# http://trac.lighttpd.net/trac/ticket/406 requires:
if env.get("SERVER_SOFTWARE", "").startswith(("lighttpd/", "nginx/")):
ctx.path = lstrips(env.get("REQUEST_URI").split("?")[0], ctx.homepath)
# Apache and CherryPy webservers unquote urls but lighttpd and nginx do not.
# Unquote explicitly for lighttpd and nginx to make ctx.path uniform across
# all servers.
ctx.path = unquote(ctx.path)
if env.get("QUERY_STRING"):
ctx.query = "?" + env.get("QUERY_STRING", "")
else:
ctx.query = ""
ctx.fullpath = ctx.path + ctx.query
for k, v in iteritems(ctx):
# convert all string values to unicode values and replace
# malformed data with a suitable replacement marker.
if isinstance(v, bytes):
ctx[k] = v.decode("utf-8", "replace")
# status must always be str
ctx.status = "200 OK"
ctx.app_stack = []
def _delegate(self, f, fvars, args=[]):
def handle_class(cls):
meth = web.ctx.method
if meth == "HEAD" and not hasattr(cls, meth):
meth = "GET"
if not hasattr(cls, meth):
raise web.nomethod(cls)
tocall = getattr(cls(), meth)
return tocall(*args)
if f is None:
raise web.notfound()
elif isinstance(f, application):
return f.handle_with_processors()
elif isclass(f):
return handle_class(f)
elif isinstance(f, str):
if f.startswith("redirect "):
url = f.split(" ", 1)[1]
if web.ctx.method == "GET":
x = web.ctx.env.get("QUERY_STRING", "")
if x:
url += "?" + x
raise web.redirect(url)
elif "." in f:
mod, cls = f.rsplit(".", 1)
mod = __import__(mod, None, None, [""])
cls = getattr(mod, cls)
else:
cls = fvars[f]
return handle_class(cls)
elif hasattr(f, "__call__"):
return f()
else:
return web.notfound()
def _match(self, mapping, value):
for pat, what in mapping:
if isinstance(what, application):
if value.startswith(pat):
f = lambda: self._delegate_sub_application(pat, what)
return f, None
else:
continue
elif isinstance(what, str):
what, result = utils.re_subm(rf"^{pat}\Z", what, value)
else:
result = utils.re_compile(rf"^{pat}\Z").match(value)
if result: # it's a match
return what, [x for x in result.groups()]
return None, None
def _delegate_sub_application(self, dir, app):
"""Deletes request to sub application `app` rooted at the directory `dir`.
The home, homepath, path and fullpath values in web.ctx are updated to mimic request
to the subapp and are restored after it is handled.
@@Any issues with when used with yield?
"""
web.ctx._oldctx = web.storage(web.ctx)
web.ctx.home += dir
web.ctx.homepath += dir
web.ctx.path = web.ctx.path[len(dir) :]
web.ctx.fullpath = web.ctx.fullpath[len(dir) :]
return app.handle_with_processors()
def get_parent_app(self):
if self in web.ctx.app_stack:
index = web.ctx.app_stack.index(self)
if index > 0:
return web.ctx.app_stack[index - 1]
def notfound(self):
"""Returns HTTPError with '404 not found' message"""
parent = self.get_parent_app()
if parent:
return parent.notfound()
else:
return web._NotFound()
def internalerror(self):
"""Returns HTTPError with '500 internal error' message"""
parent = self.get_parent_app()
if parent:
return parent.internalerror()
elif web.config.get("debug"):
return debugerror()
else:
return web._InternalError()
def with_metaclass(mcls):
def decorator(cls):
body = vars(cls).copy()
# clean out class body
body.pop("__dict__", None)
body.pop("__weakref__", None)
return mcls(cls.__name__, cls.__bases__, body)
return decorator
class auto_application(application):
"""Application similar to `application` but urls are constructed
automatically using metaclass.
>>> app = auto_application()
>>> class hello(app.page):
... def GET(self): return "hello, world"
...
>>> class foo(app.page):
... path = '/foo/.*'
... def GET(self): return "foo"
>>> app.request("/hello").data
'hello, world'
>>> app.request('/foo/bar').data
'foo'
"""
# PY3DOCTEST: b'hello, world'
# PY3DOCTEST: b'foo'
def __init__(self):
application.__init__(self)
class metapage(type):
def __init__(klass, name, bases, attrs):
type.__init__(klass, name, bases, attrs)
path = attrs.get("path", "/" + name)
# path can be specified as None to ignore that class
# typically required to create a abstract base class.
if path is not None:
self.add_mapping(path, klass)
@with_metaclass(metapage) # little hack needed for Py2 and Py3 compatibility
class page:
path = None
self.page = page
# The application class already has the required functionality of subdir_application
subdir_application = application
class subdomain_application(application):
r"""
Application to delegate requests based on the host.
>>> urls = ("/hello", "hello")
>>> app = application(urls, globals())
>>> class hello:
... def GET(self): return "hello"
>>>
>>> mapping = (r"hello\.example\.com", app)
>>> app2 = subdomain_application(mapping)
>>> app2.request("/hello", host="hello.example.com").data
'hello'
>>> response = app2.request("/hello", host="something.example.com")
>>> response.status
'404 Not Found'
>>> response.data
'not found'
"""
# PY3DOCTEST: b'hello'
# PY3DOCTEST: b'not found'
def handle(self):
host = web.ctx.host.split(":")[0] # strip port
fn, args = self._match(self.mapping, host)
return self._delegate(fn, self.fvars, args)
def _match(self, mapping, value):
for pat, what in mapping:
if isinstance(what, str):
what, result = utils.re_subm("^" + pat + "$", what, value)
else:
result = utils.re_compile("^" + pat + "$").match(value)
if result: # it's a match
return what, [x for x in result.groups()]
return None, None
def loadhook(h):
"""
Converts a load hook into an application processor.
>>> app = auto_application()
>>> def f(): "something done before handling request"
...
>>> app.add_processor(loadhook(f))
"""
def processor(handler):
h()
return handler()
return processor
def unloadhook(h):
"""
Converts an unload hook into an application processor.
>>> app = auto_application()
>>> def f(): "something done after handling request"
...
>>> app.add_processor(unloadhook(f))
"""
def processor(handler):
try:
result = handler()
except:
# run the hook even when handler raises some exception
h()
raise
if result and hasattr(result, "__next__"):
return wrap(result)
else:
h()
return result
def wrap(result):
def next_hook():
try:
return next(result)
except:
# call the hook at the and of iterator
h()
raise
result = iter(result)
while True:
try:
yield next_hook()
except StopIteration:
return
return processor
def autodelegate(prefix=""):
"""
Returns a method that takes one argument and calls the method named prefix+arg,
calling `notfound()` if there isn't one. Example:
urls = ('/prefs/(.*)', 'prefs')
class prefs:
GET = autodelegate('GET_')
def GET_password(self): pass
def GET_privacy(self): pass
`GET_password` would get called for `/prefs/password` while `GET_privacy` for
`GET_privacy` gets called for `/prefs/privacy`.
If a user visits `/prefs/password/change` then `GET_password(self, '/change')`
is called.
"""
def internal(self, arg):
if "/" in arg:
first, rest = arg.split("/", 1)
func = prefix + first
args = ["/" + rest]
else:
func = prefix + arg
args = []
if hasattr(self, func):
try:
return getattr(self, func)(*args)
except TypeError:
raise web.notfound()
else:
raise web.notfound()
return internal
class Reloader:
"""Checks to see if any loaded modules have changed on disk and,
if so, reloads them.
"""
"""File suffix of compiled modules."""
if sys.platform.startswith("java"):
SUFFIX = "$py.class"
else:
SUFFIX = ".pyc"
def __init__(self):
self.mtimes = {}
def __call__(self):
sys_modules = list(sys.modules.values())
for mod in sys_modules:
self.check(mod)
def check(self, mod):
# jython registers java packages as modules but they either
# don't have a __file__ attribute or its value is None
if not (mod and hasattr(mod, "__file__") and mod.__file__):
return
try:
mtime = os.stat(mod.__file__).st_mtime
except OSError:
return
if mod.__file__.endswith(self.__class__.SUFFIX) and os.path.exists(
mod.__file__[:-1]
):
mtime = max(os.stat(mod.__file__[:-1]).st_mtime, mtime)
if mod not in self.mtimes:
self.mtimes[mod] = mtime
elif self.mtimes[mod] < mtime:
try:
reload(mod)
self.mtimes[mod] = mtime
except ImportError:
pass
if __name__ == "__main__":
import doctest
doctest.testmod()

295
web/browser.py Normal file
View File

@@ -0,0 +1,295 @@
"""Browser to test web applications.
(from web.py)
"""
import os
import webbrowser
from http.cookiejar import CookieJar
from io import BytesIO
from urllib.parse import urljoin
from urllib.request import HTTPCookieProcessor, HTTPError, HTTPHandler, Request
from urllib.request import build_opener as urllib_build_opener
from urllib.response import addinfourl
from .net import htmlunquote
from .utils import re_compile
DEBUG = False
__all__ = ["BrowserError", "Browser", "AppBrowser", "AppHandler"]
class BrowserError(Exception):
pass
class Browser:
def __init__(self):
self.cookiejar = CookieJar()
self._cookie_processor = HTTPCookieProcessor(self.cookiejar)
self.form = None
self.url = "http://0.0.0.0:8080/"
self.path = "/"
self.status = None
self.data = None
self._response = None
self._forms = None
@property
def text(self):
return self.data.decode("utf-8")
def reset(self):
"""Clears all cookies and history."""
self.cookiejar.clear()
def build_opener(self):
"""Builds the opener using (urllib2/urllib.request).build_opener.
Subclasses can override this function to prodive custom openers.
"""
return urllib_build_opener()
def do_request(self, req):
if DEBUG:
print("requesting", req.get_method(), req.get_full_url())
opener = self.build_opener()
opener.add_handler(self._cookie_processor)
try:
self._response = opener.open(req)
except HTTPError as e:
self._response = e
self.url = self._response.geturl()
self.path = Request(self.url).selector
self.data = self._response.read()
self.status = self._response.code
self._forms = None
self.form = None
return self.get_response()
def open(self, url, data=None, headers={}):
"""Opens the specified url."""
url = urljoin(self.url, url)
req = Request(url, data, headers)
return self.do_request(req)
def show(self):
"""Opens the current page in real web browser."""
f = open("page.html", "w")
f.write(self.data)
f.close()
url = "file://" + os.path.abspath("page.html")
webbrowser.open(url)
def get_response(self):
"""Returns a copy of the current response."""
return addinfourl(
BytesIO(self.data), self._response.info(), self._response.geturl()
)
def get_soup(self):
"""Returns beautiful soup of the current document."""
import BeautifulSoup
return BeautifulSoup.BeautifulSoup(self.data)
def get_text(self, e=None):
"""Returns content of e or the current document as plain text."""
e = e or self.get_soup()
return "".join(
[htmlunquote(c) for c in e.recursiveChildGenerator() if isinstance(c, str)]
)
def _get_links(self):
soup = self.get_soup()
return [a for a in soup.findAll(name="a")]
def get_links(
self, text=None, text_regex=None, url=None, url_regex=None, predicate=None
):
"""Returns all links in the document."""
return self._filter_links(
self._get_links(),
text=text,
text_regex=text_regex,
url=url,
url_regex=url_regex,
predicate=predicate,
)
def follow_link(
self,
link=None,
text=None,
text_regex=None,
url=None,
url_regex=None,
predicate=None,
):
if link is None:
links = self._filter_links(
self.get_links(),
text=text,
text_regex=text_regex,
url=url,
url_regex=url_regex,
predicate=predicate,
)
link = links and links[0]
if link:
return self.open(link["href"])
else:
raise BrowserError("No link found")
def find_link(
self, text=None, text_regex=None, url=None, url_regex=None, predicate=None
):
links = self._filter_links(
self.get_links(),
text=text,
text_regex=text_regex,
url=url,
url_regex=url_regex,
predicate=predicate,
)
return links and links[0] or None
def _filter_links(
self,
links,
text=None,
text_regex=None,
url=None,
url_regex=None,
predicate=None,
):
predicates = []
if text is not None:
predicates.append(lambda link: link.string == text)
if text_regex is not None:
predicates.append(
lambda link: re_compile(text_regex).search(link.string or "")
)
if url is not None:
predicates.append(lambda link: link.get("href") == url)
if url_regex is not None:
predicates.append(
lambda link: re_compile(url_regex).search(link.get("href", ""))
)
if predicate:
predicate.append(predicate)
def f(link):
for p in predicates:
if not p(link):
return False
return True
return [link for link in links if f(link)]
def get_forms(self):
"""Returns all forms in the current document.
The returned form objects implement the ClientForm.HTMLForm interface.
"""
if self._forms is None:
import ClientForm
self._forms = ClientForm.ParseResponse(
self.get_response(), backwards_compat=False
)
return self._forms
def select_form(self, name=None, predicate=None, index=0):
"""Selects the specified form."""
forms = self.get_forms()
if name is not None:
forms = [f for f in forms if f.name == name]
if predicate:
forms = [f for f in forms if predicate(f)]
if forms:
self.form = forms[index]
return self.form
else:
raise BrowserError("No form selected.")
def submit(self, **kw):
"""submits the currently selected form."""
if self.form is None:
raise BrowserError("No form selected.")
req = self.form.click(**kw)
return self.do_request(req)
def __getitem__(self, key):
return self.form[key]
def __setitem__(self, key, value):
self.form[key] = value
class AppBrowser(Browser):
"""Browser interface to test web.py apps.
b = AppBrowser(app)
b.open('/')
b.follow_link(text='Login')
b.select_form(name='login')
b['username'] = 'joe'
b['password'] = 'secret'
b.submit()
assert b.path == '/'
assert 'Welcome joe' in b.get_text()
"""
def __init__(self, app):
Browser.__init__(self)
self.app = app
def build_opener(self):
return urllib_build_opener(AppHandler(self.app))
class AppHandler(HTTPHandler):
"""urllib2 handler to handle requests using web.py application."""
handler_order = 100
https_request = HTTPHandler.do_request_
def __init__(self, app):
self.app = app
def http_open(self, req):
result = self.app.request(
localpart=req.selector,
method=req.get_method(),
host=req.host,
data=req.data,
headers=dict(req.header_items()),
https=(req.type == "https"),
)
return self._make_response(result, req.get_full_url())
def https_open(self, req):
return self.http_open(req)
def _make_response(self, result, url):
data = "\r\n".join([f"{k}: {v}" for k, v in result.header_items])
import email
headers = email.message_from_string(data)
response = addinfourl(BytesIO(result.data), headers, url)
code, msg = result.status.split(None, 1)
response.code, response.msg = int(code), msg
return response

0
web/contrib/__init__.py Normal file
View File

146
web/contrib/template.py Normal file
View File

@@ -0,0 +1,146 @@
"""
Interface to various templating engines.
"""
import os.path
__all__ = ["render_cheetah", "render_genshi", "render_mako", "cache"]
class render_cheetah:
"""Rendering interface to Cheetah Templates.
Example:
render = render_cheetah('templates')
render.hello(name="cheetah")
"""
def __init__(self, path):
# give error if Chetah is not installed
from Cheetah.Template import Template # noqa: F401
self.path = path
def __getattr__(self, name):
from Cheetah.Template import Template
path = os.path.join(self.path, name + ".html")
def template(**kw):
t = Template(file=path, searchList=[kw])
return t.respond()
return template
class render_genshi:
"""Rendering interface genshi templates.
Example:
for xml/html templates.
render = render_genshi(['templates/'])
render.hello(name='genshi')
For text templates:
render = render_genshi(['templates/'], type='text')
render.hello(name='genshi')
"""
def __init__(self, *a, **kwargs):
from genshi.template import TemplateLoader
self._type = kwargs.pop("type", None)
self._loader = TemplateLoader(*a, **kwargs)
def __getattr__(self, name):
# Assuming all templates are html
path = name + ".html"
if self._type == "text":
from genshi.template import TextTemplate
cls = TextTemplate
type = "text"
else:
cls = None
type = self._type
t = self._loader.load(path, cls=cls)
def template(**kw):
stream = t.generate(**kw)
if type:
return stream.render(type)
else:
return stream.render()
return template
class render_jinja:
"""Rendering interface to Jinja2 Templates
Example:
render= render_jinja('templates')
render.hello(name='jinja2')
"""
def __init__(self, *a, **kwargs):
extensions = kwargs.pop("extensions", [])
globals = kwargs.pop("globals", {})
from jinja2 import Environment, FileSystemLoader
self._lookup = Environment(
loader=FileSystemLoader(*a, **kwargs), extensions=extensions
)
self._lookup.globals.update(globals)
def __getattr__(self, name):
# Assuming all templates end with .html
path = name + ".html"
t = self._lookup.get_template(path)
return t.render
class render_mako:
"""Rendering interface to Mako Templates.
Example:
render = render_mako(directories=['templates'])
render.hello(name="mako")
"""
def __init__(self, *a, **kwargs):
from mako.lookup import TemplateLookup
self._lookup = TemplateLookup(*a, **kwargs)
def __getattr__(self, name):
# Assuming all templates are html
path = name + ".html"
t = self._lookup.get_template(path)
return t.render
class cache:
"""Cache for any rendering interface.
Example:
render = cache(render_cheetah("templates/"))
render.hello(name='cache')
"""
def __init__(self, render):
self._render = render
self._cache = {}
def __getattr__(self, name):
if name not in self._cache:
self._cache[name] = getattr(self._render, name)
return self._cache[name]

1742
web/db.py Normal file

File diff suppressed because it is too large Load Diff

377
web/debugerror.py Normal file
View File

@@ -0,0 +1,377 @@
"""
pretty debug errors
(part of web.py)
portions adapted from Django <djangoproject.com>
Copyright (c) 2005, the Lawrence Journal-World
Used under the modified BSD license:
http://www.xfree86.org/3.3.6/COPYRIGHT2.html#5
"""
__all__ = ["debugerror", "djangoerror", "emailerrors"]
import os
import os.path
import pprint
import sys
import traceback
from . import webapi as web
from .net import websafe
from .template import Template
from .utils import safestr, sendmail
def update_globals_template(t, globals):
t.t.__globals__.update(globals)
whereami = os.path.join(os.getcwd(), __file__)
whereami = os.path.sep.join(whereami.split(os.path.sep)[:-1])
djangoerror_t = """\
$def with (exception_type, exception_value, frames)
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html lang="en">
<head>
<meta http-equiv="content-type" content="text/html; charset=utf-8" />
<meta name="robots" content="NONE,NOARCHIVE" />
<title>$exception_type at $ctx.path</title>
<style type="text/css">
html * { padding:0; margin:0; }
body * { padding:10px 20px; }
body * * { padding:0; }
body { font:small sans-serif; }
body>div { border-bottom:1px solid #ddd; }
h1 { font-weight:normal; }
h2 { margin-bottom:.8em; }
h2 span { font-size:80%; color:#666; font-weight:normal; }
h3 { margin:1em 0 .5em 0; }
h4 { margin:0 0 .5em 0; font-weight: normal; }
table {
border:1px solid #ccc; border-collapse: collapse; background:white; }
tbody td, tbody th { vertical-align:top; padding:2px 3px; }
thead th {
padding:1px 6px 1px 3px; background:#fefefe; text-align:left;
font-weight:normal; font-size:11px; border:1px solid #ddd; }
tbody th { text-align:right; color:#666; padding-right:.5em; }
table.vars { margin:5px 0 2px 40px; }
table.vars td, table.req td { font-family:monospace; }
table td.code { width:100%;}
table td.code div { overflow:hidden; }
table.source th { color:#666; }
table.source td {
font-family:monospace; white-space:pre; border-bottom:1px solid #eee; }
ul.traceback { list-style-type:none; }
ul.traceback li.frame { margin-bottom:1em; }
div.context { margin: 10px 0; }
div.context ol {
padding-left:30px; margin:0 10px; list-style-position: inside; }
div.context ol li {
font-family:monospace; white-space:pre; color:#666; cursor:pointer; }
div.context ol.context-line li { color:black; background-color:#ccc; }
div.context ol.context-line li span { float: right; }
div.commands { margin-left: 40px; }
div.commands a { color:black; text-decoration:none; }
#summary { background: #ffc; }
#summary h2 { font-weight: normal; color: #666; }
#explanation { background:#eee; }
#template, #template-not-exist { background:#f6f6f6; }
#template-not-exist ul { margin: 0 0 0 20px; }
#traceback { background:#eee; }
#requestinfo { background:#f6f6f6; padding-left:120px; }
#summary table { border:none; background:transparent; }
#requestinfo h2, #requestinfo h3 { position:relative; margin-left:-100px; }
#requestinfo h3 { margin-bottom:-1em; }
.error { background: #ffc; }
.specific { color:#cc3300; font-weight:bold; }
</style>
<script type="text/javascript">
//<!--
function getElementsByClassName(oElm, strTagName, strClassName){
// Written by Jonathan Snook, http://www.snook.ca/jon;
// Add-ons by Robert Nyman, http://www.robertnyman.com
var arrElements = (strTagName == "*" && document.all)? document.all :
oElm.getElementsByTagName(strTagName);
var arrReturnElements = new Array();
strClassName = strClassName.replace(/\\-/g, "\\-");
var oRegExp = new RegExp("(^|\\s)" + strClassName + "(\\s|$$)");
var oElement;
for(var i=0; i<arrElements.length; i++){
oElement = arrElements[i];
if(oRegExp.test(oElement.className)){
arrReturnElements.push(oElement);
}
}
return (arrReturnElements)
}
function hideAll(elems) {
for (var e = 0; e < elems.length; e++) {
elems[e].style.display = 'none';
}
}
window.onload = function() {
hideAll(getElementsByClassName(document, 'table', 'vars'));
hideAll(getElementsByClassName(document, 'ol', 'pre-context'));
hideAll(getElementsByClassName(document, 'ol', 'post-context'));
}
function toggle() {
for (var i = 0; i < arguments.length; i++) {
var e = document.getElementById(arguments[i]);
if (e) {
e.style.display = e.style.display == 'none' ? 'block' : 'none';
}
}
return false;
}
function varToggle(link, id) {
toggle('v' + id);
var s = link.getElementsByTagName('span')[0];
var uarr = String.fromCharCode(0x25b6);
var darr = String.fromCharCode(0x25bc);
s.innerHTML = s.innerHTML == uarr ? darr : uarr;
return false;
}
//-->
</script>
</head>
<body>
$def dicttable (d, kls='req', id=None):
$ items = d and list(d.items()) or []
$items.sort()
$:dicttable_items(items, kls, id)
$def dicttable_items(items, kls='req', id=None):
$if items:
<table class="$kls"
$if id: id="$id"
><thead><tr><th>Variable</th><th>Value</th></tr></thead>
<tbody>
$for k, v in items:
<tr><td>$k</td><td class="code"><div>$prettify(v)</div></td></tr>
</tbody>
</table>
$else:
<p>No data.</p>
<div id="summary">
<h1>$exception_type at $ctx.path</h1>
<h2>$exception_value</h2>
<table><tr>
<th>Python</th>
<td>$frames[0].filename in $frames[0].function, line $frames[0].lineno</td>
</tr><tr>
<th>Web</th>
<td>$ctx.method $ctx.home$ctx.path</td>
</tr></table>
</div>
<div id="traceback">
<h2>Traceback <span>(innermost first)</span></h2>
<ul class="traceback">
$for frame in frames:
<li class="frame">
<code>$frame.filename</code> in <code>$frame.function</code>
$if frame.context_line is not None:
<div class="context" id="c$frame.id">
$if frame.pre_context:
<ol start="$frame.pre_context_lineno" class="pre-context" id="pre$frame.id">
$for line in frame.pre_context:
<li onclick="toggle('pre$frame.id', 'post$frame.id')">$line</li>
</ol>
<ol start="$frame.lineno" class="context-line"><li onclick="toggle('pre$frame.id', 'post$frame.id')">$frame.context_line <span>...</span></li></ol>
$if frame.post_context:
<ol start='${frame.lineno + 1}' class="post-context" id="post$frame.id">
$for line in frame.post_context:
<li onclick="toggle('pre$frame.id', 'post$frame.id')">$line</li>
</ol>
</div>
$if frame.vars:
<div class="commands">
<a href='#' onclick="return varToggle(this, '$frame.id')"><span>&#x25b6;</span> Local vars</a>
$# $inspect.formatargvalues(*inspect.getargvalues(frame['tb'].tb_frame))
</div>
$:dicttable(frame.vars, kls='vars', id=('v' + str(frame.id)))
</li>
</ul>
</div>
<div id="requestinfo">
$if ctx.output or ctx.headers:
<h2>Response so far</h2>
<h3>HEADERS</h3>
$:dicttable_items(ctx.headers)
<h3>BODY</h3>
<p class="req" style="padding-bottom: 2em"><code>
$ctx.output
</code></p>
<h2>Request information</h2>
<h3>INPUT</h3>
$:dicttable(web.input(_unicode=False))
<h3 id="cookie-info">COOKIES</h3>
$:dicttable(web.cookies())
<h3 id="meta-info">META</h3>
$ newctx = [(k, v) for (k, v) in ctx.iteritems() if not k.startswith('_') and not isinstance(v, dict)]
$:dicttable(dict(newctx))
<h3 id="meta-info">ENVIRONMENT</h3>
$:dicttable(ctx.env)
</div>
<div id="explanation">
<p>
You're seeing this error because you have <code>web.config.debug</code>
set to <code>True</code>. Set that to <code>False</code> if you don't want to see this.
</p>
</div>
</body>
</html>
""" # noqa: W605
djangoerror_r = None
def djangoerror():
def _get_lines_from_file(filename, lineno, context_lines):
"""
Returns context_lines before and after lineno from file.
Returns (pre_context_lineno, pre_context, context_line, post_context).
"""
try:
source = open(filename).readlines()
lower_bound = max(0, lineno - context_lines)
upper_bound = lineno + context_lines
pre_context = [line.strip("\n") for line in source[lower_bound:lineno]]
context_line = source[lineno].strip("\n")
post_context = [
line.strip("\n") for line in source[lineno + 1 : upper_bound]
]
return lower_bound, pre_context, context_line, post_context
except (OSError, IndexError):
return None, [], None, []
exception_type, exception_value, tback = sys.exc_info()
frames = []
while tback is not None:
filename = tback.tb_frame.f_code.co_filename
function = tback.tb_frame.f_code.co_name
lineno = tback.tb_lineno - 1
# hack to get correct line number for templates
lineno += tback.tb_frame.f_locals.get("__lineoffset__", 0)
(
pre_context_lineno,
pre_context,
context_line,
post_context,
) = _get_lines_from_file(filename, lineno, 7)
if "__hidetraceback__" not in tback.tb_frame.f_locals:
frames.append(
web.storage(
{
"tback": tback,
"filename": filename,
"function": function,
"lineno": lineno,
"vars": tback.tb_frame.f_locals,
"id": id(tback),
"pre_context": pre_context,
"context_line": context_line,
"post_context": post_context,
"pre_context_lineno": pre_context_lineno,
}
)
)
tback = tback.tb_next
frames.reverse()
def prettify(x):
try:
out = pprint.pformat(x)
except Exception as e:
out = "[could not display: <" + e.__class__.__name__ + ": " + str(e) + ">]"
return out
global djangoerror_r
if djangoerror_r is None:
djangoerror_r = Template(djangoerror_t, filename=__file__, filter=websafe)
t = djangoerror_r
globals = {
"ctx": web.ctx,
"web": web,
"dict": dict,
"str": str,
"prettify": prettify,
}
update_globals_template(t, globals)
return t(exception_type, exception_value, frames)
def debugerror():
"""
A replacement for `internalerror` that presents a nice page with lots
of debug information for the programmer.
(Based on the beautiful 500 page from [Django](http://djangoproject.com/),
designed by [Wilson Miner](http://wilsonminer.com/).)
"""
return web._InternalError(djangoerror())
def emailerrors(to_address, olderror, from_address=None):
"""
Wraps the old `internalerror` handler (pass as `olderror`) to
additionally email all errors to `to_address`, to aid in
debugging production websites.
Emails contain a normal text traceback as well as an
attachment containing the nice `debugerror` page.
"""
from_address = from_address or to_address
def emailerrors_internal():
error = olderror()
tb = sys.exc_info()
error_name = tb[0]
error_value = tb[1]
tb_txt = "".join(traceback.format_exception(*tb))
path = web.ctx.path
request = web.ctx.method + " " + web.ctx.home + web.ctx.fullpath
message = f"\n{request}\n\n{tb_txt}\n\n"
sendmail(
"your buggy site <%s>" % from_address,
"the bugfixer <%s>" % to_address,
"bug: %(error_name)s: %(error_value)s (%(path)s)" % locals(),
message,
attachments=[dict(filename="bug.html", content=safestr(djangoerror()))],
)
return error
return emailerrors_internal
if __name__ == "__main__":
urls = ("/", "index")
from .application import application
app = application(urls, globals())
app.internalerror = debugerror
class index:
def GET(self):
thisdoesnotexist # noqa: F821
app.run()

690
web/form.py Normal file
View File

@@ -0,0 +1,690 @@
"""
HTML forms
(part of web.py)
"""
import copy
import re
from . import net, utils
from . import webapi as web
def attrget(obj, attr, value=None):
try:
if hasattr(obj, "has_key") and attr in obj:
return obj[attr]
except TypeError:
# Handle the case where has_key takes different number of arguments.
# This is the case with Model objects on appengine. See #134
pass
if (
hasattr(obj, "keys") and attr in obj
): # needed for Py3, has_key doesn't exist anymore
return obj[attr]
elif hasattr(obj, attr):
return getattr(obj, attr)
return value
class Form:
r"""
HTML form.
>>> f = Form(Textbox("x"))
>>> f.render()
u'<table>\n <tr><th><label for="x">x</label></th><td><input id="x" name="x" type="text"/></td></tr>\n</table>'
>>> f.fill(x="42")
True
>>> f.render()
u'<table>\n <tr><th><label for="x">x</label></th><td><input id="x" name="x" type="text" value="42"/></td></tr>\n</table>'
"""
def __init__(self, *inputs, **kw):
self.inputs = inputs
self.valid = True
self.note = None
self.validators = kw.pop("validators", [])
def __call__(self, x=None):
o = copy.deepcopy(self)
if x:
o.validates(x)
return o
def render(self):
out = ""
out += self.rendernote(self.note)
out += "<table>\n"
for i in self.inputs:
html = (
utils.safeunicode(i.pre)
+ i.render()
+ self.rendernote(i.note)
+ utils.safeunicode(i.post)
)
if i.is_hidden():
out += ' <tr style="display: none;"><th></th><td>%s</td></tr>\n' % (
html
)
else:
out += (
' <tr><th><label for="%s">%s</label></th><td>%s</td></tr>\n'
% (net.websafe(i.id), net.websafe(i.description), html)
)
out += "</table>"
return out
def render_css(self):
out = []
out.append(self.rendernote(self.note))
for i in self.inputs:
if not i.is_hidden():
out.append(
'<label for="%s">%s</label>'
% (net.websafe(i.id), net.websafe(i.description))
)
out.append(i.pre)
out.append(i.render())
out.append(self.rendernote(i.note))
out.append(i.post)
out.append("\n")
return "".join(out)
def rendernote(self, note):
if note:
return '<strong class="wrong">%s</strong>' % net.websafe(note)
else:
return ""
def validates(self, source=None, _validate=True, **kw):
source = source or kw or web.input()
out = True
for i in self.inputs:
v = attrget(source, i.name)
if _validate:
out = i.validate(v) and out
else:
i.set_value(v)
if _validate:
out = out and self._validate(source)
self.valid = out
return out
def _validate(self, value):
self.value = value
for v in self.validators:
if not v.valid(value):
self.note = v.msg
return False
return True
def fill(self, source=None, **kw):
return self.validates(source, _validate=False, **kw)
def __getitem__(self, i):
for x in self.inputs:
if x.name == i:
return x
raise KeyError(i)
def __getattr__(self, name):
# don't interfere with deepcopy
inputs = self.__dict__.get("inputs") or []
for x in inputs:
if x.name == name:
return x
raise AttributeError(name)
def get(self, i, default=None):
try:
return self[i]
except KeyError:
return default
def _get_d(self): # @@ should really be form.attr, no?
return utils.storage([(i.name, i.get_value()) for i in self.inputs])
d = property(_get_d)
class Input:
"""Generic input. Type attribute must be specified when called directly.
See also: <https://www.w3.org/TR/html52/sec-forms.html#the-input-element>
Currently only types which can be written inside one `<input />` tag are
supported.
- For checkbox, please use `Checkbox` class for better control.
- For radiobox, please use `Radio` class for better control.
>>> Input(name='foo', type='email', value="user@domain.com").render()
u'<input id="foo" name="foo" type="email" value="user@domain.com"/>'
>>> Input(name='foo', type='number', value="bar").render()
u'<input id="foo" name="foo" type="number" value="bar"/>'
>>> Input(name='num', type="number", min='0', max='10', step='2', value='5').render()
u'<input id="num" max="10" min="0" name="num" step="2" type="number" value="5"/>'
>>> Input(name='foo', type="tel", value='55512345').render()
u'<input id="foo" name="foo" type="tel" value="55512345"/>'
>>> Input(name='search', type="search", value='Search').render()
u'<input id="search" name="search" type="search" value="Search"/>'
>>> Input(name='search', type="search", value='Search', required='required', pattern='[a-z0-9]{2,30}', placeholder='Search...').render()
u'<input id="search" name="search" pattern="[a-z0-9]{2,30}" placeholder="Search..." required="required" type="search" value="Search"/>'
>>> Input(name='url', type="url", value='url').render()
u'<input id="url" name="url" type="url" value="url"/>'
>>> Input(name='range', type="range", min='0', max='10', step='2', value='5').render()
u'<input id="range" max="10" min="0" name="range" step="2" type="range" value="5"/>'
>>> Input(name='color', type="color").render()
u'<input id="color" name="color" type="color"/>'
>>> Input(name='f', type="file", accept=".doc,.docx,.xml").render()
u'<input accept=".doc,.docx,.xml" id="f" name="f" type="file"/>'
"""
def __init__(self, name, *validators, **attrs):
self.name = name
self.validators = validators
self.attrs = attrs = AttributeList(attrs)
self.type = attrs.pop("type", None)
self.description = attrs.pop("description", name)
self.value = attrs.pop("value", None)
self.pre = attrs.pop("pre", "")
self.post = attrs.pop("post", "")
self.note = None
self.id = attrs.setdefault("id", self.get_default_id())
if "class_" in attrs:
attrs["class"] = attrs["class_"]
del attrs["class_"]
def is_hidden(self):
return False
def get_type(self):
if self.type is not None:
return self.type
else:
raise AttributeError("missing attribute 'type'")
def get_default_id(self):
return self.name
def validate(self, value):
self.set_value(value)
for v in self.validators:
if not v.valid(value):
self.note = v.msg
return False
return True
def set_value(self, value):
self.value = value
def get_value(self):
return self.value
def render(self):
attrs = self.attrs.copy()
attrs["type"] = self.get_type()
if self.value is not None:
attrs["value"] = self.value
attrs["name"] = self.name
attrs["id"] = self.id
return "<input %s/>" % attrs
def rendernote(self, note):
if note:
return '<strong class="wrong">%s</strong>' % net.websafe(note)
else:
return ""
def addatts(self):
# add leading space for backward-compatibility
return " " + str(self.attrs)
class AttributeList(dict):
"""List of attributes of input.
>>> a = AttributeList(type='text', name='x', value=20)
>>> a
<attrs: 'name="x" type="text" value="20"'>
"""
def copy(self):
return AttributeList(self)
def __str__(self):
return " ".join([f'{k}="{net.websafe(v)}"' for k, v in sorted(self.items())])
def __repr__(self):
return "<attrs: %s>" % repr(str(self))
class Textbox(Input):
"""Textbox input.
>>> Textbox(name='foo', value='bar').render()
u'<input id="foo" name="foo" type="text" value="bar"/>'
>>> Textbox(name='foo', value=0).render()
u'<input id="foo" name="foo" type="text" value="0"/>'
"""
def get_type(self):
return "text"
class Password(Input):
"""Password input.
>>> Password(name='password', value='secret').render()
u'<input id="password" name="password" type="password" value="secret"/>'
"""
def get_type(self):
return "password"
class Textarea(Input):
"""Textarea input.
>>> Textarea(name='foo', value='bar').render()
u'<textarea id="foo" name="foo">bar</textarea>'
"""
def render(self):
attrs = self.attrs.copy()
attrs["name"] = self.name
value = net.websafe(self.value or "")
return f"<textarea {attrs}>{value}</textarea>"
class Dropdown(Input):
r"""Dropdown/select input.
>>> Dropdown(name='foo', args=['a', 'b', 'c'], value='b').render()
u'<select id="foo" name="foo">\n <option value="a">a</option>\n <option selected="selected" value="b">b</option>\n <option value="c">c</option>\n</select>\n'
>>> Dropdown(name='foo', args=[('a', 'aa'), ('b', 'bb'), ('c', 'cc')], value='b').render()
u'<select id="foo" name="foo">\n <option value="a">aa</option>\n <option selected="selected" value="b">bb</option>\n <option value="c">cc</option>\n</select>\n'
"""
def __init__(self, name, args, *validators, **attrs):
self.args = args
super().__init__(name, *validators, **attrs)
def render(self):
attrs = self.attrs.copy()
attrs["name"] = self.name
x = "<select %s>\n" % attrs
for arg in self.args:
x += self._render_option(arg)
x += "</select>\n"
return x
def _render_option(self, arg, indent=" "):
if isinstance(arg, (tuple, list)):
value, desc = arg
else:
value, desc = arg, arg
value = utils.safestr(value)
if isinstance(self.value, (tuple, list)):
s_value = [utils.safestr(x) for x in self.value]
else:
s_value = utils.safestr(self.value)
if s_value == value or (isinstance(s_value, list) and value in s_value):
select_p = ' selected="selected"'
else:
select_p = ""
return indent + '<option{} value="{}">{}</option>\n'.format(
select_p,
net.websafe(value),
net.websafe(desc),
)
class GroupedDropdown(Dropdown):
r"""Grouped Dropdown/select input.
>>> GroupedDropdown(name='car_type', args=(('Swedish Cars', ('Volvo', 'Saab')), ('German Cars', ('Mercedes', 'Audi'))), value='Audi').render()
u'<select id="car_type" name="car_type">\n <optgroup label="Swedish Cars">\n <option value="Volvo">Volvo</option>\n <option value="Saab">Saab</option>\n </optgroup>\n <optgroup label="German Cars">\n <option value="Mercedes">Mercedes</option>\n <option selected="selected" value="Audi">Audi</option>\n </optgroup>\n</select>\n'
>>> GroupedDropdown(name='car_type', args=(('Swedish Cars', (('v', 'Volvo'), ('s', 'Saab'))), ('German Cars', (('m', 'Mercedes'), ('a', 'Audi')))), value='a').render()
u'<select id="car_type" name="car_type">\n <optgroup label="Swedish Cars">\n <option value="v">Volvo</option>\n <option value="s">Saab</option>\n </optgroup>\n <optgroup label="German Cars">\n <option value="m">Mercedes</option>\n <option selected="selected" value="a">Audi</option>\n </optgroup>\n</select>\n'
"""
def __init__(self, name, args, *validators, **attrs):
self.args = args
super().__init__(name, *validators, **attrs)
def render(self):
attrs = self.attrs.copy()
attrs["name"] = self.name
x = "<select %s>\n" % attrs
for label, options in self.args:
x += ' <optgroup label="%s">\n' % net.websafe(label)
for arg in options:
x += self._render_option(arg, indent=" ")
x += " </optgroup>\n"
x += "</select>\n"
return x
class Radio(Input):
def __init__(self, name, args, *validators, **attrs):
self.args = args
super().__init__(name, *validators, **attrs)
def render(self):
x = "<span>"
for idx, arg in enumerate(self.args, start=1):
if isinstance(arg, (tuple, list)):
value, desc = arg
else:
value, desc = arg, arg
attrs = self.attrs.copy()
attrs["name"] = self.name
attrs["type"] = "radio"
attrs["value"] = value
attrs["id"] = self.name + str(idx)
if self.value == value:
attrs["checked"] = "checked"
x += f"<input {attrs}/> {net.websafe(desc)}"
x += "</span>"
return x
class Checkbox(Input):
"""Checkbox input.
>>> Checkbox('foo', value='bar', checked=True).render()
u'<input checked="checked" id="foo_bar" name="foo" type="checkbox" value="bar"/>'
>>> Checkbox('foo', value='bar').render()
u'<input id="foo_bar" name="foo" type="checkbox" value="bar"/>'
>>> c = Checkbox('foo', value='bar')
>>> c.validate('on')
True
>>> c.render()
u'<input checked="checked" id="foo_bar" name="foo" type="checkbox" value="bar"/>'
"""
def __init__(self, name, *validators, **attrs):
self.checked = attrs.pop("checked", False)
Input.__init__(self, name, *validators, **attrs)
def get_default_id(self):
value = utils.safestr(self.value or "")
return self.name + "_" + value.replace(" ", "_")
def render(self):
attrs = self.attrs.copy()
attrs["type"] = "checkbox"
attrs["name"] = self.name
attrs["value"] = self.value
if self.checked:
attrs["checked"] = "checked"
return "<input %s/>" % attrs
def set_value(self, value):
self.checked = bool(value)
def get_value(self):
return self.checked
class Button(Input):
"""HTML Button.
>>> Button("save").render()
u'<button id="save" name="save">save</button>'
>>> Button("action", value="save", html="<b>Save Changes</b>").render()
u'<button id="action" name="action" value="save"><b>Save Changes</b></button>'
"""
def __init__(self, name, *validators, **attrs):
super().__init__(name, *validators, **attrs)
self.description = ""
def render(self):
attrs = self.attrs.copy()
attrs["name"] = self.name
if self.value is not None:
attrs["value"] = self.value
html = attrs.pop("html", None) or net.websafe(self.name)
return f"<button {attrs}>{html}</button>"
class Hidden(Input):
"""Hidden Input.
>>> Hidden(name='foo', value='bar').render()
u'<input id="foo" name="foo" type="hidden" value="bar"/>'
"""
def is_hidden(self):
return True
def get_type(self):
return "hidden"
class File(Input):
"""File input.
>>> File(name='f', accept=".doc,.docx,.xml").render()
u'<input accept=".doc,.docx,.xml" id="f" name="f" type="file"/>'
"""
def get_type(self):
return "file"
class Telephone(Input):
"""Telephone input.
See: <https://html.spec.whatwg.org/#telephone-state-(type=tel)>
>>> Telephone(name='tel', value='55512345').render()
u'<input id="tel" name="tel" type="tel" value="55512345"/>'
"""
def get_type(self):
return "tel"
class Email(Input):
"""Email input.
See: <https://html.spec.whatwg.org/#e-mail-state-(type=email)>
>>> Email(name='email', value='me@example.org').render()
u'<input id="email" name="email" type="email" value="me@example.org"/>'
"""
def get_type(self):
return "email"
class Date(Input):
"""Date input.
Note: Not supported by desktop Safari, Internet Explorer, or Opera Mini
See: <https://html.spec.whatwg.org/#date-state-(type=date)>
>>> Date(name='date', value='2020-04-01').render()
u'<input id="date" name="date" type="date" value="2020-04-01"/>'
"""
def get_type(self):
return "date"
class Time(Input):
"""Time input.
Note: Not supported by desktop Safari, Internet Explorer, or Opera Mini
See: <https://html.spec.whatwg.org/#time-state-(type=time)>
>>> Time(name='time', value='07:00').render()
u'<input id="time" name="time" type="time" value="07:00"/>'
"""
def get_type(self):
return "time"
class Search(Input):
"""Search input.
See: <https://html.spec.whatwg.org/#text-(type=text)-state-and-search-state-(type=search)>
>> Search(name='search', value='Search').render()
u'<input id="search" name="search" type="search" value="Search"/>'
>>> Search(name='search', value='Search', required='required', pattern='[a-z0-9]{2,30}', placeholder='Search...').render()
u'<input id="search" name="search" pattern="[a-z0-9]{2,30}" placeholder="Search..." required="required" type="search" value="Search"/>'
"""
def get_type(self):
return "search"
class Url(Input):
"""URL input.
See: <https://html.spec.whatwg.org/#url-state-(type=url)>
>>> Url(name='url', value='url').render()
u'<input id="url" name="url" type="url" value="url"/>'
"""
def get_type(self):
return "url"
class Number(Input):
"""Number input.
See: <https://html.spec.whatwg.org/#number-state-(type=number)>
>>> Number(name='num', min='0', max='10', step='2', value='5').render()
u'<input id="num" max="10" min="0" name="num" step="2" type="number" value="5"/>'
"""
def get_type(self):
return "number"
class Range(Input):
"""Range input.
See: <https://html.spec.whatwg.org/#range-state-(type=range)>
>>> Range(name='range', min='0', max='10', step='2', value='5').render()
u'<input id="range" max="10" min="0" name="range" step="2" type="range" value="5"/>'
"""
def get_type(self):
return "range"
class Color(Input):
"""Color input.
Note: Not supported by Internet Explorer or Opera Mini
See: <https://html.spec.whatwg.org/#color-stat://html.spec.whatwg.org/#color-state-(type=color)>
>>> Color(name='color').render()
u'<input id="color" name="color" type="color"/>'
"""
def get_type(self):
return "color"
class Datalist(Input):
"""Datalist input.
This is currently supported by Chrome, Firefox, Edge, and Opera. It is not
supported on Safari or Internet Explorer. Use it with caution.
Datalist cannot be used separately. It must be bound to an input.
<https://html.spec.whatwg.org/#the-datalist-element>
>>> Datalist(name='list', args=[('a', 'b'), ('c', 'd')]).render()
u'<datalist id="list" name="list"><option label="a" value="b"/><option label="c" value="d"/></datalist>'
>>> Datalist(name='list', args=[['a', 'b'], ['c', 'd']]).render()
u'<datalist id="list" name="list"><option label="a" value="b"/><option label="c" value="d"/></datalist>'
>>> Datalist(name='list', args=['a', 'b', 'c', 'd']).render()
u'<datalist id="list" name="list"><option value="a"/><option value="b"/><option value="c"/><option value="d"/></datalist>'
"""
def __init__(self, name, args, *validators, **kwargs):
self.args = args
super().__init__(name, *validators, **kwargs)
def render(self):
attrs = self.attrs.copy()
attrs["name"] = self.name
label_p = ""
x = "<datalist %s>" % attrs
for arg in self.args:
if isinstance(arg, (tuple, list)):
label_p = ' label="%s"' % net.websafe(arg[0])
label = net.websafe(arg[1])
else:
label = net.websafe(arg)
x += f'<option{label_p} value="{label}"/>'
x += "</datalist>"
return x
class Validator:
def __deepcopy__(self, memo):
return copy.copy(self)
def __init__(self, msg, test, jstest=None):
utils.autoassign(self, locals())
def valid(self, value):
try:
return self.test(value)
except:
return False
notnull = Validator("Required", bool)
class regexp(Validator):
def __init__(self, rexp, msg):
self.rexp = re.compile(rexp)
self.msg = msg
def valid(self, value):
return bool(self.rexp.match(value))
if __name__ == "__main__":
import doctest
doctest.testmod()

168
web/http.py Normal file
View File

@@ -0,0 +1,168 @@
"""
HTTP Utilities
(from web.py)
"""
__all__ = [
"expires",
"lastmodified",
"prefixurl",
"modified",
"changequery",
"url",
"profiler",
]
import datetime
from . import net, utils
from . import webapi as web
from .py3helpers import iteritems
try:
from urllib.parse import urlencode as urllib_urlencode
except ImportError:
from urllib import urlencode as urllib_urlencode
def prefixurl(base=""):
"""
Sorry, this function is really difficult to explain.
Maybe some other time.
"""
url = web.ctx.path.lstrip("/")
for i in range(url.count("/")):
base += "../"
if not base:
base = "./"
return base
def expires(delta):
"""
Outputs an `Expires` header for `delta` from now.
`delta` is a `timedelta` object or a number of seconds.
"""
if isinstance(delta, int):
delta = datetime.timedelta(seconds=delta)
date_obj = datetime.datetime.utcnow() + delta
web.header("Expires", net.httpdate(date_obj))
def lastmodified(date_obj):
"""Outputs a `Last-Modified` header for `datetime`."""
web.header("Last-Modified", net.httpdate(date_obj))
def modified(date=None, etag=None):
"""
Checks to see if the page has been modified since the version in the
requester's cache.
When you publish pages, you can include `Last-Modified` and `ETag`
with the date the page was last modified and an opaque token for
the particular version, respectively. When readers reload the page,
the browser sends along the modification date and etag value for
the version it has in its cache. If the page hasn't changed,
the server can just return `304 Not Modified` and not have to
send the whole page again.
This function takes the last-modified date `date` and the ETag `etag`
and checks the headers to see if they match. If they do, it returns
`True`, or otherwise it raises NotModified error. It also sets
`Last-Modified` and `ETag` output headers.
"""
n = {x.strip('" ') for x in web.ctx.env.get("HTTP_IF_NONE_MATCH", "").split(",")}
m = net.parsehttpdate(web.ctx.env.get("HTTP_IF_MODIFIED_SINCE", "").split(";")[0])
validate = False
if etag:
if "*" in n or etag in n:
validate = True
if date and m:
# we subtract a second because
# HTTP dates don't have sub-second precision
if date - datetime.timedelta(seconds=1) <= m:
validate = True
if date:
lastmodified(date)
if etag:
web.header("ETag", '"' + etag + '"')
if validate:
raise web.notmodified()
else:
return True
def urlencode(query, doseq=0):
"""
Same as urllib.urlencode, but supports unicode strings.
>>> urlencode({'text':'foo bar'})
'text=foo+bar'
>>> urlencode({'x': [1, 2]}, doseq=True)
'x=1&x=2'
"""
def convert(value, doseq=False):
if doseq and isinstance(value, list):
return [convert(v) for v in value]
else:
return utils.safestr(value)
query = {k: convert(v, doseq) for k, v in query.items()}
return urllib_urlencode(query, doseq=doseq)
def changequery(query=None, **kw):
"""
Imagine you're at `/foo?a=1&b=2`. Then `changequery(a=3)` will return
`/foo?a=3&b=2` -- the same URL but with the arguments you requested
changed.
"""
if query is None:
query = web.rawinput(method="get")
for k, v in iteritems(kw):
if v is None:
query.pop(k, None)
else:
query[k] = v
out = web.ctx.path
if query:
out += "?" + urlencode(query, doseq=True)
return out
def url(path=None, doseq=False, **kw):
"""
Makes url by concatenating web.ctx.homepath and path and the
query string created using the arguments.
"""
if path is None:
path = web.ctx.path
if path.startswith("/"):
out = web.ctx.homepath + path
else:
out = path
if kw:
out += "?" + urlencode(kw, doseq=doseq)
return out
def profiler(app):
"""Outputs basic profiling information at the bottom of each response."""
from utils import profile
def profile_internal(e, o):
out, result = profile(app)(e, o)
return list(out) + ["<pre>" + net.websafe(result) + "</pre>"]
return profile_internal
if __name__ == "__main__":
import doctest
doctest.testmod()

306
web/httpserver.py Normal file
View File

@@ -0,0 +1,306 @@
import os
import posixpath
import sys
from http.server import BaseHTTPRequestHandler, HTTPServer, SimpleHTTPRequestHandler
from io import BytesIO
from urllib import parse as urlparse
from urllib.parse import unquote
from . import utils
from . import webapi as web
__all__ = ["runsimple"]
def runbasic(func, server_address=("0.0.0.0", 8080)):
"""
Runs a simple HTTP server hosting WSGI app `func`. The directory `static/`
is hosted statically.
Based on [WsgiServer][ws] from [Colin Stewart][cs].
[ws]: http://www.owlfish.com/software/wsgiutils/documentation/wsgi-server-api.html
[cs]: http://www.owlfish.com/
"""
# Copyright (c) 2004 Colin Stewart (http://www.owlfish.com/)
# Modified somewhat for simplicity
# Used under the modified BSD license:
# http://www.xfree86.org/3.3.6/COPYRIGHT2.html#5
import errno
import socket
import traceback
import SocketServer
class WSGIHandler(SimpleHTTPRequestHandler):
def run_wsgi_app(self):
protocol, host, path, parameters, query, fragment = urlparse.urlparse(
"http://dummyhost%s" % self.path
)
# we only use path, query
env = {
"wsgi.version": (1, 0),
"wsgi.url_scheme": "http",
"wsgi.input": self.rfile,
"wsgi.errors": sys.stderr,
"wsgi.multithread": 1,
"wsgi.multiprocess": 0,
"wsgi.run_once": 0,
"REQUEST_METHOD": self.command,
"REQUEST_URI": self.path,
"PATH_INFO": path,
"QUERY_STRING": query,
"CONTENT_TYPE": self.headers.get("Content-Type", ""),
"CONTENT_LENGTH": self.headers.get("Content-Length", ""),
"REMOTE_ADDR": self.client_address[0],
"SERVER_NAME": self.server.server_address[0],
"SERVER_PORT": str(self.server.server_address[1]),
"SERVER_PROTOCOL": self.request_version,
}
for http_header, http_value in self.headers.items():
env["HTTP_%s" % http_header.replace("-", "_").upper()] = http_value
# Setup the state
self.wsgi_sent_headers = 0
self.wsgi_headers = []
try:
# We have there environment, now invoke the application
result = self.server.app(env, self.wsgi_start_response)
try:
try:
for data in result:
if data:
self.wsgi_write_data(data)
finally:
if hasattr(result, "close"):
result.close()
except OSError as socket_err:
# Catch common network errors and suppress them
if socket_err.args[0] in (errno.ECONNABORTED, errno.EPIPE):
return
except socket.timeout:
return
except:
print(traceback.format_exc(), file=web.debug)
if not self.wsgi_sent_headers:
# We must write out something!
self.wsgi_write_data(" ")
return
do_POST = run_wsgi_app
do_PUT = run_wsgi_app
do_DELETE = run_wsgi_app
def do_GET(self):
if self.path.startswith("/static/"):
SimpleHTTPRequestHandler.do_GET(self)
else:
self.run_wsgi_app()
def wsgi_start_response(self, response_status, response_headers, exc_info=None):
if self.wsgi_sent_headers:
raise Exception("Headers already sent and start_response called again!")
# Should really take a copy to avoid changes in the application....
self.wsgi_headers = (response_status, response_headers)
return self.wsgi_write_data
def wsgi_write_data(self, data):
if not self.wsgi_sent_headers:
status, headers = self.wsgi_headers
# Need to send header prior to data
status_code = status[: status.find(" ")]
status_msg = status[status.find(" ") + 1 :]
self.send_response(int(status_code), status_msg)
for header, value in headers:
self.send_header(header, value)
self.end_headers()
self.wsgi_sent_headers = 1
# Send the data
self.wfile.write(data)
class WSGIServer(SocketServer.ThreadingMixIn, HTTPServer):
def __init__(self, func, server_address):
HTTPServer.HTTPServer.__init__(self, server_address, WSGIHandler)
self.app = func
self.serverShuttingDown = 0
print("http://%s:%d/" % server_address)
WSGIServer(func, server_address).serve_forever()
# The WSGIServer instance.
# Made global so that it can be stopped in embedded mode.
server = None
def runsimple(func, server_address=("0.0.0.0", 8080)):
"""
Runs [CherryPy][cp] WSGI server hosting WSGI app `func`.
The directory `static/` is hosted statically.
[cp]: http://www.cherrypy.org
"""
global server
func = StaticMiddleware(func)
func = LogMiddleware(func)
server = WSGIServer(server_address, func)
if "/" in server_address[0]:
print("%s" % server_address)
else:
if server.ssl_adapter:
print("https://%s:%d/" % server_address)
else:
print("http://%s:%d/" % server_address)
try:
server.start()
except (KeyboardInterrupt, SystemExit):
server.stop()
server = None
def WSGIServer(server_address, wsgi_app):
"""Creates CherryPy WSGI server listening at `server_address` to serve `wsgi_app`.
This function can be overwritten to customize the webserver or use a different webserver.
"""
from cheroot import wsgi
server = wsgi.Server(server_address, wsgi_app, server_name="localhost")
server.nodelay = not sys.platform.startswith(
"java"
) # TCP_NODELAY isn't supported on the JVM
return server
class StaticApp(SimpleHTTPRequestHandler):
"""WSGI application for serving static files."""
def __init__(self, environ, start_response):
self.headers = []
self.environ = environ
self.start_response = start_response
self.directory = os.getcwd()
def send_response(self, status, msg=""):
# the int(status) call is needed because in Py3 status is an enum.IntEnum and we need the integer behind
self.status = str(int(status)) + " " + msg
def send_header(self, name, value):
self.headers.append((name, value))
def end_headers(self):
pass
def log_message(*a):
pass
def __iter__(self):
environ = self.environ
self.path = environ.get("PATH_INFO", "")
self.client_address = (
environ.get("REMOTE_ADDR", "-"),
environ.get("REMOTE_PORT", "-"),
)
self.command = environ.get("REQUEST_METHOD", "-")
self.wfile = BytesIO() # for capturing error
try:
path = self.translate_path(self.path)
etag = '"%s"' % os.path.getmtime(path)
client_etag = environ.get("HTTP_IF_NONE_MATCH")
self.send_header("ETag", etag)
if etag == client_etag:
self.send_response(304, "Not Modified")
self.start_response(self.status, self.headers)
return
except OSError:
pass # Probably a 404
f = self.send_head()
self.start_response(self.status, self.headers)
if f:
block_size = 16 * 1024
while True:
buf = f.read(block_size)
if not buf:
break
yield buf
f.close()
else:
value = self.wfile.getvalue()
yield value
class StaticMiddleware:
"""WSGI middleware for serving static files."""
def __init__(self, app, prefix="/static/"):
self.app = app
self.prefix = prefix
def __call__(self, environ, start_response):
path = environ.get("PATH_INFO", "")
path = self.normpath(path)
if path.startswith(self.prefix):
return StaticApp(environ, start_response)
else:
return self.app(environ, start_response)
def normpath(self, path):
path2 = posixpath.normpath(unquote(path))
if path.endswith("/"):
path2 += "/"
return path2
class LogMiddleware:
"""WSGI middleware for logging the status."""
def __init__(self, app):
self.app = app
self.format = '%s - - [%s] "%s %s %s" - %s'
f = BytesIO()
class FakeSocket:
def makefile(self, *a):
return f
# take log_date_time_string method from BaseHTTPRequestHandler
self.log_date_time_string = BaseHTTPRequestHandler(
FakeSocket(), None, None
).log_date_time_string
def __call__(self, environ, start_response):
def xstart_response(status, response_headers, *args):
out = start_response(status, response_headers, *args)
self.log(status, environ)
return out
return self.app(environ, xstart_response)
def log(self, status, environ):
outfile = environ.get("wsgi.errors", web.debug)
req = environ.get("PATH_INFO", "_")
protocol = environ.get("ACTUAL_SERVER_PROTOCOL", "-")
method = environ.get("REQUEST_METHOD", "-")
host = "{}:{}".format(
environ.get("REMOTE_ADDR", "-"),
environ.get("REMOTE_PORT", "-"),
)
time = self.log_date_time_string()
msg = self.format % (host, time, protocol, method, req, status)
print(utils.safestr(msg), file=outfile)

279
web/net.py Normal file
View File

@@ -0,0 +1,279 @@
"""
Network Utilities
(from web.py)
"""
import datetime
import re
import socket
import time
try:
from urllib.parse import quote
except ImportError:
from urllib import quote
__all__ = [
"validipaddr",
"validip6addr",
"validipport",
"validip",
"validaddr",
"urlquote",
"httpdate",
"parsehttpdate",
"htmlquote",
"htmlunquote",
"websafe",
]
def validip6addr(address):
"""
Returns True if `address` is a valid IPv6 address.
>>> validip6addr('::')
True
>>> validip6addr('aaaa:bbbb:cccc:dddd::1')
True
>>> validip6addr('1:2:3:4:5:6:7:8:9:10')
False
>>> validip6addr('12:10')
False
"""
try:
socket.inet_pton(socket.AF_INET6, address)
except (OSError, AttributeError, ValueError):
return False
return True
def validipaddr(address):
"""
Returns True if `address` is a valid IPv4 address.
>>> validipaddr('192.168.1.1')
True
>>> validipaddr('192.168. 1.1')
False
>>> validipaddr('192.168.1.800')
False
>>> validipaddr('192.168.1')
False
"""
try:
octets = address.split(".")
if len(octets) != 4:
return False
for x in octets:
if " " in x:
return False
if not (0 <= int(x) <= 255):
return False
except ValueError:
return False
return True
def validipport(port):
"""
Returns True if `port` is a valid IPv4 port.
>>> validipport('9000')
True
>>> validipport('foo')
False
>>> validipport('1000000')
False
"""
try:
if not (0 <= int(port) <= 65535):
return False
except ValueError:
return False
return True
def validip(ip, defaultaddr="0.0.0.0", defaultport=8080):
"""
Returns `(ip_address, port)` from string `ip_addr_port`
>>> validip('1.2.3.4')
('1.2.3.4', 8080)
>>> validip('80')
('0.0.0.0', 80)
>>> validip('192.168.0.1:85')
('192.168.0.1', 85)
>>> validip('::')
('::', 8080)
>>> validip('[::]:88')
('::', 88)
>>> validip('[::1]:80')
('::1', 80)
"""
addr = defaultaddr
port = defaultport
# Matt Boswell's code to check for ipv6 first
match = re.search(r"^\[([^]]+)\](?::(\d+))?$", ip) # check for [ipv6]:port
if match:
if validip6addr(match.group(1)):
if match.group(2):
if validipport(match.group(2)):
return (match.group(1), int(match.group(2)))
else:
return (match.group(1), port)
else:
if validip6addr(ip):
return (ip, port)
# end ipv6 code
ip = ip.split(":", 1)
if len(ip) == 1:
if not ip[0]:
pass
elif validipaddr(ip[0]):
addr = ip[0]
elif validipport(ip[0]):
port = int(ip[0])
else:
raise ValueError(":".join(ip) + " is not a valid IP address/port")
elif len(ip) == 2:
addr, port = ip
if not validipaddr(addr) or not validipport(port):
raise ValueError(":".join(ip) + " is not a valid IP address/port")
port = int(port)
else:
raise ValueError(":".join(ip) + " is not a valid IP address/port")
return (addr, port)
def validaddr(string_):
"""
Returns either (ip_address, port) or "/path/to/socket" from string_
>>> validaddr('/path/to/socket')
'/path/to/socket'
>>> validaddr('8000')
('0.0.0.0', 8000)
>>> validaddr('127.0.0.1')
('127.0.0.1', 8080)
>>> validaddr('127.0.0.1:8000')
('127.0.0.1', 8000)
>>> validip('[::1]:80')
('::1', 80)
>>> validaddr('fff')
Traceback (most recent call last):
...
ValueError: fff is not a valid IP address/port
"""
if "/" in string_:
return string_
else:
return validip(string_)
def urlquote(val):
"""
Quotes a string for use in a URL.
>>> urlquote('://?f=1&j=1')
'%3A//%3Ff%3D1%26j%3D1'
>>> urlquote(None)
''
>>> urlquote(u'\u203d')
'%E2%80%BD'
"""
if val is None:
return ""
val = str(val).encode("utf-8")
return quote(val)
def httpdate(date_obj):
"""
Formats a datetime object for use in HTTP headers.
>>> import datetime
>>> httpdate(datetime.datetime(1970, 1, 1, 1, 1, 1))
'Thu, 01 Jan 1970 01:01:01 GMT'
"""
return date_obj.strftime("%a, %d %b %Y %H:%M:%S GMT")
def parsehttpdate(string_):
"""
Parses an HTTP date into a datetime object.
>>> parsehttpdate('Thu, 01 Jan 1970 01:01:01 GMT')
datetime.datetime(1970, 1, 1, 1, 1, 1)
"""
try:
t = time.strptime(string_, "%a, %d %b %Y %H:%M:%S %Z")
except ValueError:
return None
return datetime.datetime(*t[:6])
def htmlquote(text):
r"""
Encodes `text` for raw use in HTML.
>>> htmlquote(u"<'&\">")
u'&lt;&#39;&amp;&quot;&gt;'
"""
text = text.replace("&", "&amp;") # Must be done first!
text = text.replace("<", "&lt;")
text = text.replace(">", "&gt;")
text = text.replace("'", "&#39;")
text = text.replace('"', "&quot;")
return text
def htmlunquote(text):
r"""
Decodes `text` that's HTML quoted.
>>> htmlunquote(u'&lt;&#39;&amp;&quot;&gt;')
u'<\'&">'
"""
text = text.replace("&quot;", '"')
text = text.replace("&#39;", "'")
text = text.replace("&gt;", ">")
text = text.replace("&lt;", "<")
text = text.replace("&amp;", "&") # Must be done last!
return text
def websafe(val):
r"""
Converts `val` so that it is safe for use in Unicode HTML.
>>> websafe("<'&\">")
u'&lt;&#39;&amp;&quot;&gt;'
>>> websafe(None)
u''
>>> websafe(u'\u203d') == u'\u203d'
True
"""
if val is None:
return ""
if isinstance(val, bytes):
val = val.decode("utf-8")
elif not isinstance(val, str):
val = str(val)
return htmlquote(val)
if __name__ == "__main__":
import doctest
doctest.testmod()

7
web/py3helpers.py Normal file
View File

@@ -0,0 +1,7 @@
"""Utilities for make the code run both on Python2 and Python3.
"""
# Dictionary iteration
iterkeys = lambda d: iter(d.keys())
itervalues = lambda d: iter(d.values())
iteritems = lambda d: iter(d.items())

457
web/session.py Normal file
View File

@@ -0,0 +1,457 @@
"""
Session Management
(from web.py)
"""
import datetime
import os
import os.path
import shutil
import threading
import time
from copy import deepcopy
from hashlib import sha1
from . import utils
from . import webapi as web
from .py3helpers import iteritems
try:
import cPickle as pickle
except ImportError:
import pickle
from base64 import decodebytes, encodebytes
__all__ = ["Session", "SessionExpired", "Store", "DiskStore", "DBStore", "MemoryStore"]
web.config.session_parameters = utils.storage(
{
"cookie_name": "webpy_session_id",
"cookie_domain": None,
"cookie_path": None,
"samesite": None,
"timeout": 86400, # 24 * 60 * 60, # 24 hours in seconds
"ignore_expiry": True,
"ignore_change_ip": True,
"secret_key": "fLjUfxqXtfNoIldA0A0J",
"expired_message": "Session expired",
"httponly": True,
"secure": False,
}
)
class SessionExpired(web.HTTPError):
def __init__(self, message):
web.HTTPError.__init__(self, "200 OK", {}, data=message)
class Session:
"""Session management for web.py"""
__slots__ = [
"store",
"_initializer",
"_last_cleanup_time",
"_config",
"_data",
"__getitem__",
"__setitem__",
"__delitem__",
]
def __init__(self, app, store, initializer=None):
self.store = store
self._initializer = initializer
self._last_cleanup_time = 0
self._config = utils.storage(web.config.session_parameters)
self._data = utils.threadeddict()
self.__getitem__ = self._data.__getitem__
self.__setitem__ = self._data.__setitem__
self.__delitem__ = self._data.__delitem__
if app:
app.add_processor(self._processor)
def __contains__(self, name):
return name in self._data
def __getattr__(self, name):
return getattr(self._data, name)
def __setattr__(self, name, value):
if name in self.__slots__:
object.__setattr__(self, name, value)
else:
setattr(self._data, name, value)
def __delattr__(self, name):
delattr(self._data, name)
def _processor(self, handler):
"""Application processor to setup session for every request"""
self._cleanup()
self._load()
try:
return handler()
finally:
self._save()
def _load(self):
"""Load the session from the store, by the id from cookie"""
cookie_name = self._config.cookie_name
self.session_id = web.cookies().get(cookie_name)
# protection against session_id tampering
if self.session_id and not self._valid_session_id(self.session_id):
self.session_id = None
self._check_expiry()
if self.session_id:
d = self.store[self.session_id]
self.update(d)
self._validate_ip()
if not self.session_id:
self.session_id = self._generate_session_id()
if self._initializer:
if isinstance(self._initializer, dict):
self.update(deepcopy(self._initializer))
elif hasattr(self._initializer, "__call__"):
self._initializer()
self.ip = web.ctx.ip
def _check_expiry(self):
# check for expiry
if self.session_id and self.session_id not in self.store:
if self._config.ignore_expiry:
self.session_id = None
else:
return self.expired()
def _validate_ip(self):
# check for change of IP
if self.session_id and self.get("ip", None) != web.ctx.ip:
if not self._config.ignore_change_ip:
return self.expired()
def _save(self):
current_values = dict(self._data)
del current_values["session_id"]
del current_values["ip"]
if not self.get("_killed"):
self._setcookie(self.session_id)
self.store[self.session_id] = dict(self._data)
else:
if web.cookies().get(self._config.cookie_name):
self._setcookie(self.session_id, expires=-1)
def _setcookie(self, session_id, expires="", **kw):
cookie_name = self._config.cookie_name
cookie_domain = self._config.cookie_domain
cookie_path = self._config.cookie_path
httponly = self._config.httponly
secure = self._config.secure
samesite = kw.get("samesite", self._config.get("samesite", None))
web.setcookie(
cookie_name,
session_id,
expires=expires,
domain=cookie_domain,
httponly=httponly,
secure=secure,
path=cookie_path,
samesite=samesite,
)
def _generate_session_id(self):
"""Generate a random id for session"""
while True:
rand = os.urandom(16)
now = time.time()
secret_key = self._config.secret_key
hashable = f"{rand}{now}{utils.safestr(web.ctx.ip)}{secret_key}"
session_id = sha1(hashable.encode("utf-8")).hexdigest()
if session_id not in self.store:
break
return session_id
def _valid_session_id(self, session_id):
rx = utils.re_compile("^[0-9a-fA-F]+$")
return rx.match(session_id)
def _cleanup(self):
"""Cleanup the stored sessions"""
current_time = time.time()
timeout = self._config.timeout
if current_time - self._last_cleanup_time > timeout:
self.store.cleanup(timeout)
self._last_cleanup_time = current_time
def expired(self):
"""Called when an expired session is atime"""
self._killed = True
self._save()
raise SessionExpired(self._config.expired_message)
def kill(self):
"""Kill the session, make it no longer available"""
del self.store[self.session_id]
self._killed = True
class Store:
"""Base class for session stores"""
def __contains__(self, key):
raise NotImplementedError()
def __getitem__(self, key):
raise NotImplementedError()
def __setitem__(self, key, value):
raise NotImplementedError()
def cleanup(self, timeout):
"""removes all the expired sessions"""
raise NotImplementedError()
def encode(self, session_dict):
"""encodes session dict as a string"""
pickled = pickle.dumps(session_dict)
return encodebytes(pickled)
def decode(self, session_data):
"""decodes the data to get back the session dict"""
if isinstance(session_data, str):
session_data = session_data.encode()
pickled = decodebytes(session_data)
return pickle.loads(pickled)
class DiskStore(Store):
"""
Store for saving a session on disk.
>>> import tempfile
>>> root = tempfile.mkdtemp()
>>> s = DiskStore(root)
>>> s['a'] = 'foo'
>>> s['a']
'foo'
>>> time.sleep(0.01)
>>> s.cleanup(0.01)
>>> s['a']
Traceback (most recent call last):
...
KeyError: 'a'
"""
def __init__(self, root):
# if the storage root doesn't exists, create it.
if not os.path.exists(root):
os.makedirs(os.path.abspath(root))
self.root = root
def _get_path(self, key):
if os.path.sep in key:
raise ValueError("Bad key: %s" % repr(key))
return os.path.join(self.root, key)
def __contains__(self, key):
path = self._get_path(key)
return os.path.exists(path)
def __getitem__(self, key):
path = self._get_path(key)
if os.path.exists(path):
with open(path, "rb") as fh:
pickled = fh.read()
return self.decode(pickled)
else:
raise KeyError(key)
def __setitem__(self, key, value):
path = self._get_path(key)
pickled = self.encode(value)
try:
tname = path + "." + threading.current_thread().getName()
f = open(tname, "wb")
try:
f.write(pickled)
finally:
f.close()
shutil.move(tname, path) # atomary operation
except OSError:
pass
def __delitem__(self, key):
path = self._get_path(key)
if os.path.exists(path):
os.remove(path)
def cleanup(self, timeout):
if not os.path.isdir(self.root):
return
now = time.time()
for f in os.listdir(self.root):
path = self._get_path(f)
atime = os.stat(path).st_atime
if now - atime > timeout:
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
class DBStore(Store):
"""Store for saving a session in database
Needs a table with the following columns:
session_id CHAR(128) UNIQUE NOT NULL,
atime DATETIME NOT NULL default current_timestamp,
data TEXT
"""
def __init__(self, db, table_name):
self.db = db
self.table = table_name
def __contains__(self, key):
data = self.db.select(self.table, where="session_id=$key", vars=locals())
return bool(list(data))
def __getitem__(self, key):
now = datetime.datetime.now()
try:
s = self.db.select(self.table, where="session_id=$key", vars=locals())[0]
self.db.update(
self.table, where="session_id=$key", atime=now, vars=locals()
)
except IndexError:
raise KeyError(key)
else:
return self.decode(s.data)
def __setitem__(self, key, value):
# Remove the leading `b` of bytes object (`b"..."`), otherwise encoded
# value is invalid base64 format.
pickled = self.encode(value).decode()
now = datetime.datetime.now()
if key in self:
self.db.update(
self.table,
where="session_id=$key",
data=pickled,
atime=now,
vars=locals(),
)
else:
self.db.insert(self.table, False, session_id=key, atime=now, data=pickled)
def __delitem__(self, key):
self.db.delete(self.table, where="session_id=$key", vars=locals())
def cleanup(self, timeout):
timeout = datetime.timedelta(
timeout / (24.0 * 60 * 60)
) # timedelta takes numdays as arg
last_allowed_time = datetime.datetime.now() - timeout
self.db.delete(self.table, where="$last_allowed_time > atime", vars=locals())
class ShelfStore:
"""Store for saving session using `shelve` module.
import shelve
store = ShelfStore(shelve.open('session.shelf'))
XXX: is shelve thread-safe?
"""
def __init__(self, shelf):
self.shelf = shelf
def __contains__(self, key):
return key in self.shelf
def __getitem__(self, key):
atime, v = self.shelf[key]
self[key] = v # update atime
return v
def __setitem__(self, key, value):
self.shelf[key] = time.time(), value
def __delitem__(self, key):
try:
del self.shelf[key]
except KeyError:
pass
def cleanup(self, timeout):
now = time.time()
for k in self.shelf:
atime, v = self.shelf[k]
if now - atime > timeout:
del self[k]
class MemoryStore(Store):
"""Store for saving a session in memory.
Useful where there is limited fs writes on the disk, like
flash memories
Data will be saved into a dict:
k: (time, pydata)
"""
def __init__(self, d_store=None):
if d_store is None:
d_store = {}
self.d_store = d_store
def __contains__(self, key):
return key in self.d_store
def __getitem__(self, key):
"""Return the value and update the last seen value"""
t, value = self.d_store[key]
self.d_store[key] = (time.time(), value)
return value
def __setitem__(self, key, value):
self.d_store[key] = (time.time(), value)
def __delitem__(self, key):
del self.d_store[key]
def cleanup(self, timeout):
now = time.time()
to_del = []
for k, (atime, value) in iteritems(self.d_store):
if now - atime > timeout:
to_del.append(k)
# to avoid exception on "dict change during iterations"
for k in to_del:
del self.d_store[k]
if __name__ == "__main__":
import doctest
doctest.testmod()

1742
web/template.py Normal file

File diff suppressed because it is too large Load Diff

55
web/test.py Normal file
View File

@@ -0,0 +1,55 @@
"""test utilities
(part of web.py)
"""
import doctest
import sys
import unittest
TestCase = unittest.TestCase
TestSuite = unittest.TestSuite
def load_modules(names):
return [__import__(name, None, None, "x") for name in names]
def module_suite(module, classnames=None):
"""Makes a suite from a module."""
if classnames:
return unittest.TestLoader().loadTestsFromNames(classnames, module)
elif hasattr(module, "suite"):
return module.suite()
else:
return unittest.TestLoader().loadTestsFromModule(module)
def doctest_suite(module_names):
"""Makes a test suite from doctests."""
suite = TestSuite()
for mod in load_modules(module_names):
suite.addTest(doctest.DocTestSuite(mod))
return suite
def suite(module_names):
"""Creates a suite from multiple modules."""
suite = TestSuite()
for mod in load_modules(module_names):
suite.addTest(module_suite(mod))
return suite
def runTests(suite):
runner = unittest.TextTestRunner()
return runner.run(suite)
def main(suite=None):
if not suite:
main_module = __import__("__main__")
# allow command line switches
args = [a for a in sys.argv[1:] if not a.startswith("-")]
suite = module_suite(main_module, args or None)
result = runTests(suite)
sys.exit(not result.wasSuccessful())

1622
web/utils.py Normal file

File diff suppressed because it is too large Load Diff

667
web/webapi.py Normal file
View File

@@ -0,0 +1,667 @@
"""
Web API (wrapper around WSGI)
(from web.py)
"""
import cgi
import pprint
import sys
import tempfile
from http.cookies import CookieError, Morsel, SimpleCookie
from io import BytesIO
from urllib.parse import quote, unquote, urljoin
from .utils import dictadd, intget, safestr, storage, storify, threadeddict
__all__ = [
"config",
"header",
"debug",
"input",
"data",
"setcookie",
"cookies",
"ctx",
"HTTPError",
# 200, 201, 202, 204
"OK",
"Created",
"Accepted",
"NoContent",
"ok",
"created",
"accepted",
"nocontent",
# 301, 302, 303, 304, 307
"Redirect",
"Found",
"SeeOther",
"NotModified",
"TempRedirect",
"redirect",
"found",
"seeother",
"notmodified",
"tempredirect",
# 400, 401, 403, 404, 405, 406, 409, 410, 412, 415, 451
"BadRequest",
"Unauthorized",
"Forbidden",
"NotFound",
"NoMethod",
"NotAcceptable",
"Conflict",
"Gone",
"PreconditionFailed",
"UnsupportedMediaType",
"UnavailableForLegalReasons",
"badrequest",
"unauthorized",
"forbidden",
"notfound",
"nomethod",
"notacceptable",
"conflict",
"gone",
"preconditionfailed",
"unsupportedmediatype",
"unavailableforlegalreasons",
# 500
"InternalError",
"internalerror",
]
config = storage()
config.__doc__ = """
A configuration object for various aspects of web.py.
`debug`
: when True, enables reloading, disabled template caching and sets internalerror to debugerror.
"""
class HTTPError(Exception):
def __init__(self, status, headers={}, data=""):
ctx.status = status
for k, v in headers.items():
header(k, v)
self.data = data
Exception.__init__(self, status)
def _status_code(status, data=None, classname=None, docstring=None):
if data is None:
data = status.split(" ", 1)[1]
classname = status.split(" ", 1)[1].replace(
" ", ""
) # 304 Not Modified -> NotModified
docstring = docstring or "`%s` status" % status
def __init__(self, data=data, headers={}):
HTTPError.__init__(self, status, headers, data)
# trick to create class dynamically with dynamic docstring.
return type(
classname, (HTTPError, object), {"__doc__": docstring, "__init__": __init__}
)
ok = OK = _status_code("200 OK", data="")
created = Created = _status_code("201 Created")
accepted = Accepted = _status_code("202 Accepted")
nocontent = NoContent = _status_code("204 No Content")
class Redirect(HTTPError):
"""A `301 Moved Permanently` redirect."""
def __init__(self, url, status="301 Moved Permanently", absolute=False):
"""
Returns a `status` redirect to the new URL.
`url` is joined with the base URL so that things like
`redirect("about") will work properly.
"""
newloc = urljoin(ctx.path, url)
if newloc.startswith("/"):
if absolute:
home = ctx.realhome
else:
home = ctx.home
newloc = home + newloc
headers = {"Content-Type": "text/html", "Location": newloc}
HTTPError.__init__(self, status, headers, "")
redirect = Redirect
class Found(Redirect):
"""A `302 Found` redirect."""
def __init__(self, url, absolute=False):
Redirect.__init__(self, url, "302 Found", absolute=absolute)
found = Found
class SeeOther(Redirect):
"""A `303 See Other` redirect."""
def __init__(self, url, absolute=False):
Redirect.__init__(self, url, "303 See Other", absolute=absolute)
seeother = SeeOther
class NotModified(HTTPError):
"""A `304 Not Modified` status."""
def __init__(self):
HTTPError.__init__(self, "304 Not Modified")
notmodified = NotModified
class TempRedirect(Redirect):
"""A `307 Temporary Redirect` redirect."""
def __init__(self, url, absolute=False):
Redirect.__init__(self, url, "307 Temporary Redirect", absolute=absolute)
tempredirect = TempRedirect
class BadRequest(HTTPError):
"""`400 Bad Request` error."""
message = "bad request"
def __init__(self, message=None):
status = "400 Bad Request"
headers = {"Content-Type": "text/html"}
HTTPError.__init__(self, status, headers, message or self.message)
badrequest = BadRequest
class Unauthorized(HTTPError):
"""`401 Unauthorized` error."""
message = "unauthorized"
def __init__(self, message=None):
status = "401 Unauthorized"
headers = {"Content-Type": "text/html"}
HTTPError.__init__(self, status, headers, message or self.message)
unauthorized = Unauthorized
class Forbidden(HTTPError):
"""`403 Forbidden` error."""
message = "forbidden"
def __init__(self, message=None):
status = "403 Forbidden"
headers = {"Content-Type": "text/html"}
HTTPError.__init__(self, status, headers, message or self.message)
forbidden = Forbidden
class _NotFound(HTTPError):
"""`404 Not Found` error."""
message = "not found"
def __init__(self, message=None):
status = "404 Not Found"
headers = {"Content-Type": "text/html; charset=utf-8"}
HTTPError.__init__(self, status, headers, message or self.message)
def NotFound(message=None):
"""Returns HTTPError with '404 Not Found' error from the active application."""
if message:
return _NotFound(message)
elif ctx.get("app_stack"):
return ctx.app_stack[-1].notfound()
else:
return _NotFound()
notfound = NotFound
class NoMethod(HTTPError):
"""A `405 Method Not Allowed` error."""
message = "method not allowed"
def __init__(self, cls=None):
status = "405 Method Not Allowed"
headers = {}
headers["Content-Type"] = "text/html"
methods = ["GET", "HEAD", "POST", "PUT", "DELETE"]
if cls:
methods = [method for method in methods if hasattr(cls, method)]
headers["Allow"] = ", ".join(methods)
HTTPError.__init__(self, status, headers, self.message)
nomethod = NoMethod
class NotAcceptable(HTTPError):
"""`406 Not Acceptable` error."""
message = "not acceptable"
def __init__(self, message=None):
status = "406 Not Acceptable"
headers = {"Content-Type": "text/html"}
HTTPError.__init__(self, status, headers, message or self.message)
notacceptable = NotAcceptable
class Conflict(HTTPError):
"""`409 Conflict` error."""
message = "conflict"
def __init__(self, message=None):
status = "409 Conflict"
headers = {"Content-Type": "text/html"}
HTTPError.__init__(self, status, headers, message or self.message)
conflict = Conflict
class Gone(HTTPError):
"""`410 Gone` error."""
message = "gone"
def __init__(self, message=None):
status = "410 Gone"
headers = {"Content-Type": "text/html"}
HTTPError.__init__(self, status, headers, message or self.message)
gone = Gone
class PreconditionFailed(HTTPError):
"""`412 Precondition Failed` error."""
message = "precondition failed"
def __init__(self, message=None):
status = "412 Precondition Failed"
headers = {"Content-Type": "text/html"}
HTTPError.__init__(self, status, headers, message or self.message)
preconditionfailed = PreconditionFailed
class UnsupportedMediaType(HTTPError):
"""`415 Unsupported Media Type` error."""
message = "unsupported media type"
def __init__(self, message=None):
status = "415 Unsupported Media Type"
headers = {"Content-Type": "text/html"}
HTTPError.__init__(self, status, headers, message or self.message)
unsupportedmediatype = UnsupportedMediaType
class _UnavailableForLegalReasons(HTTPError):
"""`451 Unavailable For Legal Reasons` error."""
message = "unavailable for legal reasons"
def __init__(self, message=None):
status = "451 Unavailable For Legal Reasons"
headers = {"Content-Type": "text/html"}
HTTPError.__init__(self, status, headers, message or self.message)
def UnavailableForLegalReasons(message=None):
"""Returns HTTPError with '415 Unavailable For Legal Reasons' error from the active application."""
if message:
return _UnavailableForLegalReasons(message)
elif ctx.get("app_stack"):
return ctx.app_stack[-1].unavailableforlegalreasons()
else:
return _UnavailableForLegalReasons()
unavailableforlegalreasons = UnavailableForLegalReasons
class _InternalError(HTTPError):
"""500 Internal Server Error`."""
message = "internal server error"
def __init__(self, message=None):
status = "500 Internal Server Error"
headers = {"Content-Type": "text/html"}
HTTPError.__init__(self, status, headers, message or self.message)
def InternalError(message=None):
"""Returns HTTPError with '500 internal error' error from the active application."""
if message:
return _InternalError(message)
elif ctx.get("app_stack"):
return ctx.app_stack[-1].internalerror()
else:
return _InternalError()
internalerror = InternalError
class cgiFieldStorage(cgi.FieldStorage):
"""
Subclass cgi.FieldStorage, as read_binary expects fp to return
bytes. If the headers do not contain a content-disposition with a
filename, cgi.FieldStorage's make_file will create a TemporaryFile
with `w+` flags. The write to that temporary file will fail, due
to incorrect encoding in Python 3.
"""
def make_file(self, binary=None):
"""
For backwards compatibility with Python 2, make_file accepted
a binary flag. This was unused, and removed in Python 3.
"""
return tempfile.TemporaryFile("wb+")
def header(hdr, value, unique=False):
"""
Adds the header `hdr: value` with the response.
If `unique` is True and a header with that name already exists,
it doesn't add a new one.
"""
hdr, value = safestr(hdr), safestr(value)
# protection against HTTP response splitting attack
if "\n" in hdr or "\r" in hdr or "\n" in value or "\r" in value:
raise ValueError("invalid characters in header")
if unique is True:
for h, v in ctx.headers:
if h.lower() == hdr.lower():
return
ctx.headers.append((hdr, value))
def rawinput(method=None):
"""Returns storage object with GET or POST arguments."""
method = method or "both"
def dictify(fs):
# hack to make web.input work with enctype='text/plain.
if fs.list is None:
fs.list = []
return {k: fs[k] for k in fs}
e = ctx.env.copy()
a = b = {}
if method.lower() in ["both", "post", "put", "patch"]:
if e["REQUEST_METHOD"] in ["POST", "PUT", "PATCH"]:
if e.get("CONTENT_TYPE", "").lower().startswith("multipart/"):
# since wsgi.input is directly passed to cgi.FieldStorage,
# it can not be called multiple times. Saving the FieldStorage
# object in ctx to allow calling web.input multiple times.
a = ctx.get("_fieldstorage")
if not a:
fp = e["wsgi.input"]
a = cgiFieldStorage(fp=fp, environ=e, keep_blank_values=1)
ctx._fieldstorage = a
else:
d = data()
if isinstance(d, str):
d = d.encode("utf-8")
fp = BytesIO(d)
a = cgiFieldStorage(fp=fp, environ=e, keep_blank_values=1)
a = dictify(a)
if method.lower() in ["both", "get"]:
e["REQUEST_METHOD"] = "GET"
b = dictify(cgiFieldStorage(environ=e, keep_blank_values=1))
def process_fieldstorage(fs):
if isinstance(fs, list):
return [process_fieldstorage(x) for x in fs]
elif fs.filename is None:
return fs.value
else:
return fs
return storage([(k, process_fieldstorage(v)) for k, v in dictadd(b, a).items()])
def input(*requireds, **defaults):
"""
Returns a `storage` object with the GET and POST arguments.
See `storify` for how `requireds` and `defaults` work.
"""
_method = defaults.pop("_method", "both")
out = rawinput(_method)
try:
defaults.setdefault("_unicode", True) # force unicode conversion by default.
return storify(out, *requireds, **defaults)
except KeyError:
raise badrequest()
def data():
"""Returns the data sent with the request."""
if "data" not in ctx:
if ctx.env.get("HTTP_TRANSFER_ENCODING") == "chunked":
ctx.data = ctx.env["wsgi.input"].read()
else:
cl = intget(ctx.env.get("CONTENT_LENGTH"), 0)
ctx.data = ctx.env["wsgi.input"].read(cl)
return ctx.data
def setcookie(
name,
value,
expires="",
domain=None,
secure=False,
httponly=False,
path=None,
samesite=None,
):
"""Sets a cookie."""
morsel = Morsel()
name, value = safestr(name), safestr(value)
morsel.set(name, value, quote(value))
if isinstance(expires, int) and expires < 0:
expires = -1000000000
morsel["expires"] = expires
morsel["path"] = path or ctx.homepath + "/"
if domain:
morsel["domain"] = domain
if secure:
morsel["secure"] = secure
if httponly:
morsel["httponly"] = True
value = morsel.OutputString()
if samesite and samesite.lower() in ("strict", "lax", "none"):
value += "; SameSite=%s" % samesite
header("Set-Cookie", value)
def parse_cookies(http_cookie):
r"""Parse a HTTP_COOKIE header and return dict of cookie names and decoded values.
>>> sorted(parse_cookies('').items())
[]
>>> sorted(parse_cookies('a=1').items())
[('a', '1')]
>>> sorted(parse_cookies('a=1%202').items())
[('a', '1 2')]
>>> sorted(parse_cookies('a=Z%C3%A9Z').items())
[('a', 'Z\xc3\xa9Z')]
>>> sorted(parse_cookies('a=1; b=2; c=3').items())
[('a', '1'), ('b', '2'), ('c', '3')]
# TODO: cclauss re-enable this test
# >>> sorted(parse_cookies('a=1; b=w("x")|y=z; c=3').items())
# [('a', '1'), ('b', 'w('), ('c', '3')]
>>> sorted(parse_cookies('a=1; b=w(%22x%22)|y=z; c=3').items())
[('a', '1'), ('b', 'w("x")|y=z'), ('c', '3')]
>>> sorted(parse_cookies('keebler=E=mc2').items())
[('keebler', 'E=mc2')]
>>> sorted(parse_cookies(r'keebler="E=mc2; L=\"Loves\"; fudge=\012;"').items())
[('keebler', 'E=mc2; L="Loves"; fudge=\n;')]
"""
# print "parse_cookies"
if '"' in http_cookie:
# HTTP_COOKIE has quotes in it, use slow but correct cookie parsing
cookie = SimpleCookie()
try:
cookie.load(http_cookie)
except CookieError:
# If HTTP_COOKIE header is malformed, try at least to load the cookies we can by
# first splitting on ';' and loading each attr=value pair separately
cookie = SimpleCookie()
for attr_value in http_cookie.split(";"):
try:
cookie.load(attr_value)
except CookieError:
pass
cookies = {k: unquote(v.value) for k, v in cookie.items()}
else:
# HTTP_COOKIE doesn't have quotes, use fast cookie parsing
cookies = {}
for key_value in http_cookie.split(";"):
key_value = key_value.split("=", 1)
if len(key_value) == 2:
key, value = key_value
cookies[key.strip()] = unquote(value.strip())
return cookies
def cookies(*requireds, **defaults):
"""Returns a `storage` object with all the request cookies in it.
See `storify` for how `requireds` and `defaults` work.
This is forgiving on bad HTTP_COOKIE input, it tries to parse at least
the cookies it can.
The values are converted to unicode if _unicode=True is passed.
"""
# parse cookie string and cache the result for next time.
if "_parsed_cookies" not in ctx:
http_cookie = ctx.env.get("HTTP_COOKIE", "")
ctx._parsed_cookies = parse_cookies(http_cookie)
try:
return storify(ctx._parsed_cookies, *requireds, **defaults)
except KeyError:
badrequest()
raise StopIteration()
def debug(*args):
"""
Prints a prettyprinted version of `args` to stderr.
"""
try:
out = ctx.environ["wsgi.errors"]
except:
out = sys.stderr
for arg in args:
print(pprint.pformat(arg), file=out)
return ""
def _debugwrite(x):
try:
out = ctx.environ["wsgi.errors"]
except:
out = sys.stderr
out.write(x)
debug.write = _debugwrite
ctx = context = threadeddict()
ctx.__doc__ = """
A `storage` object containing various information about the request:
`environ` (aka `env`)
: A dictionary containing the standard WSGI environment variables.
`host`
: The domain (`Host` header) requested by the user.
`home`
: The base path for the application.
`ip`
: The IP address of the requester.
`method`
: The HTTP method used.
`path`
: The path request.
`query`
: If there are no query arguments, the empty string. Otherwise, a `?` followed
by the query string.
`fullpath`
: The full path requested, including query arguments (`== path + query`).
### Response Data
`status` (default: "200 OK")
: The status code to be used in the response.
`headers`
: A list of 2-tuples to be used in the response.
`output`
: A string to be used as the response.
"""
if __name__ == "__main__":
import doctest
doctest.testmod()

87
web/wsgi.py Normal file
View File

@@ -0,0 +1,87 @@
"""
WSGI Utilities
(from web.py)
"""
import os
import sys
from . import httpserver
from . import webapi as web
from .net import validaddr
from .utils import intget, listget
def runfcgi(func, addr=("localhost", 8000)):
"""Runs a WSGI function as a FastCGI server."""
import flup.server.fcgi as flups
return flups.WSGIServer(func, multiplexed=True, bindAddress=addr, debug=False).run()
def runscgi(func, addr=("localhost", 4000)):
"""Runs a WSGI function as an SCGI server."""
import flup.server.scgi as flups
return flups.WSGIServer(func, bindAddress=addr, debug=False).run()
def runwsgi(func):
"""
Runs a WSGI-compatible `func` using FCGI, SCGI, or a simple web server,
as appropriate based on context and `sys.argv`.
"""
if "SERVER_SOFTWARE" in os.environ: # cgi
os.environ["FCGI_FORCE_CGI"] = "Y"
# PHP_FCGI_CHILDREN is used by lighttpd fastcgi
if "PHP_FCGI_CHILDREN" in os.environ or "SERVER_SOFTWARE" in os.environ:
return runfcgi(func, None)
if "fcgi" in sys.argv or "fastcgi" in sys.argv:
args = sys.argv[1:]
if "fastcgi" in args:
args.remove("fastcgi")
elif "fcgi" in args:
args.remove("fcgi")
if args:
return runfcgi(func, validaddr(args[0]))
else:
return runfcgi(func, None)
if "scgi" in sys.argv:
args = sys.argv[1:]
args.remove("scgi")
if args:
return runscgi(func, validaddr(args[0]))
else:
return runscgi(func)
server_addr = validaddr(listget(sys.argv, 1, ""))
if "PORT" in os.environ: # e.g. Heroku
server_addr = ("0.0.0.0", intget(os.environ["PORT"]))
return httpserver.runsimple(func, server_addr)
def _is_dev_mode():
# Some embedded python interpreters won't have sys.arv
# For details, see https://github.com/webpy/webpy/issues/87
argv = getattr(sys, "argv", [])
# quick hack to check if the program is running in dev mode.
if (
"SERVER_SOFTWARE" in os.environ
or "PHP_FCGI_CHILDREN" in os.environ
or "fcgi" in argv
or "fastcgi" in argv
or "mod_wsgi" in argv
):
return False
return True
# When running the builtin-server, enable debug mode if not already set.
web.config.setdefault("debug", _is_dev_mode())