diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2011-05-09 15:04:27 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2011-05-09 15:04:27 (GMT) |
commit | 87cf220972c9cb400ddcd577962883dcc5dca51a (patch) | |
tree | 3f1ab5b64ae538a2ced622637cc7e4112b1c6ffd | |
parent | df77e3d4a07223ebfe049e66d4d8a8c0b4315e04 (diff) | |
download | cpython-87cf220972c9cb400ddcd577962883dcc5dca51a.zip cpython-87cf220972c9cb400ddcd577962883dcc5dca51a.tar.gz cpython-87cf220972c9cb400ddcd577962883dcc5dca51a.tar.bz2 |
Issue #11743: Rewrite multiprocessing connection classes in pure Python.
-rw-r--r-- | Lib/multiprocessing/connection.py | 315 | ||||
-rw-r--r-- | Lib/multiprocessing/forking.py | 5 | ||||
-rw-r--r-- | Lib/multiprocessing/reduction.py | 13 | ||||
-rw-r--r-- | Lib/test/test_multiprocessing.py | 12 | ||||
-rw-r--r-- | Misc/NEWS | 2 | ||||
-rw-r--r-- | Modules/_multiprocessing/connection.h | 527 | ||||
-rw-r--r-- | Modules/_multiprocessing/multiprocessing.c | 23 | ||||
-rw-r--r-- | Modules/_multiprocessing/multiprocessing.h | 24 | ||||
-rw-r--r-- | Modules/_multiprocessing/pipe_connection.c | 149 | ||||
-rw-r--r-- | Modules/_multiprocessing/socket_connection.c | 202 | ||||
-rw-r--r-- | Modules/_multiprocessing/win32_functions.c | 166 | ||||
-rw-r--r-- | PC/VC6/_multiprocessing.dsp | 8 | ||||
-rw-r--r-- | PC/VS8.0/_multiprocessing.vcproj | 12 | ||||
-rw-r--r-- | PCbuild/_multiprocessing.vcproj | 12 | ||||
-rw-r--r-- | setup.py | 3 |
15 files changed, 490 insertions, 983 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index d6627e5..afd580b 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -34,19 +34,27 @@ __all__ = [ 'Client', 'Listener', 'Pipe' ] +import io import os import sys +import pickle +import select import socket +import struct import errno import time import tempfile import itertools import _multiprocessing -from multiprocessing import current_process, AuthenticationError +from multiprocessing import current_process, AuthenticationError, BufferTooShort from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug -from multiprocessing.forking import duplicate, close - +try: + from _multiprocessing import win32 +except ImportError: + if sys.platform == 'win32': + raise + win32 = None # # @@ -111,6 +119,281 @@ def address_type(address): raise ValueError('address type of %r unrecognized' % address) # +# Connection classes +# + +class _ConnectionBase: + _handle = None + + def __init__(self, handle, readable=True, writable=True): + handle = handle.__index__() + if handle < 0: + raise ValueError("invalid handle") + if not readable and not writable: + raise ValueError( + "at least one of `readable` and `writable` must be True") + self._handle = handle + self._readable = readable + self._writable = writable + + def __del__(self): + if self._handle is not None: + self._close() + + def _check_closed(self): + if self._handle is None: + raise IOError("handle is closed") + + def _check_readable(self): + if not self._readable: + raise IOError("connection is write-only") + + def _check_writable(self): + if not self._writable: + raise IOError("connection is read-only") + + def _bad_message_length(self): + if self._writable: + self._readable = False + else: + self.close() + raise IOError("bad message length") + + @property + def closed(self): + """True if the connection is closed""" + return self._handle is None + + @property + def readable(self): + """True if the connection is readable""" + return self._readable + + @property + def writable(self): + """True if the connection is writable""" + return self._writable + + def fileno(self): + """File descriptor or handle of the connection""" + self._check_closed() + return self._handle + + def close(self): + """Close the connection""" + if self._handle is not None: + try: + self._close() + finally: + self._handle = None + + def send_bytes(self, buf, offset=0, size=None): + """Send the bytes data from a bytes-like object""" + self._check_closed() + self._check_writable() + m = memoryview(buf) + # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) + if m.itemsize > 1: + m = memoryview(bytes(m)) + n = len(m) + if offset < 0: + raise ValueError("offset is negative") + if n < offset: + raise ValueError("buffer length < offset") + if size is None: + size = n - offset + elif size < 0: + raise ValueError("size is negative") + elif offset + size > n: + raise ValueError("buffer length < offset + size") + self._send_bytes(m[offset:offset + size]) + + def send(self, obj): + """Send a (picklable) object""" + self._check_closed() + self._check_writable() + buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL) + self._send_bytes(memoryview(buf)) + + def recv_bytes(self, maxlength=None): + """ + Receive bytes data as a bytes object. + """ + self._check_closed() + self._check_readable() + if maxlength is not None and maxlength < 0: + raise ValueError("negative maxlength") + buf = self._recv_bytes(maxlength) + if buf is None: + self._bad_message_length() + return buf.getvalue() + + def recv_bytes_into(self, buf, offset=0): + """ + Receive bytes data into a writeable buffer-like object. + Return the number of bytes read. + """ + self._check_closed() + self._check_readable() + with memoryview(buf) as m: + # Get bytesize of arbitrary buffer + itemsize = m.itemsize + bytesize = itemsize * len(m) + if offset < 0: + raise ValueError("negative offset") + elif offset > bytesize: + raise ValueError("offset too large") + result = self._recv_bytes() + size = result.tell() + if bytesize < offset + size: + raise BufferTooShort(result.getvalue()) + # Message can fit in dest + result.seek(0) + result.readinto(m[offset // itemsize : + (offset + size) // itemsize]) + return size + + def recv(self): + """Receive a (picklable) object""" + self._check_closed() + self._check_readable() + buf = self._recv_bytes() + return pickle.loads(buf.getbuffer()) + + def poll(self, timeout=0.0): + """Whether there is any input available to be read""" + self._check_closed() + self._check_readable() + if timeout < 0.0: + timeout = None + return self._poll(timeout) + + +if win32: + + class PipeConnection(_ConnectionBase): + """ + Connection class based on a Windows named pipe. + """ + + def _close(self): + win32.CloseHandle(self._handle) + + def _send_bytes(self, buf): + nwritten = win32.WriteFile(self._handle, buf) + assert nwritten == len(buf) + + def _recv_bytes(self, maxsize=None): + buf = io.BytesIO() + bufsize = 512 + if maxsize is not None: + bufsize = min(bufsize, maxsize) + try: + firstchunk, complete = win32.ReadFile(self._handle, bufsize) + except IOError as e: + if e.errno == win32.ERROR_BROKEN_PIPE: + raise EOFError + raise + lenfirstchunk = len(firstchunk) + buf.write(firstchunk) + if complete: + return buf + navail, nleft = win32.PeekNamedPipe(self._handle) + if maxsize is not None and lenfirstchunk + nleft > maxsize: + return None + lastchunk, complete = win32.ReadFile(self._handle, nleft) + assert complete + buf.write(lastchunk) + return buf + + def _poll(self, timeout): + navail, nleft = win32.PeekNamedPipe(self._handle) + if navail > 0: + return True + elif timeout == 0.0: + return False + # Setup a polling loop (translated straight from old + # pipe_connection.c) + if timeout < 0.0: + deadline = None + else: + deadline = time.time() + timeout + delay = 0.001 + max_delay = 0.02 + while True: + time.sleep(delay) + navail, nleft = win32.PeekNamedPipe(self._handle) + if navail > 0: + return True + if deadline and time.time() > deadline: + return False + if delay < max_delay: + delay += 0.001 + + +class Connection(_ConnectionBase): + """ + Connection class based on an arbitrary file descriptor (Unix only), or + a socket handle (Windows). + """ + + if win32: + def _close(self): + win32.closesocket(self._handle) + _write = win32.send + _read = win32.recv + else: + def _close(self): + os.close(self._handle) + _write = os.write + _read = os.read + + def _send(self, buf, write=_write): + remaining = len(buf) + while True: + n = write(self._handle, buf) + remaining -= n + if remaining == 0: + break + buf = buf[n:] + + def _recv(self, size, read=_read): + buf = io.BytesIO() + remaining = size + while remaining > 0: + chunk = read(self._handle, remaining) + n = len(chunk) + if n == 0: + if remaining == size: + raise EOFError + else: + raise IOError("got end of file during message") + buf.write(chunk) + remaining -= n + return buf + + def _send_bytes(self, buf): + # For wire compatibility with 3.2 and lower + n = len(buf) + self._send(struct.pack("=i", len(buf))) + # The condition is necessary to avoid "broken pipe" errors + # when sending a 0-length buffer if the other end closed the pipe. + if n > 0: + self._send(buf) + + def _recv_bytes(self, maxsize=None): + buf = self._recv(4) + size, = struct.unpack("=i", buf.getvalue()) + if maxsize is not None and size > maxsize: + return None + return self._recv(size) + + def _poll(self, timeout): + r = select.select([self._handle], [], [], timeout)[0] + return bool(r) + + +# # Public functions # @@ -186,21 +469,19 @@ if sys.platform != 'win32': ''' if duplex: s1, s2 = socket.socketpair() - c1 = _multiprocessing.Connection(os.dup(s1.fileno())) - c2 = _multiprocessing.Connection(os.dup(s2.fileno())) + c1 = Connection(os.dup(s1.fileno())) + c2 = Connection(os.dup(s2.fileno())) s1.close() s2.close() else: fd1, fd2 = os.pipe() - c1 = _multiprocessing.Connection(fd1, writable=False) - c2 = _multiprocessing.Connection(fd2, readable=False) + c1 = Connection(fd1, writable=False) + c2 = Connection(fd2, readable=False) return c1, c2 else: - from _multiprocessing import win32 - def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe @@ -234,8 +515,8 @@ else: if e.args[0] != win32.ERROR_PIPE_CONNECTED: raise - c1 = _multiprocessing.PipeConnection(h1, writable=duplex) - c2 = _multiprocessing.PipeConnection(h2, readable=duplex) + c1 = PipeConnection(h1, writable=duplex) + c2 = PipeConnection(h2, readable=duplex) return c1, c2 @@ -266,7 +547,7 @@ class SocketListener(object): def accept(self): s, self._last_accepted = self._socket.accept() fd = duplicate(s.fileno()) - conn = _multiprocessing.Connection(fd) + conn = Connection(fd) s.close() return conn @@ -298,7 +579,7 @@ def SocketClient(address): raise fd = duplicate(s.fileno()) - conn = _multiprocessing.Connection(fd) + conn = Connection(fd) return conn # @@ -345,7 +626,7 @@ if sys.platform == 'win32': except WindowsError as e: if e.args[0] != win32.ERROR_PIPE_CONNECTED: raise - return _multiprocessing.PipeConnection(handle) + return PipeConnection(handle) @staticmethod def _finalize_pipe_listener(queue, address): @@ -377,7 +658,7 @@ if sys.platform == 'win32': win32.SetNamedPipeHandleState( h, win32.PIPE_READMODE_MESSAGE, None, None ) - return _multiprocessing.PipeConnection(h) + return PipeConnection(h) # # Authentication stuff @@ -451,3 +732,7 @@ def XmlClient(*args, **kwds): global xmlrpclib import xmlrpc.client as xmlrpclib return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) + + +# Late import because of circular import +from multiprocessing.forking import duplicate, close diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index cc7c326..3d95557 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -183,7 +183,7 @@ else: import time from pickle import dump, load, HIGHEST_PROTOCOL - from _multiprocessing import win32, Connection, PipeConnection + from _multiprocessing import win32 from .util import Finalize def dump(obj, file, protocol=None): @@ -411,6 +411,9 @@ else: # Make (Pipe)Connection picklable # + # Late import because of circular import + from .connection import Connection, PipeConnection + def reduce_connection(conn): if not Popen.thread_is_spawning(): raise RuntimeError( diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 6e5e5bc..b32c725 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -44,7 +44,7 @@ import _multiprocessing from multiprocessing import current_process from multiprocessing.forking import Popen, duplicate, close, ForkingPickler from multiprocessing.util import register_after_fork, debug, sub_debug -from multiprocessing.connection import Client, Listener +from multiprocessing.connection import Client, Listener, Connection # @@ -159,7 +159,7 @@ def rebuild_handle(pickled_data): return new_handle # -# Register `_multiprocessing.Connection` with `ForkingPickler` +# Register `Connection` with `ForkingPickler` # def reduce_connection(conn): @@ -168,11 +168,11 @@ def reduce_connection(conn): def rebuild_connection(reduced_handle, readable, writable): handle = rebuild_handle(reduced_handle) - return _multiprocessing.Connection( + return Connection( handle, readable=readable, writable=writable ) -ForkingPickler.register(_multiprocessing.Connection, reduce_connection) +ForkingPickler.register(Connection, reduce_connection) # # Register `socket.socket` with `ForkingPickler` @@ -201,6 +201,7 @@ ForkingPickler.register(socket.socket, reduce_socket) # if sys.platform == 'win32': + from multiprocessing.connection import PipeConnection def reduce_pipe_connection(conn): rh = reduce_handle(conn.fileno()) @@ -208,8 +209,8 @@ if sys.platform == 'win32': def rebuild_pipe_connection(reduced_handle, readable, writable): handle = rebuild_handle(reduced_handle) - return _multiprocessing.PipeConnection( + return PipeConnection( handle, readable=readable, writable=writable ) - ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection) + ForkingPickler.register(PipeConnection, reduce_pipe_connection) diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index a7f0391..0c05ff6 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -1915,9 +1915,15 @@ class TestInvalidHandle(unittest.TestCase): @unittest.skipIf(WIN32, "skipped on Windows") def test_invalid_handles(self): - conn = _multiprocessing.Connection(44977608) - self.assertRaises(IOError, conn.poll) - self.assertRaises(IOError, _multiprocessing.Connection, -1) + conn = multiprocessing.connection.Connection(44977608) + try: + self.assertRaises((ValueError, IOError), conn.poll) + finally: + # Hack private attribute _handle to avoid printing an error + # in conn.__del__ + conn._handle = None + self.assertRaises((ValueError, IOError), + multiprocessing.connection.Connection, -1) # # Functions used to create test cases from the base ones in this module @@ -140,6 +140,8 @@ Core and Builtins Library ------- +- Issue #11743: Rewrite multiprocessing connection classes in pure Python. + - Issue #11164: Stop trying to use _xmlplus in the xml module. - Issue #11888: Add log2 function to math module. Patch written by Mark diff --git a/Modules/_multiprocessing/connection.h b/Modules/_multiprocessing/connection.h deleted file mode 100644 index 002d5aa..0000000 --- a/Modules/_multiprocessing/connection.h +++ /dev/null @@ -1,527 +0,0 @@ -/* - * Definition of a `Connection` type. - * Used by `socket_connection.c` and `pipe_connection.c`. - * - * connection.h - * - * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt - */ - -#ifndef CONNECTION_H -#define CONNECTION_H - -/* - * Read/write flags - */ - -#define READABLE 1 -#define WRITABLE 2 - -#define CHECK_READABLE(self) \ - if (!(self->flags & READABLE)) { \ - PyErr_SetString(PyExc_IOError, "connection is write-only"); \ - return NULL; \ - } - -#define CHECK_WRITABLE(self) \ - if (!(self->flags & WRITABLE)) { \ - PyErr_SetString(PyExc_IOError, "connection is read-only"); \ - return NULL; \ - } - -/* - * Allocation and deallocation - */ - -static PyObject * -connection_new(PyTypeObject *type, PyObject *args, PyObject *kwds) -{ - ConnectionObject *self; - HANDLE handle; - BOOL readable = TRUE, writable = TRUE; - - static char *kwlist[] = {"handle", "readable", "writable", NULL}; - - if (!PyArg_ParseTupleAndKeywords(args, kwds, F_HANDLE "|ii", kwlist, - &handle, &readable, &writable)) - return NULL; - - if (handle == INVALID_HANDLE_VALUE || (Py_ssize_t)handle < 0) { - PyErr_Format(PyExc_IOError, "invalid handle %zd", - (Py_ssize_t)handle); - return NULL; - } - - if (!readable && !writable) { - PyErr_SetString(PyExc_ValueError, - "either readable or writable must be true"); - return NULL; - } - - self = PyObject_New(ConnectionObject, type); - if (self == NULL) - return NULL; - - self->weakreflist = NULL; - self->handle = handle; - self->flags = 0; - - if (readable) - self->flags |= READABLE; - if (writable) - self->flags |= WRITABLE; - assert(self->flags >= 1 && self->flags <= 3); - - return (PyObject*)self; -} - -static void -connection_dealloc(ConnectionObject* self) -{ - if (self->weakreflist != NULL) - PyObject_ClearWeakRefs((PyObject*)self); - - if (self->handle != INVALID_HANDLE_VALUE) { - Py_BEGIN_ALLOW_THREADS - CLOSE(self->handle); - Py_END_ALLOW_THREADS - } - PyObject_Del(self); -} - -/* - * Functions for transferring buffers - */ - -static PyObject * -connection_sendbytes(ConnectionObject *self, PyObject *args) -{ - Py_buffer pbuffer; - char *buffer; - Py_ssize_t length, offset=0, size=PY_SSIZE_T_MIN; - int res; - - if (!PyArg_ParseTuple(args, F_RBUFFER "*|" F_PY_SSIZE_T F_PY_SSIZE_T, - &pbuffer, &offset, &size)) - return NULL; - buffer = pbuffer.buf; - length = pbuffer.len; - - CHECK_WRITABLE(self); /* XXX release buffer in case of failure */ - - if (offset < 0) { - PyBuffer_Release(&pbuffer); - PyErr_SetString(PyExc_ValueError, "offset is negative"); - return NULL; - } - if (length < offset) { - PyBuffer_Release(&pbuffer); - PyErr_SetString(PyExc_ValueError, "buffer length < offset"); - return NULL; - } - - if (size == PY_SSIZE_T_MIN) { - size = length - offset; - } else { - if (size < 0) { - PyBuffer_Release(&pbuffer); - PyErr_SetString(PyExc_ValueError, "size is negative"); - return NULL; - } - if (offset + size > length) { - PyBuffer_Release(&pbuffer); - PyErr_SetString(PyExc_ValueError, - "buffer length < offset + size"); - return NULL; - } - } - - res = conn_send_string(self, buffer + offset, size); - - PyBuffer_Release(&pbuffer); - if (res < 0) { - if (PyErr_Occurred()) - return NULL; - else - return mp_SetError(PyExc_IOError, res); - } - - Py_RETURN_NONE; -} - -static PyObject * -connection_recvbytes(ConnectionObject *self, PyObject *args) -{ - char *freeme = NULL; - Py_ssize_t res, maxlength = PY_SSIZE_T_MAX; - PyObject *result = NULL; - - if (!PyArg_ParseTuple(args, "|" F_PY_SSIZE_T, &maxlength)) - return NULL; - - CHECK_READABLE(self); - - if (maxlength < 0) { - PyErr_SetString(PyExc_ValueError, "maxlength < 0"); - return NULL; - } - - res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE, - &freeme, maxlength); - - if (res < 0) { - if (res == MP_BAD_MESSAGE_LENGTH) { - if ((self->flags & WRITABLE) == 0) { - Py_BEGIN_ALLOW_THREADS - CLOSE(self->handle); - Py_END_ALLOW_THREADS - self->handle = INVALID_HANDLE_VALUE; - } else { - self->flags = WRITABLE; - } - } - mp_SetError(PyExc_IOError, res); - } else { - if (freeme == NULL) { - result = PyBytes_FromStringAndSize(self->buffer, res); - } else { - result = PyBytes_FromStringAndSize(freeme, res); - PyMem_Free(freeme); - } - } - - return result; -} - -static PyObject * -connection_recvbytes_into(ConnectionObject *self, PyObject *args) -{ - char *freeme = NULL, *buffer = NULL; - Py_ssize_t res, length, offset = 0; - PyObject *result = NULL; - Py_buffer pbuf; - - CHECK_READABLE(self); - - if (!PyArg_ParseTuple(args, "w*|" F_PY_SSIZE_T, - &pbuf, &offset)) - return NULL; - - buffer = pbuf.buf; - length = pbuf.len; - - if (offset < 0) { - PyErr_SetString(PyExc_ValueError, "negative offset"); - goto _error; - } - - if (offset > length) { - PyErr_SetString(PyExc_ValueError, "offset too large"); - goto _error; - } - - res = conn_recv_string(self, buffer+offset, length-offset, - &freeme, PY_SSIZE_T_MAX); - - if (res < 0) { - if (res == MP_BAD_MESSAGE_LENGTH) { - if ((self->flags & WRITABLE) == 0) { - Py_BEGIN_ALLOW_THREADS - CLOSE(self->handle); - Py_END_ALLOW_THREADS - self->handle = INVALID_HANDLE_VALUE; - } else { - self->flags = WRITABLE; - } - } - mp_SetError(PyExc_IOError, res); - } else { - if (freeme == NULL) { - result = PyInt_FromSsize_t(res); - } else { - result = PyObject_CallFunction(BufferTooShort, - F_RBUFFER "#", - freeme, res); - PyMem_Free(freeme); - if (result) { - PyErr_SetObject(BufferTooShort, result); - Py_DECREF(result); - } - goto _error; - } - } - -_cleanup: - PyBuffer_Release(&pbuf); - return result; - -_error: - result = NULL; - goto _cleanup; -} - -/* - * Functions for transferring objects - */ - -static PyObject * -connection_send_obj(ConnectionObject *self, PyObject *obj) -{ - char *buffer; - int res; - Py_ssize_t length; - PyObject *pickled_string = NULL; - - CHECK_WRITABLE(self); - - pickled_string = PyObject_CallFunctionObjArgs(pickle_dumps, obj, - pickle_protocol, NULL); - if (!pickled_string) - goto failure; - - if (PyBytes_AsStringAndSize(pickled_string, &buffer, &length) < 0) - goto failure; - - res = conn_send_string(self, buffer, (int)length); - - if (res < 0) { - mp_SetError(PyExc_IOError, res); - goto failure; - } - - Py_XDECREF(pickled_string); - Py_RETURN_NONE; - - failure: - Py_XDECREF(pickled_string); - return NULL; -} - -static PyObject * -connection_recv_obj(ConnectionObject *self) -{ - char *freeme = NULL; - Py_ssize_t res; - PyObject *temp = NULL, *result = NULL; - - CHECK_READABLE(self); - - res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE, - &freeme, PY_SSIZE_T_MAX); - - if (res < 0) { - if (res == MP_BAD_MESSAGE_LENGTH) { - if ((self->flags & WRITABLE) == 0) { - Py_BEGIN_ALLOW_THREADS - CLOSE(self->handle); - Py_END_ALLOW_THREADS - self->handle = INVALID_HANDLE_VALUE; - } else { - self->flags = WRITABLE; - } - } - mp_SetError(PyExc_IOError, res); - } else { - if (freeme == NULL) { - temp = PyBytes_FromStringAndSize(self->buffer, res); - } else { - temp = PyBytes_FromStringAndSize(freeme, res); - PyMem_Free(freeme); - } - } - - if (temp) - result = PyObject_CallFunctionObjArgs(pickle_loads, - temp, NULL); - Py_XDECREF(temp); - return result; -} - -/* - * Other functions - */ - -static PyObject * -connection_poll(ConnectionObject *self, PyObject *args) -{ - PyObject *timeout_obj = NULL; - double timeout = 0.0; - int res; - - CHECK_READABLE(self); - - if (!PyArg_ParseTuple(args, "|O", &timeout_obj)) - return NULL; - - if (timeout_obj == NULL) { - timeout = 0.0; - } else if (timeout_obj == Py_None) { - timeout = -1.0; /* block forever */ - } else { - timeout = PyFloat_AsDouble(timeout_obj); - if (PyErr_Occurred()) - return NULL; - if (timeout < 0.0) - timeout = 0.0; - } - - Py_BEGIN_ALLOW_THREADS - res = conn_poll(self, timeout, _save); - Py_END_ALLOW_THREADS - - switch (res) { - case TRUE: - Py_RETURN_TRUE; - case FALSE: - Py_RETURN_FALSE; - default: - return mp_SetError(PyExc_IOError, res); - } -} - -static PyObject * -connection_fileno(ConnectionObject* self) -{ - if (self->handle == INVALID_HANDLE_VALUE) { - PyErr_SetString(PyExc_IOError, "handle is invalid"); - return NULL; - } - return PyInt_FromLong((long)self->handle); -} - -static PyObject * -connection_close(ConnectionObject *self) -{ - if (self->handle != INVALID_HANDLE_VALUE) { - Py_BEGIN_ALLOW_THREADS - CLOSE(self->handle); - Py_END_ALLOW_THREADS - self->handle = INVALID_HANDLE_VALUE; - } - - Py_RETURN_NONE; -} - -static PyObject * -connection_repr(ConnectionObject *self) -{ - static char *conn_type[] = {"read-only", "write-only", "read-write"}; - - assert(self->flags >= 1 && self->flags <= 3); - return FROM_FORMAT("<%s %s, handle %zd>", - conn_type[self->flags - 1], - CONNECTION_NAME, (Py_ssize_t)self->handle); -} - -/* - * Getters and setters - */ - -static PyObject * -connection_closed(ConnectionObject *self, void *closure) -{ - return PyBool_FromLong((long)(self->handle == INVALID_HANDLE_VALUE)); -} - -static PyObject * -connection_readable(ConnectionObject *self, void *closure) -{ - return PyBool_FromLong((long)(self->flags & READABLE)); -} - -static PyObject * -connection_writable(ConnectionObject *self, void *closure) -{ - return PyBool_FromLong((long)(self->flags & WRITABLE)); -} - -/* - * Tables - */ - -static PyMethodDef connection_methods[] = { - {"send_bytes", (PyCFunction)connection_sendbytes, METH_VARARGS, - "send the byte data from a readable buffer-like object"}, - {"recv_bytes", (PyCFunction)connection_recvbytes, METH_VARARGS, - "receive byte data as a string"}, - {"recv_bytes_into",(PyCFunction)connection_recvbytes_into,METH_VARARGS, - "receive byte data into a writeable buffer-like object\n" - "returns the number of bytes read"}, - - {"send", (PyCFunction)connection_send_obj, METH_O, - "send a (picklable) object"}, - {"recv", (PyCFunction)connection_recv_obj, METH_NOARGS, - "receive a (picklable) object"}, - - {"poll", (PyCFunction)connection_poll, METH_VARARGS, - "whether there is any input available to be read"}, - {"fileno", (PyCFunction)connection_fileno, METH_NOARGS, - "file descriptor or handle of the connection"}, - {"close", (PyCFunction)connection_close, METH_NOARGS, - "close the connection"}, - - {NULL} /* Sentinel */ -}; - -static PyGetSetDef connection_getset[] = { - {"closed", (getter)connection_closed, NULL, - "True if the connection is closed", NULL}, - {"readable", (getter)connection_readable, NULL, - "True if the connection is readable", NULL}, - {"writable", (getter)connection_writable, NULL, - "True if the connection is writable", NULL}, - {NULL} -}; - -/* - * Connection type - */ - -PyDoc_STRVAR(connection_doc, - "Connection type whose constructor signature is\n\n" - " Connection(handle, readable=True, writable=True).\n\n" - "The constructor does *not* duplicate the handle."); - -PyTypeObject CONNECTION_TYPE = { - PyVarObject_HEAD_INIT(NULL, 0) - /* tp_name */ "_multiprocessing." CONNECTION_NAME, - /* tp_basicsize */ sizeof(ConnectionObject), - /* tp_itemsize */ 0, - /* tp_dealloc */ (destructor)connection_dealloc, - /* tp_print */ 0, - /* tp_getattr */ 0, - /* tp_setattr */ 0, - /* tp_reserved */ 0, - /* tp_repr */ (reprfunc)connection_repr, - /* tp_as_number */ 0, - /* tp_as_sequence */ 0, - /* tp_as_mapping */ 0, - /* tp_hash */ 0, - /* tp_call */ 0, - /* tp_str */ 0, - /* tp_getattro */ 0, - /* tp_setattro */ 0, - /* tp_as_buffer */ 0, - /* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | - Py_TPFLAGS_HAVE_WEAKREFS, - /* tp_doc */ connection_doc, - /* tp_traverse */ 0, - /* tp_clear */ 0, - /* tp_richcompare */ 0, - /* tp_weaklistoffset */ offsetof(ConnectionObject, weakreflist), - /* tp_iter */ 0, - /* tp_iternext */ 0, - /* tp_methods */ connection_methods, - /* tp_members */ 0, - /* tp_getset */ connection_getset, - /* tp_base */ 0, - /* tp_dict */ 0, - /* tp_descr_get */ 0, - /* tp_descr_set */ 0, - /* tp_dictoffset */ 0, - /* tp_init */ 0, - /* tp_alloc */ 0, - /* tp_new */ connection_new, -}; - -#endif /* CONNECTION_H */ diff --git a/Modules/_multiprocessing/multiprocessing.c b/Modules/_multiprocessing/multiprocessing.c index 7c4f52d..5685517 100644 --- a/Modules/_multiprocessing/multiprocessing.c +++ b/Modules/_multiprocessing/multiprocessing.c @@ -49,16 +49,6 @@ mp_SetError(PyObject *Type, int num) case MP_MEMORY_ERROR: PyErr_NoMemory(); break; - case MP_END_OF_FILE: - PyErr_SetNone(PyExc_EOFError); - break; - case MP_EARLY_END_OF_FILE: - PyErr_SetString(PyExc_IOError, - "got end of file during message"); - break; - case MP_BAD_MESSAGE_LENGTH: - PyErr_SetString(PyExc_IOError, "bad message length"); - break; case MP_EXCEPTION_HAS_BEEN_SET: break; default: @@ -257,12 +247,6 @@ PyInit__multiprocessing(void) BufferTooShort = PyObject_GetAttrString(temp, "BufferTooShort"); Py_XDECREF(temp); - /* Add connection type to module */ - if (PyType_Ready(&ConnectionType) < 0) - return NULL; - Py_INCREF(&ConnectionType); - PyModule_AddObject(module, "Connection", (PyObject*)&ConnectionType); - #if defined(MS_WINDOWS) || \ (defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED)) /* Add SemLock type to module */ @@ -286,13 +270,6 @@ PyInit__multiprocessing(void) #endif #ifdef MS_WINDOWS - /* Add PipeConnection to module */ - if (PyType_Ready(&PipeConnectionType) < 0) - return NULL; - Py_INCREF(&PipeConnectionType); - PyModule_AddObject(module, "PipeConnection", - (PyObject*)&PipeConnectionType); - /* Initialize win32 class and add to multiprocessing */ temp = create_win32_namespace(); if (!temp) diff --git a/Modules/_multiprocessing/multiprocessing.h b/Modules/_multiprocessing/multiprocessing.h index b9917c3..02796c1 100644 --- a/Modules/_multiprocessing/multiprocessing.h +++ b/Modules/_multiprocessing/multiprocessing.h @@ -118,11 +118,8 @@ #define MP_SUCCESS (0) #define MP_STANDARD_ERROR (-1) #define MP_MEMORY_ERROR (-1001) -#define MP_END_OF_FILE (-1002) -#define MP_EARLY_END_OF_FILE (-1003) -#define MP_BAD_MESSAGE_LENGTH (-1004) -#define MP_SOCKET_ERROR (-1005) -#define MP_EXCEPTION_HAS_BEEN_SET (-1006) +#define MP_SOCKET_ERROR (-1002) +#define MP_EXCEPTION_HAS_BEEN_SET (-1003) PyObject *mp_SetError(PyObject *Type, int num); @@ -135,7 +132,6 @@ extern PyObject *pickle_loads; extern PyObject *pickle_protocol; extern PyObject *BufferTooShort; extern PyTypeObject SemLockType; -extern PyTypeObject ConnectionType; extern PyTypeObject PipeConnectionType; extern HANDLE sigint_event; @@ -162,25 +158,9 @@ extern HANDLE sigint_event; #endif /* - * Connection definition - */ - -#define CONNECTION_BUFFER_SIZE 1024 - -typedef struct { - PyObject_HEAD - HANDLE handle; - int flags; - PyObject *weakreflist; - char buffer[CONNECTION_BUFFER_SIZE]; -} ConnectionObject; - -/* * Miscellaneous */ -#define MAX_MESSAGE_LENGTH 0x7fffffff - #ifndef MIN # define MIN(x, y) ((x) < (y) ? x : y) # define MAX(x, y) ((x) > (y) ? x : y) diff --git a/Modules/_multiprocessing/pipe_connection.c b/Modules/_multiprocessing/pipe_connection.c deleted file mode 100644 index 05dde0c..0000000 --- a/Modules/_multiprocessing/pipe_connection.c +++ /dev/null @@ -1,149 +0,0 @@ -/* - * A type which wraps a pipe handle in message oriented mode - * - * pipe_connection.c - * - * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt - */ - -#include "multiprocessing.h" - -#define CLOSE(h) CloseHandle(h) - -/* - * Send string to the pipe; assumes in message oriented mode - */ - -static Py_ssize_t -conn_send_string(ConnectionObject *conn, char *string, size_t length) -{ - DWORD amount_written; - BOOL ret; - - Py_BEGIN_ALLOW_THREADS - ret = WriteFile(conn->handle, string, length, &amount_written, NULL); - Py_END_ALLOW_THREADS - - if (ret == 0 && GetLastError() == ERROR_NO_SYSTEM_RESOURCES) { - PyErr_Format(PyExc_ValueError, "Cannnot send %" PY_FORMAT_SIZE_T "d bytes over connection", length); - return MP_STANDARD_ERROR; - } - - return ret ? MP_SUCCESS : MP_STANDARD_ERROR; -} - -/* - * Attempts to read into buffer, or if buffer too small into *newbuffer. - * - * Returns number of bytes read. Assumes in message oriented mode. - */ - -static Py_ssize_t -conn_recv_string(ConnectionObject *conn, char *buffer, - size_t buflength, char **newbuffer, size_t maxlength) -{ - DWORD left, length, full_length, err; - BOOL ret; - *newbuffer = NULL; - - Py_BEGIN_ALLOW_THREADS - ret = ReadFile(conn->handle, buffer, MIN(buflength, maxlength), - &length, NULL); - Py_END_ALLOW_THREADS - if (ret) - return length; - - err = GetLastError(); - if (err != ERROR_MORE_DATA) { - if (err == ERROR_BROKEN_PIPE) - return MP_END_OF_FILE; - return MP_STANDARD_ERROR; - } - - if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, NULL, &left)) - return MP_STANDARD_ERROR; - - full_length = length + left; - if (full_length > maxlength) - return MP_BAD_MESSAGE_LENGTH; - - *newbuffer = PyMem_Malloc(full_length); - if (*newbuffer == NULL) - return MP_MEMORY_ERROR; - - memcpy(*newbuffer, buffer, length); - - Py_BEGIN_ALLOW_THREADS - ret = ReadFile(conn->handle, *newbuffer+length, left, &length, NULL); - Py_END_ALLOW_THREADS - if (ret) { - assert(length == left); - return full_length; - } else { - PyMem_Free(*newbuffer); - return MP_STANDARD_ERROR; - } -} - -/* - * Check whether any data is available for reading - */ - -static int -conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save) -{ - DWORD bytes, deadline, delay; - int difference, res; - BOOL block = FALSE; - - if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL)) - return MP_STANDARD_ERROR; - - if (timeout == 0.0) - return bytes > 0; - - if (timeout < 0.0) - block = TRUE; - else - /* XXX does not check for overflow */ - deadline = GetTickCount() + (DWORD)(1000 * timeout + 0.5); - - Sleep(0); - - for (delay = 1 ; ; delay += 1) { - if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL)) - return MP_STANDARD_ERROR; - else if (bytes > 0) - return TRUE; - - if (!block) { - difference = deadline - GetTickCount(); - if (difference < 0) - return FALSE; - if ((int)delay > difference) - delay = difference; - } - - if (delay > 20) - delay = 20; - - Sleep(delay); - - /* check for signals */ - Py_BLOCK_THREADS - res = PyErr_CheckSignals(); - Py_UNBLOCK_THREADS - - if (res) - return MP_EXCEPTION_HAS_BEEN_SET; - } -} - -/* - * "connection.h" defines the PipeConnection type using the definitions above - */ - -#define CONNECTION_NAME "PipeConnection" -#define CONNECTION_TYPE PipeConnectionType - -#include "connection.h" diff --git a/Modules/_multiprocessing/socket_connection.c b/Modules/_multiprocessing/socket_connection.c deleted file mode 100644 index 7ebf338..0000000 --- a/Modules/_multiprocessing/socket_connection.c +++ /dev/null @@ -1,202 +0,0 @@ -/* - * A type which wraps a socket - * - * socket_connection.c - * - * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt - */ - -#include "multiprocessing.h" - -#ifdef MS_WINDOWS -# define WRITE(h, buffer, length) send((SOCKET)h, buffer, length, 0) -# define READ(h, buffer, length) recv((SOCKET)h, buffer, length, 0) -# define CLOSE(h) closesocket((SOCKET)h) -#else -# define WRITE(h, buffer, length) write(h, buffer, length) -# define READ(h, buffer, length) read(h, buffer, length) -# define CLOSE(h) close(h) -#endif - -/* - * Send string to file descriptor - */ - -static Py_ssize_t -_conn_sendall(HANDLE h, char *string, size_t length) -{ - char *p = string; - Py_ssize_t res; - - while (length > 0) { - res = WRITE(h, p, length); - if (res < 0) - return MP_SOCKET_ERROR; - length -= res; - p += res; - } - - return MP_SUCCESS; -} - -/* - * Receive string of exact length from file descriptor - */ - -static Py_ssize_t -_conn_recvall(HANDLE h, char *buffer, size_t length) -{ - size_t remaining = length; - Py_ssize_t temp; - char *p = buffer; - - while (remaining > 0) { - temp = READ(h, p, remaining); - if (temp <= 0) { - if (temp == 0) - return remaining == length ? - MP_END_OF_FILE : MP_EARLY_END_OF_FILE; - else - return temp; - } - remaining -= temp; - p += temp; - } - - return MP_SUCCESS; -} - -/* - * Send a string prepended by the string length in network byte order - */ - -static Py_ssize_t -conn_send_string(ConnectionObject *conn, char *string, size_t length) -{ - Py_ssize_t res; - /* The "header" of the message is a 32 bit unsigned number (in - network order) which specifies the length of the "body". If - the message is shorter than about 16kb then it is quicker to - combine the "header" and the "body" of the message and send - them at once. */ - if (length < (16*1024)) { - char *message; - - message = PyMem_Malloc(length+4); - if (message == NULL) - return MP_MEMORY_ERROR; - - *(UINT32*)message = htonl((UINT32)length); - memcpy(message+4, string, length); - Py_BEGIN_ALLOW_THREADS - res = _conn_sendall(conn->handle, message, length+4); - Py_END_ALLOW_THREADS - PyMem_Free(message); - } else { - UINT32 lenbuff; - - if (length > MAX_MESSAGE_LENGTH) - return MP_BAD_MESSAGE_LENGTH; - - lenbuff = htonl((UINT32)length); - Py_BEGIN_ALLOW_THREADS - res = _conn_sendall(conn->handle, (char*)&lenbuff, 4) || - _conn_sendall(conn->handle, string, length); - Py_END_ALLOW_THREADS - } - return res; -} - -/* - * Attempts to read into buffer, or failing that into *newbuffer - * - * Returns number of bytes read. - */ - -static Py_ssize_t -conn_recv_string(ConnectionObject *conn, char *buffer, - size_t buflength, char **newbuffer, size_t maxlength) -{ - int res; - UINT32 ulength; - - *newbuffer = NULL; - - Py_BEGIN_ALLOW_THREADS - res = _conn_recvall(conn->handle, (char*)&ulength, 4); - Py_END_ALLOW_THREADS - if (res < 0) - return res; - - ulength = ntohl(ulength); - if (ulength > maxlength) - return MP_BAD_MESSAGE_LENGTH; - - if (ulength <= buflength) { - Py_BEGIN_ALLOW_THREADS - res = _conn_recvall(conn->handle, buffer, (size_t)ulength); - Py_END_ALLOW_THREADS - return res < 0 ? res : ulength; - } else { - *newbuffer = PyMem_Malloc((size_t)ulength); - if (*newbuffer == NULL) - return MP_MEMORY_ERROR; - Py_BEGIN_ALLOW_THREADS - res = _conn_recvall(conn->handle, *newbuffer, (size_t)ulength); - Py_END_ALLOW_THREADS - return res < 0 ? (Py_ssize_t)res : (Py_ssize_t)ulength; - } -} - -/* - * Check whether any data is available for reading -- neg timeout blocks - */ - -static int -conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save) -{ - int res; - fd_set rfds; - - /* - * Verify the handle, issue 3321. Not required for windows. - */ - #ifndef MS_WINDOWS - if (((int)conn->handle) < 0 || ((int)conn->handle) >= FD_SETSIZE) { - Py_BLOCK_THREADS - PyErr_SetString(PyExc_IOError, "handle out of range in select()"); - Py_UNBLOCK_THREADS - return MP_EXCEPTION_HAS_BEEN_SET; - } - #endif - - FD_ZERO(&rfds); - FD_SET((SOCKET)conn->handle, &rfds); - - if (timeout < 0.0) { - res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL); - } else { - struct timeval tv; - tv.tv_sec = (long)timeout; - tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5); - res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv); - } - - if (res < 0) { - return MP_SOCKET_ERROR; - } else if (FD_ISSET(conn->handle, &rfds)) { - return TRUE; - } else { - assert(res == 0); - return FALSE; - } -} - -/* - * "connection.h" defines the Connection type using defs above - */ - -#define CONNECTION_NAME "Connection" -#define CONNECTION_TYPE ConnectionType - -#include "connection.h" diff --git a/Modules/_multiprocessing/win32_functions.c b/Modules/_multiprocessing/win32_functions.c index 452d608..12dc0cd 100644 --- a/Modules/_multiprocessing/win32_functions.c +++ b/Modules/_multiprocessing/win32_functions.c @@ -215,6 +215,164 @@ win32_WaitNamedPipe(PyObject *self, PyObject *args) Py_RETURN_NONE; } +static PyObject * +win32_closesocket(PyObject *self, PyObject *args) +{ + HANDLE handle; + int ret; + + if (!PyArg_ParseTuple(args, F_HANDLE ":closesocket" , &handle)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + ret = closesocket((SOCKET) handle); + Py_END_ALLOW_THREADS + + if (ret) + return PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError()); + Py_RETURN_NONE; +} + +static PyObject * +win32_recv(PyObject *self, PyObject *args) +{ + HANDLE handle; + int size, nread; + PyObject *buf; + + if (!PyArg_ParseTuple(args, F_HANDLE "i:recv" , &handle, &size)) + return NULL; + + buf = PyBytes_FromStringAndSize(NULL, size); + if (!buf) + return NULL; + + Py_BEGIN_ALLOW_THREADS + nread = recv((SOCKET) handle, PyBytes_AS_STRING(buf), size, 0); + Py_END_ALLOW_THREADS + + if (nread < 0) { + Py_DECREF(buf); + return PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError()); + } + _PyBytes_Resize(&buf, nread); + return buf; +} + +static PyObject * +win32_send(PyObject *self, PyObject *args) +{ + HANDLE handle; + Py_buffer buf; + int ret; + + if (!PyArg_ParseTuple(args, F_HANDLE "y*:send" , &handle, &buf)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + ret = send((SOCKET) handle, buf.buf, buf.len, 0); + Py_END_ALLOW_THREADS + + PyBuffer_Release(&buf); + if (ret < 0) + return PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError()); + return PyLong_FromLong(ret); +} + +static PyObject * +win32_WriteFile(PyObject *self, PyObject *args) +{ + HANDLE handle; + Py_buffer buf; + int written; + BOOL ret; + + if (!PyArg_ParseTuple(args, F_HANDLE "y*:WriteFile" , &handle, &buf)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + ret = WriteFile(handle, buf.buf, buf.len, &written, NULL); + Py_END_ALLOW_THREADS + + PyBuffer_Release(&buf); + if (!ret) + return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); + return PyLong_FromLong(written); +} + +static PyObject * +win32_ReadFile(PyObject *self, PyObject *args) +{ + HANDLE handle; + int size; + DWORD nread; + PyObject *buf; + BOOL ret; + + if (!PyArg_ParseTuple(args, F_HANDLE "i:ReadFile" , &handle, &size)) + return NULL; + + buf = PyBytes_FromStringAndSize(NULL, size); + if (!buf) + return NULL; + + Py_BEGIN_ALLOW_THREADS + ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread, NULL); + Py_END_ALLOW_THREADS + + if (!ret && GetLastError() != ERROR_MORE_DATA) { + Py_DECREF(buf); + return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); + } + if (_PyBytes_Resize(&buf, nread)) + return NULL; + return Py_BuildValue("NN", buf, PyBool_FromLong(ret)); +} + +static PyObject * +win32_PeekNamedPipe(PyObject *self, PyObject *args) +{ + HANDLE handle; + int size = 0; + PyObject *buf = NULL; + DWORD nread, navail, nleft; + BOOL ret; + + if (!PyArg_ParseTuple(args, F_HANDLE "|i:PeekNamedPipe" , &handle, &size)) + return NULL; + + if (size < 0) { + PyErr_SetString(PyExc_ValueError, "negative size"); + return NULL; + } + + if (size) { + buf = PyBytes_FromStringAndSize(NULL, size); + if (!buf) + return NULL; + Py_BEGIN_ALLOW_THREADS + ret = PeekNamedPipe(handle, PyBytes_AS_STRING(buf), size, &nread, + &navail, &nleft); + Py_END_ALLOW_THREADS + if (!ret) { + Py_DECREF(buf); + return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); + } + if (_PyBytes_Resize(&buf, nread)) + return NULL; + return Py_BuildValue("Nii", buf, navail, nleft); + } + else { + Py_BEGIN_ALLOW_THREADS + ret = PeekNamedPipe(handle, NULL, 0, NULL, &navail, &nleft); + Py_END_ALLOW_THREADS + if (!ret) { + return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); + } + return Py_BuildValue("ii", navail, nleft); + } +} + static PyMethodDef win32_methods[] = { WIN32_FUNCTION(CloseHandle), WIN32_FUNCTION(GetLastError), @@ -223,8 +381,14 @@ static PyMethodDef win32_methods[] = { WIN32_FUNCTION(ConnectNamedPipe), WIN32_FUNCTION(CreateFile), WIN32_FUNCTION(CreateNamedPipe), + WIN32_FUNCTION(ReadFile), + WIN32_FUNCTION(PeekNamedPipe), WIN32_FUNCTION(SetNamedPipeHandleState), WIN32_FUNCTION(WaitNamedPipe), + WIN32_FUNCTION(WriteFile), + WIN32_FUNCTION(closesocket), + WIN32_FUNCTION(recv), + WIN32_FUNCTION(send), {NULL} }; @@ -244,6 +408,8 @@ create_win32_namespace(void) Py_INCREF(&Win32Type); WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS); + WIN32_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE); + WIN32_CONSTANT(F_DWORD, ERROR_NO_SYSTEM_RESOURCES); WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY); WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED); WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT); diff --git a/PC/VC6/_multiprocessing.dsp b/PC/VC6/_multiprocessing.dsp index 2dcaf83..e1b8470 100644 --- a/PC/VC6/_multiprocessing.dsp +++ b/PC/VC6/_multiprocessing.dsp @@ -97,18 +97,10 @@ SOURCE=..\..\Modules\_multiprocessing\multiprocessing.c # End Source File
# Begin Source File
-SOURCE=..\..\Modules\_multiprocessing\pipe_connection.c
-# End Source File
-# Begin Source File
-
SOURCE=..\..\Modules\_multiprocessing\semaphore.c
# End Source File
# Begin Source File
-SOURCE=..\..\Modules\_multiprocessing\socket_connection.c
-# End Source File
-# Begin Source File
-
SOURCE=..\..\Modules\_multiprocessing\win32_functions.c
# End Source File
# End Target
diff --git a/PC/VS8.0/_multiprocessing.vcproj b/PC/VS8.0/_multiprocessing.vcproj index c2bbec4..8f1cafc 100644 --- a/PC/VS8.0/_multiprocessing.vcproj +++ b/PC/VS8.0/_multiprocessing.vcproj @@ -522,10 +522,6 @@ RelativePath="..\..\Modules\_multiprocessing\multiprocessing.h"
>
</File>
- <File
- RelativePath="..\..\Modules\_multiprocessing\connection.h"
- >
- </File>
</Filter>
<Filter
Name="Source Files"
@@ -535,18 +531,10 @@ >
</File>
<File
- RelativePath="..\..\Modules\_multiprocessing\pipe_connection.c"
- >
- </File>
- <File
RelativePath="..\..\Modules\_multiprocessing\semaphore.c"
>
</File>
<File
- RelativePath="..\..\Modules\_multiprocessing\socket_connection.c"
- >
- </File>
- <File
RelativePath="..\..\Modules\_multiprocessing\win32_functions.c"
>
</File>
diff --git a/PCbuild/_multiprocessing.vcproj b/PCbuild/_multiprocessing.vcproj index 5d6337d..e9cd3a8 100644 --- a/PCbuild/_multiprocessing.vcproj +++ b/PCbuild/_multiprocessing.vcproj @@ -522,10 +522,6 @@ RelativePath="..\Modules\_multiprocessing\multiprocessing.h"
>
</File>
- <File
- RelativePath="..\Modules\_multiprocessing\connection.h"
- >
- </File>
</Filter>
<Filter
Name="Source Files"
@@ -535,18 +531,10 @@ >
</File>
<File
- RelativePath="..\Modules\_multiprocessing\pipe_connection.c"
- >
- </File>
- <File
RelativePath="..\Modules\_multiprocessing\semaphore.c"
>
</File>
<File
- RelativePath="..\Modules\_multiprocessing\socket_connection.c"
- >
- </File>
- <File
RelativePath="..\Modules\_multiprocessing\win32_functions.c"
>
</File>
@@ -1353,14 +1353,11 @@ class PyBuildExt(build_ext): if platform == 'win32': multiprocessing_srcs = [ '_multiprocessing/multiprocessing.c', '_multiprocessing/semaphore.c', - '_multiprocessing/pipe_connection.c', - '_multiprocessing/socket_connection.c', '_multiprocessing/win32_functions.c' ] else: multiprocessing_srcs = [ '_multiprocessing/multiprocessing.c', - '_multiprocessing/socket_connection.c' ] if (sysconfig.get_config_var('HAVE_SEM_OPEN') and not sysconfig.get_config_var('POSIX_SEMAPHORES_NOT_ENABLED')): |