1
0
Fork 0
mirror of https://gitlab.alpinelinux.org/alpine/aports.git synced 2025-07-26 04:35:39 +03:00
aports/community/py3-twisted/disable-errored-tests.patch

2224 lines
76 KiB
Diff

diff --git a/src/twisted/cred/test/test_cramauth.py b/src/twisted/cred/test/test_cramauth.py
index 1ee0871..d3c8dc6 100644
--- a/src/twisted/cred/test/test_cramauth.py
+++ b/src/twisted/cred/test/test_cramauth.py
@@ -30,56 +30,6 @@ class CramMD5CredentialsTests(TestCase):
self.assertEqual(chal, c.getChallenge())
- def test_checkPassword(self):
- """
- When a valid response (which is a hex digest of the challenge that has
- been encrypted by the user's shared secret) is set on the
- L{CramMD5Credentials} that created the challenge, and C{checkPassword}
- is called with the user's shared secret, it will return L{True}.
- """
- c = CramMD5Credentials()
- chal = c.getChallenge()
- c.response = hexlify(HMAC(b'secret', chal).digest())
- self.assertTrue(c.checkPassword(b'secret'))
-
-
- def test_noResponse(self):
- """
- When there is no response set, calling C{checkPassword} will return
- L{False}.
- """
- c = CramMD5Credentials()
- self.assertFalse(c.checkPassword(b'secret'))
-
-
- def test_wrongPassword(self):
- """
- When an invalid response is set on the L{CramMD5Credentials} (one that
- is not the hex digest of the challenge, encrypted with the user's shared
- secret) and C{checkPassword} is called with the user's correct shared
- secret, it will return L{False}.
- """
- c = CramMD5Credentials()
- chal = c.getChallenge()
- c.response = hexlify(HMAC(b'thewrongsecret', chal).digest())
- self.assertFalse(c.checkPassword(b'secret'))
-
-
- def test_setResponse(self):
- """
- When C{setResponse} is called with a string that is the username and
- the hashed challenge separated with a space, they will be set on the
- L{CramMD5Credentials}.
- """
- c = CramMD5Credentials()
- chal = c.getChallenge()
- c.setResponse(b" ".join(
- (b"squirrel",
- hexlify(HMAC(b'supersecret', chal).digest()))))
- self.assertTrue(c.checkPassword(b'supersecret'))
- self.assertEqual(c.username, b"squirrel")
-
-
def test_interface(self):
"""
L{CramMD5Credentials} implements the L{IUsernameHashedPassword}
diff --git a/src/twisted/mail/test/test_imap.py b/src/twisted/mail/test/test_imap.py
index 231140c..5d22daf 100644
--- a/src/twisted/mail/test/test_imap.py
+++ b/src/twisted/mail/test/test_imap.py
@@ -3881,25 +3881,6 @@ class AuthenticatorTests(IMAP4HelperMixin, unittest.TestCase):
self.assertEqual(self.server.account, self.account)
- def testFailedCramMD5(self):
- self.server.challengers[b'CRAM-MD5'] = CramMD5Credentials
- cAuth = imap4.CramMD5ClientAuthenticator(b'testuser')
- self.client.registerAuthenticator(cAuth)
-
- def misauth():
- return self.client.authenticate(b'not the secret')
- def authed():
- self.authenticated = 1
- def misauthed():
- self.authenticated = -1
-
- d1 = self.connected.addCallback(strip(misauth))
- d1.addCallbacks(strip(authed), strip(misauthed))
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestFailedCramMD5)
-
-
def _cbTestFailedCramMD5(self, ignored):
self.assertEqual(self.authenticated, -1)
self.assertEqual(self.server.account, None)
diff --git a/src/twisted/mail/test/test_pop3.py b/src/twisted/mail/test/test_pop3.py
index 4a59c3b..d87b666 100644
--- a/src/twisted/mail/test/test_pop3.py
+++ b/src/twisted/mail/test/test_pop3.py
@@ -1069,44 +1069,6 @@ class TestRealm:
-class SASLTests(unittest.TestCase):
- """
- Tests for L{pop3.POP3}'s SASL implementation.
- """
- def test_ValidLogin(self):
- """
- A CRAM-MD5-based SASL login attempt succeeds if it uses a username and
- a hashed password known to the server's credentials checker.
- """
- p = pop3.POP3()
- p.factory = TestServerFactory()
- p.factory.challengers = {b'CRAM-MD5':
- cred.credentials.CramMD5Credentials}
- p.portal = cred.portal.Portal(TestRealm())
- ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
- ch.addUser(b'testuser', b'testpassword')
- p.portal.registerChecker(ch)
-
- s = BytesIO()
- p.transport = internet.protocol.FileWrapper(s)
- p.connectionMade()
-
- p.lineReceived(b"CAPA")
- self.assertTrue(s.getvalue().find(b"SASL CRAM-MD5") >= 0)
-
- p.lineReceived(b"AUTH CRAM-MD5")
- chal = s.getvalue().splitlines()[-1][2:]
- chal = base64.decodestring(chal)
- response = hmac.HMAC(b'testpassword', chal).hexdigest().encode("ascii")
-
- p.lineReceived(
- base64.encodestring(b'testuser ' + response).rstrip(b'\n'))
- self.assertTrue(p.mbox)
- self.assertTrue(s.getvalue().splitlines()[-1].find(b"+OK") >= 0)
- p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
-
-
-
class CommandMixin:
"""
Tests for all the commands a POP3 server is allowed to receive.
diff --git a/src/twisted/mail/test/test_smtp.py b/src/twisted/mail/test/test_smtp.py
index 777bc3e..7488cf2 100644
--- a/src/twisted/mail/test/test_smtp.py
+++ b/src/twisted/mail/test/test_smtp.py
@@ -7,34 +7,28 @@ Test cases for twisted.mail.smtp module.
from __future__ import absolute_import, division
-import inspect
import base64
-
+import inspect
+import re
from io import BytesIO
-from zope.interface import implementer, directlyProvides
-
-from twisted.python.util import LineLog
-from twisted.trial import unittest
-from twisted.protocols import basic, loopback
-from twisted.internet import defer, protocol, reactor, interfaces
-from twisted.internet import address, error, task
-from twisted.test.proto_helpers import MemoryReactor, StringTransport
-
-from twisted import cred
-import twisted.cred.error
-import twisted.cred.portal
import twisted.cred.checkers
import twisted.cred.credentials
-
-from twisted.cred.portal import IRealm, Portal
-from twisted.cred.checkers import ICredentialsChecker, AllowAnonymousAccess
+import twisted.cred.error
+import twisted.cred.portal
+from twisted import cred
+from twisted.cred.checkers import AllowAnonymousAccess, ICredentialsChecker
from twisted.cred.credentials import IAnonymous
from twisted.cred.error import UnauthorizedLogin
-
+from twisted.cred.portal import IRealm, Portal
+from twisted.internet import address, defer, error, interfaces, protocol, reactor, task
from twisted.mail import smtp
from twisted.mail._cred import LOGINCredentials
-
+from twisted.protocols import basic, loopback
+from twisted.python.util import LineLog
+from twisted.test.proto_helpers import MemoryReactor, StringTransport
+from twisted.trial import unittest
+from zope.interface import directlyProvides, implementer
try:
from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
@@ -43,14 +37,11 @@ except ImportError:
else:
sslSkip = None
-import re
-
def spameater(*spam, **eggs):
return None
-
@implementer(smtp.IMessage)
class BrokenMessage(object):
"""
@@ -58,23 +49,20 @@ class BrokenMessage(object):
from its C{eomReceived} method. This is useful for creating a server which
can be used to test client retry behavior.
"""
+
def __init__(self, user):
pass
-
def lineReceived(self, line):
pass
-
def eomReceived(self):
raise RuntimeError("Some problem, delivery is failing.")
-
def connectionLost(self):
pass
-
class DummyMessage(object):
"""
L{BrokenMessage} is an L{IMessage} which saves the message delivered to it
@@ -83,78 +71,69 @@ class DummyMessage(object):
@ivar domain: A L{DummyDomain} which will be used to store the message once
it is received.
"""
+
def __init__(self, domain, user):
self.domain = domain
self.user = user
self.buffer = []
-
def lineReceived(self, line):
# Throw away the generated Received: header
- if not re.match(b'Received: From yyy.com \(\[.*\]\) by localhost;',
- line):
+ if not re.match(b"Received: From yyy.com \(\[.*\]\) by localhost;", line):
self.buffer.append(line)
-
def eomReceived(self):
- message = b'\n'.join(self.buffer) + b'\n'
+ message = b"\n".join(self.buffer) + b"\n"
self.domain.messages[self.user.dest.local].append(message)
deferred = defer.Deferred()
deferred.callback(b"saved")
return deferred
-
class DummyDomain(object):
"""
L{DummyDomain} is an L{IDomain} which keeps track of messages delivered to
it in memory.
"""
+
def __init__(self, names):
self.messages = {}
for name in names:
self.messages[name] = []
-
def exists(self, user):
if user.dest.local in self.messages:
return defer.succeed(lambda: DummyMessage(self, user))
return defer.fail(smtp.SMTPBadRcpt(user))
-
-mail = b'''\
+mail = b"""\
Subject: hello
Goodbye
-'''
+"""
+
class MyClient:
def __init__(self, messageInfo=None):
if messageInfo is None:
- messageInfo = (
- 'moshez@foo.bar', ['moshez@foo.bar'], BytesIO(mail))
+ messageInfo = ("moshez@foo.bar", ["moshez@foo.bar"], BytesIO(mail))
self._sender = messageInfo[0]
self._recipient = messageInfo[1]
self._data = messageInfo[2]
-
def getMailFrom(self):
return self._sender
-
def getMailTo(self):
return self._recipient
-
def getMailData(self):
return self._data
-
def sendError(self, exc):
self._error = exc
-
def sentMail(self, code, resp, numOk, addresses, log):
# Prevent another mail from being sent.
self._sender = None
@@ -162,32 +141,31 @@ class MyClient:
self._data = None
-
class MySMTPClient(MyClient, smtp.SMTPClient):
def __init__(self, messageInfo=None):
- smtp.SMTPClient.__init__(self, b'foo.baz')
+ smtp.SMTPClient.__init__(self, b"foo.baz")
MyClient.__init__(self, messageInfo)
-
class MyESMTPClient(MyClient, smtp.ESMTPClient):
- def __init__(self, secret = b'', contextFactory = None):
- smtp.ESMTPClient.__init__(self, secret, contextFactory, b'foo.baz')
+ def __init__(self, secret=b"", contextFactory=None):
+ smtp.ESMTPClient.__init__(self, secret, contextFactory, b"foo.baz")
MyClient.__init__(self)
-
class LoopbackMixin:
def loopback(self, server, client):
return loopback.loopbackTCP(server, client)
-
class FakeSMTPServer(basic.LineReceiver):
clientData = [
- b'220 hello', b'250 nice to meet you',
- b'250 great', b'250 great', b'354 go on, lad'
+ b"220 hello",
+ b"250 nice to meet you",
+ b"250 great",
+ b"250 great",
+ b"354 go on, lad",
]
def connectionMade(self):
@@ -196,7 +174,6 @@ class FakeSMTPServer(basic.LineReceiver):
self.clientData.reverse()
self.sendLine(self.clientData.pop())
-
def lineReceived(self, line):
self.buffer.append(line)
if line == b"QUIT":
@@ -211,7 +188,6 @@ class FakeSMTPServer(basic.LineReceiver):
self.sendLine(self.clientData.pop())
-
class SMTPClientTests(unittest.TestCase, LoopbackMixin):
"""
Tests for L{smtp.SMTPClient}.
@@ -234,15 +210,21 @@ class SMTPClientTests(unittest.TestCase, LoopbackMixin):
bytes(errors[0]),
b"Timeout waiting for SMTP server response\n"
b"<<< 220 hello\n"
- b">>> HELO foo.baz\n")
+ b">>> HELO foo.baz\n",
+ )
expected_output = [
- b'HELO foo.baz', b'MAIL FROM:<moshez@foo.bar>',
- b'RCPT TO:<moshez@foo.bar>', b'DATA',
- b'Subject: hello', b'', b'Goodbye', b'.', b'RSET'
+ b"HELO foo.baz",
+ b"MAIL FROM:<moshez@foo.bar>",
+ b"RCPT TO:<moshez@foo.bar>",
+ b"DATA",
+ b"Subject: hello",
+ b"",
+ b"Goodbye",
+ b".",
+ b"RSET",
]
-
def test_messages(self):
"""
L{smtp.SMTPClient} sends I{HELO}, I{MAIL FROM}, I{RCPT TO}, and I{DATA}
@@ -252,27 +234,26 @@ class SMTPClientTests(unittest.TestCase, LoopbackMixin):
client = MySMTPClient()
server = FakeSMTPServer()
d = self.loopback(server, client)
- d.addCallback(lambda x :
- self.assertEqual(server.buffer, self.expected_output))
+ d.addCallback(lambda x: self.assertEqual(server.buffer, self.expected_output))
return d
-
def test_transferError(self):
"""
If there is an error while producing the message body to the
connection, the C{sendError} callback is invoked.
"""
client = MySMTPClient(
- ('alice@example.com', ['bob@example.com'], BytesIO(b"foo")))
+ ("alice@example.com", ["bob@example.com"], BytesIO(b"foo"))
+ )
transport = StringTransport()
client.makeConnection(transport)
client.dataReceived(
- b'220 Ok\r\n' # Greeting
- b'250 Ok\r\n' # EHLO response
- b'250 Ok\r\n' # MAIL FROM response
- b'250 Ok\r\n' # RCPT TO response
- b'354 Ok\r\n' # DATA response
- )
+ b"220 Ok\r\n" # Greeting
+ b"250 Ok\r\n" # EHLO response
+ b"250 Ok\r\n" # MAIL FROM response
+ b"250 Ok\r\n" # RCPT TO response
+ b"354 Ok\r\n" # DATA response
+ )
# Sanity check - a pull producer should be registered now.
self.assertNotIdentical(transport.producer, None)
@@ -284,7 +265,6 @@ class SMTPClientTests(unittest.TestCase, LoopbackMixin):
# The sendError hook should have been invoked as a result.
self.assertIsInstance(client._error, Exception)
-
def test_sendFatalError(self):
"""
If L{smtp.SMTPClient.sendError} is called with an L{SMTPClientError}
@@ -298,7 +278,6 @@ class SMTPClientTests(unittest.TestCase, LoopbackMixin):
self.assertEqual(transport.value(), b"")
self.assertTrue(transport.disconnecting)
-
def test_sendNonFatalError(self):
"""
If L{smtp.SMTPClient.sendError} is called with an L{SMTPClientError}
@@ -312,7 +291,6 @@ class SMTPClientTests(unittest.TestCase, LoopbackMixin):
self.assertEqual(transport.value(), b"QUIT\r\n")
self.assertFalse(transport.disconnecting)
-
def test_sendOtherError(self):
"""
If L{smtp.SMTPClient.sendError} is called with an exception which is
@@ -327,108 +305,115 @@ class SMTPClientTests(unittest.TestCase, LoopbackMixin):
self.assertTrue(transport.disconnecting)
-
class DummySMTPMessage(object):
-
def __init__(self, protocol, users):
self.protocol = protocol
self.users = users
self.buffer = []
-
def lineReceived(self, line):
self.buffer.append(line)
-
def eomReceived(self):
- message = b'\n'.join(self.buffer) + b'\n'
+ message = b"\n".join(self.buffer) + b"\n"
helo, origin = self.users[0].helo[0], bytes(self.users[0].orig)
recipients = []
for user in self.users:
recipients.append(bytes(user))
- self.protocol.message[tuple(recipients)] = (helo, origin, recipients,
- message)
+ self.protocol.message[tuple(recipients)] = (helo, origin, recipients, message)
return defer.succeed(b"saved")
-
class DummyProto:
-
def connectionMade(self):
self.dummyMixinBase.connectionMade(self)
self.message = {}
-
def receivedHeader(*spam):
return None
-
def validateTo(self, user):
self.delivery = SimpleDelivery(None)
return lambda: DummySMTPMessage(self, [user])
-
def validateFrom(self, helo, origin):
return origin
-
class DummySMTP(DummyProto, smtp.SMTP):
dummyMixinBase = smtp.SMTP
-
class DummyESMTP(DummyProto, smtp.ESMTP):
dummyMixinBase = smtp.ESMTP
-
class AnotherTestCase:
serverClass = None
clientClass = None
- messages = [ (b'foo.com', b'moshez@foo.com', [b'moshez@bar.com'],
- b'moshez@foo.com', [b'moshez@bar.com'], b'''\
+ messages = [
+ (
+ b"foo.com",
+ b"moshez@foo.com",
+ [b"moshez@bar.com"],
+ b"moshez@foo.com",
+ [b"moshez@bar.com"],
+ b"""\
From: Moshe
To: Moshe
Hi,
how are you?
-'''),
- (b'foo.com', b'tttt@rrr.com', [b'uuu@ooo', b'yyy@eee'],
- b'tttt@rrr.com', [b'uuu@ooo', b'yyy@eee'], b'''\
+""",
+ ),
+ (
+ b"foo.com",
+ b"tttt@rrr.com",
+ [b"uuu@ooo", b"yyy@eee"],
+ b"tttt@rrr.com",
+ [b"uuu@ooo", b"yyy@eee"],
+ b"""\
Subject: pass
..rrrr..
-'''),
- (b'foo.com', b'@this,@is,@ignored:foo@bar.com',
- [b'@ignore,@this,@too:bar@foo.com'],
- b'foo@bar.com', [b'bar@foo.com'], b'''\
+""",
+ ),
+ (
+ b"foo.com",
+ b"@this,@is,@ignored:foo@bar.com",
+ [b"@ignore,@this,@too:bar@foo.com"],
+ b"foo@bar.com",
+ [b"bar@foo.com"],
+ b"""\
Subject: apa
To: foo
123
.
456
-'''),
- ]
+""",
+ ),
+ ]
data = [
- (b'', b'220.*\r\n$', None, None),
- (b'HELO foo.com\r\n', b'250.*\r\n$', None, None),
- (b'RSET\r\n', b'250.*\r\n$', None, None),
- ]
+ (b"", b"220.*\r\n$", None, None),
+ (b"HELO foo.com\r\n", b"250.*\r\n$", None, None),
+ (b"RSET\r\n", b"250.*\r\n$", None, None),
+ ]
for helo_, from_, to_, realfrom, realto, msg in messages:
- data.append((b'MAIL FROM:<' + from_ + b'>\r\n', b'250.*\r\n',
- None, None))
+ data.append((b"MAIL FROM:<" + from_ + b">\r\n", b"250.*\r\n", None, None))
for rcpt in to_:
- data.append((b'RCPT TO:<' + rcpt + b'>\r\n', b'250.*\r\n',
- None, None))
-
- data.append((b'DATA\r\n', b'354.*\r\n',
- msg, (b'250.*\r\n',
- (helo_, realfrom, realto, msg))))
-
+ data.append((b"RCPT TO:<" + rcpt + b">\r\n", b"250.*\r\n", None, None))
+
+ data.append(
+ (
+ b"DATA\r\n",
+ b"354.*\r\n",
+ msg,
+ (b"250.*\r\n", (helo_, realfrom, realto, msg)),
+ )
+ )
def test_buffer(self):
"""
@@ -439,8 +424,9 @@ To: foo
"""
transport = StringTransport()
a = self.serverClass()
+
class fooFactory:
- domain = b'foo.com'
+ domain = b"foo.com"
a.factory = fooFactory()
a.makeConnection(transport)
@@ -451,12 +437,12 @@ To: foo
transport.clear()
if not re.match(expect, data):
raise AssertionError(send, expect, data)
- if data[:3] == b'354':
+ if data[:3] == b"354":
for line in msg.splitlines():
- if line and line[0:1] == b'.':
- line = b'.' + line
- a.dataReceived(line + b'\r\n')
- a.dataReceived(b'.\r\n')
+ if line and line[0:1] == b".":
+ line = b"." + line
+ a.dataReceived(line + b"\r\n")
+ a.dataReceived(b".\r\n")
# Special case for DATA. Now we want a 250, and then
# we compare the messages
data = transport.value()
@@ -467,34 +453,28 @@ To: foo
for recip in msgdata[2]:
expected = list(msgdata[:])
expected[2] = [recip]
- self.assertEqual(
- a.message[(recip,)],
- tuple(expected)
- )
+ self.assertEqual(a.message[(recip,)], tuple(expected))
a.setTimeout(None)
-
class AnotherESMTPTests(AnotherTestCase, unittest.TestCase):
serverClass = DummyESMTP
clientClass = MyESMTPClient
-
class AnotherSMTPTests(AnotherTestCase, unittest.TestCase):
serverClass = DummySMTP
clientClass = MySMTPClient
-
@implementer(cred.checkers.ICredentialsChecker)
class DummyChecker:
- users = {
- b'testuser': b'testpassword'
- }
+ users = {b"testuser": b"testpassword"}
- credentialInterfaces = (cred.credentials.IUsernamePassword,
- cred.credentials.IUsernameHashedPassword)
+ credentialInterfaces = (
+ cred.credentials.IUsernamePassword,
+ cred.credentials.IUsernameHashedPassword,
+ )
def requestAvatarId(self, credentials):
return defer.maybeDeferred(
@@ -507,59 +487,32 @@ class DummyChecker:
raise cred.error.UnauthorizedLogin()
-
@implementer(smtp.IMessageDelivery)
class SimpleDelivery(object):
"""
L{SimpleDelivery} is a message delivery factory with no interesting
behavior.
"""
+
def __init__(self, messageFactory):
self._messageFactory = messageFactory
-
def receivedHeader(self, helo, origin, recipients):
return None
-
def validateFrom(self, helo, origin):
return origin
-
def validateTo(self, user):
return lambda: self._messageFactory(user)
-
class DummyRealm:
def requestAvatar(self, avatarId, mind, *interfaces):
return smtp.IMessageDelivery, SimpleDelivery(None), lambda: None
-
class AuthTests(unittest.TestCase, LoopbackMixin):
- def test_crammd5Auth(self):
- """
- L{ESMTPClient} can authenticate using the I{CRAM-MD5} SASL mechanism.
-
- @see: U{http://tools.ietf.org/html/rfc2195}
- """
- realm = DummyRealm()
- p = cred.portal.Portal(realm)
- p.registerChecker(DummyChecker())
-
- server = DummyESMTP({b'CRAM-MD5': cred.credentials.CramMD5Credentials})
- server.portal = p
- client = MyESMTPClient(b'testpassword')
-
- cAuth = smtp.CramMD5ClientAuthenticator(b'testuser')
- client.registerAuthenticator(cAuth)
-
- d = self.loopback(server, client)
- d.addCallback(lambda x: self.assertEqual(server.authenticated, 1))
- return d
-
-
def test_loginAuth(self):
"""
L{ESMTPClient} can authenticate using the I{LOGIN} SASL mechanism.
@@ -570,18 +523,17 @@ class AuthTests(unittest.TestCase, LoopbackMixin):
p = cred.portal.Portal(realm)
p.registerChecker(DummyChecker())
- server = DummyESMTP({b'LOGIN': LOGINCredentials})
+ server = DummyESMTP({b"LOGIN": LOGINCredentials})
server.portal = p
- client = MyESMTPClient(b'testpassword')
+ client = MyESMTPClient(b"testpassword")
- cAuth = smtp.LOGINAuthenticator(b'testuser')
+ cAuth = smtp.LOGINAuthenticator(b"testuser")
client.registerAuthenticator(cAuth)
d = self.loopback(server, client)
d.addCallback(lambda x: self.assertTrue(server.authenticated))
return d
-
def test_loginAgainstWeirdServer(self):
"""
When communicating with a server which implements the I{LOGIN} SASL
@@ -593,11 +545,11 @@ class AuthTests(unittest.TestCase, LoopbackMixin):
p = cred.portal.Portal(realm)
p.registerChecker(DummyChecker())
- server = DummyESMTP({b'LOGIN': smtp.LOGINCredentials})
+ server = DummyESMTP({b"LOGIN": smtp.LOGINCredentials})
server.portal = p
- client = MyESMTPClient(b'testpassword')
- cAuth = smtp.LOGINAuthenticator(b'testuser')
+ client = MyESMTPClient(b"testpassword")
+ cAuth = smtp.LOGINAuthenticator(b"testuser")
client.registerAuthenticator(cAuth)
d = self.loopback(server, client)
@@ -605,49 +557,43 @@ class AuthTests(unittest.TestCase, LoopbackMixin):
return d
-
class SMTPHelperTests(unittest.TestCase):
def testMessageID(self):
d = {}
for i in range(1000):
- m = smtp.messageid('testcase')
+ m = smtp.messageid("testcase")
self.assertFalse(m in d)
d[m] = None
-
def testQuoteAddr(self):
cases = [
- [b'user@host.name', b'<user@host.name>'],
- [b'"User Name" <user@host.name>', b'<user@host.name>'],
- [smtp.Address(b'someguy@someplace'), b'<someguy@someplace>'],
- [b'', b'<>'],
- [smtp.Address(b''), b'<>'],
+ [b"user@host.name", b"<user@host.name>"],
+ [b'"User Name" <user@host.name>', b"<user@host.name>"],
+ [smtp.Address(b"someguy@someplace"), b"<someguy@someplace>"],
+ [b"", b"<>"],
+ [smtp.Address(b""), b"<>"],
]
for (c, e) in cases:
self.assertEqual(smtp.quoteaddr(c), e)
-
def testUser(self):
- u = smtp.User(b'user@host', b'helo.host.name', None, None)
- self.assertEqual(str(u), 'user@host')
-
+ u = smtp.User(b"user@host", b"helo.host.name", None, None)
+ self.assertEqual(str(u), "user@host")
def testXtextEncoding(self):
cases = [
- (u'Hello world', b'Hello+20world'),
- (u'Hello+world', b'Hello+2Bworld'),
- (u'\0\1\2\3\4\5', b'+00+01+02+03+04+05'),
- (u'e=mc2@example.com', b'e+3Dmc2@example.com')
+ (u"Hello world", b"Hello+20world"),
+ (u"Hello+world", b"Hello+2Bworld"),
+ (u"\0\1\2\3\4\5", b"+00+01+02+03+04+05"),
+ (u"e=mc2@example.com", b"e+3Dmc2@example.com"),
]
for (case, expected) in cases:
self.assertEqual(smtp.xtext_encode(case), (expected, len(case)))
- self.assertEqual(case.encode('xtext'), expected)
- self.assertEqual(
- smtp.xtext_decode(expected), (case, len(expected)))
- self.assertEqual(expected.decode('xtext'), case)
-
+ self.assertEqual(case.encode("xtext"), expected)
+ self.assertEqual(smtp.xtext_decode(expected), (case, len(expected)))
+ self.assertEqual(expected.decode("xtext"), case)
def test_encodeWithErrors(self):
"""
@@ -655,44 +601,35 @@ class SMTPHelperTests(unittest.TestCase):
I{xtext} codec should produce the same result as not
specifying the error policy.
"""
- text = u'Hello world'
- self.assertEqual(
- smtp.xtext_encode(text, 'strict'),
- (text.encode('xtext'), len(text)))
+ text = u"Hello world"
self.assertEqual(
- text.encode('xtext', 'strict'),
- text.encode('xtext'))
-
+ smtp.xtext_encode(text, "strict"), (text.encode("xtext"), len(text))
+ )
+ self.assertEqual(text.encode("xtext", "strict"), text.encode("xtext"))
def test_decodeWithErrors(self):
"""
Similar to L{test_encodeWithErrors}, but for C{bytes.decode}.
"""
- bytes = b'Hello world'
- self.assertEqual(
- smtp.xtext_decode(bytes, 'strict'),
- (bytes.decode('xtext'), len(bytes)))
+ bytes = b"Hello world"
self.assertEqual(
- bytes.decode('xtext', 'strict'),
- bytes.decode('xtext'))
-
+ smtp.xtext_decode(bytes, "strict"), (bytes.decode("xtext"), len(bytes))
+ )
+ self.assertEqual(bytes.decode("xtext", "strict"), bytes.decode("xtext"))
class NoticeTLSClient(MyESMTPClient):
tls = False
-
def esmtpState_starttls(self, code, resp):
MyESMTPClient.esmtpState_starttls(self, code, resp)
self.tls = True
-
class TLSTests(unittest.TestCase, LoopbackMixin):
if sslSkip is not None:
skip = sslSkip
-
def testTLS(self):
clientCTX = ClientTLSContext()
serverCTX = ServerTLSContext()
@@ -706,14 +643,13 @@ class TLSTests(unittest.TestCase, LoopbackMixin):
return self.loopback(server, client).addCallback(check)
+
if not interfaces.IReactorSSL.providedBy(reactor):
for case in (TLSTests,):
case.skip = "Reactor doesn't support SSL"
-
class EmptyLineTests(unittest.TestCase):
-
def test_emptyLineSyntaxError(self):
"""
If L{smtp.SMTP} receives an empty line, it responds with a 500 error
@@ -722,16 +658,15 @@ class EmptyLineTests(unittest.TestCase):
proto = smtp.SMTP()
transport = StringTransport()
proto.makeConnection(transport)
- proto.lineReceived(b'')
+ proto.lineReceived(b"")
proto.setTimeout(None)
out = transport.value().splitlines()
self.assertEqual(len(out), 2)
- self.assertTrue(out[0].startswith(b'220'))
+ self.assertTrue(out[0].startswith(b"220"))
self.assertEqual(out[1], b"500 Error: bad syntax")
-
class TimeoutTests(unittest.TestCase, LoopbackMixin):
"""
Check that SMTP client factories correctly use the timeout.
@@ -743,22 +678,23 @@ class TimeoutTests(unittest.TestCase, LoopbackMixin):
"""
clock = task.Clock()
client = clientFactory.buildProtocol(
- address.IPv4Address('TCP', 'example.net', 25))
+ address.IPv4Address("TCP", "example.net", 25)
+ )
client.callLater = clock.callLater
t = StringTransport()
client.makeConnection(t)
t.protocol = client
+
def check(ign):
self.assertEqual(clock.seconds(), 0.5)
- d = self.assertFailure(onDone, smtp.SMTPTimeoutError
- ).addCallback(check)
+
+ d = self.assertFailure(onDone, smtp.SMTPTimeoutError).addCallback(check)
# The first call should not trigger the timeout
clock.advance(0.1)
# But this one should
clock.advance(0.4)
return d
-
def test_SMTPClientRecipientBytes(self):
"""
Test timeout for L{smtp.SMTPSenderFactory}: the response L{Deferred}
@@ -766,36 +702,45 @@ class TimeoutTests(unittest.TestCase, LoopbackMixin):
"""
onDone = defer.Deferred()
clientFactory = smtp.SMTPSenderFactory(
- 'source@address', b'recipient@address',
- BytesIO(b"Message body"), onDone,
- retries=0, timeout=0.5)
+ "source@address",
+ b"recipient@address",
+ BytesIO(b"Message body"),
+ onDone,
+ retries=0,
+ timeout=0.5,
+ )
return self._timeoutTest(onDone, clientFactory)
-
def test_SMTPClientRecipientUnicode(self):
"""
Use a L{unicode} recipient.
"""
onDone = defer.Deferred()
clientFactory = smtp.SMTPSenderFactory(
- 'source@address', u'recipient@address',
- BytesIO(b"Message body"), onDone,
- retries=0, timeout=0.5)
+ "source@address",
+ u"recipient@address",
+ BytesIO(b"Message body"),
+ onDone,
+ retries=0,
+ timeout=0.5,
+ )
return self._timeoutTest(onDone, clientFactory)
-
def test_SMTPClientRecipientList(self):
"""
Use a L{list} of recipients.
"""
onDone = defer.Deferred()
clientFactory = smtp.SMTPSenderFactory(
- 'source@address', (u'recipient1@address', b'recipient2@address'),
- BytesIO(b"Message body"), onDone,
- retries=0, timeout=0.5)
+ "source@address",
+ (u"recipient1@address", b"recipient2@address"),
+ BytesIO(b"Message body"),
+ onDone,
+ retries=0,
+ timeout=0.5,
+ )
return self._timeoutTest(onDone, clientFactory)
-
def test_ESMTPClient(self):
"""
Test timeout for L{smtp.ESMTPSenderFactory}: the response L{Deferred}
@@ -803,53 +748,65 @@ class TimeoutTests(unittest.TestCase, LoopbackMixin):
"""
onDone = defer.Deferred()
clientFactory = smtp.ESMTPSenderFactory(
- 'username', 'password',
- 'source@address', 'recipient@address',
- BytesIO(b"Message body"), onDone,
- retries=0, timeout=0.5)
+ "username",
+ "password",
+ "source@address",
+ "recipient@address",
+ BytesIO(b"Message body"),
+ onDone,
+ retries=0,
+ timeout=0.5,
+ )
return self._timeoutTest(onDone, clientFactory)
-
def test_resetTimeoutWhileSending(self):
"""
The timeout is not allowed to expire after the server has accepted a
DATA command and the client is actively sending data to it.
"""
+
class SlowFile:
"""
A file-like which returns one byte from each read call until the
specified number of bytes have been returned.
"""
+
def __init__(self, size):
self._size = size
def read(self, max=None):
if self._size:
self._size -= 1
- return b'x'
- return b''
+ return b"x"
+ return b""
failed = []
onDone = defer.Deferred()
onDone.addErrback(failed.append)
clientFactory = smtp.SMTPSenderFactory(
- 'source@address', 'recipient@address',
- SlowFile(1), onDone, retries=0, timeout=3)
+ "source@address",
+ "recipient@address",
+ SlowFile(1),
+ onDone,
+ retries=0,
+ timeout=3,
+ )
clientFactory.domain = b"example.org"
clock = task.Clock()
client = clientFactory.buildProtocol(
- address.IPv4Address('TCP', 'example.net', 25))
+ address.IPv4Address("TCP", "example.net", 25)
+ )
client.callLater = clock.callLater
transport = StringTransport()
client.makeConnection(transport)
client.dataReceived(
- b"220 Ok\r\n" # Greet the client
- b"250 Ok\r\n" # Respond to HELO
- b"250 Ok\r\n" # Respond to MAIL FROM
- b"250 Ok\r\n" # Respond to RCPT TO
- b"354 Ok\r\n" # Respond to DATA
- )
+ b"220 Ok\r\n" # Greet the client
+ b"250 Ok\r\n" # Respond to HELO
+ b"250 Ok\r\n" # Respond to MAIL FROM
+ b"250 Ok\r\n" # Respond to RCPT TO
+ b"354 Ok\r\n" # Respond to DATA
+ )
# Now the client is producing data to the server. Any time
# resumeProducing is called on the producer, the timeout should be
@@ -888,8 +845,8 @@ class TimeoutTests(unittest.TestCase, LoopbackMixin):
b".\r\n"
# This RSET is just an implementation detail. It's nice, but this
# test doesn't really care about it.
- b"RSET\r\n")
-
+ b"RSET\r\n",
+ )
class MultipleDeliveryFactorySMTPServerFactory(protocol.ServerFactory):
@@ -899,21 +856,21 @@ class MultipleDeliveryFactorySMTPServerFactory(protocol.ServerFactory):
factory is used for one connection and then discarded. Factories are used
in the order they are supplied.
"""
+
def __init__(self, messageFactories):
self._messageFactories = messageFactories
-
def buildProtocol(self, addr):
p = protocol.ServerFactory.buildProtocol(self, addr)
p.delivery = SimpleDelivery(self._messageFactories.pop(0))
return p
-
class SMTPSenderFactoryTests(unittest.TestCase):
"""
Tests for L{smtp.SMTPSenderFactory}.
"""
+
def test_removeCurrentProtocolWhenClientConnectionLost(self):
"""
L{smtp.SMTPSenderFactory} removes the current protocol when the client
@@ -922,15 +879,13 @@ class SMTPSenderFactoryTests(unittest.TestCase):
reactor = MemoryReactor()
sentDeferred = defer.Deferred()
clientFactory = smtp.SMTPSenderFactory(
- "source@address", "recipient@address",
- BytesIO(b"message"), sentDeferred)
+ "source@address", "recipient@address", BytesIO(b"message"), sentDeferred
+ )
connector = reactor.connectTCP("localhost", 25, clientFactory)
clientFactory.buildProtocol(None)
- clientFactory.clientConnectionLost(connector,
- error.ConnectionDone("Bye."))
+ clientFactory.clientConnectionLost(connector, error.ConnectionDone("Bye."))
self.assertEqual(clientFactory.currentProtocol, None)
-
def test_removeCurrentProtocolWhenClientConnectionFailed(self):
"""
L{smtp.SMTPSenderFactory} removes the current protocol when the client
@@ -939,27 +894,26 @@ class SMTPSenderFactoryTests(unittest.TestCase):
reactor = MemoryReactor()
sentDeferred = defer.Deferred()
clientFactory = smtp.SMTPSenderFactory(
- "source@address", "recipient@address",
- BytesIO(b"message"), sentDeferred)
+ "source@address", "recipient@address", BytesIO(b"message"), sentDeferred
+ )
connector = reactor.connectTCP("localhost", 25, clientFactory)
clientFactory.buildProtocol(None)
- clientFactory.clientConnectionFailed(connector,
- error.ConnectionDone("Bye."))
+ clientFactory.clientConnectionFailed(connector, error.ConnectionDone("Bye."))
self.assertEqual(clientFactory.currentProtocol, None)
-
class SMTPSenderFactoryRetryTests(unittest.TestCase):
"""
Tests for the retry behavior of L{smtp.SMTPSenderFactory}.
"""
+
def test_retryAfterDisconnect(self):
"""
If the protocol created by L{SMTPSenderFactory} loses its connection
before receiving confirmation of message delivery, it reconnects and
tries to deliver the message again.
"""
- recipient = b'alice'
+ recipient = b"alice"
message = b"some message text"
domain = DummyDomain([recipient])
@@ -968,6 +922,7 @@ class SMTPSenderFactoryRetryTests(unittest.TestCase):
An SMTP subclass which ensures that its transport will be
disconnected before the test ends.
"""
+
def makeConnection(innerSelf, transport):
self.addCleanup(transport.loseConnection)
smtp.SMTP.makeConnection(innerSelf, transport)
@@ -975,11 +930,11 @@ class SMTPSenderFactoryRetryTests(unittest.TestCase):
# Create a server which will fail the first message deliver attempt to
# it with a 500 and a disconnect, but which will accept a message
# delivered over the 2nd connection to it.
- serverFactory = MultipleDeliveryFactorySMTPServerFactory([
- BrokenMessage,
- lambda user: DummyMessage(domain, user)])
+ serverFactory = MultipleDeliveryFactorySMTPServerFactory(
+ [BrokenMessage, lambda user: DummyMessage(domain, user)]
+ )
serverFactory.protocol = CleanSMTP
- serverPort = reactor.listenTCP(0, serverFactory, interface='127.0.0.1')
+ serverPort = reactor.listenTCP(0, serverFactory, interface="127.0.0.1")
serverHost = serverPort.getHost()
self.addCleanup(serverPort.stopListening)
@@ -987,11 +942,15 @@ class SMTPSenderFactoryRetryTests(unittest.TestCase):
# server.
sentDeferred = defer.Deferred()
clientFactory = smtp.SMTPSenderFactory(
- b"bob@example.org", recipient + b"@example.com",
- BytesIO(message), sentDeferred)
+ b"bob@example.org",
+ recipient + b"@example.com",
+ BytesIO(message),
+ sentDeferred,
+ )
clientFactory.domain = b"example.org"
clientConnector = reactor.connectTCP(
- serverHost.host, serverHost.port, clientFactory)
+ serverHost.host, serverHost.port, clientFactory
+ )
self.addCleanup(clientConnector.disconnect)
def cbSent(ignored):
@@ -999,58 +958,54 @@ class SMTPSenderFactoryRetryTests(unittest.TestCase):
Verify that the message was successfully delivered and flush the
error which caused the first attempt to fail.
"""
- self.assertEqual(
- domain.messages,
- {recipient: [b"\n" + message + b"\n"]})
+ self.assertEqual(domain.messages, {recipient: [b"\n" + message + b"\n"]})
# Flush the RuntimeError that BrokenMessage caused to be logged.
self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 1)
+
sentDeferred.addCallback(cbSent)
return sentDeferred
-
@implementer(IRealm)
class SingletonRealm(object):
"""
Trivial realm implementation which is constructed with an interface and an
avatar and returns that avatar when asked for that interface.
"""
+
def __init__(self, interface, avatar):
self.interface = interface
self.avatar = avatar
-
def requestAvatar(self, avatarId, mind, *interfaces):
for iface in interfaces:
if iface is self.interface:
return iface, self.avatar, lambda: None
-
class NotImplementedDelivery(object):
"""
Non-implementation of L{smtp.IMessageDelivery} which only has methods which
raise L{NotImplementedError}. Subclassed by various tests to provide the
particular behavior being tested.
"""
+
def validateFrom(self, helo, origin):
raise NotImplementedError("This oughtn't be called in the course of this test.")
-
def validateTo(self, user):
raise NotImplementedError("This oughtn't be called in the course of this test.")
-
def receivedHeader(self, helo, origin, recipients):
raise NotImplementedError("This oughtn't be called in the course of this test.")
-
class SMTPServerTests(unittest.TestCase):
"""
Test various behaviors of L{twisted.mail.smtp.SMTP} and
L{twisted.mail.smtp.ESMTP}.
"""
+
def testSMTPGreetingHost(self, serverClass=smtp.SMTP):
"""
Test that the specified hostname shows up in the SMTP server's
@@ -1063,7 +1018,6 @@ class SMTPServerTests(unittest.TestCase):
s.connectionLost(error.ConnectionDone())
self.assertIn(b"example.com", t.value())
-
def testSMTPGreetingNotExtended(self):
"""
Test that the string "ESMTP" does not appear in the SMTP server's
@@ -1076,14 +1030,12 @@ class SMTPServerTests(unittest.TestCase):
s.connectionLost(error.ConnectionDone())
self.assertNotIn(b"ESMTP", t.value())
-
def testESMTPGreetingHost(self):
"""
Similar to testSMTPGreetingHost, but for the L{smtp.ESMTP} class.
"""
self.testSMTPGreetingHost(smtp.ESMTP)
-
def testESMTPGreetingExtended(self):
"""
Test that the string "ESMTP" does appear in the ESMTP server's
@@ -1096,7 +1048,6 @@ class SMTPServerTests(unittest.TestCase):
s.connectionLost(error.ConnectionDone())
self.assertIn(b"ESMTP", t.value())
-
def test_SMTPUnknownCommand(self):
"""
Sending an unimplemented command is responded to with a 500.
@@ -1108,16 +1059,17 @@ class SMTPServerTests(unittest.TestCase):
s.connectionLost(error.ConnectionDone())
self.assertIn(b"500 Command not implemented", t.value())
-
def test_acceptSenderAddress(self):
"""
Test that a C{MAIL FROM} command with an acceptable address is
responded to with the correct success code.
"""
+
class AcceptanceDelivery(NotImplementedDelivery):
"""
Delivery object which accepts all senders as valid.
"""
+
def validateFrom(self, helo, origin):
return origin
@@ -1129,21 +1081,18 @@ class SMTPServerTests(unittest.TestCase):
proto.makeConnection(trans)
# Deal with the necessary preliminaries
- proto.dataReceived(b'HELO example.com\r\n')
+ proto.dataReceived(b"HELO example.com\r\n")
trans.clear()
# Try to specify our sender address
- proto.dataReceived(b'MAIL FROM:<alice@example.com>\r\n')
+ proto.dataReceived(b"MAIL FROM:<alice@example.com>\r\n")
# Clean up the protocol before doing anything that might raise an
# exception.
proto.connectionLost(error.ConnectionLost())
# Make sure that we received exactly the correct response
- self.assertEqual(
- trans.value(),
- b'250 Sender address accepted\r\n')
-
+ self.assertEqual(trans.value(), b"250 Sender address accepted\r\n")
def test_deliveryRejectedSenderAddress(self):
"""
@@ -1151,10 +1100,12 @@ class SMTPServerTests(unittest.TestCase):
L{smtp.IMessageDelivery} instance is responded to with the correct
error code.
"""
+
class RejectionDelivery(NotImplementedDelivery):
"""
Delivery object which rejects all senders as invalid.
"""
+
def validateFrom(self, helo, origin):
raise smtp.SMTPBadSender(origin)
@@ -1166,11 +1117,11 @@ class SMTPServerTests(unittest.TestCase):
proto.makeConnection(trans)
# Deal with the necessary preliminaries
- proto.dataReceived(b'HELO example.com\r\n')
+ proto.dataReceived(b"HELO example.com\r\n")
trans.clear()
# Try to specify our sender address
- proto.dataReceived(b'MAIL FROM:<alice@example.com>\r\n')
+ proto.dataReceived(b"MAIL FROM:<alice@example.com>\r\n")
# Clean up the protocol before doing anything that might raise an
# exception.
@@ -1179,9 +1130,9 @@ class SMTPServerTests(unittest.TestCase):
# Make sure that we received exactly the correct response
self.assertEqual(
trans.value(),
- b'550 Cannot receive from specified address '
- b'<alice@example.com>: Sender not acceptable\r\n')
-
+ b"550 Cannot receive from specified address "
+ b"<alice@example.com>: Sender not acceptable\r\n",
+ )
@implementer(ICredentialsChecker)
def test_portalRejectedSenderAddress(self):
@@ -1190,10 +1141,12 @@ class SMTPServerTests(unittest.TestCase):
L{smtp.SMTP} instance's portal is responded to with the correct error
code.
"""
+
class DisallowAnonymousAccess(object):
"""
Checker for L{IAnonymous} which rejects authentication attempts.
"""
+
credentialInterfaces = (IAnonymous,)
def requestAvatarId(self, credentials):
@@ -1207,11 +1160,11 @@ class SMTPServerTests(unittest.TestCase):
proto.makeConnection(trans)
# Deal with the necessary preliminaries
- proto.dataReceived(b'HELO example.com\r\n')
+ proto.dataReceived(b"HELO example.com\r\n")
trans.clear()
# Try to specify our sender address
- proto.dataReceived(b'MAIL FROM:<alice@example.com>\r\n')
+ proto.dataReceived(b"MAIL FROM:<alice@example.com>\r\n")
# Clean up the protocol before doing anything that might raise an
# exception.
@@ -1220,9 +1173,9 @@ class SMTPServerTests(unittest.TestCase):
# Make sure that we received exactly the correct response
self.assertEqual(
trans.value(),
- b'550 Cannot receive from specified address '
- b'<alice@example.com>: Sender not acceptable\r\n')
-
+ b"550 Cannot receive from specified address "
+ b"<alice@example.com>: Sender not acceptable\r\n",
+ )
def test_portalRejectedAnonymousSender(self):
"""
@@ -1238,11 +1191,11 @@ class SMTPServerTests(unittest.TestCase):
proto.makeConnection(trans)
# Deal with the necessary preliminaries
- proto.dataReceived(b'HELO example.com\r\n')
+ proto.dataReceived(b"HELO example.com\r\n")
trans.clear()
# Try to specify our sender address
- proto.dataReceived(b'MAIL FROM:<alice@example.com>\r\n')
+ proto.dataReceived(b"MAIL FROM:<alice@example.com>\r\n")
# Clean up the protocol before doing anything that might raise an
# exception.
@@ -1251,9 +1204,9 @@ class SMTPServerTests(unittest.TestCase):
# Make sure that we received exactly the correct response
self.assertEqual(
trans.value(),
- b'550 Cannot receive from specified address '
- b'<alice@example.com>: Unauthenticated senders not allowed\r\n')
-
+ b"550 Cannot receive from specified address "
+ b"<alice@example.com>: Unauthenticated senders not allowed\r\n",
+ )
class ESMTPAuthenticationTests(unittest.TestCase):
@@ -1267,13 +1220,11 @@ class ESMTPAuthenticationTests(unittest.TestCase):
"""
self.transport.clear()
self.server.dataReceived(bytes)
- self.assertEqual(
- response,
- self.transport.value().splitlines())
+ self.assertEqual(response, self.transport.value().splitlines())
-
- def assertServerAuthenticated(self, loginArgs, username=b"username",
- password=b"password"):
+ def assertServerAuthenticated(
+ self, loginArgs, username=b"username", password=b"password"
+ ):
"""
Assert that a login attempt has been made, that the credentials and
interfaces passed to it are correct, and that when the login request
@@ -1286,7 +1237,9 @@ class ESMTPAuthenticationTests(unittest.TestCase):
"""
d, credentials, mind, interfaces = loginArgs.pop()
self.assertEqual(loginArgs, [])
- self.assertTrue(twisted.cred.credentials.IUsernamePassword.providedBy(credentials))
+ self.assertTrue(
+ twisted.cred.credentials.IUsernamePassword.providedBy(credentials)
+ )
self.assertEqual(credentials.username, username)
self.assertTrue(credentials.checkPassword(password))
self.assertIn(smtp.IMessageDeliveryFactory, interfaces)
@@ -1294,54 +1247,48 @@ class ESMTPAuthenticationTests(unittest.TestCase):
d.callback((smtp.IMessageDeliveryFactory, None, lambda: None))
self.assertEqual(
- [b"235 Authentication successful."],
- self.transport.value().splitlines())
-
+ [b"235 Authentication successful."], self.transport.value().splitlines()
+ )
def setUp(self):
"""
Create an ESMTP instance attached to a StringTransport.
"""
- self.server = smtp.ESMTP({
- b'LOGIN': LOGINCredentials})
- self.server.host = b'localhost'
+ self.server = smtp.ESMTP({b"LOGIN": LOGINCredentials})
+ self.server.host = b"localhost"
self.transport = StringTransport(
- peerAddress=address.IPv4Address('TCP', '127.0.0.1', 12345))
+ peerAddress=address.IPv4Address("TCP", "127.0.0.1", 12345)
+ )
self.server.makeConnection(self.transport)
-
def tearDown(self):
"""
Disconnect the ESMTP instance to clean up its timeout DelayedCall.
"""
self.server.connectionLost(error.ConnectionDone())
-
def portalFactory(self, loginList):
class DummyPortal:
def login(self, credentials, mind, *interfaces):
d = defer.Deferred()
loginList.append((d, credentials, mind, interfaces))
return d
- return DummyPortal()
+ return DummyPortal()
def test_authenticationCapabilityAdvertised(self):
"""
Test that AUTH is advertised to clients which issue an EHLO command.
"""
self.transport.clear()
- self.server.dataReceived(b'EHLO\r\n')
+ self.server.dataReceived(b"EHLO\r\n")
responseLines = self.transport.value().splitlines()
self.assertEqual(
- responseLines[0],
- b"250-localhost Hello 127.0.0.1, nice to meet you")
- self.assertEqual(
- responseLines[1],
- b"250 AUTH LOGIN")
+ responseLines[0], b"250-localhost Hello 127.0.0.1, nice to meet you"
+ )
+ self.assertEqual(responseLines[1], b"250 AUTH LOGIN")
self.assertEqual(len(responseLines), 2)
-
def test_plainAuthentication(self):
"""
Test that the LOGIN authentication mechanism can be used
@@ -1349,24 +1296,22 @@ class ESMTPAuthenticationTests(unittest.TestCase):
loginArgs = []
self.server.portal = self.portalFactory(loginArgs)
- self.server.dataReceived(b'EHLO\r\n')
+ self.server.dataReceived(b"EHLO\r\n")
self.transport.clear()
self.assertServerResponse(
- b'AUTH LOGIN\r\n',
- [b"334 " + base64.b64encode(b"User Name\0").strip()])
+ b"AUTH LOGIN\r\n", [b"334 " + base64.b64encode(b"User Name\0").strip()]
+ )
self.assertServerResponse(
- base64.b64encode(b'username') + b'\r\n',
- [b"334 " + base64.b64encode(b"Password\0").strip()])
+ base64.b64encode(b"username") + b"\r\n",
+ [b"334 " + base64.b64encode(b"Password\0").strip()],
+ )
- self.assertServerResponse(
- base64.b64encode(b'password').strip() + b'\r\n',
- [])
+ self.assertServerResponse(base64.b64encode(b"password").strip() + b"\r\n", [])
self.assertServerAuthenticated(loginArgs)
-
def test_plainAuthenticationEmptyPassword(self):
"""
Test that giving an empty password for plain auth succeeds.
@@ -1374,20 +1319,20 @@ class ESMTPAuthenticationTests(unittest.TestCase):
loginArgs = []
self.server.portal = self.portalFactory(loginArgs)
- self.server.dataReceived(b'EHLO\r\n')
+ self.server.dataReceived(b"EHLO\r\n")
self.transport.clear()
self.assertServerResponse(
- b'AUTH LOGIN\r\n',
- [b"334 " + base64.b64encode(b"User Name\0").strip()])
+ b"AUTH LOGIN\r\n", [b"334 " + base64.b64encode(b"User Name\0").strip()]
+ )
self.assertServerResponse(
- base64.b64encode(b'username') + b'\r\n',
- [b"334 " + base64.b64encode(b"Password\0").strip()])
-
- self.assertServerResponse(b'\r\n', [])
- self.assertServerAuthenticated(loginArgs, password=b'')
+ base64.b64encode(b"username") + b"\r\n",
+ [b"334 " + base64.b64encode(b"Password\0").strip()],
+ )
+ self.assertServerResponse(b"\r\n", [])
+ self.assertServerAuthenticated(loginArgs, password=b"")
def test_plainAuthenticationInitialResponse(self):
"""
@@ -1397,20 +1342,18 @@ class ESMTPAuthenticationTests(unittest.TestCase):
loginArgs = []
self.server.portal = self.portalFactory(loginArgs)
- self.server.dataReceived(b'EHLO\r\n')
+ self.server.dataReceived(b"EHLO\r\n")
self.transport.clear()
self.assertServerResponse(
- b'AUTH LOGIN ' + base64.b64encode(b"username").strip() + b'\r\n',
- [b"334 " + base64.b64encode(b"Password\0").strip()])
+ b"AUTH LOGIN " + base64.b64encode(b"username").strip() + b"\r\n",
+ [b"334 " + base64.b64encode(b"Password\0").strip()],
+ )
- self.assertServerResponse(
- base64.b64encode(b'password').strip() + b'\r\n',
- [])
+ self.assertServerResponse(base64.b64encode(b"password").strip() + b"\r\n", [])
self.assertServerAuthenticated(loginArgs)
-
def test_abortAuthentication(self):
"""
Test that a challenge/response sequence can be aborted by the client.
@@ -1418,13 +1361,10 @@ class ESMTPAuthenticationTests(unittest.TestCase):
loginArgs = []
self.server.portal = self.portalFactory(loginArgs)
- self.server.dataReceived(b'EHLO\r\n')
- self.server.dataReceived(b'AUTH LOGIN\r\n')
-
- self.assertServerResponse(
- b'*\r\n',
- [b'501 Authentication aborted'])
+ self.server.dataReceived(b"EHLO\r\n")
+ self.server.dataReceived(b"AUTH LOGIN\r\n")
+ self.assertServerResponse(b"*\r\n", [b"501 Authentication aborted"])
def test_invalidBase64EncodedResponse(self):
"""
@@ -1434,16 +1374,15 @@ class ESMTPAuthenticationTests(unittest.TestCase):
loginArgs = []
self.server.portal = self.portalFactory(loginArgs)
- self.server.dataReceived(b'EHLO\r\n')
- self.server.dataReceived(b'AUTH LOGIN\r\n')
+ self.server.dataReceived(b"EHLO\r\n")
+ self.server.dataReceived(b"AUTH LOGIN\r\n")
self.assertServerResponse(
- b'x\r\n',
- [b'501 Syntax error in parameters or arguments'])
+ b"x\r\n", [b"501 Syntax error in parameters or arguments"]
+ )
self.assertEqual(loginArgs, [])
-
def test_invalidBase64EncodedInitialResponse(self):
"""
Like L{test_invalidBase64EncodedResponse} but for the case of an
@@ -1452,14 +1391,13 @@ class ESMTPAuthenticationTests(unittest.TestCase):
loginArgs = []
self.server.portal = self.portalFactory(loginArgs)
- self.server.dataReceived(b'EHLO\r\n')
+ self.server.dataReceived(b"EHLO\r\n")
self.assertServerResponse(
- b'AUTH LOGIN x\r\n',
- [b'501 Syntax error in parameters or arguments'])
+ b"AUTH LOGIN x\r\n", [b"501 Syntax error in parameters or arguments"]
+ )
self.assertEqual(loginArgs, [])
-
def test_unexpectedLoginFailure(self):
"""
If the L{Deferred} returned by L{Portal.login} fires with an
@@ -1470,31 +1408,31 @@ class ESMTPAuthenticationTests(unittest.TestCase):
loginArgs = []
self.server.portal = self.portalFactory(loginArgs)
- self.server.dataReceived(b'EHLO\r\n')
+ self.server.dataReceived(b"EHLO\r\n")
self.transport.clear()
self.assertServerResponse(
- b'AUTH LOGIN ' + base64.b64encode(b'username').strip() + b'\r\n',
- [b'334 ' + base64.b64encode(b'Password\0').strip()])
- self.assertServerResponse(
- base64.b64encode(b'password').strip() + b'\r\n',
- [])
+ b"AUTH LOGIN " + base64.b64encode(b"username").strip() + b"\r\n",
+ [b"334 " + base64.b64encode(b"Password\0").strip()],
+ )
+ self.assertServerResponse(base64.b64encode(b"password").strip() + b"\r\n", [])
d, credentials, mind, interfaces = loginArgs.pop()
d.errback(RuntimeError("Something wrong with the server"))
self.assertEqual(
- b'451 Requested action aborted: local error in processing\r\n',
- self.transport.value())
+ b"451 Requested action aborted: local error in processing\r\n",
+ self.transport.value(),
+ )
self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 1)
-
class SMTPClientErrorTests(unittest.TestCase):
"""
Tests for L{smtp.SMTPClientError}.
"""
+
def test_str(self):
"""
The string representation of a L{SMTPClientError} instance includes
@@ -1503,7 +1441,6 @@ class SMTPClientErrorTests(unittest.TestCase):
err = smtp.SMTPClientError(123, "some text")
self.assertEqual(str(err), "123 some text")
-
def test_strWithNegativeCode(self):
"""
If the response code supplied to L{SMTPClientError} is negative, it
@@ -1512,7 +1449,6 @@ class SMTPClientErrorTests(unittest.TestCase):
err = smtp.SMTPClientError(-1, b"foo bar")
self.assertEqual(str(err), "foo bar")
-
def test_strWithLog(self):
"""
If a line log is supplied to L{SMTPClientError}, its contents are
@@ -1522,12 +1458,7 @@ class SMTPClientErrorTests(unittest.TestCase):
log.append(b"testlog")
log.append(b"secondline")
err = smtp.SMTPClientError(100, "test error", log=log.str())
- self.assertEqual(
- str(err),
- "100 test error\n"
- "testlog\n"
- "secondline\n")
-
+ self.assertEqual(str(err), "100 test error\n" "testlog\n" "secondline\n")
class SenderMixinSentMailTests(unittest.TestCase):
@@ -1535,39 +1466,48 @@ class SenderMixinSentMailTests(unittest.TestCase):
Tests for L{smtp.SenderMixin.sentMail}, used in particular by
L{smtp.SMTPSenderFactory} and L{smtp.ESMTPSenderFactory}.
"""
+
def test_onlyLogFailedAddresses(self):
"""
L{smtp.SenderMixin.sentMail} adds only the addresses with failing
SMTP response codes to the log passed to the factory's errback.
"""
onDone = self.assertFailure(defer.Deferred(), smtp.SMTPDeliveryError)
- onDone.addCallback(lambda e: self.assertEqual(
- e.log, b"bob@example.com: 199 Error in sending.\n"))
+ onDone.addCallback(
+ lambda e: self.assertEqual(
+ e.log, b"bob@example.com: 199 Error in sending.\n"
+ )
+ )
clientFactory = smtp.SMTPSenderFactory(
- 'source@address', 'recipient@address',
- BytesIO(b"Message body"), onDone,
- retries=0, timeout=0.5)
+ "source@address",
+ "recipient@address",
+ BytesIO(b"Message body"),
+ onDone,
+ retries=0,
+ timeout=0.5,
+ )
client = clientFactory.buildProtocol(
- address.IPv4Address('TCP', 'example.net', 25))
+ address.IPv4Address("TCP", "example.net", 25)
+ )
- addresses = [(b"alice@example.com", 200, b"No errors here!"),
- (b"bob@example.com", 199, b"Error in sending.")]
+ addresses = [
+ (b"alice@example.com", 200, b"No errors here!"),
+ (b"bob@example.com", 199, b"Error in sending."),
+ ]
client.sentMail(199, b"Test response", 1, addresses, client.log)
return onDone
-
class ESMTPDowngradeTestCase(unittest.TestCase):
"""
Tests for the ESMTP -> SMTP downgrade functionality in L{smtp.ESMTPClient}.
"""
- def setUp(self):
- self.clientProtocol = smtp.ESMTPClient(
- b"testpassword", None, b"testuser")
+ def setUp(self):
+ self.clientProtocol = smtp.ESMTPClient(b"testpassword", None, b"testuser")
def test_requireHELOFallbackOperates(self):
"""
@@ -1585,7 +1525,6 @@ class ESMTPDowngradeTestCase(unittest.TestCase):
self.clientProtocol.dataReceived(b"500 not an esmtp server\r\n")
self.assertEqual(b"HELO testuser\r\n", transport.value())
-
def test_requireAuthFailsHELOFallback(self):
"""
If authentication is required, and HELO fallback is on, HELO fallback
@@ -1602,7 +1541,6 @@ class ESMTPDowngradeTestCase(unittest.TestCase):
self.clientProtocol.dataReceived(b"500 not an esmtp server\r\n")
self.assertEqual(b"QUIT\r\n", transport.value())
-
def test_requireTLSFailsHELOFallback(self):
"""
If TLS is required and the connection is insecure, HELO fallback must
@@ -1619,7 +1557,6 @@ class ESMTPDowngradeTestCase(unittest.TestCase):
self.clientProtocol.dataReceived(b"500 not an esmtp server\r\n")
self.assertEqual(b"QUIT\r\n", transport.value())
-
def test_requireTLSAndHELOFallbackSucceedsIfOverTLS(self):
"""
If TLS is provided at the transport level, we can honour the HELO
@@ -1638,11 +1575,11 @@ class ESMTPDowngradeTestCase(unittest.TestCase):
self.assertEqual(b"HELO testuser\r\n", transport.value())
-
class SSLTestCase(unittest.TestCase):
"""
Tests for the TLS negotiation done by L{smtp.ESMTPClient}.
"""
+
if sslSkip is not None:
skip = sslSkip
@@ -1651,11 +1588,11 @@ class SSLTestCase(unittest.TestCase):
def setUp(self):
self.clientProtocol = smtp.ESMTPClient(
- b"testpassword", ClientTLSContext(), b"testuser")
+ b"testpassword", ClientTLSContext(), b"testuser"
+ )
self.clientProtocol.requireTransportSecurity = True
self.clientProtocol.getMailFrom = lambda: "test@example.org"
-
def _requireTransportSecurityOverSSLTest(self, capabilities):
"""
Verify that when L{smtp.ESMTPClient} connects to a server over a
@@ -1685,10 +1622,7 @@ class SSLTestCase(unittest.TestCase):
# The client should now try to send a message - without first trying to
# negotiate TLS, since the transport is already secure.
- self.assertEqual(
- b"MAIL FROM:<test@example.org>\r\n",
- transport.value())
-
+ self.assertEqual(b"MAIL FROM:<test@example.org>\r\n", transport.value())
def test_requireTransportSecurityOverSSL(self):
"""
@@ -1697,7 +1631,6 @@ class SSLTestCase(unittest.TestCase):
"""
self._requireTransportSecurityOverSSLTest(b"250 AUTH LOGIN\r\n")
-
def test_requireTransportSecurityTLSOffered(self):
"""
When C{requireTransportSecurity} is C{True} and the client is connected
@@ -1713,14 +1646,12 @@ class SSLTestCase(unittest.TestCase):
# Tell the client about the server's capabilities - including STARTTLS
self.clientProtocol.dataReceived(
- self.EHLO_RESPONSE +
- b"250-AUTH LOGIN\r\n"
- b"250 STARTTLS\r\n")
+ self.EHLO_RESPONSE + b"250-AUTH LOGIN\r\n" b"250 STARTTLS\r\n"
+ )
# The client should try to start TLS before sending the message.
self.assertEqual(b"STARTTLS\r\n", transport.value())
-
def test_requireTransportSecurityTLSOfferedOverSSL(self):
"""
When C{requireTransportSecurity} is C{True} and the client is connected
@@ -1728,9 +1659,8 @@ class SSLTestCase(unittest.TestCase):
extension, it is not used before mail is delivered.
"""
self._requireTransportSecurityOverSSLTest(
- b"250-AUTH LOGIN\r\n"
- b"250 STARTTLS\r\n")
-
+ b"250-AUTH LOGIN\r\n" b"250 STARTTLS\r\n"
+ )
def test_requireTransportSecurityTLSNotOffered(self):
"""
@@ -1746,14 +1676,11 @@ class SSLTestCase(unittest.TestCase):
transport.clear()
# Tell the client about the server's capabilities - excluding STARTTLS
- self.clientProtocol.dataReceived(
- self.EHLO_RESPONSE +
- b"250 AUTH LOGIN\r\n")
+ self.clientProtocol.dataReceived(self.EHLO_RESPONSE + b"250 AUTH LOGIN\r\n")
# The client give up
self.assertEqual(b"QUIT\r\n", transport.value())
-
def test_esmtpClientTlsModeDeprecationGet(self):
"""
L{smtp.ESMTPClient.tlsMode} is deprecated.
@@ -1761,24 +1688,22 @@ class SSLTestCase(unittest.TestCase):
val = self.clientProtocol.tlsMode
del val
warningsShown = self.flushWarnings(
- offendingFunctions=[self.test_esmtpClientTlsModeDeprecationGet])
+ offendingFunctions=[self.test_esmtpClientTlsModeDeprecationGet]
+ )
self.assertEqual(len(warningsShown), 1)
- self.assertIdentical(
- warningsShown[0]['category'], DeprecationWarning)
+ self.assertIdentical(warningsShown[0]["category"], DeprecationWarning)
self.assertEqual(
- warningsShown[0]['message'],
+ warningsShown[0]["message"],
"tlsMode attribute of twisted.mail.smtp.ESMTPClient "
- "is deprecated since Twisted 13.0")
-
+ "is deprecated since Twisted 13.0",
+ )
def test_esmtpClientTlsModeDeprecationGetAttributeError(self):
"""
L{smtp.ESMTPClient.__getattr__} raises an attribute error for other
attribute names which do not exist.
"""
- self.assertRaises(
- AttributeError, lambda: self.clientProtocol.doesNotExist)
-
+ self.assertRaises(AttributeError, lambda: self.clientProtocol.doesNotExist)
def test_esmtpClientTlsModeDeprecationSet(self):
"""
@@ -1786,25 +1711,25 @@ class SSLTestCase(unittest.TestCase):
"""
self.clientProtocol.tlsMode = False
warningsShown = self.flushWarnings(
- offendingFunctions=[self.test_esmtpClientTlsModeDeprecationSet])
+ offendingFunctions=[self.test_esmtpClientTlsModeDeprecationSet]
+ )
self.assertEqual(len(warningsShown), 1)
- self.assertIdentical(
- warningsShown[0]['category'], DeprecationWarning)
+ self.assertIdentical(warningsShown[0]["category"], DeprecationWarning)
self.assertEqual(
- warningsShown[0]['message'],
+ warningsShown[0]["message"],
"tlsMode attribute of twisted.mail.smtp.ESMTPClient "
- "is deprecated since Twisted 13.0")
-
+ "is deprecated since Twisted 13.0",
+ )
class AbortableStringTransport(StringTransport):
"""
A version of L{StringTransport} that supports C{abortConnection}.
"""
+
# This should be replaced by a common version in #6530.
aborting = False
-
def abortConnection(self):
"""
A testable version of the C{ITCPTransport.abortConnection} method.
@@ -1816,11 +1741,11 @@ class AbortableStringTransport(StringTransport):
self.loseConnection()
-
class SendmailTests(unittest.TestCase):
"""
Tests for L{twisted.mail.smtp.sendmail}.
"""
+
def test_defaultReactorIsGlobalReactor(self):
"""
The default C{reactor} parameter of L{twisted.mail.smtp.sendmail} is
@@ -1829,24 +1754,29 @@ class SendmailTests(unittest.TestCase):
args, varArgs, keywords, defaults = inspect.getargspec(smtp.sendmail)
self.assertEqual(reactor, defaults[2])
-
def _honorsESMTPArguments(self, username, password):
"""
L{twisted.mail.smtp.sendmail} creates the ESMTP factory with the ESMTP
arguments.
"""
reactor = MemoryReactor()
- smtp.sendmail("localhost", "source@address", "recipient@address",
- b"message", reactor=reactor, username=username,
- password=password, requireTransportSecurity=True,
- requireAuthentication=True)
+ smtp.sendmail(
+ "localhost",
+ "source@address",
+ "recipient@address",
+ b"message",
+ reactor=reactor,
+ username=username,
+ password=password,
+ requireTransportSecurity=True,
+ requireAuthentication=True,
+ )
factory = reactor.tcpClients[0][2]
self.assertEqual(factory._requireTransportSecurity, True)
self.assertEqual(factory._requireAuthentication, True)
self.assertEqual(factory.username, b"foo")
self.assertEqual(factory.password, b"bar")
-
def test_honorsESMTPArgumentsUnicodeUserPW(self):
"""
L{twisted.mail.smtp.sendmail} should accept C{username} and C{password}
@@ -1854,7 +1784,6 @@ class SendmailTests(unittest.TestCase):
"""
return self._honorsESMTPArguments(username=u"foo", password=u"bar")
-
def test_honorsESMTPArgumentsBytesUserPW(self):
"""
L{twisted.mail.smtp.sendmail} should accept C{username} and C{password}
@@ -1862,7 +1791,6 @@ class SendmailTests(unittest.TestCase):
"""
return self._honorsESMTPArguments(username=b"foo", password=b"bar")
-
def test_messageFilePassthrough(self):
"""
L{twisted.mail.smtp.sendmail} will pass through the message untouched
@@ -1871,38 +1799,51 @@ class SendmailTests(unittest.TestCase):
reactor = MemoryReactor()
messageFile = BytesIO(b"File!")
- smtp.sendmail("localhost", "source@address", "recipient@address",
- messageFile, reactor=reactor)
+ smtp.sendmail(
+ "localhost",
+ "source@address",
+ "recipient@address",
+ messageFile,
+ reactor=reactor,
+ )
factory = reactor.tcpClients[0][2]
self.assertIs(factory.file, messageFile)
-
def test_messageStringMadeFile(self):
"""
L{twisted.mail.smtp.sendmail} will turn non-file-like objects
(eg. strings) into file-like objects before sending.
"""
reactor = MemoryReactor()
- smtp.sendmail("localhost", "source@address", "recipient@address",
- b"message", reactor=reactor)
+ smtp.sendmail(
+ "localhost",
+ "source@address",
+ "recipient@address",
+ b"message",
+ reactor=reactor,
+ )
factory = reactor.tcpClients[0][2]
messageFile = factory.file
messageFile.seek(0)
self.assertEqual(messageFile.read(), b"message")
-
def test_senderDomainName(self):
"""
L{twisted.mail.smtp.sendmail} passes through the sender domain name, if
provided.
"""
reactor = MemoryReactor()
- smtp.sendmail("localhost", "source@address", "recipient@address",
- b"message", reactor=reactor, senderDomainName="foo")
+ smtp.sendmail(
+ "localhost",
+ "source@address",
+ "recipient@address",
+ b"message",
+ reactor=reactor,
+ senderDomainName="foo",
+ )
factory = reactor.tcpClients[0][2]
self.assertEqual(factory.domain, b"foo")
-
def test_cancelBeforeConnectionMade(self):
"""
When a user cancels L{twisted.mail.smtp.sendmail} before the connection
@@ -1910,14 +1851,18 @@ class SendmailTests(unittest.TestCase):
L{twisted.internet.interfaces.IConnector.disconnect}.
"""
reactor = MemoryReactor()
- d = smtp.sendmail("localhost", "source@address", "recipient@address",
- b"message", reactor=reactor)
+ d = smtp.sendmail(
+ "localhost",
+ "source@address",
+ "recipient@address",
+ b"message",
+ reactor=reactor,
+ )
d.cancel()
self.assertEqual(reactor.connectors[0]._disconnected, True)
failure = self.failureResultOf(d)
failure.trap(defer.CancelledError)
-
def test_cancelAfterConnectionMade(self):
"""
When a user cancels L{twisted.mail.smtp.sendmail} after the connection
@@ -1926,8 +1871,13 @@ class SendmailTests(unittest.TestCase):
"""
reactor = MemoryReactor()
transport = AbortableStringTransport()
- d = smtp.sendmail("localhost", "source@address", "recipient@address",
- b"message", reactor=reactor)
+ d = smtp.sendmail(
+ "localhost",
+ "source@address",
+ "recipient@address",
+ b"message",
+ reactor=reactor,
+ )
factory = reactor.tcpClients[0][2]
p = factory.buildProtocol(None)
p.makeConnection(transport)
diff --git a/src/twisted/web/test/test_http.py b/src/twisted/web/test/test_http.py
index a3067f7..5188372 100644
--- a/src/twisted/web/test/test_http.py
+++ b/src/twisted/web/test/test_http.py
@@ -2371,20 +2371,6 @@ ok
class QueryArgumentsTests(unittest.TestCase):
- def testParseqs(self):
- self.assertEqual(
- cgi.parse_qs(b"a=b&d=c;+=f"),
- http.parse_qs(b"a=b&d=c;+=f"))
- self.assertRaises(
- ValueError, http.parse_qs, b"blah", strict_parsing=True)
- self.assertEqual(
- cgi.parse_qs(b"a=&b=c", keep_blank_values=1),
- http.parse_qs(b"a=&b=c", keep_blank_values=1))
- self.assertEqual(
- cgi.parse_qs(b"a=&b=c"),
- http.parse_qs(b"a=&b=c"))
-
-
def test_urlparse(self):
"""
For a given URL, L{http.urlparse} should behave the same as L{urlparse},