summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2011-05-09 15:04:27 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2011-05-09 15:04:27 (GMT)
commit87cf220972c9cb400ddcd577962883dcc5dca51a (patch)
tree3f1ab5b64ae538a2ced622637cc7e4112b1c6ffd
parentdf77e3d4a07223ebfe049e66d4d8a8c0b4315e04 (diff)
downloadcpython-87cf220972c9cb400ddcd577962883dcc5dca51a.zip
cpython-87cf220972c9cb400ddcd577962883dcc5dca51a.tar.gz
cpython-87cf220972c9cb400ddcd577962883dcc5dca51a.tar.bz2
Issue #11743: Rewrite multiprocessing connection classes in pure Python.
-rw-r--r--Lib/multiprocessing/connection.py315
-rw-r--r--Lib/multiprocessing/forking.py5
-rw-r--r--Lib/multiprocessing/reduction.py13
-rw-r--r--Lib/test/test_multiprocessing.py12
-rw-r--r--Misc/NEWS2
-rw-r--r--Modules/_multiprocessing/connection.h527
-rw-r--r--Modules/_multiprocessing/multiprocessing.c23
-rw-r--r--Modules/_multiprocessing/multiprocessing.h24
-rw-r--r--Modules/_multiprocessing/pipe_connection.c149
-rw-r--r--Modules/_multiprocessing/socket_connection.c202
-rw-r--r--Modules/_multiprocessing/win32_functions.c166
-rw-r--r--PC/VC6/_multiprocessing.dsp8
-rw-r--r--PC/VS8.0/_multiprocessing.vcproj12
-rw-r--r--PCbuild/_multiprocessing.vcproj12
-rw-r--r--setup.py3
15 files changed, 490 insertions, 983 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index d6627e5..afd580b 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -34,19 +34,27 @@
__all__ = [ 'Client', 'Listener', 'Pipe' ]
+import io
import os
import sys
+import pickle
+import select
import socket
+import struct
import errno
import time
import tempfile
import itertools
import _multiprocessing
-from multiprocessing import current_process, AuthenticationError
+from multiprocessing import current_process, AuthenticationError, BufferTooShort
from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
-from multiprocessing.forking import duplicate, close
-
+try:
+ from _multiprocessing import win32
+except ImportError:
+ if sys.platform == 'win32':
+ raise
+ win32 = None
#
#
@@ -111,6 +119,281 @@ def address_type(address):
raise ValueError('address type of %r unrecognized' % address)
#
+# Connection classes
+#
+
+class _ConnectionBase:
+ _handle = None
+
+ def __init__(self, handle, readable=True, writable=True):
+ handle = handle.__index__()
+ if handle < 0:
+ raise ValueError("invalid handle")
+ if not readable and not writable:
+ raise ValueError(
+ "at least one of `readable` and `writable` must be True")
+ self._handle = handle
+ self._readable = readable
+ self._writable = writable
+
+ def __del__(self):
+ if self._handle is not None:
+ self._close()
+
+ def _check_closed(self):
+ if self._handle is None:
+ raise IOError("handle is closed")
+
+ def _check_readable(self):
+ if not self._readable:
+ raise IOError("connection is write-only")
+
+ def _check_writable(self):
+ if not self._writable:
+ raise IOError("connection is read-only")
+
+ def _bad_message_length(self):
+ if self._writable:
+ self._readable = False
+ else:
+ self.close()
+ raise IOError("bad message length")
+
+ @property
+ def closed(self):
+ """True if the connection is closed"""
+ return self._handle is None
+
+ @property
+ def readable(self):
+ """True if the connection is readable"""
+ return self._readable
+
+ @property
+ def writable(self):
+ """True if the connection is writable"""
+ return self._writable
+
+ def fileno(self):
+ """File descriptor or handle of the connection"""
+ self._check_closed()
+ return self._handle
+
+ def close(self):
+ """Close the connection"""
+ if self._handle is not None:
+ try:
+ self._close()
+ finally:
+ self._handle = None
+
+ def send_bytes(self, buf, offset=0, size=None):
+ """Send the bytes data from a bytes-like object"""
+ self._check_closed()
+ self._check_writable()
+ m = memoryview(buf)
+ # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
+ if m.itemsize > 1:
+ m = memoryview(bytes(m))
+ n = len(m)
+ if offset < 0:
+ raise ValueError("offset is negative")
+ if n < offset:
+ raise ValueError("buffer length < offset")
+ if size is None:
+ size = n - offset
+ elif size < 0:
+ raise ValueError("size is negative")
+ elif offset + size > n:
+ raise ValueError("buffer length < offset + size")
+ self._send_bytes(m[offset:offset + size])
+
+ def send(self, obj):
+ """Send a (picklable) object"""
+ self._check_closed()
+ self._check_writable()
+ buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
+ self._send_bytes(memoryview(buf))
+
+ def recv_bytes(self, maxlength=None):
+ """
+ Receive bytes data as a bytes object.
+ """
+ self._check_closed()
+ self._check_readable()
+ if maxlength is not None and maxlength < 0:
+ raise ValueError("negative maxlength")
+ buf = self._recv_bytes(maxlength)
+ if buf is None:
+ self._bad_message_length()
+ return buf.getvalue()
+
+ def recv_bytes_into(self, buf, offset=0):
+ """
+ Receive bytes data into a writeable buffer-like object.
+ Return the number of bytes read.
+ """
+ self._check_closed()
+ self._check_readable()
+ with memoryview(buf) as m:
+ # Get bytesize of arbitrary buffer
+ itemsize = m.itemsize
+ bytesize = itemsize * len(m)
+ if offset < 0:
+ raise ValueError("negative offset")
+ elif offset > bytesize:
+ raise ValueError("offset too large")
+ result = self._recv_bytes()
+ size = result.tell()
+ if bytesize < offset + size:
+ raise BufferTooShort(result.getvalue())
+ # Message can fit in dest
+ result.seek(0)
+ result.readinto(m[offset // itemsize :
+ (offset + size) // itemsize])
+ return size
+
+ def recv(self):
+ """Receive a (picklable) object"""
+ self._check_closed()
+ self._check_readable()
+ buf = self._recv_bytes()
+ return pickle.loads(buf.getbuffer())
+
+ def poll(self, timeout=0.0):
+ """Whether there is any input available to be read"""
+ self._check_closed()
+ self._check_readable()
+ if timeout < 0.0:
+ timeout = None
+ return self._poll(timeout)
+
+
+if win32:
+
+ class PipeConnection(_ConnectionBase):
+ """
+ Connection class based on a Windows named pipe.
+ """
+
+ def _close(self):
+ win32.CloseHandle(self._handle)
+
+ def _send_bytes(self, buf):
+ nwritten = win32.WriteFile(self._handle, buf)
+ assert nwritten == len(buf)
+
+ def _recv_bytes(self, maxsize=None):
+ buf = io.BytesIO()
+ bufsize = 512
+ if maxsize is not None:
+ bufsize = min(bufsize, maxsize)
+ try:
+ firstchunk, complete = win32.ReadFile(self._handle, bufsize)
+ except IOError as e:
+ if e.errno == win32.ERROR_BROKEN_PIPE:
+ raise EOFError
+ raise
+ lenfirstchunk = len(firstchunk)
+ buf.write(firstchunk)
+ if complete:
+ return buf
+ navail, nleft = win32.PeekNamedPipe(self._handle)
+ if maxsize is not None and lenfirstchunk + nleft > maxsize:
+ return None
+ lastchunk, complete = win32.ReadFile(self._handle, nleft)
+ assert complete
+ buf.write(lastchunk)
+ return buf
+
+ def _poll(self, timeout):
+ navail, nleft = win32.PeekNamedPipe(self._handle)
+ if navail > 0:
+ return True
+ elif timeout == 0.0:
+ return False
+ # Setup a polling loop (translated straight from old
+ # pipe_connection.c)
+ if timeout < 0.0:
+ deadline = None
+ else:
+ deadline = time.time() + timeout
+ delay = 0.001
+ max_delay = 0.02
+ while True:
+ time.sleep(delay)
+ navail, nleft = win32.PeekNamedPipe(self._handle)
+ if navail > 0:
+ return True
+ if deadline and time.time() > deadline:
+ return False
+ if delay < max_delay:
+ delay += 0.001
+
+
+class Connection(_ConnectionBase):
+ """
+ Connection class based on an arbitrary file descriptor (Unix only), or
+ a socket handle (Windows).
+ """
+
+ if win32:
+ def _close(self):
+ win32.closesocket(self._handle)
+ _write = win32.send
+ _read = win32.recv
+ else:
+ def _close(self):
+ os.close(self._handle)
+ _write = os.write
+ _read = os.read
+
+ def _send(self, buf, write=_write):
+ remaining = len(buf)
+ while True:
+ n = write(self._handle, buf)
+ remaining -= n
+ if remaining == 0:
+ break
+ buf = buf[n:]
+
+ def _recv(self, size, read=_read):
+ buf = io.BytesIO()
+ remaining = size
+ while remaining > 0:
+ chunk = read(self._handle, remaining)
+ n = len(chunk)
+ if n == 0:
+ if remaining == size:
+ raise EOFError
+ else:
+ raise IOError("got end of file during message")
+ buf.write(chunk)
+ remaining -= n
+ return buf
+
+ def _send_bytes(self, buf):
+ # For wire compatibility with 3.2 and lower
+ n = len(buf)
+ self._send(struct.pack("=i", len(buf)))
+ # The condition is necessary to avoid "broken pipe" errors
+ # when sending a 0-length buffer if the other end closed the pipe.
+ if n > 0:
+ self._send(buf)
+
+ def _recv_bytes(self, maxsize=None):
+ buf = self._recv(4)
+ size, = struct.unpack("=i", buf.getvalue())
+ if maxsize is not None and size > maxsize:
+ return None
+ return self._recv(size)
+
+ def _poll(self, timeout):
+ r = select.select([self._handle], [], [], timeout)[0]
+ return bool(r)
+
+
+#
# Public functions
#
@@ -186,21 +469,19 @@ if sys.platform != 'win32':
'''
if duplex:
s1, s2 = socket.socketpair()
- c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
- c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
+ c1 = Connection(os.dup(s1.fileno()))
+ c2 = Connection(os.dup(s2.fileno()))
s1.close()
s2.close()
else:
fd1, fd2 = os.pipe()
- c1 = _multiprocessing.Connection(fd1, writable=False)
- c2 = _multiprocessing.Connection(fd2, readable=False)
+ c1 = Connection(fd1, writable=False)
+ c2 = Connection(fd2, readable=False)
return c1, c2
else:
- from _multiprocessing import win32
-
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
@@ -234,8 +515,8 @@ else:
if e.args[0] != win32.ERROR_PIPE_CONNECTED:
raise
- c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
- c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
+ c1 = PipeConnection(h1, writable=duplex)
+ c2 = PipeConnection(h2, readable=duplex)
return c1, c2
@@ -266,7 +547,7 @@ class SocketListener(object):
def accept(self):
s, self._last_accepted = self._socket.accept()
fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
+ conn = Connection(fd)
s.close()
return conn
@@ -298,7 +579,7 @@ def SocketClient(address):
raise
fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
+ conn = Connection(fd)
return conn
#
@@ -345,7 +626,7 @@ if sys.platform == 'win32':
except WindowsError as e:
if e.args[0] != win32.ERROR_PIPE_CONNECTED:
raise
- return _multiprocessing.PipeConnection(handle)
+ return PipeConnection(handle)
@staticmethod
def _finalize_pipe_listener(queue, address):
@@ -377,7 +658,7 @@ if sys.platform == 'win32':
win32.SetNamedPipeHandleState(
h, win32.PIPE_READMODE_MESSAGE, None, None
)
- return _multiprocessing.PipeConnection(h)
+ return PipeConnection(h)
#
# Authentication stuff
@@ -451,3 +732,7 @@ def XmlClient(*args, **kwds):
global xmlrpclib
import xmlrpc.client as xmlrpclib
return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
+
+
+# Late import because of circular import
+from multiprocessing.forking import duplicate, close
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
index cc7c326..3d95557 100644
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -183,7 +183,7 @@ else:
import time
from pickle import dump, load, HIGHEST_PROTOCOL
- from _multiprocessing import win32, Connection, PipeConnection
+ from _multiprocessing import win32
from .util import Finalize
def dump(obj, file, protocol=None):
@@ -411,6 +411,9 @@ else:
# Make (Pipe)Connection picklable
#
+ # Late import because of circular import
+ from .connection import Connection, PipeConnection
+
def reduce_connection(conn):
if not Popen.thread_is_spawning():
raise RuntimeError(
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index 6e5e5bc..b32c725 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -44,7 +44,7 @@ import _multiprocessing
from multiprocessing import current_process
from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
from multiprocessing.util import register_after_fork, debug, sub_debug
-from multiprocessing.connection import Client, Listener
+from multiprocessing.connection import Client, Listener, Connection
#
@@ -159,7 +159,7 @@ def rebuild_handle(pickled_data):
return new_handle
#
-# Register `_multiprocessing.Connection` with `ForkingPickler`
+# Register `Connection` with `ForkingPickler`
#
def reduce_connection(conn):
@@ -168,11 +168,11 @@ def reduce_connection(conn):
def rebuild_connection(reduced_handle, readable, writable):
handle = rebuild_handle(reduced_handle)
- return _multiprocessing.Connection(
+ return Connection(
handle, readable=readable, writable=writable
)
-ForkingPickler.register(_multiprocessing.Connection, reduce_connection)
+ForkingPickler.register(Connection, reduce_connection)
#
# Register `socket.socket` with `ForkingPickler`
@@ -201,6 +201,7 @@ ForkingPickler.register(socket.socket, reduce_socket)
#
if sys.platform == 'win32':
+ from multiprocessing.connection import PipeConnection
def reduce_pipe_connection(conn):
rh = reduce_handle(conn.fileno())
@@ -208,8 +209,8 @@ if sys.platform == 'win32':
def rebuild_pipe_connection(reduced_handle, readable, writable):
handle = rebuild_handle(reduced_handle)
- return _multiprocessing.PipeConnection(
+ return PipeConnection(
handle, readable=readable, writable=writable
)
- ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection)
+ ForkingPickler.register(PipeConnection, reduce_pipe_connection)
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index a7f0391..0c05ff6 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1915,9 +1915,15 @@ class TestInvalidHandle(unittest.TestCase):
@unittest.skipIf(WIN32, "skipped on Windows")
def test_invalid_handles(self):
- conn = _multiprocessing.Connection(44977608)
- self.assertRaises(IOError, conn.poll)
- self.assertRaises(IOError, _multiprocessing.Connection, -1)
+ conn = multiprocessing.connection.Connection(44977608)
+ try:
+ self.assertRaises((ValueError, IOError), conn.poll)
+ finally:
+ # Hack private attribute _handle to avoid printing an error
+ # in conn.__del__
+ conn._handle = None
+ self.assertRaises((ValueError, IOError),
+ multiprocessing.connection.Connection, -1)
#
# Functions used to create test cases from the base ones in this module
diff --git a/Misc/NEWS b/Misc/NEWS
index 36a6cb5..18a2a55 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -140,6 +140,8 @@ Core and Builtins
Library
-------
+- Issue #11743: Rewrite multiprocessing connection classes in pure Python.
+
- Issue #11164: Stop trying to use _xmlplus in the xml module.
- Issue #11888: Add log2 function to math module. Patch written by Mark
diff --git a/Modules/_multiprocessing/connection.h b/Modules/_multiprocessing/connection.h
deleted file mode 100644
index 002d5aa..0000000
--- a/Modules/_multiprocessing/connection.h
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * Definition of a `Connection` type.
- * Used by `socket_connection.c` and `pipe_connection.c`.
- *
- * connection.h
- *
- * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
- */
-
-#ifndef CONNECTION_H
-#define CONNECTION_H
-
-/*
- * Read/write flags
- */
-
-#define READABLE 1
-#define WRITABLE 2
-
-#define CHECK_READABLE(self) \
- if (!(self->flags & READABLE)) { \
- PyErr_SetString(PyExc_IOError, "connection is write-only"); \
- return NULL; \
- }
-
-#define CHECK_WRITABLE(self) \
- if (!(self->flags & WRITABLE)) { \
- PyErr_SetString(PyExc_IOError, "connection is read-only"); \
- return NULL; \
- }
-
-/*
- * Allocation and deallocation
- */
-
-static PyObject *
-connection_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
-{
- ConnectionObject *self;
- HANDLE handle;
- BOOL readable = TRUE, writable = TRUE;
-
- static char *kwlist[] = {"handle", "readable", "writable", NULL};
-
- if (!PyArg_ParseTupleAndKeywords(args, kwds, F_HANDLE "|ii", kwlist,
- &handle, &readable, &writable))
- return NULL;
-
- if (handle == INVALID_HANDLE_VALUE || (Py_ssize_t)handle < 0) {
- PyErr_Format(PyExc_IOError, "invalid handle %zd",
- (Py_ssize_t)handle);
- return NULL;
- }
-
- if (!readable && !writable) {
- PyErr_SetString(PyExc_ValueError,
- "either readable or writable must be true");
- return NULL;
- }
-
- self = PyObject_New(ConnectionObject, type);
- if (self == NULL)
- return NULL;
-
- self->weakreflist = NULL;
- self->handle = handle;
- self->flags = 0;
-
- if (readable)
- self->flags |= READABLE;
- if (writable)
- self->flags |= WRITABLE;
- assert(self->flags >= 1 && self->flags <= 3);
-
- return (PyObject*)self;
-}
-
-static void
-connection_dealloc(ConnectionObject* self)
-{
- if (self->weakreflist != NULL)
- PyObject_ClearWeakRefs((PyObject*)self);
-
- if (self->handle != INVALID_HANDLE_VALUE) {
- Py_BEGIN_ALLOW_THREADS
- CLOSE(self->handle);
- Py_END_ALLOW_THREADS
- }
- PyObject_Del(self);
-}
-
-/*
- * Functions for transferring buffers
- */
-
-static PyObject *
-connection_sendbytes(ConnectionObject *self, PyObject *args)
-{
- Py_buffer pbuffer;
- char *buffer;
- Py_ssize_t length, offset=0, size=PY_SSIZE_T_MIN;
- int res;
-
- if (!PyArg_ParseTuple(args, F_RBUFFER "*|" F_PY_SSIZE_T F_PY_SSIZE_T,
- &pbuffer, &offset, &size))
- return NULL;
- buffer = pbuffer.buf;
- length = pbuffer.len;
-
- CHECK_WRITABLE(self); /* XXX release buffer in case of failure */
-
- if (offset < 0) {
- PyBuffer_Release(&pbuffer);
- PyErr_SetString(PyExc_ValueError, "offset is negative");
- return NULL;
- }
- if (length < offset) {
- PyBuffer_Release(&pbuffer);
- PyErr_SetString(PyExc_ValueError, "buffer length < offset");
- return NULL;
- }
-
- if (size == PY_SSIZE_T_MIN) {
- size = length - offset;
- } else {
- if (size < 0) {
- PyBuffer_Release(&pbuffer);
- PyErr_SetString(PyExc_ValueError, "size is negative");
- return NULL;
- }
- if (offset + size > length) {
- PyBuffer_Release(&pbuffer);
- PyErr_SetString(PyExc_ValueError,
- "buffer length < offset + size");
- return NULL;
- }
- }
-
- res = conn_send_string(self, buffer + offset, size);
-
- PyBuffer_Release(&pbuffer);
- if (res < 0) {
- if (PyErr_Occurred())
- return NULL;
- else
- return mp_SetError(PyExc_IOError, res);
- }
-
- Py_RETURN_NONE;
-}
-
-static PyObject *
-connection_recvbytes(ConnectionObject *self, PyObject *args)
-{
- char *freeme = NULL;
- Py_ssize_t res, maxlength = PY_SSIZE_T_MAX;
- PyObject *result = NULL;
-
- if (!PyArg_ParseTuple(args, "|" F_PY_SSIZE_T, &maxlength))
- return NULL;
-
- CHECK_READABLE(self);
-
- if (maxlength < 0) {
- PyErr_SetString(PyExc_ValueError, "maxlength < 0");
- return NULL;
- }
-
- res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE,
- &freeme, maxlength);
-
- if (res < 0) {
- if (res == MP_BAD_MESSAGE_LENGTH) {
- if ((self->flags & WRITABLE) == 0) {
- Py_BEGIN_ALLOW_THREADS
- CLOSE(self->handle);
- Py_END_ALLOW_THREADS
- self->handle = INVALID_HANDLE_VALUE;
- } else {
- self->flags = WRITABLE;
- }
- }
- mp_SetError(PyExc_IOError, res);
- } else {
- if (freeme == NULL) {
- result = PyBytes_FromStringAndSize(self->buffer, res);
- } else {
- result = PyBytes_FromStringAndSize(freeme, res);
- PyMem_Free(freeme);
- }
- }
-
- return result;
-}
-
-static PyObject *
-connection_recvbytes_into(ConnectionObject *self, PyObject *args)
-{
- char *freeme = NULL, *buffer = NULL;
- Py_ssize_t res, length, offset = 0;
- PyObject *result = NULL;
- Py_buffer pbuf;
-
- CHECK_READABLE(self);
-
- if (!PyArg_ParseTuple(args, "w*|" F_PY_SSIZE_T,
- &pbuf, &offset))
- return NULL;
-
- buffer = pbuf.buf;
- length = pbuf.len;
-
- if (offset < 0) {
- PyErr_SetString(PyExc_ValueError, "negative offset");
- goto _error;
- }
-
- if (offset > length) {
- PyErr_SetString(PyExc_ValueError, "offset too large");
- goto _error;
- }
-
- res = conn_recv_string(self, buffer+offset, length-offset,
- &freeme, PY_SSIZE_T_MAX);
-
- if (res < 0) {
- if (res == MP_BAD_MESSAGE_LENGTH) {
- if ((self->flags & WRITABLE) == 0) {
- Py_BEGIN_ALLOW_THREADS
- CLOSE(self->handle);
- Py_END_ALLOW_THREADS
- self->handle = INVALID_HANDLE_VALUE;
- } else {
- self->flags = WRITABLE;
- }
- }
- mp_SetError(PyExc_IOError, res);
- } else {
- if (freeme == NULL) {
- result = PyInt_FromSsize_t(res);
- } else {
- result = PyObject_CallFunction(BufferTooShort,
- F_RBUFFER "#",
- freeme, res);
- PyMem_Free(freeme);
- if (result) {
- PyErr_SetObject(BufferTooShort, result);
- Py_DECREF(result);
- }
- goto _error;
- }
- }
-
-_cleanup:
- PyBuffer_Release(&pbuf);
- return result;
-
-_error:
- result = NULL;
- goto _cleanup;
-}
-
-/*
- * Functions for transferring objects
- */
-
-static PyObject *
-connection_send_obj(ConnectionObject *self, PyObject *obj)
-{
- char *buffer;
- int res;
- Py_ssize_t length;
- PyObject *pickled_string = NULL;
-
- CHECK_WRITABLE(self);
-
- pickled_string = PyObject_CallFunctionObjArgs(pickle_dumps, obj,
- pickle_protocol, NULL);
- if (!pickled_string)
- goto failure;
-
- if (PyBytes_AsStringAndSize(pickled_string, &buffer, &length) < 0)
- goto failure;
-
- res = conn_send_string(self, buffer, (int)length);
-
- if (res < 0) {
- mp_SetError(PyExc_IOError, res);
- goto failure;
- }
-
- Py_XDECREF(pickled_string);
- Py_RETURN_NONE;
-
- failure:
- Py_XDECREF(pickled_string);
- return NULL;
-}
-
-static PyObject *
-connection_recv_obj(ConnectionObject *self)
-{
- char *freeme = NULL;
- Py_ssize_t res;
- PyObject *temp = NULL, *result = NULL;
-
- CHECK_READABLE(self);
-
- res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE,
- &freeme, PY_SSIZE_T_MAX);
-
- if (res < 0) {
- if (res == MP_BAD_MESSAGE_LENGTH) {
- if ((self->flags & WRITABLE) == 0) {
- Py_BEGIN_ALLOW_THREADS
- CLOSE(self->handle);
- Py_END_ALLOW_THREADS
- self->handle = INVALID_HANDLE_VALUE;
- } else {
- self->flags = WRITABLE;
- }
- }
- mp_SetError(PyExc_IOError, res);
- } else {
- if (freeme == NULL) {
- temp = PyBytes_FromStringAndSize(self->buffer, res);
- } else {
- temp = PyBytes_FromStringAndSize(freeme, res);
- PyMem_Free(freeme);
- }
- }
-
- if (temp)
- result = PyObject_CallFunctionObjArgs(pickle_loads,
- temp, NULL);
- Py_XDECREF(temp);
- return result;
-}
-
-/*
- * Other functions
- */
-
-static PyObject *
-connection_poll(ConnectionObject *self, PyObject *args)
-{
- PyObject *timeout_obj = NULL;
- double timeout = 0.0;
- int res;
-
- CHECK_READABLE(self);
-
- if (!PyArg_ParseTuple(args, "|O", &timeout_obj))
- return NULL;
-
- if (timeout_obj == NULL) {
- timeout = 0.0;
- } else if (timeout_obj == Py_None) {
- timeout = -1.0; /* block forever */
- } else {
- timeout = PyFloat_AsDouble(timeout_obj);
- if (PyErr_Occurred())
- return NULL;
- if (timeout < 0.0)
- timeout = 0.0;
- }
-
- Py_BEGIN_ALLOW_THREADS
- res = conn_poll(self, timeout, _save);
- Py_END_ALLOW_THREADS
-
- switch (res) {
- case TRUE:
- Py_RETURN_TRUE;
- case FALSE:
- Py_RETURN_FALSE;
- default:
- return mp_SetError(PyExc_IOError, res);
- }
-}
-
-static PyObject *
-connection_fileno(ConnectionObject* self)
-{
- if (self->handle == INVALID_HANDLE_VALUE) {
- PyErr_SetString(PyExc_IOError, "handle is invalid");
- return NULL;
- }
- return PyInt_FromLong((long)self->handle);
-}
-
-static PyObject *
-connection_close(ConnectionObject *self)
-{
- if (self->handle != INVALID_HANDLE_VALUE) {
- Py_BEGIN_ALLOW_THREADS
- CLOSE(self->handle);
- Py_END_ALLOW_THREADS
- self->handle = INVALID_HANDLE_VALUE;
- }
-
- Py_RETURN_NONE;
-}
-
-static PyObject *
-connection_repr(ConnectionObject *self)
-{
- static char *conn_type[] = {"read-only", "write-only", "read-write"};
-
- assert(self->flags >= 1 && self->flags <= 3);
- return FROM_FORMAT("<%s %s, handle %zd>",
- conn_type[self->flags - 1],
- CONNECTION_NAME, (Py_ssize_t)self->handle);
-}
-
-/*
- * Getters and setters
- */
-
-static PyObject *
-connection_closed(ConnectionObject *self, void *closure)
-{
- return PyBool_FromLong((long)(self->handle == INVALID_HANDLE_VALUE));
-}
-
-static PyObject *
-connection_readable(ConnectionObject *self, void *closure)
-{
- return PyBool_FromLong((long)(self->flags & READABLE));
-}
-
-static PyObject *
-connection_writable(ConnectionObject *self, void *closure)
-{
- return PyBool_FromLong((long)(self->flags & WRITABLE));
-}
-
-/*
- * Tables
- */
-
-static PyMethodDef connection_methods[] = {
- {"send_bytes", (PyCFunction)connection_sendbytes, METH_VARARGS,
- "send the byte data from a readable buffer-like object"},
- {"recv_bytes", (PyCFunction)connection_recvbytes, METH_VARARGS,
- "receive byte data as a string"},
- {"recv_bytes_into",(PyCFunction)connection_recvbytes_into,METH_VARARGS,
- "receive byte data into a writeable buffer-like object\n"
- "returns the number of bytes read"},
-
- {"send", (PyCFunction)connection_send_obj, METH_O,
- "send a (picklable) object"},
- {"recv", (PyCFunction)connection_recv_obj, METH_NOARGS,
- "receive a (picklable) object"},
-
- {"poll", (PyCFunction)connection_poll, METH_VARARGS,
- "whether there is any input available to be read"},
- {"fileno", (PyCFunction)connection_fileno, METH_NOARGS,
- "file descriptor or handle of the connection"},
- {"close", (PyCFunction)connection_close, METH_NOARGS,
- "close the connection"},
-
- {NULL} /* Sentinel */
-};
-
-static PyGetSetDef connection_getset[] = {
- {"closed", (getter)connection_closed, NULL,
- "True if the connection is closed", NULL},
- {"readable", (getter)connection_readable, NULL,
- "True if the connection is readable", NULL},
- {"writable", (getter)connection_writable, NULL,
- "True if the connection is writable", NULL},
- {NULL}
-};
-
-/*
- * Connection type
- */
-
-PyDoc_STRVAR(connection_doc,
- "Connection type whose constructor signature is\n\n"
- " Connection(handle, readable=True, writable=True).\n\n"
- "The constructor does *not* duplicate the handle.");
-
-PyTypeObject CONNECTION_TYPE = {
- PyVarObject_HEAD_INIT(NULL, 0)
- /* tp_name */ "_multiprocessing." CONNECTION_NAME,
- /* tp_basicsize */ sizeof(ConnectionObject),
- /* tp_itemsize */ 0,
- /* tp_dealloc */ (destructor)connection_dealloc,
- /* tp_print */ 0,
- /* tp_getattr */ 0,
- /* tp_setattr */ 0,
- /* tp_reserved */ 0,
- /* tp_repr */ (reprfunc)connection_repr,
- /* tp_as_number */ 0,
- /* tp_as_sequence */ 0,
- /* tp_as_mapping */ 0,
- /* tp_hash */ 0,
- /* tp_call */ 0,
- /* tp_str */ 0,
- /* tp_getattro */ 0,
- /* tp_setattro */ 0,
- /* tp_as_buffer */ 0,
- /* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
- Py_TPFLAGS_HAVE_WEAKREFS,
- /* tp_doc */ connection_doc,
- /* tp_traverse */ 0,
- /* tp_clear */ 0,
- /* tp_richcompare */ 0,
- /* tp_weaklistoffset */ offsetof(ConnectionObject, weakreflist),
- /* tp_iter */ 0,
- /* tp_iternext */ 0,
- /* tp_methods */ connection_methods,
- /* tp_members */ 0,
- /* tp_getset */ connection_getset,
- /* tp_base */ 0,
- /* tp_dict */ 0,
- /* tp_descr_get */ 0,
- /* tp_descr_set */ 0,
- /* tp_dictoffset */ 0,
- /* tp_init */ 0,
- /* tp_alloc */ 0,
- /* tp_new */ connection_new,
-};
-
-#endif /* CONNECTION_H */
diff --git a/Modules/_multiprocessing/multiprocessing.c b/Modules/_multiprocessing/multiprocessing.c
index 7c4f52d..5685517 100644
--- a/Modules/_multiprocessing/multiprocessing.c
+++ b/Modules/_multiprocessing/multiprocessing.c
@@ -49,16 +49,6 @@ mp_SetError(PyObject *Type, int num)
case MP_MEMORY_ERROR:
PyErr_NoMemory();
break;
- case MP_END_OF_FILE:
- PyErr_SetNone(PyExc_EOFError);
- break;
- case MP_EARLY_END_OF_FILE:
- PyErr_SetString(PyExc_IOError,
- "got end of file during message");
- break;
- case MP_BAD_MESSAGE_LENGTH:
- PyErr_SetString(PyExc_IOError, "bad message length");
- break;
case MP_EXCEPTION_HAS_BEEN_SET:
break;
default:
@@ -257,12 +247,6 @@ PyInit__multiprocessing(void)
BufferTooShort = PyObject_GetAttrString(temp, "BufferTooShort");
Py_XDECREF(temp);
- /* Add connection type to module */
- if (PyType_Ready(&ConnectionType) < 0)
- return NULL;
- Py_INCREF(&ConnectionType);
- PyModule_AddObject(module, "Connection", (PyObject*)&ConnectionType);
-
#if defined(MS_WINDOWS) || \
(defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED))
/* Add SemLock type to module */
@@ -286,13 +270,6 @@ PyInit__multiprocessing(void)
#endif
#ifdef MS_WINDOWS
- /* Add PipeConnection to module */
- if (PyType_Ready(&PipeConnectionType) < 0)
- return NULL;
- Py_INCREF(&PipeConnectionType);
- PyModule_AddObject(module, "PipeConnection",
- (PyObject*)&PipeConnectionType);
-
/* Initialize win32 class and add to multiprocessing */
temp = create_win32_namespace();
if (!temp)
diff --git a/Modules/_multiprocessing/multiprocessing.h b/Modules/_multiprocessing/multiprocessing.h
index b9917c3..02796c1 100644
--- a/Modules/_multiprocessing/multiprocessing.h
+++ b/Modules/_multiprocessing/multiprocessing.h
@@ -118,11 +118,8 @@
#define MP_SUCCESS (0)
#define MP_STANDARD_ERROR (-1)
#define MP_MEMORY_ERROR (-1001)
-#define MP_END_OF_FILE (-1002)
-#define MP_EARLY_END_OF_FILE (-1003)
-#define MP_BAD_MESSAGE_LENGTH (-1004)
-#define MP_SOCKET_ERROR (-1005)
-#define MP_EXCEPTION_HAS_BEEN_SET (-1006)
+#define MP_SOCKET_ERROR (-1002)
+#define MP_EXCEPTION_HAS_BEEN_SET (-1003)
PyObject *mp_SetError(PyObject *Type, int num);
@@ -135,7 +132,6 @@ extern PyObject *pickle_loads;
extern PyObject *pickle_protocol;
extern PyObject *BufferTooShort;
extern PyTypeObject SemLockType;
-extern PyTypeObject ConnectionType;
extern PyTypeObject PipeConnectionType;
extern HANDLE sigint_event;
@@ -162,25 +158,9 @@ extern HANDLE sigint_event;
#endif
/*
- * Connection definition
- */
-
-#define CONNECTION_BUFFER_SIZE 1024
-
-typedef struct {
- PyObject_HEAD
- HANDLE handle;
- int flags;
- PyObject *weakreflist;
- char buffer[CONNECTION_BUFFER_SIZE];
-} ConnectionObject;
-
-/*
* Miscellaneous
*/
-#define MAX_MESSAGE_LENGTH 0x7fffffff
-
#ifndef MIN
# define MIN(x, y) ((x) < (y) ? x : y)
# define MAX(x, y) ((x) > (y) ? x : y)
diff --git a/Modules/_multiprocessing/pipe_connection.c b/Modules/_multiprocessing/pipe_connection.c
deleted file mode 100644
index 05dde0c..0000000
--- a/Modules/_multiprocessing/pipe_connection.c
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * A type which wraps a pipe handle in message oriented mode
- *
- * pipe_connection.c
- *
- * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
- */
-
-#include "multiprocessing.h"
-
-#define CLOSE(h) CloseHandle(h)
-
-/*
- * Send string to the pipe; assumes in message oriented mode
- */
-
-static Py_ssize_t
-conn_send_string(ConnectionObject *conn, char *string, size_t length)
-{
- DWORD amount_written;
- BOOL ret;
-
- Py_BEGIN_ALLOW_THREADS
- ret = WriteFile(conn->handle, string, length, &amount_written, NULL);
- Py_END_ALLOW_THREADS
-
- if (ret == 0 && GetLastError() == ERROR_NO_SYSTEM_RESOURCES) {
- PyErr_Format(PyExc_ValueError, "Cannnot send %" PY_FORMAT_SIZE_T "d bytes over connection", length);
- return MP_STANDARD_ERROR;
- }
-
- return ret ? MP_SUCCESS : MP_STANDARD_ERROR;
-}
-
-/*
- * Attempts to read into buffer, or if buffer too small into *newbuffer.
- *
- * Returns number of bytes read. Assumes in message oriented mode.
- */
-
-static Py_ssize_t
-conn_recv_string(ConnectionObject *conn, char *buffer,
- size_t buflength, char **newbuffer, size_t maxlength)
-{
- DWORD left, length, full_length, err;
- BOOL ret;
- *newbuffer = NULL;
-
- Py_BEGIN_ALLOW_THREADS
- ret = ReadFile(conn->handle, buffer, MIN(buflength, maxlength),
- &length, NULL);
- Py_END_ALLOW_THREADS
- if (ret)
- return length;
-
- err = GetLastError();
- if (err != ERROR_MORE_DATA) {
- if (err == ERROR_BROKEN_PIPE)
- return MP_END_OF_FILE;
- return MP_STANDARD_ERROR;
- }
-
- if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, NULL, &left))
- return MP_STANDARD_ERROR;
-
- full_length = length + left;
- if (full_length > maxlength)
- return MP_BAD_MESSAGE_LENGTH;
-
- *newbuffer = PyMem_Malloc(full_length);
- if (*newbuffer == NULL)
- return MP_MEMORY_ERROR;
-
- memcpy(*newbuffer, buffer, length);
-
- Py_BEGIN_ALLOW_THREADS
- ret = ReadFile(conn->handle, *newbuffer+length, left, &length, NULL);
- Py_END_ALLOW_THREADS
- if (ret) {
- assert(length == left);
- return full_length;
- } else {
- PyMem_Free(*newbuffer);
- return MP_STANDARD_ERROR;
- }
-}
-
-/*
- * Check whether any data is available for reading
- */
-
-static int
-conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save)
-{
- DWORD bytes, deadline, delay;
- int difference, res;
- BOOL block = FALSE;
-
- if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
- return MP_STANDARD_ERROR;
-
- if (timeout == 0.0)
- return bytes > 0;
-
- if (timeout < 0.0)
- block = TRUE;
- else
- /* XXX does not check for overflow */
- deadline = GetTickCount() + (DWORD)(1000 * timeout + 0.5);
-
- Sleep(0);
-
- for (delay = 1 ; ; delay += 1) {
- if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
- return MP_STANDARD_ERROR;
- else if (bytes > 0)
- return TRUE;
-
- if (!block) {
- difference = deadline - GetTickCount();
- if (difference < 0)
- return FALSE;
- if ((int)delay > difference)
- delay = difference;
- }
-
- if (delay > 20)
- delay = 20;
-
- Sleep(delay);
-
- /* check for signals */
- Py_BLOCK_THREADS
- res = PyErr_CheckSignals();
- Py_UNBLOCK_THREADS
-
- if (res)
- return MP_EXCEPTION_HAS_BEEN_SET;
- }
-}
-
-/*
- * "connection.h" defines the PipeConnection type using the definitions above
- */
-
-#define CONNECTION_NAME "PipeConnection"
-#define CONNECTION_TYPE PipeConnectionType
-
-#include "connection.h"
diff --git a/Modules/_multiprocessing/socket_connection.c b/Modules/_multiprocessing/socket_connection.c
deleted file mode 100644
index 7ebf338..0000000
--- a/Modules/_multiprocessing/socket_connection.c
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * A type which wraps a socket
- *
- * socket_connection.c
- *
- * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
- */
-
-#include "multiprocessing.h"
-
-#ifdef MS_WINDOWS
-# define WRITE(h, buffer, length) send((SOCKET)h, buffer, length, 0)
-# define READ(h, buffer, length) recv((SOCKET)h, buffer, length, 0)
-# define CLOSE(h) closesocket((SOCKET)h)
-#else
-# define WRITE(h, buffer, length) write(h, buffer, length)
-# define READ(h, buffer, length) read(h, buffer, length)
-# define CLOSE(h) close(h)
-#endif
-
-/*
- * Send string to file descriptor
- */
-
-static Py_ssize_t
-_conn_sendall(HANDLE h, char *string, size_t length)
-{
- char *p = string;
- Py_ssize_t res;
-
- while (length > 0) {
- res = WRITE(h, p, length);
- if (res < 0)
- return MP_SOCKET_ERROR;
- length -= res;
- p += res;
- }
-
- return MP_SUCCESS;
-}
-
-/*
- * Receive string of exact length from file descriptor
- */
-
-static Py_ssize_t
-_conn_recvall(HANDLE h, char *buffer, size_t length)
-{
- size_t remaining = length;
- Py_ssize_t temp;
- char *p = buffer;
-
- while (remaining > 0) {
- temp = READ(h, p, remaining);
- if (temp <= 0) {
- if (temp == 0)
- return remaining == length ?
- MP_END_OF_FILE : MP_EARLY_END_OF_FILE;
- else
- return temp;
- }
- remaining -= temp;
- p += temp;
- }
-
- return MP_SUCCESS;
-}
-
-/*
- * Send a string prepended by the string length in network byte order
- */
-
-static Py_ssize_t
-conn_send_string(ConnectionObject *conn, char *string, size_t length)
-{
- Py_ssize_t res;
- /* The "header" of the message is a 32 bit unsigned number (in
- network order) which specifies the length of the "body". If
- the message is shorter than about 16kb then it is quicker to
- combine the "header" and the "body" of the message and send
- them at once. */
- if (length < (16*1024)) {
- char *message;
-
- message = PyMem_Malloc(length+4);
- if (message == NULL)
- return MP_MEMORY_ERROR;
-
- *(UINT32*)message = htonl((UINT32)length);
- memcpy(message+4, string, length);
- Py_BEGIN_ALLOW_THREADS
- res = _conn_sendall(conn->handle, message, length+4);
- Py_END_ALLOW_THREADS
- PyMem_Free(message);
- } else {
- UINT32 lenbuff;
-
- if (length > MAX_MESSAGE_LENGTH)
- return MP_BAD_MESSAGE_LENGTH;
-
- lenbuff = htonl((UINT32)length);
- Py_BEGIN_ALLOW_THREADS
- res = _conn_sendall(conn->handle, (char*)&lenbuff, 4) ||
- _conn_sendall(conn->handle, string, length);
- Py_END_ALLOW_THREADS
- }
- return res;
-}
-
-/*
- * Attempts to read into buffer, or failing that into *newbuffer
- *
- * Returns number of bytes read.
- */
-
-static Py_ssize_t
-conn_recv_string(ConnectionObject *conn, char *buffer,
- size_t buflength, char **newbuffer, size_t maxlength)
-{
- int res;
- UINT32 ulength;
-
- *newbuffer = NULL;
-
- Py_BEGIN_ALLOW_THREADS
- res = _conn_recvall(conn->handle, (char*)&ulength, 4);
- Py_END_ALLOW_THREADS
- if (res < 0)
- return res;
-
- ulength = ntohl(ulength);
- if (ulength > maxlength)
- return MP_BAD_MESSAGE_LENGTH;
-
- if (ulength <= buflength) {
- Py_BEGIN_ALLOW_THREADS
- res = _conn_recvall(conn->handle, buffer, (size_t)ulength);
- Py_END_ALLOW_THREADS
- return res < 0 ? res : ulength;
- } else {
- *newbuffer = PyMem_Malloc((size_t)ulength);
- if (*newbuffer == NULL)
- return MP_MEMORY_ERROR;
- Py_BEGIN_ALLOW_THREADS
- res = _conn_recvall(conn->handle, *newbuffer, (size_t)ulength);
- Py_END_ALLOW_THREADS
- return res < 0 ? (Py_ssize_t)res : (Py_ssize_t)ulength;
- }
-}
-
-/*
- * Check whether any data is available for reading -- neg timeout blocks
- */
-
-static int
-conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save)
-{
- int res;
- fd_set rfds;
-
- /*
- * Verify the handle, issue 3321. Not required for windows.
- */
- #ifndef MS_WINDOWS
- if (((int)conn->handle) < 0 || ((int)conn->handle) >= FD_SETSIZE) {
- Py_BLOCK_THREADS
- PyErr_SetString(PyExc_IOError, "handle out of range in select()");
- Py_UNBLOCK_THREADS
- return MP_EXCEPTION_HAS_BEEN_SET;
- }
- #endif
-
- FD_ZERO(&rfds);
- FD_SET((SOCKET)conn->handle, &rfds);
-
- if (timeout < 0.0) {
- res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL);
- } else {
- struct timeval tv;
- tv.tv_sec = (long)timeout;
- tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5);
- res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv);
- }
-
- if (res < 0) {
- return MP_SOCKET_ERROR;
- } else if (FD_ISSET(conn->handle, &rfds)) {
- return TRUE;
- } else {
- assert(res == 0);
- return FALSE;
- }
-}
-
-/*
- * "connection.h" defines the Connection type using defs above
- */
-
-#define CONNECTION_NAME "Connection"
-#define CONNECTION_TYPE ConnectionType
-
-#include "connection.h"
diff --git a/Modules/_multiprocessing/win32_functions.c b/Modules/_multiprocessing/win32_functions.c
index 452d608..12dc0cd 100644
--- a/Modules/_multiprocessing/win32_functions.c
+++ b/Modules/_multiprocessing/win32_functions.c
@@ -215,6 +215,164 @@ win32_WaitNamedPipe(PyObject *self, PyObject *args)
Py_RETURN_NONE;
}
+static PyObject *
+win32_closesocket(PyObject *self, PyObject *args)
+{
+ HANDLE handle;
+ int ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE ":closesocket" , &handle))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = closesocket((SOCKET) handle);
+ Py_END_ALLOW_THREADS
+
+ if (ret)
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError());
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+win32_recv(PyObject *self, PyObject *args)
+{
+ HANDLE handle;
+ int size, nread;
+ PyObject *buf;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "i:recv" , &handle, &size))
+ return NULL;
+
+ buf = PyBytes_FromStringAndSize(NULL, size);
+ if (!buf)
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ nread = recv((SOCKET) handle, PyBytes_AS_STRING(buf), size, 0);
+ Py_END_ALLOW_THREADS
+
+ if (nread < 0) {
+ Py_DECREF(buf);
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError());
+ }
+ _PyBytes_Resize(&buf, nread);
+ return buf;
+}
+
+static PyObject *
+win32_send(PyObject *self, PyObject *args)
+{
+ HANDLE handle;
+ Py_buffer buf;
+ int ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "y*:send" , &handle, &buf))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = send((SOCKET) handle, buf.buf, buf.len, 0);
+ Py_END_ALLOW_THREADS
+
+ PyBuffer_Release(&buf);
+ if (ret < 0)
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError());
+ return PyLong_FromLong(ret);
+}
+
+static PyObject *
+win32_WriteFile(PyObject *self, PyObject *args)
+{
+ HANDLE handle;
+ Py_buffer buf;
+ int written;
+ BOOL ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "y*:WriteFile" , &handle, &buf))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = WriteFile(handle, buf.buf, buf.len, &written, NULL);
+ Py_END_ALLOW_THREADS
+
+ PyBuffer_Release(&buf);
+ if (!ret)
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+ return PyLong_FromLong(written);
+}
+
+static PyObject *
+win32_ReadFile(PyObject *self, PyObject *args)
+{
+ HANDLE handle;
+ int size;
+ DWORD nread;
+ PyObject *buf;
+ BOOL ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "i:ReadFile" , &handle, &size))
+ return NULL;
+
+ buf = PyBytes_FromStringAndSize(NULL, size);
+ if (!buf)
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread, NULL);
+ Py_END_ALLOW_THREADS
+
+ if (!ret && GetLastError() != ERROR_MORE_DATA) {
+ Py_DECREF(buf);
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+ }
+ if (_PyBytes_Resize(&buf, nread))
+ return NULL;
+ return Py_BuildValue("NN", buf, PyBool_FromLong(ret));
+}
+
+static PyObject *
+win32_PeekNamedPipe(PyObject *self, PyObject *args)
+{
+ HANDLE handle;
+ int size = 0;
+ PyObject *buf = NULL;
+ DWORD nread, navail, nleft;
+ BOOL ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "|i:PeekNamedPipe" , &handle, &size))
+ return NULL;
+
+ if (size < 0) {
+ PyErr_SetString(PyExc_ValueError, "negative size");
+ return NULL;
+ }
+
+ if (size) {
+ buf = PyBytes_FromStringAndSize(NULL, size);
+ if (!buf)
+ return NULL;
+ Py_BEGIN_ALLOW_THREADS
+ ret = PeekNamedPipe(handle, PyBytes_AS_STRING(buf), size, &nread,
+ &navail, &nleft);
+ Py_END_ALLOW_THREADS
+ if (!ret) {
+ Py_DECREF(buf);
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+ }
+ if (_PyBytes_Resize(&buf, nread))
+ return NULL;
+ return Py_BuildValue("Nii", buf, navail, nleft);
+ }
+ else {
+ Py_BEGIN_ALLOW_THREADS
+ ret = PeekNamedPipe(handle, NULL, 0, NULL, &navail, &nleft);
+ Py_END_ALLOW_THREADS
+ if (!ret) {
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+ }
+ return Py_BuildValue("ii", navail, nleft);
+ }
+}
+
static PyMethodDef win32_methods[] = {
WIN32_FUNCTION(CloseHandle),
WIN32_FUNCTION(GetLastError),
@@ -223,8 +381,14 @@ static PyMethodDef win32_methods[] = {
WIN32_FUNCTION(ConnectNamedPipe),
WIN32_FUNCTION(CreateFile),
WIN32_FUNCTION(CreateNamedPipe),
+ WIN32_FUNCTION(ReadFile),
+ WIN32_FUNCTION(PeekNamedPipe),
WIN32_FUNCTION(SetNamedPipeHandleState),
WIN32_FUNCTION(WaitNamedPipe),
+ WIN32_FUNCTION(WriteFile),
+ WIN32_FUNCTION(closesocket),
+ WIN32_FUNCTION(recv),
+ WIN32_FUNCTION(send),
{NULL}
};
@@ -244,6 +408,8 @@ create_win32_namespace(void)
Py_INCREF(&Win32Type);
WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS);
+ WIN32_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE);
+ WIN32_CONSTANT(F_DWORD, ERROR_NO_SYSTEM_RESOURCES);
WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED);
WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
diff --git a/PC/VC6/_multiprocessing.dsp b/PC/VC6/_multiprocessing.dsp
index 2dcaf83..e1b8470 100644
--- a/PC/VC6/_multiprocessing.dsp
+++ b/PC/VC6/_multiprocessing.dsp
@@ -97,18 +97,10 @@ SOURCE=..\..\Modules\_multiprocessing\multiprocessing.c
# End Source File
# Begin Source File
-SOURCE=..\..\Modules\_multiprocessing\pipe_connection.c
-# End Source File
-# Begin Source File
-
SOURCE=..\..\Modules\_multiprocessing\semaphore.c
# End Source File
# Begin Source File
-SOURCE=..\..\Modules\_multiprocessing\socket_connection.c
-# End Source File
-# Begin Source File
-
SOURCE=..\..\Modules\_multiprocessing\win32_functions.c
# End Source File
# End Target
diff --git a/PC/VS8.0/_multiprocessing.vcproj b/PC/VS8.0/_multiprocessing.vcproj
index c2bbec4..8f1cafc 100644
--- a/PC/VS8.0/_multiprocessing.vcproj
+++ b/PC/VS8.0/_multiprocessing.vcproj
@@ -522,10 +522,6 @@
RelativePath="..\..\Modules\_multiprocessing\multiprocessing.h"
>
</File>
- <File
- RelativePath="..\..\Modules\_multiprocessing\connection.h"
- >
- </File>
</Filter>
<Filter
Name="Source Files"
@@ -535,18 +531,10 @@
>
</File>
<File
- RelativePath="..\..\Modules\_multiprocessing\pipe_connection.c"
- >
- </File>
- <File
RelativePath="..\..\Modules\_multiprocessing\semaphore.c"
>
</File>
<File
- RelativePath="..\..\Modules\_multiprocessing\socket_connection.c"
- >
- </File>
- <File
RelativePath="..\..\Modules\_multiprocessing\win32_functions.c"
>
</File>
diff --git a/PCbuild/_multiprocessing.vcproj b/PCbuild/_multiprocessing.vcproj
index 5d6337d..e9cd3a8 100644
--- a/PCbuild/_multiprocessing.vcproj
+++ b/PCbuild/_multiprocessing.vcproj
@@ -522,10 +522,6 @@
RelativePath="..\Modules\_multiprocessing\multiprocessing.h"
>
</File>
- <File
- RelativePath="..\Modules\_multiprocessing\connection.h"
- >
- </File>
</Filter>
<Filter
Name="Source Files"
@@ -535,18 +531,10 @@
>
</File>
<File
- RelativePath="..\Modules\_multiprocessing\pipe_connection.c"
- >
- </File>
- <File
RelativePath="..\Modules\_multiprocessing\semaphore.c"
>
</File>
<File
- RelativePath="..\Modules\_multiprocessing\socket_connection.c"
- >
- </File>
- <File
RelativePath="..\Modules\_multiprocessing\win32_functions.c"
>
</File>
diff --git a/setup.py b/setup.py
index cbaf1ab..9d46425 100644
--- a/setup.py
+++ b/setup.py
@@ -1353,14 +1353,11 @@ class PyBuildExt(build_ext):
if platform == 'win32':
multiprocessing_srcs = [ '_multiprocessing/multiprocessing.c',
'_multiprocessing/semaphore.c',
- '_multiprocessing/pipe_connection.c',
- '_multiprocessing/socket_connection.c',
'_multiprocessing/win32_functions.c'
]
else:
multiprocessing_srcs = [ '_multiprocessing/multiprocessing.c',
- '_multiprocessing/socket_connection.c'
]
if (sysconfig.get_config_var('HAVE_SEM_OPEN') and not
sysconfig.get_config_var('POSIX_SEMAPHORES_NOT_ENABLED')):