summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
authorAndrew Svetlov <andrew.svetlov@gmail.com>2018-02-25 16:32:14 (GMT)
committerGitHub <noreply@github.com>2018-02-25 16:32:14 (GMT)
commita19fb3c6aaa7632410d1d9dcb395d7101d124da4 (patch)
tree476902dc75526cc8bb22c41cf213416384c36805 /Lib/asyncio
parent5fb632e83136399bad9427ee23ec8b771695290a (diff)
downloadcpython-a19fb3c6aaa7632410d1d9dcb395d7101d124da4.zip
cpython-a19fb3c6aaa7632410d1d9dcb395d7101d124da4.tar.gz
cpython-a19fb3c6aaa7632410d1d9dcb395d7101d124da4.tar.bz2
bpo-32622: Native sendfile on windows (#5565)
* Support sendfile on Windows Proactor event loop naively.
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/proactor_events.py70
-rw-r--r--Lib/asyncio/windows_events.py22
2 files changed, 91 insertions, 1 deletions
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index 10ca6f8..b675c82 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -6,11 +6,14 @@ proactor is only implemented on Windows with IOCP.
__all__ = 'BaseProactorEventLoop',
+import io
+import os
import socket
import warnings
from . import base_events
from . import constants
+from . import events
from . import futures
from . import protocols
from . import sslproto
@@ -107,6 +110,11 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self._force_close(exc)
def _force_close(self, exc):
+ if self._empty_waiter is not None:
+ if exc is None:
+ self._empty_waiter.set_result(None)
+ else:
+ self._empty_waiter.set_exception(exc)
if self._closing:
return
self._closing = True
@@ -327,6 +335,10 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
_start_tls_compatible = True
+ def __init__(self, *args, **kw):
+ super().__init__(*args, **kw)
+ self._empty_waiter = None
+
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError(
@@ -334,6 +346,8 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
f"not {type(data).__name__}")
if self._eof_written:
raise RuntimeError('write_eof() already called')
+ if self._empty_waiter is not None:
+ raise RuntimeError('unable to write; sendfile is in progress')
if not data:
return
@@ -393,6 +407,8 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
self._maybe_pause_protocol()
else:
self._write_fut.add_done_callback(self._loop_writing)
+ if self._empty_waiter is not None and self._write_fut is None:
+ self._empty_waiter.set_result(None)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
@@ -407,6 +423,17 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
def abort(self):
self._force_close(None)
+ def _make_empty_waiter(self):
+ if self._empty_waiter is not None:
+ raise RuntimeError("Empty waiter is already set")
+ self._empty_waiter = self._loop.create_future()
+ if self._write_fut is None:
+ self._empty_waiter.set_result(None)
+ return self._empty_waiter
+
+ def _reset_empty_waiter(self):
+ self._empty_waiter = None
+
class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
def __init__(self, *args, **kw):
@@ -447,7 +474,7 @@ class _ProactorSocketTransport(_ProactorReadPipeTransport,
transports.Transport):
"""Transport for connected sockets."""
- _sendfile_compatible = constants._SendfileMode.FALLBACK
+ _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
def _set_extra(self, sock):
self._extra['socket'] = sock
@@ -556,6 +583,47 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
async def sock_accept(self, sock):
return await self._proactor.accept(sock)
+ async def _sock_sendfile_native(self, sock, file, offset, count):
+ try:
+ fileno = file.fileno()
+ except (AttributeError, io.UnsupportedOperation) as err:
+ raise events.SendfileNotAvailableError("not a regular file")
+ try:
+ fsize = os.fstat(fileno).st_size
+ except OSError as err:
+ raise events.SendfileNotAvailableError("not a regular file")
+ blocksize = count if count else fsize
+ if not blocksize:
+ return 0 # empty file
+
+ blocksize = min(blocksize, 0xffff_ffff)
+ end_pos = min(offset + count, fsize) if count else fsize
+ offset = min(offset, fsize)
+ total_sent = 0
+ try:
+ while True:
+ blocksize = min(end_pos - offset, blocksize)
+ if blocksize <= 0:
+ return total_sent
+ await self._proactor.sendfile(sock, file, offset, blocksize)
+ offset += blocksize
+ total_sent += blocksize
+ finally:
+ if total_sent > 0:
+ file.seek(offset)
+
+ async def _sendfile_native(self, transp, file, offset, count):
+ resume_reading = transp.is_reading()
+ transp.pause_reading()
+ await transp._make_empty_waiter()
+ try:
+ return await self.sock_sendfile(transp._sock, file, offset, count,
+ fallback=False)
+ finally:
+ transp._reset_empty_waiter()
+ if resume_reading:
+ transp.resume_reading()
+
def _close_self_pipe(self):
if self._self_reading_future is not None:
self._self_reading_future.cancel()
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index f91fcdd..d22edec 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -4,6 +4,7 @@ import _overlapped
import _winapi
import errno
import math
+import msvcrt
import socket
import struct
import weakref
@@ -527,6 +528,27 @@ class IocpProactor:
return self._register(ov, conn, finish_connect)
+ def sendfile(self, sock, file, offset, count):
+ self._register_with_iocp(sock)
+ ov = _overlapped.Overlapped(NULL)
+ offset_low = offset & 0xffff_ffff
+ offset_high = (offset >> 32) & 0xffff_ffff
+ ov.TransmitFile(sock.fileno(),
+ msvcrt.get_osfhandle(file.fileno()),
+ offset_low, offset_high,
+ count, 0, 0)
+
+ def finish_sendfile(trans, key, ov):
+ try:
+ return ov.getresult()
+ except OSError as exc:
+ if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
+ _overlapped.ERROR_OPERATION_ABORTED):
+ raise ConnectionResetError(*exc.args)
+ else:
+ raise
+ return self._register(ov, sock, finish_sendfile)
+
def accept_pipe(self, pipe):
self._register_with_iocp(pipe)
ov = _overlapped.Overlapped(NULL)