summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/test/test_asyncore.py77
1 files changed, 38 insertions, 39 deletions
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py
index d05462b..270e9cc 100644
--- a/Lib/test/test_asyncore.py
+++ b/Lib/test/test_asyncore.py
@@ -755,50 +755,49 @@ class BaseTestAPI:
def test_set_reuse_addr(self):
if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
self.skipTest("Not applicable to AF_UNIX sockets.")
- sock = socket.socket(self.family)
- try:
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- except OSError:
- unittest.skip("SO_REUSEADDR not supported on this platform")
- else:
- # if SO_REUSEADDR succeeded for sock we expect asyncore
- # to do the same
- s = asyncore.dispatcher(socket.socket(self.family))
- self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
- socket.SO_REUSEADDR))
- s.socket.close()
- s.create_socket(self.family)
- s.set_reuse_addr()
- self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
- socket.SO_REUSEADDR))
- finally:
- sock.close()
+
+ with socket.socket(self.family) as sock:
+ try:
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ except OSError:
+ unittest.skip("SO_REUSEADDR not supported on this platform")
+ else:
+ # if SO_REUSEADDR succeeded for sock we expect asyncore
+ # to do the same
+ s = asyncore.dispatcher(socket.socket(self.family))
+ self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR))
+ s.socket.close()
+ s.create_socket(self.family)
+ s.set_reuse_addr()
+ self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR))
@unittest.skipUnless(threading, 'Threading required for this test.')
@support.reap_threads
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
- if self.family in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
- server = BaseServer(self.family, self.addr)
- t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
- count=500))
- t.start()
- def cleanup():
- t.join(timeout=TIMEOUT)
- if t.is_alive():
- self.fail("join() timed out")
- self.addCleanup(cleanup)
-
- s = socket.socket(self.family, socket.SOCK_STREAM)
- s.settimeout(.2)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
- struct.pack('ii', 1, 0))
- try:
- s.connect(server.address)
- except OSError:
- pass
- finally:
- s.close()
+ if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
+ self.skipTest("test specific to AF_INET and AF_INET6")
+
+ server = BaseServer(self.family, self.addr)
+ t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
+ count=500), name="ident")
+ t.start()
+ try:
+ with socket.socket(self.family, socket.SOCK_STREAM) as s:
+ s.settimeout(.2)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+ struct.pack('ii', 1, 0))
+
+ try:
+ s.connect(server.address)
+ except OSError:
+ pass
+ finally:
+ t.join(timeout=TIMEOUT)
+ if t.is_alive():
+ self.fail("join() timed out")
class TestAPI_UseIPv4Sockets(BaseTestAPI):
family = socket.AF_INET