diff options
Diffstat (limited to 'Lib/test/test_ftplib.py')
-rw-r--r-- | Lib/test/test_ftplib.py | 180 |
1 files changed, 152 insertions, 28 deletions
diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py index 71bc23e..299a146 100644 --- a/Lib/test/test_ftplib.py +++ b/Lib/test/test_ftplib.py @@ -22,10 +22,25 @@ from test.support import HOST threading = support.import_module('threading') # the dummy data returned by server over the data channel when -# RETR, LIST and NLST commands are issued +# RETR, LIST, NLST, MLSD commands are issued RETR_DATA = 'abcde12345\r\n' * 1000 LIST_DATA = 'foo\r\nbar\r\n' NLST_DATA = 'foo\r\nbar\r\n' +MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n" + "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n" + "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n" + "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n" + "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n" + "type=file;perm=awr;unique==keVO1+8G4; writable\r\n" + "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n" + "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n" + "type=file;perm=r;unique==keVO1+EG4; two words\r\n" + "type=file;perm=r;unique==keVO1+IH4; leading space\r\n" + "type=file;perm=r;unique==keVO1+1G4; file1\r\n" + "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n" + "type=file;perm=r;unique==keVO1+1G4; file2\r\n" + "type=file;perm=r;unique==keVO1+1G4; file3\r\n" + "type=file;perm=r;unique==keVO1+1G4; file4\r\n") class DummyDTPHandler(asynchat.async_chat): @@ -49,6 +64,11 @@ class DummyDTPHandler(asynchat.async_chat): self.dtp_conn_closed = True def push(self, what): + if self.baseclass.next_data is not None: + what = self.baseclass.next_data + self.baseclass.next_data = None + if not what: + return self.close_when_done() super(DummyDTPHandler, self).push(what.encode('ascii')) def handle_error(self): @@ -69,6 +89,7 @@ class DummyFTPHandler(asynchat.async_chat): self.last_received_cmd = None self.last_received_data = '' self.next_response = '' + self.next_data = None self.rest = None self.push('220 welcome') @@ -104,7 +125,7 @@ class DummyFTPHandler(asynchat.async_chat): addr = list(map(int, arg.split(','))) ip = '%d.%d.%d.%d' %tuple(addr[:4]) port = (addr[4] * 256) + addr[5] - s = socket.create_connection((ip, port), timeout=10) + s = socket.create_connection((ip, port), timeout=2) self.dtp = self.dtp_handler(s, baseclass=self) self.push('200 active data connection established') @@ -122,7 +143,7 @@ class DummyFTPHandler(asynchat.async_chat): def cmd_eprt(self, arg): af, ip, port = arg.split(arg[0])[1:-1] port = int(port) - s = socket.create_connection((ip, port), timeout=10) + s = socket.create_connection((ip, port), timeout=2) self.dtp = self.dtp_handler(s, baseclass=self) self.push('200 active data connection established') @@ -213,6 +234,14 @@ class DummyFTPHandler(asynchat.async_chat): self.dtp.push(NLST_DATA) self.dtp.close_when_done() + def cmd_opts(self, arg): + self.push('200 opts ok') + + def cmd_mlsd(self, arg): + self.push('125 mlsd ok') + self.dtp.push(MLSD_DATA) + self.dtp.close_when_done() + class DummyFTPServer(asyncore.dispatcher, threading.Thread): @@ -274,11 +303,11 @@ if ssl is not None: _ssl_closing = False def secure_connection(self): - self.del_channel() socket = ssl.wrap_socket(self.socket, suppress_ragged_eofs=False, certfile=CERTFILE, server_side=True, do_handshake_on_connect=False, ssl_version=ssl.PROTOCOL_SSLv23) + self.del_channel() self.set_socket(socket) self._ssl_accepting = True @@ -313,7 +342,10 @@ if ssl is not None: # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html pass self._ssl_closing = False - super(SSLConnection, self).close() + if getattr(self, '_ccc', False) is False: + super(SSLConnection, self).close() + else: + pass def handle_read_event(self): if self._ssl_accepting: @@ -381,12 +413,18 @@ if ssl is not None: def __init__(self, conn): DummyFTPHandler.__init__(self, conn) self.secure_data_channel = False + self._ccc = False def cmd_auth(self, line): """Set up secure control channel.""" self.push('234 AUTH TLS successful') self.secure_connection() + def cmd_ccc(self, line): + self.push('220 Reverting back to clear-text') + self._ccc = True + self._do_ssl_shutdown() + def cmd_pbsz(self, line): """Negotiate size of buffer for secure data transfer. For TLS/SSL the only valid value for the parameter is '0'. @@ -416,13 +454,17 @@ class TestFTPClass(TestCase): def setUp(self): self.server = DummyFTPServer((HOST, 0)) self.server.start() - self.client = ftplib.FTP(timeout=10) + self.client = ftplib.FTP(timeout=2) self.client.connect(self.server.host, self.server.port) def tearDown(self): self.client.close() self.server.stop() + def check_data(self, received, expected): + self.assertEqual(len(received), len(expected)) + self.assertEqual(received, expected) + def test_getwelcome(self): self.assertEqual(self.client.getwelcome(), '220 welcome') @@ -504,7 +546,7 @@ class TestFTPClass(TestCase): received.append(data.decode('ascii')) received = [] self.client.retrbinary('retr', callback) - self.assertEqual(''.join(received), RETR_DATA) + self.check_data(''.join(received), RETR_DATA) def test_retrbinary_rest(self): def callback(data): @@ -512,20 +554,17 @@ class TestFTPClass(TestCase): for rest in (0, 10, 20): received = [] self.client.retrbinary('retr', callback, rest=rest) - self.assertEqual(''.join(received), RETR_DATA[rest:], - msg='rest test case %d %d %d' % (rest, - len(''.join(received)), - len(RETR_DATA[rest:]))) + self.check_data(''.join(received), RETR_DATA[rest:]) def test_retrlines(self): received = [] self.client.retrlines('retr', received.append) - self.assertEqual(''.join(received), RETR_DATA.replace('\r\n', '')) + self.check_data(''.join(received), RETR_DATA.replace('\r\n', '')) def test_storbinary(self): f = io.BytesIO(RETR_DATA.encode('ascii')) self.client.storbinary('stor', f) - self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA) + self.check_data(self.server.handler_instance.last_received_data, RETR_DATA) # test new callback arg flag = [] f.seek(0) @@ -542,7 +581,7 @@ class TestFTPClass(TestCase): def test_storlines(self): f = io.BytesIO(RETR_DATA.replace('\r\n', '\n').encode('ascii')) self.client.storlines('stor', f) - self.assertEqual(self.server.handler_instance.last_received_data, RETR_DATA) + self.check_data(self.server.handler_instance.last_received_data, RETR_DATA) # test new callback arg flag = [] f.seek(0) @@ -558,6 +597,64 @@ class TestFTPClass(TestCase): self.client.dir(lambda x: l.append(x)) self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', '')) + def test_mlsd(self): + list(self.client.mlsd()) + list(self.client.mlsd(path='/')) + list(self.client.mlsd(path='/', facts=['size', 'type'])) + + ls = list(self.client.mlsd()) + for name, facts in ls: + self.assertIsInstance(name, str) + self.assertIsInstance(facts, dict) + self.assertTrue(name) + self.assertIn('type', facts) + self.assertIn('perm', facts) + self.assertIn('unique', facts) + + def set_data(data): + self.server.handler_instance.next_data = data + + def test_entry(line, type=None, perm=None, unique=None, name=None): + type = 'type' if type is None else type + perm = 'perm' if perm is None else perm + unique = 'unique' if unique is None else unique + name = 'name' if name is None else name + set_data(line) + _name, facts = next(self.client.mlsd()) + self.assertEqual(_name, name) + self.assertEqual(facts['type'], type) + self.assertEqual(facts['perm'], perm) + self.assertEqual(facts['unique'], unique) + + # plain + test_entry('type=type;perm=perm;unique=unique; name\r\n') + # "=" in fact value + test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe") + test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type") + test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe") + test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====") + # spaces in name + test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me") + test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ") + test_entry('type=type;perm=perm;unique=unique; name\r\n', name=" name") + test_entry('type=type;perm=perm;unique=unique; n am e\r\n', name="n am e") + # ";" in name + test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me") + test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name") + test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;") + test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;") + # case sensitiveness + set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n') + _name, facts = next(self.client.mlsd()) + for x in facts: + self.assertTrue(x.islower()) + # no data (directory empty) + set_data('') + self.assertRaises(StopIteration, next, self.client.mlsd()) + set_data('') + for x in self.client.mlsd(): + self.fail("unexpected data %s" % data) + def test_makeport(self): with self.client.makeport(): # IPv4 is in use, just make sure send_eprt has not been used @@ -584,7 +681,7 @@ class TestFTPClass(TestCase): return True # base test - with ftplib.FTP(timeout=10) as self.client: + with ftplib.FTP(timeout=2) as self.client: self.client.connect(self.server.host, self.server.port) self.client.sendcmd('noop') self.assertTrue(is_client_connected()) @@ -592,7 +689,7 @@ class TestFTPClass(TestCase): self.assertFalse(is_client_connected()) # QUIT sent inside the with block - with ftplib.FTP(timeout=10) as self.client: + with ftplib.FTP(timeout=2) as self.client: self.client.connect(self.server.host, self.server.port) self.client.sendcmd('noop') self.client.quit() @@ -602,7 +699,7 @@ class TestFTPClass(TestCase): # force a wrong response code to be sent on QUIT: error_perm # is expected and the connection is supposed to be closed try: - with ftplib.FTP(timeout=10) as self.client: + with ftplib.FTP(timeout=2) as self.client: self.client.connect(self.server.host, self.server.port) self.client.sendcmd('noop') self.server.handler_instance.next_response = '550 error on quit' @@ -616,6 +713,30 @@ class TestFTPClass(TestCase): self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit') self.assertFalse(is_client_connected()) + def test_source_address(self): + self.client.quit() + port = support.find_unused_port() + try: + self.client.connect(self.server.host, self.server.port, + source_address=(HOST, port)) + self.assertEqual(self.client.sock.getsockname()[1], port) + self.client.quit() + except IOError as e: + if e.errno == errno.EADDRINUSE: + self.skipTest("couldn't bind to port %d" % port) + raise + + def test_source_address_passive_connection(self): + port = support.find_unused_port() + self.client.source_address = (HOST, port) + try: + with self.client.transfercmd('list') as sock: + self.assertEqual(sock.getsockname()[1], port) + except IOError as e: + if e.errno == errno.EADDRINUSE: + self.skipTest("couldn't bind to port %d" % port) + raise + def test_parse257(self): self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar') self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar') @@ -632,7 +753,7 @@ class TestFTPClass(TestCase): class TestIPv6Environment(TestCase): def setUp(self): - self.server = DummyFTPServer((HOST, 0), af=socket.AF_INET6) + self.server = DummyFTPServer(('::1', 0), af=socket.AF_INET6) self.server.start() self.client = ftplib.FTP() self.client.connect(self.server.host, self.server.port) @@ -661,6 +782,7 @@ class TestIPv6Environment(TestCase): received.append(data.decode('ascii')) received = [] self.client.retrbinary('retr', callback) + self.assertEqual(len(''.join(received)), len(RETR_DATA)) self.assertEqual(''.join(received), RETR_DATA) self.client.set_pasv(True) retr() @@ -676,7 +798,7 @@ class TestTLS_FTPClassMixin(TestFTPClass): def setUp(self): self.server = DummyTLS_FTPServer((HOST, 0)) self.server.start() - self.client = ftplib.FTP_TLS(timeout=10) + self.client = ftplib.FTP_TLS(timeout=2) self.client.connect(self.server.host, self.server.port) # enable TLS self.client.auth() @@ -689,7 +811,7 @@ class TestTLS_FTPClass(TestCase): def setUp(self): self.server = DummyTLS_FTPServer((HOST, 0)) self.server.start() - self.client = ftplib.FTP_TLS(timeout=10) + self.client = ftplib.FTP_TLS(timeout=2) self.client.connect(self.server.host, self.server.port) def tearDown(self): @@ -749,7 +871,7 @@ class TestTLS_FTPClass(TestCase): self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, keyfile=CERTFILE, context=ctx) - self.client = ftplib.FTP_TLS(context=ctx, timeout=10) + self.client = ftplib.FTP_TLS(context=ctx, timeout=2) self.client.connect(self.server.host, self.server.port) self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) self.client.auth() @@ -761,6 +883,13 @@ class TestTLS_FTPClass(TestCase): self.assertIs(sock.context, ctx) self.assertIsInstance(sock, ssl.SSLSocket) + def test_ccc(self): + self.assertRaises(ValueError, self.client.ccc) + self.client.login(secure=True) + self.assertIsInstance(self.client.sock, ssl.SSLSocket) + self.client.ccc() + self.assertRaises(ValueError, self.client.sock.unwrap) + class TestTimeouts(TestCase): @@ -857,13 +986,8 @@ class TestTimeouts(TestCase): def test_main(): tests = [TestFTPClass, TestTimeouts] - if socket.has_ipv6: - try: - DummyFTPServer((HOST, 0), af=socket.AF_INET6) - except socket.error: - pass - else: - tests.append(TestIPv6Environment) + if support.IPV6_ENABLED: + tests.append(TestIPv6Environment) if ssl is not None: tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass]) |