130 lines
4.2 KiB
Python
130 lines
4.2 KiB
Python
|
from hashlib import md5
|
||
|
import unittest
|
||
|
|
||
|
from tornado.escape import utf8
|
||
|
from tornado.testing import AsyncHTTPTestCase
|
||
|
from tornado.test import httpclient_test
|
||
|
from tornado.web import Application, RequestHandler
|
||
|
|
||
|
|
||
|
try:
|
||
|
import pycurl
|
||
|
except ImportError:
|
||
|
pycurl = None # type: ignore
|
||
|
|
||
|
if pycurl is not None:
|
||
|
from tornado.curl_httpclient import CurlAsyncHTTPClient
|
||
|
|
||
|
|
||
|
@unittest.skipIf(pycurl is None, "pycurl module not present")
|
||
|
class CurlHTTPClientCommonTestCase(httpclient_test.HTTPClientCommonTestCase):
|
||
|
def get_http_client(self):
|
||
|
client = CurlAsyncHTTPClient(defaults=dict(allow_ipv6=False))
|
||
|
# make sure AsyncHTTPClient magic doesn't give us the wrong class
|
||
|
self.assertTrue(isinstance(client, CurlAsyncHTTPClient))
|
||
|
return client
|
||
|
|
||
|
|
||
|
class DigestAuthHandler(RequestHandler):
|
||
|
def initialize(self, username, password):
|
||
|
self.username = username
|
||
|
self.password = password
|
||
|
|
||
|
def get(self):
|
||
|
realm = "test"
|
||
|
opaque = "asdf"
|
||
|
# Real implementations would use a random nonce.
|
||
|
nonce = "1234"
|
||
|
|
||
|
auth_header = self.request.headers.get("Authorization", None)
|
||
|
if auth_header is not None:
|
||
|
auth_mode, params = auth_header.split(" ", 1)
|
||
|
assert auth_mode == "Digest"
|
||
|
param_dict = {}
|
||
|
for pair in params.split(","):
|
||
|
k, v = pair.strip().split("=", 1)
|
||
|
if v[0] == '"' and v[-1] == '"':
|
||
|
v = v[1:-1]
|
||
|
param_dict[k] = v
|
||
|
assert param_dict["realm"] == realm
|
||
|
assert param_dict["opaque"] == opaque
|
||
|
assert param_dict["nonce"] == nonce
|
||
|
assert param_dict["username"] == self.username
|
||
|
assert param_dict["uri"] == self.request.path
|
||
|
h1 = md5(
|
||
|
utf8("%s:%s:%s" % (self.username, realm, self.password))
|
||
|
).hexdigest()
|
||
|
h2 = md5(
|
||
|
utf8("%s:%s" % (self.request.method, self.request.path))
|
||
|
).hexdigest()
|
||
|
digest = md5(utf8("%s:%s:%s" % (h1, nonce, h2))).hexdigest()
|
||
|
if digest == param_dict["response"]:
|
||
|
self.write("ok")
|
||
|
else:
|
||
|
self.write("fail")
|
||
|
else:
|
||
|
self.set_status(401)
|
||
|
self.set_header(
|
||
|
"WWW-Authenticate",
|
||
|
'Digest realm="%s", nonce="%s", opaque="%s"' % (realm, nonce, opaque),
|
||
|
)
|
||
|
|
||
|
|
||
|
class CustomReasonHandler(RequestHandler):
|
||
|
def get(self):
|
||
|
self.set_status(200, "Custom reason")
|
||
|
|
||
|
|
||
|
class CustomFailReasonHandler(RequestHandler):
|
||
|
def get(self):
|
||
|
self.set_status(400, "Custom reason")
|
||
|
|
||
|
|
||
|
@unittest.skipIf(pycurl is None, "pycurl module not present")
|
||
|
class CurlHTTPClientTestCase(AsyncHTTPTestCase):
|
||
|
def setUp(self):
|
||
|
super().setUp()
|
||
|
self.http_client = self.create_client()
|
||
|
|
||
|
def get_app(self):
|
||
|
return Application(
|
||
|
[
|
||
|
("/digest", DigestAuthHandler, {"username": "foo", "password": "bar"}),
|
||
|
(
|
||
|
"/digest_non_ascii",
|
||
|
DigestAuthHandler,
|
||
|
{"username": "foo", "password": "barユ£"},
|
||
|
),
|
||
|
("/custom_reason", CustomReasonHandler),
|
||
|
("/custom_fail_reason", CustomFailReasonHandler),
|
||
|
]
|
||
|
)
|
||
|
|
||
|
def create_client(self, **kwargs):
|
||
|
return CurlAsyncHTTPClient(
|
||
|
force_instance=True, defaults=dict(allow_ipv6=False), **kwargs
|
||
|
)
|
||
|
|
||
|
def test_digest_auth(self):
|
||
|
response = self.fetch(
|
||
|
"/digest", auth_mode="digest", auth_username="foo", auth_password="bar"
|
||
|
)
|
||
|
self.assertEqual(response.body, b"ok")
|
||
|
|
||
|
def test_custom_reason(self):
|
||
|
response = self.fetch("/custom_reason")
|
||
|
self.assertEqual(response.reason, "Custom reason")
|
||
|
|
||
|
def test_fail_custom_reason(self):
|
||
|
response = self.fetch("/custom_fail_reason")
|
||
|
self.assertEqual(str(response.error), "HTTP 400: Custom reason")
|
||
|
|
||
|
def test_digest_auth_non_ascii(self):
|
||
|
response = self.fetch(
|
||
|
"/digest_non_ascii",
|
||
|
auth_mode="digest",
|
||
|
auth_username="foo",
|
||
|
auth_password="barユ£",
|
||
|
)
|
||
|
self.assertEqual(response.body, b"ok")
|