diff options
Diffstat (limited to 'Lib/test/test_ftplib.py')
-rw-r--r-- | Lib/test/test_ftplib.py | 142 |
1 files changed, 124 insertions, 18 deletions
diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py index 5b69b0a..78e0fb4 100644 --- a/Lib/test/test_ftplib.py +++ b/Lib/test/test_ftplib.py @@ -22,10 +22,25 @@ from test.support import HOST threading = support.import_module('threading') # the dummy data returned by server over the data channel when -# RETR, LIST and NLST commands are issued +# RETR, LIST, NLST, MLSD commands are issued RETR_DATA = 'abcde12345\r\n' * 1000 LIST_DATA = 'foo\r\nbar\r\n' NLST_DATA = 'foo\r\nbar\r\n' +MLSD_DATA = ("type=cdir;perm=el;unique==keVO1+ZF4; test\r\n" + "type=pdir;perm=e;unique==keVO1+d?3; ..\r\n" + "type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar\r\n" + "type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device\r\n" + "type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block\r\n" + "type=file;perm=awr;unique==keVO1+8G4; writable\r\n" + "type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous\r\n" + "type=dir;perm=;unique==keVO1+1t2; no-exec\r\n" + "type=file;perm=r;unique==keVO1+EG4; two words\r\n" + "type=file;perm=r;unique==keVO1+IH4; leading space\r\n" + "type=file;perm=r;unique==keVO1+1G4; file1\r\n" + "type=dir;perm=cpmel;unique==keVO1+7G4; incoming\r\n" + "type=file;perm=r;unique==keVO1+1G4; file2\r\n" + "type=file;perm=r;unique==keVO1+1G4; file3\r\n" + "type=file;perm=r;unique==keVO1+1G4; file4\r\n") class DummyDTPHandler(asynchat.async_chat): @@ -49,6 +64,11 @@ class DummyDTPHandler(asynchat.async_chat): self.dtp_conn_closed = True def push(self, what): + if self.baseclass.next_data is not None: + what = self.baseclass.next_data + self.baseclass.next_data = None + if not what: + return self.close_when_done() super(DummyDTPHandler, self).push(what.encode('ascii')) def handle_error(self): @@ -69,6 +89,7 @@ class DummyFTPHandler(asynchat.async_chat): self.last_received_cmd = None self.last_received_data = '' self.next_response = '' + self.next_data = None self.rest = None self.push('220 welcome') @@ -104,7 +125,7 @@ class DummyFTPHandler(asynchat.async_chat): addr = list(map(int, arg.split(','))) ip = '%d.%d.%d.%d' %tuple(addr[:4]) port = (addr[4] * 256) + addr[5] - s = socket.create_connection((ip, port), timeout=10) + s = socket.create_connection((ip, port), timeout=2) self.dtp = self.dtp_handler(s, baseclass=self) self.push('200 active data connection established') @@ -122,7 +143,7 @@ class DummyFTPHandler(asynchat.async_chat): def cmd_eprt(self, arg): af, ip, port = arg.split(arg[0])[1:-1] port = int(port) - s = socket.create_connection((ip, port), timeout=10) + s = socket.create_connection((ip, port), timeout=2) self.dtp = self.dtp_handler(s, baseclass=self) self.push('200 active data connection established') @@ -213,6 +234,14 @@ class DummyFTPHandler(asynchat.async_chat): self.dtp.push(NLST_DATA) self.dtp.close_when_done() + def cmd_opts(self, arg): + self.push('200 opts ok') + + def cmd_mlsd(self, arg): + self.push('125 mlsd ok') + self.dtp.push(MLSD_DATA) + self.dtp.close_when_done() + class DummyFTPServer(asyncore.dispatcher, threading.Thread): @@ -416,7 +445,7 @@ class TestFTPClass(TestCase): def setUp(self): self.server = DummyFTPServer((HOST, 0)) self.server.start() - self.client = ftplib.FTP(timeout=10) + self.client = ftplib.FTP(timeout=2) self.client.connect(self.server.host, self.server.port) def tearDown(self): @@ -558,6 +587,64 @@ class TestFTPClass(TestCase): self.client.dir(lambda x: l.append(x)) self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', '')) + def test_mlsd(self): + list(self.client.mlsd()) + list(self.client.mlsd(path='/')) + list(self.client.mlsd(path='/', facts=['size', 'type'])) + + ls = list(self.client.mlsd()) + for name, facts in ls: + self.assertIsInstance(name, str) + self.assertIsInstance(facts, dict) + self.assertTrue(name) + self.assertIn('type', facts) + self.assertIn('perm', facts) + self.assertIn('unique', facts) + + def set_data(data): + self.server.handler_instance.next_data = data + + def test_entry(line, type=None, perm=None, unique=None, name=None): + type = 'type' if type is None else type + perm = 'perm' if perm is None else perm + unique = 'unique' if unique is None else unique + name = 'name' if name is None else name + set_data(line) + _name, facts = next(self.client.mlsd()) + self.assertEqual(_name, name) + self.assertEqual(facts['type'], type) + self.assertEqual(facts['perm'], perm) + self.assertEqual(facts['unique'], unique) + + # plain + test_entry('type=type;perm=perm;unique=unique; name\r\n') + # "=" in fact value + test_entry('type=ty=pe;perm=perm;unique=unique; name\r\n', type="ty=pe") + test_entry('type==type;perm=perm;unique=unique; name\r\n', type="=type") + test_entry('type=t=y=pe;perm=perm;unique=unique; name\r\n', type="t=y=pe") + test_entry('type=====;perm=perm;unique=unique; name\r\n', type="====") + # spaces in name + test_entry('type=type;perm=perm;unique=unique; na me\r\n', name="na me") + test_entry('type=type;perm=perm;unique=unique; name \r\n', name="name ") + test_entry('type=type;perm=perm;unique=unique; name\r\n', name=" name") + test_entry('type=type;perm=perm;unique=unique; n am e\r\n', name="n am e") + # ";" in name + test_entry('type=type;perm=perm;unique=unique; na;me\r\n', name="na;me") + test_entry('type=type;perm=perm;unique=unique; ;name\r\n', name=";name") + test_entry('type=type;perm=perm;unique=unique; ;name;\r\n', name=";name;") + test_entry('type=type;perm=perm;unique=unique; ;;;;\r\n', name=";;;;") + # case sensitiveness + set_data('Type=type;TyPe=perm;UNIQUE=unique; name\r\n') + _name, facts = next(self.client.mlsd()) + for x in facts: + self.assertTrue(x.islower()) + # no data (directory empty) + set_data('') + self.assertRaises(StopIteration, next, self.client.mlsd()) + set_data('') + for x in self.client.mlsd(): + self.fail("unexpected data %s" % data) + def test_makeport(self): with self.client.makeport(): # IPv4 is in use, just make sure send_eprt has not been used @@ -584,7 +671,7 @@ class TestFTPClass(TestCase): return True # base test - with ftplib.FTP(timeout=10) as self.client: + with ftplib.FTP(timeout=2) as self.client: self.client.connect(self.server.host, self.server.port) self.client.sendcmd('noop') self.assertTrue(is_client_connected()) @@ -592,7 +679,7 @@ class TestFTPClass(TestCase): self.assertFalse(is_client_connected()) # QUIT sent inside the with block - with ftplib.FTP(timeout=10) as self.client: + with ftplib.FTP(timeout=2) as self.client: self.client.connect(self.server.host, self.server.port) self.client.sendcmd('noop') self.client.quit() @@ -602,7 +689,7 @@ class TestFTPClass(TestCase): # force a wrong response code to be sent on QUIT: error_perm # is expected and the connection is supposed to be closed try: - with ftplib.FTP(timeout=10) as self.client: + with ftplib.FTP(timeout=2) as self.client: self.client.connect(self.server.host, self.server.port) self.client.sendcmd('noop') self.server.handler_instance.next_response = '550 error on quit' @@ -616,6 +703,30 @@ class TestFTPClass(TestCase): self.assertEqual(self.server.handler_instance.last_received_cmd, 'quit') self.assertFalse(is_client_connected()) + def test_source_address(self): + self.client.quit() + port = support.find_unused_port() + try: + self.client.connect(self.server.host, self.server.port, + source_address=(HOST, port)) + self.assertEqual(self.client.sock.getsockname()[1], port) + self.client.quit() + except IOError as e: + if e.errno == errno.EADDRINUSE: + self.skipTest("couldn't bind to port %d" % port) + raise + + def test_source_address_passive_connection(self): + port = support.find_unused_port() + self.client.source_address = (HOST, port) + try: + with self.client.transfercmd('list') as sock: + self.assertEqual(sock.getsockname()[1], port) + except IOError as e: + if e.errno == errno.EADDRINUSE: + self.skipTest("couldn't bind to port %d" % port) + raise + def test_parse257(self): self.assertEqual(ftplib.parse257('257 "/foo/bar"'), '/foo/bar') self.assertEqual(ftplib.parse257('257 "/foo/bar" created'), '/foo/bar') @@ -632,7 +743,7 @@ class TestFTPClass(TestCase): class TestIPv6Environment(TestCase): def setUp(self): - self.server = DummyFTPServer((HOST, 0), af=socket.AF_INET6) + self.server = DummyFTPServer(('::1', 0), af=socket.AF_INET6) self.server.start() self.client = ftplib.FTP() self.client.connect(self.server.host, self.server.port) @@ -676,7 +787,7 @@ class TestTLS_FTPClassMixin(TestFTPClass): def setUp(self): self.server = DummyTLS_FTPServer((HOST, 0)) self.server.start() - self.client = ftplib.FTP_TLS(timeout=10) + self.client = ftplib.FTP_TLS(timeout=2) self.client.connect(self.server.host, self.server.port) # enable TLS self.client.auth() @@ -689,7 +800,7 @@ class TestTLS_FTPClass(TestCase): def setUp(self): self.server = DummyTLS_FTPServer((HOST, 0)) self.server.start() - self.client = ftplib.FTP_TLS(timeout=10) + self.client = ftplib.FTP_TLS(timeout=2) self.client.connect(self.server.host, self.server.port) def tearDown(self): @@ -749,7 +860,7 @@ class TestTLS_FTPClass(TestCase): self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, keyfile=CERTFILE, context=ctx) - self.client = ftplib.FTP_TLS(context=ctx, timeout=10) + self.client = ftplib.FTP_TLS(context=ctx, timeout=2) self.client.connect(self.server.host, self.server.port) self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) self.client.auth() @@ -857,13 +968,8 @@ class TestTimeouts(TestCase): def test_main(): tests = [TestFTPClass, TestTimeouts] - if socket.has_ipv6: - try: - DummyFTPServer((HOST, 0), af=socket.AF_INET6) - except socket.error: - pass - else: - tests.append(TestIPv6Environment) + if support.IPV6_ENABLED: + tests.append(TestIPv6Environment) if ssl is not None: tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass]) |