summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_ftplib.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_ftplib.py')
-rw-r--r--Lib/test/test_ftplib.py180
1 files changed, 152 insertions, 28 deletions
diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py
index 71bc23e..299a146 100644
--- a/Lib/test/test_ftplib.py
+++ b/Lib/test/test_ftplib.py
@@ -22,10 +22,25 @@ from test.support import HOST
threading = support.import_module('threading')
# the dummy data returned by server over the data channel when
-# RETR, LIST and NLST commands are issued
+# RETR, LIST, NLST, MLSD commands are issued
RETR_DATA = 'abcde12345\r\n' * 1000
LIST_DATA = 'foo\r\nbar\r\n'
NLST_DATA = 'foo\r\nbar\r\n'
+MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
+ "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
+ "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
+ "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
+ "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
+ "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
+ "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
+ "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
+ "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
+ "type=file;perm=r;unique==keVO1+IH4; leading space\r\n"
+ "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
+ "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
+ "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
+ "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
+ "type=file;perm=r;unique==keVO1+1G4; file4\r\n")
class DummyDTPHandler(asynchat.async_chat):
@@ -49,6 +64,11 @@ class DummyDTPHandler(asynchat.async_chat):
self.dtp_conn_closed = True
def push(self, what):
+ if self.baseclass.next_data is not None:
+ what = self.baseclass.next_data
+ self.baseclass.next_data = None
+ if not what:
+ return self.close_when_done()
super(DummyDTPHandler, self).push(what.encode('ascii'))
def handle_error(self):
@@ -69,6 +89,7 @@ class DummyFTPHandler(asynchat.async_chat):
self.last_received_cmd = None
self.last_received_data = ''
self.next_response = ''
+ self.next_data = None
self.rest = None
self.push('220 welcome')
@@ -104,7 +125,7 @@ class DummyFTPHandler(asynchat.async_chat):
addr = list(map(int, arg.split(',')))
ip = '%d.%d.%d.%d' %tuple(addr[:4])
port = (addr[4] * 256) + addr[5]
- s = socket.create_connection((ip, port), timeout=10)
+ s = socket.create_connection((ip, port), timeout=2)
self.dtp = self.dtp_handler(s, baseclass=self)
self.push('200 active data connection established')
@@ -122,7 +143,7 @@ class DummyFTPHandler(asynchat.async_chat):
def cmd_eprt(self, arg):
af, ip, port = arg.split(arg[0])[1:-1]
port = int(port)
- s = socket.create_connection((ip, port), timeout=10)
+ s = socket.create_connection((ip, port), timeout=2)
self.dtp = self.dtp_handler(s, baseclass=self)
self.push('200 active data connection established')
@@ -213,6 +234,14 @@ class DummyFTPHandler(asynchat.async_chat):
self.dtp.push(NLST_DATA)
self.dtp.close_when_done()
+ def cmd_opts(self, arg):
+ self.push('200 opts ok')
+
+ def cmd_mlsd(self, arg):
+ self.push('125 mlsd ok')
+ self.dtp.push(MLSD_DATA)
+ self.dtp.close_when_done()
+
class DummyFTPServer(asyncore.dispatcher, threading.Thread):
@@ -274,11 +303,11 @@ if ssl is not None:
_ssl_closing = False
def secure_connection(self):
- self.del_channel()
socket = ssl.wrap_socket(self.socket, suppress_ragged_eofs=False,
certfile=CERTFILE, server_side=True,
do_handshake_on_connect=False,
ssl_version=ssl.PROTOCOL_SSLv23)
+ self.del_channel()
self.set_socket(socket)
self._ssl_accepting = True
@@ -313,7 +342,10 @@ if ssl is not None:
# http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
pass
self._ssl_closing = False
- super(SSLConnection, self).close()
+ if getattr(self, '_ccc', False) is False:
+ super(SSLConnection, self).close()
+ else:
+ pass
def handle_read_event(self):
if self._ssl_accepting:
@@ -381,12 +413,18 @@ if ssl is not None:
def __init__(self, conn):
DummyFTPHandler.__init__(self, conn)
self.secure_data_channel = False
+ self._ccc = False
def cmd_auth(self, line):
"""Set up secure control channel."""
self.push('234 AUTH TLS successful')
self.secure_connection()
+ def cmd_ccc(self, line):
+ self.push('220 Reverting back to clear-text')
+ self._ccc = True
+ self._do_ssl_shutdown()
+
def cmd_pbsz(self, line):
"""Negotiate size of buffer for secure data transfer.
For TLS/SSL the only valid value for the parameter is '0'.
@@ -416,13 +454,17 @@ class TestFTPClass(TestCase):
def setUp(self):
self.server = DummyFTPServer((HOST, 0))
self.server.start()
- self.client = ftplib.FTP(timeout=10)
+ self.client = ftplib.FTP(timeout=2)
self.client.connect(self.server.host, self.server.port)
def tearDown(self):
self.client.close()
self.server.stop()
+ def check_data(self, received, expected):
+ self.assertEqual(len(received), len(expected))
+ self.assertEqual(received, expected)
+
def test_getwelcome(self):
self.assertEqual(self.client.getwelcome(), '220 welcome')
@@ -504,7 +546,7 @@ class TestFTPClass(TestCase):
received.append(data.decode('ascii'))
received = []
self.client.retrbinary('retr', callback)
- self.assertEqual(''.join(received), RETR_DATA)
+ self.check_data(''.join(received), RETR_DATA)
def test_retrbinary_rest(self):
def callback(data):
@@ -512,20 +554,17 @@ class TestFTPClass(TestCase):
for rest in (0, 10, 20):
received = []
self.client.retrbinary('retr', callback, rest=rest)
- self.assertEqual(''.join(received), RETR_DATA[rest:],
- msg='rest test case %d %d %d' % (rest,
- len(''.join(received)),
- len(RETR_DATA[rest:])))
+ self.check_data(''.join(received), RETR_DATA[rest:])
def test_retrlines(self):
received = []
self.client.retrlines('retr', received.append)
- self.assertEqual(''.join(received), RETR_DATA.replace('\r\n', ''))
+ self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
def test_storbinary(self):
f = io.BytesIO(RETR_DATA.encode('ascii'))
self.client.storbinary('stor', f)
- self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA)
+ self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
# test new callback arg
flag = []
f.seek(0)
@@ -542,7 +581,7 @@ class TestFTPClass(TestCase):
def test_storlines(self):
f = io.BytesIO(RETR_DATA.replace('\r\n', '\n').encode('ascii'))
self.client.storlines('stor', f)
- self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA)
+ self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
# test new callback arg
flag = []
f.seek(0)
@@ -558,6 +597,64 @@ class TestFTPClass(TestCase):
self.client.dir(lambda x: l.append(x))
self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
+ def test_mlsd(self):
+ list(self.client.mlsd())
+ list(self.client.mlsd(path='/'))
+ list(self.client.mlsd(path='/', facts=['size', 'type']))
+
+ ls = list(self.client.mlsd())
+ for name, facts in ls:
+ self.assertIsInstance(name, str)
+ self.assertIsInstance(facts, dict)
+ self.assertTrue(name)
+ self.assertIn('type', facts)
+ self.assertIn('perm', facts)
+ self.assertIn('unique', facts)
+
+ def set_data(data):
+ self.server.handler_instance.next_data = data
+
+ def test_entry(line, type=None, perm=None, unique=None, name=None):
+ type = 'type' if type is None else type
+ perm = 'perm' if perm is None else perm
+ unique = 'unique' if unique is None else unique
+ name = 'name' if name is None else name
+ set_data(line)
+ _name, facts = next(self.client.mlsd())
+ self.assertEqual(_name, name)
+ self.assertEqual(facts['type'], type)
+ self.assertEqual(facts['perm'], perm)
+ self.assertEqual(facts['unique'], unique)
+
+ # plain
+ test_entry('type=type;perm=perm;unique=unique; name\r\n')
+ # "=" in fact value
+ test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
+ test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
+ test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
+ test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
+ # spaces in name
+ test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
+ test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
+ test_entry('type=type;perm=perm;unique=unique; name\r\n', name=" name")
+ test_entry('type=type;perm=perm;unique=unique; n am e\r\n', name="n am e")
+ # ";" in name
+ test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
+ test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
+ test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
+ test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
+ # case sensitiveness
+ set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
+ _name, facts = next(self.client.mlsd())
+ for x in facts:
+ self.assertTrue(x.islower())
+ # no data (directory empty)
+ set_data('')
+ self.assertRaises(StopIteration, next, self.client.mlsd())
+ set_data('')
+ for x in self.client.mlsd():
+ self.fail("unexpected data %s" % data)
+
def test_makeport(self):
with self.client.makeport():
# IPv4 is in use, just make sure send_eprt has not been used
@@ -584,7 +681,7 @@ class TestFTPClass(TestCase):
return True
# base test
- with ftplib.FTP(timeout=10) as self.client:
+ with ftplib.FTP(timeout=2) as self.client:
self.client.connect(self.server.host, self.server.port)
self.client.sendcmd('noop')
self.assertTrue(is_client_connected())
@@ -592,7 +689,7 @@ class TestFTPClass(TestCase):
self.assertFalse(is_client_connected())
# QUIT sent inside the with block
- with ftplib.FTP(timeout=10) as self.client:
+ with ftplib.FTP(timeout=2) as self.client:
self.client.connect(self.server.host, self.server.port)
self.client.sendcmd('noop')
self.client.quit()
@@ -602,7 +699,7 @@ class TestFTPClass(TestCase):
# force a wrong response code to be sent on QUIT: error_perm
# is expected and the connection is supposed to be closed
try:
- with ftplib.FTP(timeout=10) as self.client:
+ with ftplib.FTP(timeout=2) as self.client:
self.client.connect(self.server.host, self.server.port)
self.client.sendcmd('noop')
self.server.handler_instance.next_response = '550 error on quit'
@@ -616,6 +713,30 @@ class TestFTPClass(TestCase):
self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
self.assertFalse(is_client_connected())
+ def test_source_address(self):
+ self.client.quit()
+ port = support.find_unused_port()
+ try:
+ self.client.connect(self.server.host, self.server.port,
+ source_address=(HOST, port))
+ self.assertEqual(self.client.sock.getsockname()[1], port)
+ self.client.quit()
+ except IOError as e:
+ if e.errno == errno.EADDRINUSE:
+ self.skipTest("couldn't bind to port %d" % port)
+ raise
+
+ def test_source_address_passive_connection(self):
+ port = support.find_unused_port()
+ self.client.source_address = (HOST, port)
+ try:
+ with self.client.transfercmd('list') as sock:
+ self.assertEqual(sock.getsockname()[1], port)
+ except IOError as e:
+ if e.errno == errno.EADDRINUSE:
+ self.skipTest("couldn't bind to port %d" % port)
+ raise
+
def test_parse257(self):
self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
@@ -632,7 +753,7 @@ class TestFTPClass(TestCase):
class TestIPv6Environment(TestCase):
def setUp(self):
- self.server = DummyFTPServer((HOST, 0), af=socket.AF_INET6)
+ self.server = DummyFTPServer(('::1', 0), af=socket.AF_INET6)
self.server.start()
self.client = ftplib.FTP()
self.client.connect(self.server.host, self.server.port)
@@ -661,6 +782,7 @@ class TestIPv6Environment(TestCase):
received.append(data.decode('ascii'))
received = []
self.client.retrbinary('retr', callback)
+ self.assertEqual(len(''.join(received)), len(RETR_DATA))
self.assertEqual(''.join(received), RETR_DATA)
self.client.set_pasv(True)
retr()
@@ -676,7 +798,7 @@ class TestTLS_FTPClassMixin(TestFTPClass):
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, 0))
self.server.start()
- self.client = ftplib.FTP_TLS(timeout=10)
+ self.client = ftplib.FTP_TLS(timeout=2)
self.client.connect(self.server.host, self.server.port)
# enable TLS
self.client.auth()
@@ -689,7 +811,7 @@ class TestTLS_FTPClass(TestCase):
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, 0))
self.server.start()
- self.client = ftplib.FTP_TLS(timeout=10)
+ self.client = ftplib.FTP_TLS(timeout=2)
self.client.connect(self.server.host, self.server.port)
def tearDown(self):
@@ -749,7 +871,7 @@ class TestTLS_FTPClass(TestCase):
self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
keyfile=CERTFILE, context=ctx)
- self.client = ftplib.FTP_TLS(context=ctx, timeout=10)
+ self.client = ftplib.FTP_TLS(context=ctx, timeout=2)
self.client.connect(self.server.host, self.server.port)
self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
self.client.auth()
@@ -761,6 +883,13 @@ class TestTLS_FTPClass(TestCase):
self.assertIs(sock.context, ctx)
self.assertIsInstance(sock, ssl.SSLSocket)
+ def test_ccc(self):
+ self.assertRaises(ValueError, self.client.ccc)
+ self.client.login(secure=True)
+ self.assertIsInstance(self.client.sock, ssl.SSLSocket)
+ self.client.ccc()
+ self.assertRaises(ValueError, self.client.sock.unwrap)
+
class TestTimeouts(TestCase):
@@ -857,13 +986,8 @@ class TestTimeouts(TestCase):
def test_main():
tests = [TestFTPClass, TestTimeouts]
- if socket.has_ipv6:
- try:
- DummyFTPServer((HOST, 0), af=socket.AF_INET6)
- except socket.error:
- pass
- else:
- tests.append(TestIPv6Environment)
+ if support.IPV6_ENABLED:
+ tests.append(TestIPv6Environment)
if ssl is not None:
tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass])