summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_ftplib.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_ftplib.py')
-rw-r--r--Lib/test/test_ftplib.py260
1 files changed, 207 insertions, 53 deletions
diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py
index 71bc23e..6c95c49 100644
--- a/Lib/test/test_ftplib.py
+++ b/Lib/test/test_ftplib.py
@@ -16,16 +16,31 @@ try:
except ImportError:
ssl = None
-from unittest import TestCase
+from unittest import TestCase, skipUnless
from test import support
-from test.support import HOST
+from test.support import HOST, HOSTv6
threading = support.import_module('threading')
# the dummy data returned by server over the data channel when
-# RETR, LIST and NLST commands are issued
+# RETR, LIST, NLST, MLSD commands are issued
RETR_DATA = 'abcde12345\r\n' * 1000
LIST_DATA = 'foo\r\nbar\r\n'
NLST_DATA = 'foo\r\nbar\r\n'
+MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n"
+ "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n"
+ "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n"
+ "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n"
+ "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n"
+ "type=file;perm=awr;unique==keVO1+8G4; writable\r\n"
+ "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n"
+ "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n"
+ "type=file;perm=r;unique==keVO1+EG4; two words\r\n"
+ "type=file;perm=r;unique==keVO1+IH4; leading space\r\n"
+ "type=file;perm=r;unique==keVO1+1G4; file1\r\n"
+ "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n"
+ "type=file;perm=r;unique==keVO1+1G4; file2\r\n"
+ "type=file;perm=r;unique==keVO1+1G4; file3\r\n"
+ "type=file;perm=r;unique==keVO1+1G4; file4\r\n")
class DummyDTPHandler(asynchat.async_chat):
@@ -49,6 +64,11 @@ class DummyDTPHandler(asynchat.async_chat):
self.dtp_conn_closed = True
def push(self, what):
+ if self.baseclass.next_data is not None:
+ what = self.baseclass.next_data
+ self.baseclass.next_data = None
+ if not what:
+ return self.close_when_done()
super(DummyDTPHandler, self).push(what.encode('ascii'))
def handle_error(self):
@@ -69,7 +89,9 @@ class DummyFTPHandler(asynchat.async_chat):
self.last_received_cmd = None
self.last_received_data = ''
self.next_response = ''
+ self.next_data = None
self.rest = None
+ self.next_retr_data = RETR_DATA
self.push('220 welcome')
def collect_incoming_data(self, data):
@@ -104,7 +126,7 @@ class DummyFTPHandler(asynchat.async_chat):
addr = list(map(int, arg.split(',')))
ip = '%d.%d.%d.%d' %tuple(addr[:4])
port = (addr[4] * 256) + addr[5]
- s = socket.create_connection((ip, port), timeout=10)
+ s = socket.create_connection((ip, port), timeout=2)
self.dtp = self.dtp_handler(s, baseclass=self)
self.push('200 active data connection established')
@@ -122,7 +144,7 @@ class DummyFTPHandler(asynchat.async_chat):
def cmd_eprt(self, arg):
af, ip, port = arg.split(arg[0])[1:-1]
port = int(port)
- s = socket.create_connection((ip, port), timeout=10)
+ s = socket.create_connection((ip, port), timeout=2)
self.dtp = self.dtp_handler(s, baseclass=self)
self.push('200 active data connection established')
@@ -199,7 +221,7 @@ class DummyFTPHandler(asynchat.async_chat):
offset = int(self.rest)
else:
offset = 0
- self.dtp.push(RETR_DATA[offset:])
+ self.dtp.push(self.next_retr_data[offset:])
self.dtp.close_when_done()
self.rest = None
@@ -213,6 +235,19 @@ class DummyFTPHandler(asynchat.async_chat):
self.dtp.push(NLST_DATA)
self.dtp.close_when_done()
+ def cmd_opts(self, arg):
+ self.push('200 opts ok')
+
+ def cmd_mlsd(self, arg):
+ self.push('125 mlsd ok')
+ self.dtp.push(MLSD_DATA)
+ self.dtp.close_when_done()
+
+ def cmd_setlongretr(self, arg):
+ # For testing. Next RETR will return long line.
+ self.next_retr_data = 'x' * int(arg)
+ self.push('125 setlongretr ok')
+
class DummyFTPServer(asyncore.dispatcher, threading.Thread):
@@ -274,11 +309,11 @@ if ssl is not None:
_ssl_closing = False
def secure_connection(self):
- self.del_channel()
socket = ssl.wrap_socket(self.socket, suppress_ragged_eofs=False,
certfile=CERTFILE, server_side=True,
do_handshake_on_connect=False,
ssl_version=ssl.PROTOCOL_SSLv23)
+ self.del_channel()
self.set_socket(socket)
self._ssl_accepting = True
@@ -313,7 +348,10 @@ if ssl is not None:
# http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
pass
self._ssl_closing = False
- super(SSLConnection, self).close()
+ if getattr(self, '_ccc', False) is False:
+ super(SSLConnection, self).close()
+ else:
+ pass
def handle_read_event(self):
if self._ssl_accepting:
@@ -381,12 +419,18 @@ if ssl is not None:
def __init__(self, conn):
DummyFTPHandler.__init__(self, conn)
self.secure_data_channel = False
+ self._ccc = False
def cmd_auth(self, line):
"""Set up secure control channel."""
self.push('234 AUTH TLS successful')
self.secure_connection()
+ def cmd_ccc(self, line):
+ self.push('220 Reverting back to clear-text')
+ self._ccc = True
+ self._do_ssl_shutdown()
+
def cmd_pbsz(self, line):
"""Negotiate size of buffer for secure data transfer.
For TLS/SSL the only valid value for the parameter is '0'.
@@ -416,13 +460,17 @@ class TestFTPClass(TestCase):
def setUp(self):
self.server = DummyFTPServer((HOST, 0))
self.server.start()
- self.client = ftplib.FTP(timeout=10)
+ self.client = ftplib.FTP(timeout=2)
self.client.connect(self.server.host, self.server.port)
def tearDown(self):
self.client.close()
self.server.stop()
+ def check_data(self, received, expected):
+ self.assertEqual(len(received), len(expected))
+ self.assertEqual(received, expected)
+
def test_getwelcome(self):
self.assertEqual(self.client.getwelcome(), '220 welcome')
@@ -487,6 +535,10 @@ class TestFTPClass(TestCase):
def test_rmd(self):
self.client.rmd('foo')
+ def test_cwd(self):
+ dir = self.client.cwd('/foo')
+ self.assertEqual(dir, '250 cwd ok')
+
def test_pwd(self):
dir = self.client.pwd()
self.assertEqual(dir, 'pwd ok')
@@ -504,7 +556,7 @@ class TestFTPClass(TestCase):
received.append(data.decode('ascii'))
received = []
self.client.retrbinary('retr', callback)
- self.assertEqual(''.join(received), RETR_DATA)
+ self.check_data(''.join(received), RETR_DATA)
def test_retrbinary_rest(self):
def callback(data):
@@ -512,20 +564,17 @@ class TestFTPClass(TestCase):
for rest in (0, 10, 20):
received = []
self.client.retrbinary('retr', callback, rest=rest)
- self.assertEqual(''.join(received), RETR_DATA[rest:],
- msg='rest test case %d %d %d' % (rest,
- len(''.join(received)),
- len(RETR_DATA[rest:])))
+ self.check_data(''.join(received), RETR_DATA[rest:])
def test_retrlines(self):
received = []
self.client.retrlines('retr', received.append)
- self.assertEqual(''.join(received), RETR_DATA.replace('\r\n', ''))
+ self.check_data(''.join(received), RETR_DATA.replace('\r\n', ''))
def test_storbinary(self):
f = io.BytesIO(RETR_DATA.encode('ascii'))
self.client.storbinary('stor', f)
- self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA)
+ self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
# test new callback arg
flag = []
f.seek(0)
@@ -542,13 +591,18 @@ class TestFTPClass(TestCase):
def test_storlines(self):
f = io.BytesIO(RETR_DATA.replace('\r\n', '\n').encode('ascii'))
self.client.storlines('stor', f)
- self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA)
+ self.check_data(self.server.handler_instance.last_received_data, RETR_DATA)
# test new callback arg
flag = []
f.seek(0)
self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
self.assertTrue(flag)
+ f = io.StringIO(RETR_DATA.replace('\r\n', '\n'))
+ # storlines() expects a binary file, not a text file
+ with support.check_warnings(('', BytesWarning), quiet=True):
+ self.assertRaises(TypeError, self.client.storlines, 'stor foo', f)
+
def test_nlst(self):
self.client.nlst()
self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
@@ -558,6 +612,64 @@ class TestFTPClass(TestCase):
self.client.dir(lambda x: l.append(x))
self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
+ def test_mlsd(self):
+ list(self.client.mlsd())
+ list(self.client.mlsd(path='/'))
+ list(self.client.mlsd(path='/', facts=['size', 'type']))
+
+ ls = list(self.client.mlsd())
+ for name, facts in ls:
+ self.assertIsInstance(name, str)
+ self.assertIsInstance(facts, dict)
+ self.assertTrue(name)
+ self.assertIn('type', facts)
+ self.assertIn('perm', facts)
+ self.assertIn('unique', facts)
+
+ def set_data(data):
+ self.server.handler_instance.next_data = data
+
+ def test_entry(line, type=None, perm=None, unique=None, name=None):
+ type = 'type' if type is None else type
+ perm = 'perm' if perm is None else perm
+ unique = 'unique' if unique is None else unique
+ name = 'name' if name is None else name
+ set_data(line)
+ _name, facts = next(self.client.mlsd())
+ self.assertEqual(_name, name)
+ self.assertEqual(facts['type'], type)
+ self.assertEqual(facts['perm'], perm)
+ self.assertEqual(facts['unique'], unique)
+
+ # plain
+ test_entry('type=type;perm=perm;unique=unique; name\r\n')
+ # "=" in fact value
+ test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe")
+ test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type")
+ test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe")
+ test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====")
+ # spaces in name
+ test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me")
+ test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ")
+ test_entry('type=type;perm=perm;unique=unique; name\r\n', name=" name")
+ test_entry('type=type;perm=perm;unique=unique; n am e\r\n', name="n am e")
+ # ";" in name
+ test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me")
+ test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name")
+ test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;")
+ test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;")
+ # case sensitiveness
+ set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n')
+ _name, facts = next(self.client.mlsd())
+ for x in facts:
+ self.assertTrue(x.islower())
+ # no data (directory empty)
+ set_data('')
+ self.assertRaises(StopIteration, next, self.client.mlsd())
+ set_data('')
+ for x in self.client.mlsd():
+ self.fail("unexpected data %s" % data)
+
def test_makeport(self):
with self.client.makeport():
# IPv4 is in use, just make sure send_eprt has not been used
@@ -584,7 +696,7 @@ class TestFTPClass(TestCase):
return True
# base test
- with ftplib.FTP(timeout=10) as self.client:
+ with ftplib.FTP(timeout=2) as self.client:
self.client.connect(self.server.host, self.server.port)
self.client.sendcmd('noop')
self.assertTrue(is_client_connected())
@@ -592,7 +704,7 @@ class TestFTPClass(TestCase):
self.assertFalse(is_client_connected())
# QUIT sent inside the with block
- with ftplib.FTP(timeout=10) as self.client:
+ with ftplib.FTP(timeout=2) as self.client:
self.client.connect(self.server.host, self.server.port)
self.client.sendcmd('noop')
self.client.quit()
@@ -602,7 +714,7 @@ class TestFTPClass(TestCase):
# force a wrong response code to be sent on QUIT: error_perm
# is expected and the connection is supposed to be closed
try:
- with ftplib.FTP(timeout=10) as self.client:
+ with ftplib.FTP(timeout=2) as self.client:
self.client.connect(self.server.host, self.server.port)
self.client.sendcmd('noop')
self.server.handler_instance.next_response = '550 error on quit'
@@ -616,6 +728,30 @@ class TestFTPClass(TestCase):
self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit')
self.assertFalse(is_client_connected())
+ def test_source_address(self):
+ self.client.quit()
+ port = support.find_unused_port()
+ try:
+ self.client.connect(self.server.host, self.server.port,
+ source_address=(HOST, port))
+ self.assertEqual(self.client.sock.getsockname()[1], port)
+ self.client.quit()
+ except IOError as e:
+ if e.errno == errno.EADDRINUSE:
+ self.skipTest("couldn't bind to port %d" % port)
+ raise
+
+ def test_source_address_passive_connection(self):
+ port = support.find_unused_port()
+ self.client.source_address = (HOST, port)
+ try:
+ with self.client.transfercmd('list') as sock:
+ self.assertEqual(sock.getsockname()[1], port)
+ except IOError as e:
+ if e.errno == errno.EADDRINUSE:
+ self.skipTest("couldn't bind to port %d" % port)
+ raise
+
def test_parse257(self):
self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar')
self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar')
@@ -628,11 +764,26 @@ class TestFTPClass(TestCase):
self.assertEqual(ftplib.parse257('257 "/foo/b""ar"'), '/foo/b"ar')
self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar')
+ def test_line_too_long(self):
+ self.assertRaises(ftplib.Error, self.client.sendcmd,
+ 'x' * self.client.maxline * 2)
+
+ def test_retrlines_too_long(self):
+ self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
+ received = []
+ self.assertRaises(ftplib.Error,
+ self.client.retrlines, 'retr', received.append)
+
+ def test_storlines_too_long(self):
+ f = io.BytesIO(b'x' * self.client.maxline * 2)
+ self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
+
+@skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
class TestIPv6Environment(TestCase):
def setUp(self):
- self.server = DummyFTPServer((HOST, 0), af=socket.AF_INET6)
+ self.server = DummyFTPServer((HOSTv6, 0), af=socket.AF_INET6)
self.server.start()
self.client = ftplib.FTP()
self.client.connect(self.server.host, self.server.port)
@@ -661,6 +812,7 @@ class TestIPv6Environment(TestCase):
received.append(data.decode('ascii'))
received = []
self.client.retrbinary('retr', callback)
+ self.assertEqual(len(''.join(received)), len(RETR_DATA))
self.assertEqual(''.join(received), RETR_DATA)
self.client.set_pasv(True)
retr()
@@ -668,6 +820,7 @@ class TestIPv6Environment(TestCase):
retr()
+@skipUnless(ssl, "SSL not available")
class TestTLS_FTPClassMixin(TestFTPClass):
"""Repeat TestFTPClass tests starting the TLS layer for both control
and data connections first.
@@ -676,20 +829,21 @@ class TestTLS_FTPClassMixin(TestFTPClass):
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, 0))
self.server.start()
- self.client = ftplib.FTP_TLS(timeout=10)
+ self.client = ftplib.FTP_TLS(timeout=2)
self.client.connect(self.server.host, self.server.port)
# enable TLS
self.client.auth()
self.client.prot_p()
+@skipUnless(ssl, "SSL not available")
class TestTLS_FTPClass(TestCase):
"""Specific TLS_FTP class tests."""
def setUp(self):
self.server = DummyTLS_FTPServer((HOST, 0))
self.server.start()
- self.client = ftplib.FTP_TLS(timeout=10)
+ self.client = ftplib.FTP_TLS(timeout=2)
self.client.connect(self.server.host, self.server.port)
def tearDown(self):
@@ -749,7 +903,7 @@ class TestTLS_FTPClass(TestCase):
self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
keyfile=CERTFILE, context=ctx)
- self.client = ftplib.FTP_TLS(context=ctx, timeout=10)
+ self.client = ftplib.FTP_TLS(context=ctx, timeout=2)
self.client.connect(self.server.host, self.server.port)
self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
self.client.auth()
@@ -761,52 +915,60 @@ class TestTLS_FTPClass(TestCase):
self.assertIs(sock.context, ctx)
self.assertIsInstance(sock, ssl.SSLSocket)
+ def test_ccc(self):
+ self.assertRaises(ValueError, self.client.ccc)
+ self.client.login(secure=True)
+ self.assertIsInstance(self.client.sock, ssl.SSLSocket)
+ self.client.ccc()
+ self.assertRaises(ValueError, self.client.sock.unwrap)
+
class TestTimeouts(TestCase):
def setUp(self):
self.evt = threading.Event()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.settimeout(10)
+ self.sock.settimeout(20)
self.port = support.bind_port(self.sock)
- threading.Thread(target=self.server, args=(self.evt,self.sock)).start()
+ self.server_thread = threading.Thread(target=self.server)
+ self.server_thread.start()
# Wait for the server to be ready.
self.evt.wait()
self.evt.clear()
+ self.old_port = ftplib.FTP.port
ftplib.FTP.port = self.port
def tearDown(self):
- self.evt.wait()
- self.sock.close()
+ ftplib.FTP.port = self.old_port
+ self.server_thread.join()
- def server(self, evt, serv):
+ def server(self):
# This method sets the evt 3 times:
# 1) when the connection is ready to be accepted.
# 2) when it is safe for the caller to close the connection
# 3) when we have closed the socket
- serv.listen(5)
+ self.sock.listen(5)
# (1) Signal the caller that we are ready to accept the connection.
- evt.set()
+ self.evt.set()
try:
- conn, addr = serv.accept()
+ conn, addr = self.sock.accept()
except socket.timeout:
pass
else:
- conn.send(b"1 Hola mundo\n")
+ conn.sendall(b"1 Hola mundo\n")
+ conn.shutdown(socket.SHUT_WR)
# (2) Signal the caller that it is safe to close the socket.
- evt.set()
+ self.evt.set()
conn.close()
finally:
- serv.close()
- # (3) Signal the caller that we are done.
- evt.set()
+ self.sock.close()
def testTimeoutDefault(self):
# default -- use global socket timeout
- self.assertTrue(socket.getdefaulttimeout() is None)
+ self.assertIsNone(socket.getdefaulttimeout())
socket.setdefaulttimeout(30)
try:
- ftp = ftplib.FTP("localhost")
+ ftp = ftplib.FTP(HOST)
finally:
socket.setdefaulttimeout(None)
self.assertEqual(ftp.sock.gettimeout(), 30)
@@ -815,13 +977,13 @@ class TestTimeouts(TestCase):
def testTimeoutNone(self):
# no timeout -- do not use global socket timeout
- self.assertTrue(socket.getdefaulttimeout() is None)
+ self.assertIsNone(socket.getdefaulttimeout())
socket.setdefaulttimeout(30)
try:
- ftp = ftplib.FTP("localhost", timeout=None)
+ ftp = ftplib.FTP(HOST, timeout=None)
finally:
socket.setdefaulttimeout(None)
- self.assertTrue(ftp.sock.gettimeout() is None)
+ self.assertIsNone(ftp.sock.gettimeout())
self.evt.wait()
ftp.close()
@@ -856,17 +1018,9 @@ class TestTimeouts(TestCase):
def test_main():
- tests = [TestFTPClass, TestTimeouts]
- if socket.has_ipv6:
- try:
- DummyFTPServer((HOST, 0), af=socket.AF_INET6)
- except socket.error:
- pass
- else:
- tests.append(TestIPv6Environment)
-
- if ssl is not None:
- tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass])
+ tests = [TestFTPClass, TestTimeouts,
+ TestIPv6Environment,
+ TestTLS_FTPClassMixin, TestTLS_FTPClass]
thread_info = support.threading_setup()
try: