diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2018-02-25 16:32:14 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-25 16:32:14 (GMT) |
commit | a19fb3c6aaa7632410d1d9dcb395d7101d124da4 (patch) | |
tree | 476902dc75526cc8bb22c41cf213416384c36805 /Lib/asyncio | |
parent | 5fb632e83136399bad9427ee23ec8b771695290a (diff) | |
download | cpython-a19fb3c6aaa7632410d1d9dcb395d7101d124da4.zip cpython-a19fb3c6aaa7632410d1d9dcb395d7101d124da4.tar.gz cpython-a19fb3c6aaa7632410d1d9dcb395d7101d124da4.tar.bz2 |
bpo-32622: Native sendfile on windows (#5565)
* Support sendfile on Windows Proactor event loop naively.
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/proactor_events.py | 70 | ||||
-rw-r--r-- | Lib/asyncio/windows_events.py | 22 |
2 files changed, 91 insertions, 1 deletions
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 10ca6f8..b675c82 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -6,11 +6,14 @@ proactor is only implemented on Windows with IOCP. __all__ = 'BaseProactorEventLoop', +import io +import os import socket import warnings from . import base_events from . import constants +from . import events from . import futures from . import protocols from . import sslproto @@ -107,6 +110,11 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin, self._force_close(exc) def _force_close(self, exc): + if self._empty_waiter is not None: + if exc is None: + self._empty_waiter.set_result(None) + else: + self._empty_waiter.set_exception(exc) if self._closing: return self._closing = True @@ -327,6 +335,10 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, _start_tls_compatible = True + def __init__(self, *args, **kw): + super().__init__(*args, **kw) + self._empty_waiter = None + def write(self, data): if not isinstance(data, (bytes, bytearray, memoryview)): raise TypeError( @@ -334,6 +346,8 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, f"not {type(data).__name__}") if self._eof_written: raise RuntimeError('write_eof() already called') + if self._empty_waiter is not None: + raise RuntimeError('unable to write; sendfile is in progress') if not data: return @@ -393,6 +407,8 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, self._maybe_pause_protocol() else: self._write_fut.add_done_callback(self._loop_writing) + if self._empty_waiter is not None and self._write_fut is None: + self._empty_waiter.set_result(None) except ConnectionResetError as exc: self._force_close(exc) except OSError as exc: @@ -407,6 +423,17 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, def abort(self): self._force_close(None) + def _make_empty_waiter(self): + if self._empty_waiter is not None: + raise RuntimeError("Empty waiter is already set") + self._empty_waiter = self._loop.create_future() + if self._write_fut is None: + self._empty_waiter.set_result(None) + return self._empty_waiter + + def _reset_empty_waiter(self): + self._empty_waiter = None + class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport): def __init__(self, *args, **kw): @@ -447,7 +474,7 @@ class _ProactorSocketTransport(_ProactorReadPipeTransport, transports.Transport): """Transport for connected sockets.""" - _sendfile_compatible = constants._SendfileMode.FALLBACK + _sendfile_compatible = constants._SendfileMode.TRY_NATIVE def _set_extra(self, sock): self._extra['socket'] = sock @@ -556,6 +583,47 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): async def sock_accept(self, sock): return await self._proactor.accept(sock) + async def _sock_sendfile_native(self, sock, file, offset, count): + try: + fileno = file.fileno() + except (AttributeError, io.UnsupportedOperation) as err: + raise events.SendfileNotAvailableError("not a regular file") + try: + fsize = os.fstat(fileno).st_size + except OSError as err: + raise events.SendfileNotAvailableError("not a regular file") + blocksize = count if count else fsize + if not blocksize: + return 0 # empty file + + blocksize = min(blocksize, 0xffff_ffff) + end_pos = min(offset + count, fsize) if count else fsize + offset = min(offset, fsize) + total_sent = 0 + try: + while True: + blocksize = min(end_pos - offset, blocksize) + if blocksize <= 0: + return total_sent + await self._proactor.sendfile(sock, file, offset, blocksize) + offset += blocksize + total_sent += blocksize + finally: + if total_sent > 0: + file.seek(offset) + + async def _sendfile_native(self, transp, file, offset, count): + resume_reading = transp.is_reading() + transp.pause_reading() + await transp._make_empty_waiter() + try: + return await self.sock_sendfile(transp._sock, file, offset, count, + fallback=False) + finally: + transp._reset_empty_waiter() + if resume_reading: + transp.resume_reading() + def _close_self_pipe(self): if self._self_reading_future is not None: self._self_reading_future.cancel() diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index f91fcdd..d22edec 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -4,6 +4,7 @@ import _overlapped import _winapi import errno import math +import msvcrt import socket import struct import weakref @@ -527,6 +528,27 @@ class IocpProactor: return self._register(ov, conn, finish_connect) + def sendfile(self, sock, file, offset, count): + self._register_with_iocp(sock) + ov = _overlapped.Overlapped(NULL) + offset_low = offset & 0xffff_ffff + offset_high = (offset >> 32) & 0xffff_ffff + ov.TransmitFile(sock.fileno(), + msvcrt.get_osfhandle(file.fileno()), + offset_low, offset_high, + count, 0, 0) + + def finish_sendfile(trans, key, ov): + try: + return ov.getresult() + except OSError as exc: + if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + _overlapped.ERROR_OPERATION_ABORTED): + raise ConnectionResetError(*exc.args) + else: + raise + return self._register(ov, sock, finish_sendfile) + def accept_pipe(self, pipe): self._register_with_iocp(pipe) ov = _overlapped.Overlapped(NULL) |