From e5ef45b8f519a9be9965590e1a0a587ff584c180 Mon Sep 17 00:00:00 2001 From: Davin Potts Date: Fri, 1 Feb 2019 22:52:23 -0600 Subject: bpo-35813: Added shared_memory submodule of multiprocessing. (#11664) Added shared_memory submodule to multiprocessing in time for first alpha with cross-platform tests soon to follow. --- Lib/multiprocessing/shared_memory.py | 573 ++++++++++++++++ .../2019-01-23-22-44-37.bpo-35813.Yobj-Y.rst | 2 + Modules/_multiprocessing/posixshmem.c | 724 +++++++++++++++++++++ setup.py | 11 + 4 files changed, 1310 insertions(+) create mode 100644 Lib/multiprocessing/shared_memory.py create mode 100644 Misc/NEWS.d/next/Library/2019-01-23-22-44-37.bpo-35813.Yobj-Y.rst create mode 100644 Modules/_multiprocessing/posixshmem.c diff --git a/Lib/multiprocessing/shared_memory.py b/Lib/multiprocessing/shared_memory.py new file mode 100644 index 0000000..11eac4b --- /dev/null +++ b/Lib/multiprocessing/shared_memory.py @@ -0,0 +1,573 @@ +"Provides shared memory for direct access across processes." + + +__all__ = [ 'SharedMemory', 'PosixSharedMemory', 'WindowsNamedSharedMemory', + 'ShareableList', 'shareable_wrap', + 'SharedMemoryServer', 'SharedMemoryManager', 'SharedMemoryTracker' ] + + +from functools import reduce +import mmap +from .managers import DictProxy, SyncManager, Server +from . import util +import os +import random +import struct +import sys +try: + from _posixshmem import _PosixSharedMemory, Error, ExistentialError, O_CREX +except ImportError as ie: + if os.name != "nt": + # On Windows, posixshmem is not required to be available. + raise ie + else: + _PosixSharedMemory = object + class ExistentialError(BaseException): pass + class Error(BaseException): pass + O_CREX = -1 + + +class WindowsNamedSharedMemory: + + def __init__(self, name, flags=None, mode=None, size=None, read_only=False): + if name is None: + name = f'wnsm_{os.getpid()}_{random.randrange(100000)}' + + self._mmap = mmap.mmap(-1, size, tagname=name) + self.buf = memoryview(self._mmap) + self.name = name + self.size = size + + def __repr__(self): + return f'{self.__class__.__name__}({self.name!r}, size={self.size})' + + def close(self): + self.buf.release() + self._mmap.close() + + def unlink(self): + """Windows ensures that destruction of the last reference to this + named shared memory block will result in the release of this memory.""" + pass + + +class PosixSharedMemory(_PosixSharedMemory): + + def __init__(self, name, flags=None, mode=None, size=None, read_only=False): + if name and (flags is None): + _PosixSharedMemory.__init__(self, name) + else: + if name is None: + name = f'psm_{os.getpid()}_{random.randrange(100000)}' + _PosixSharedMemory.__init__(self, name, flags=O_CREX, size=size) + + self._mmap = mmap.mmap(self.fd, self.size) + self.buf = memoryview(self._mmap) + + def __repr__(self): + return f'{self.__class__.__name__}({self.name!r}, size={self.size})' + + def close(self): + self.buf.release() + self._mmap.close() + self.close_fd() + + +class SharedMemory: + + def __new__(cls, *args, **kwargs): + if os.name == 'nt': + cls = WindowsNamedSharedMemory + else: + cls = PosixSharedMemory + return cls(*args, **kwargs) + + +def shareable_wrap( + existing_obj=None, + shmem_name=None, + cls=None, + shape=(0,), + strides=None, + dtype=None, + format=None, + **kwargs +): + augmented_kwargs = dict(kwargs) + extras = dict(shape=shape, strides=strides, dtype=dtype, format=format) + for key, value in extras.items(): + if value is not None: + augmented_kwargs[key] = value + + if existing_obj is not None: + existing_type = getattr( + existing_obj, + "_proxied_type", + type(existing_obj) + ) + + #agg = existing_obj.itemsize + #size = [ agg := i * agg for i in existing_obj.shape ][-1] + # TODO: replace use of reduce below with above 2 lines once available + size = reduce( + lambda x, y: x * y, + existing_obj.shape, + existing_obj.itemsize + ) + + else: + assert shmem_name is not None + existing_type = cls + size = 1 + + shm = SharedMemory(shmem_name, size=size) + + class CustomShareableProxy(existing_type): + + def __init__(self, *args, buffer=None, **kwargs): + # If copy method called, prevent recursion from replacing _shm. + if not hasattr(self, "_shm"): + self._shm = shm + self._proxied_type = existing_type + else: + # _proxied_type only used in pickling. + assert hasattr(self, "_proxied_type") + try: + existing_type.__init__(self, *args, **kwargs) + except: + pass + + def __repr__(self): + if not hasattr(self, "_shm"): + return existing_type.__repr__(self) + formatted_pairs = ( + "%s=%r" % kv for kv in self._build_state(self).items() + ) + return f"{self.__class__.__name__}({', '.join(formatted_pairs)})" + + #def __getstate__(self): + # if not hasattr(self, "_shm"): + # return existing_type.__getstate__(self) + # state = self._build_state(self) + # return state + + #def __setstate__(self, state): + # self.__init__(**state) + + def __reduce__(self): + return ( + shareable_wrap, + ( + None, + self._shm.name, + self._proxied_type, + self.shape, + self.strides, + self.dtype.str if hasattr(self, "dtype") else None, + getattr(self, "format", None), + ), + ) + + def copy(self): + dupe = existing_type.copy(self) + if not hasattr(dupe, "_shm"): + dupe = shareable_wrap(dupe) + return dupe + + @staticmethod + def _build_state(existing_obj, generics_only=False): + state = { + "shape": existing_obj.shape, + "strides": existing_obj.strides, + } + try: + state["dtype"] = existing_obj.dtype + except AttributeError: + try: + state["format"] = existing_obj.format + except AttributeError: + pass + if not generics_only: + try: + state["shmem_name"] = existing_obj._shm.name + state["cls"] = existing_type + except AttributeError: + pass + return state + + proxy_type = type( + f"{existing_type.__name__}Shareable", + CustomShareableProxy.__bases__, + dict(CustomShareableProxy.__dict__), + ) + + if existing_obj is not None: + try: + proxy_obj = proxy_type( + buffer=shm.buf, + **proxy_type._build_state(existing_obj) + ) + except Exception: + proxy_obj = proxy_type( + buffer=shm.buf, + **proxy_type._build_state(existing_obj, True) + ) + + mveo = memoryview(existing_obj) + proxy_obj._shm.buf[:mveo.nbytes] = mveo.tobytes() + + else: + proxy_obj = proxy_type(buffer=shm.buf, **augmented_kwargs) + + return proxy_obj + + +encoding = "utf8" + +class ShareableList: + """Pattern for a mutable list-like object shareable via a shared + memory block. It differs from the built-in list type in that these + lists can not change their overall length (i.e. no append, insert, + etc.) + + Because values are packed into a memoryview as bytes, the struct + packing format for any storable value must require no more than 8 + characters to describe its format.""" + + # TODO: Adjust for discovered word size of machine. + types_mapping = { + int: "q", + float: "d", + bool: "xxxxxxx?", + str: "%ds", + bytes: "%ds", + None.__class__: "xxxxxx?x", + } + alignment = 8 + back_transform_codes = { + 0: lambda value: value, # int, float, bool + 1: lambda value: value.rstrip(b'\x00').decode(encoding), # str + 2: lambda value: value.rstrip(b'\x00'), # bytes + 3: lambda _value: None, # None + } + + @staticmethod + def _extract_recreation_code(value): + """Used in concert with back_transform_codes to convert values + into the appropriate Python objects when retrieving them from + the list as well as when storing them.""" + if not isinstance(value, (str, bytes, None.__class__)): + return 0 + elif isinstance(value, str): + return 1 + elif isinstance(value, bytes): + return 2 + else: + return 3 # NoneType + + def __init__(self, iterable=None, name=None): + if iterable is not None: + _formats = [ + self.types_mapping[type(item)] + if not isinstance(item, (str, bytes)) + else self.types_mapping[type(item)] % ( + self.alignment * (len(item) // self.alignment + 1), + ) + for item in iterable + ] + self._list_len = len(_formats) + assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len + self._allocated_bytes = tuple( + self.alignment if fmt[-1] != "s" else int(fmt[:-1]) + for fmt in _formats + ) + _back_transform_codes = [ + self._extract_recreation_code(item) for item in iterable + ] + requested_size = struct.calcsize( + "q" + self._format_size_metainfo + "".join(_formats) + ) + + else: + requested_size = 1 # Some platforms require > 0. + + self.shm = SharedMemory(name, size=requested_size) + + if iterable is not None: + _enc = encoding + struct.pack_into( + "q" + self._format_size_metainfo, + self.shm.buf, + 0, + self._list_len, + *(self._allocated_bytes) + ) + struct.pack_into( + "".join(_formats), + self.shm.buf, + self._offset_data_start, + *(v.encode(_enc) if isinstance(v, str) else v for v in iterable) + ) + struct.pack_into( + self._format_packing_metainfo, + self.shm.buf, + self._offset_packing_formats, + *(v.encode(_enc) for v in _formats) + ) + struct.pack_into( + self._format_back_transform_codes, + self.shm.buf, + self._offset_back_transform_codes, + *(_back_transform_codes) + ) + + else: + self._list_len = len(self) # Obtains size from offset 0 in buffer. + self._allocated_bytes = struct.unpack_from( + self._format_size_metainfo, + self.shm.buf, + 1 * 8 + ) + + def _get_packing_format(self, position): + "Gets the packing format for a single value stored in the list." + position = position if position >= 0 else position + self._list_len + if (position >= self._list_len) or (self._list_len < 0): + raise IndexError("Requested position out of range.") + + v = struct.unpack_from( + "8s", + self.shm.buf, + self._offset_packing_formats + position * 8 + )[0] + fmt = v.rstrip(b'\x00') + fmt_as_str = fmt.decode(encoding) + + return fmt_as_str + + def _get_back_transform(self, position): + "Gets the back transformation function for a single value." + + position = position if position >= 0 else position + self._list_len + if (position >= self._list_len) or (self._list_len < 0): + raise IndexError("Requested position out of range.") + + transform_code = struct.unpack_from( + "b", + self.shm.buf, + self._offset_back_transform_codes + position + )[0] + transform_function = self.back_transform_codes[transform_code] + + return transform_function + + def _set_packing_format_and_transform(self, position, fmt_as_str, value): + """Sets the packing format and back transformation code for a + single value in the list at the specified position.""" + + position = position if position >= 0 else position + self._list_len + if (position >= self._list_len) or (self._list_len < 0): + raise IndexError("Requested position out of range.") + + struct.pack_into( + "8s", + self.shm.buf, + self._offset_packing_formats + position * 8, + fmt_as_str.encode(encoding) + ) + + transform_code = self._extract_recreation_code(value) + struct.pack_into( + "b", + self.shm.buf, + self._offset_back_transform_codes + position, + transform_code + ) + + def __getitem__(self, position): + try: + offset = self._offset_data_start \ + + sum(self._allocated_bytes[:position]) + (v,) = struct.unpack_from( + self._get_packing_format(position), + self.shm.buf, + offset + ) + except IndexError: + raise IndexError("index out of range") + + back_transform = self._get_back_transform(position) + v = back_transform(v) + + return v + + def __setitem__(self, position, value): + try: + offset = self._offset_data_start \ + + sum(self._allocated_bytes[:position]) + current_format = self._get_packing_format(position) + except IndexError: + raise IndexError("assignment index out of range") + + if not isinstance(value, (str, bytes)): + new_format = self.types_mapping[type(value)] + else: + if len(value) > self._allocated_bytes[position]: + raise ValueError("exceeds available storage for existing str") + if current_format[-1] == "s": + new_format = current_format + else: + new_format = self.types_mapping[str] % ( + self._allocated_bytes[position], + ) + + self._set_packing_format_and_transform( + position, + new_format, + value + ) + value = value.encode(encoding) if isinstance(value, str) else value + struct.pack_into(new_format, self.shm.buf, offset, value) + + def __len__(self): + return struct.unpack_from("q", self.shm.buf, 0)[0] + + @property + def format(self): + "The struct packing format used by all currently stored values." + return "".join(self._get_packing_format(i) for i in range(self._list_len)) + + @property + def _format_size_metainfo(self): + "The struct packing format used for metainfo on storage sizes." + return f"{self._list_len}q" + + @property + def _format_packing_metainfo(self): + "The struct packing format used for the values' packing formats." + return "8s" * self._list_len + + @property + def _format_back_transform_codes(self): + "The struct packing format used for the values' back transforms." + return "b" * self._list_len + + @property + def _offset_data_start(self): + return (self._list_len + 1) * 8 # 8 bytes per "q" + + @property + def _offset_packing_formats(self): + return self._offset_data_start + sum(self._allocated_bytes) + + @property + def _offset_back_transform_codes(self): + return self._offset_packing_formats + self._list_len * 8 + + @classmethod + def copy(cls, self): + "L.copy() -> ShareableList -- a shallow copy of L." + + return cls(self) + + def count(self, value): + "L.count(value) -> integer -- return number of occurrences of value." + + return sum(value == entry for entry in self) + + def index(self, value): + """L.index(value) -> integer -- return first index of value. + Raises ValueError if the value is not present.""" + + for position, entry in enumerate(self): + if value == entry: + return position + else: + raise ValueError(f"{value!r} not in this container") + + +class SharedMemoryTracker: + "Manages one or more shared memory segments." + + def __init__(self, name, segment_names=[]): + self.shared_memory_context_name = name + self.segment_names = segment_names + + def register_segment(self, segment): + util.debug(f"Registering segment {segment.name!r} in pid {os.getpid()}") + self.segment_names.append(segment.name) + + def destroy_segment(self, segment_name): + util.debug(f"Destroying segment {segment_name!r} in pid {os.getpid()}") + self.segment_names.remove(segment_name) + segment = SharedMemory(segment_name, size=1) + segment.close() + segment.unlink() + + def unlink(self): + for segment_name in self.segment_names[:]: + self.destroy_segment(segment_name) + + def __del__(self): + util.debug(f"Called {self.__class__.__name__}.__del__ in {os.getpid()}") + self.unlink() + + def __getstate__(self): + return (self.shared_memory_context_name, self.segment_names) + + def __setstate__(self, state): + self.__init__(*state) + + def wrap(self, obj_exposing_buffer_protocol): + wrapped_obj = shareable_wrap(obj_exposing_buffer_protocol) + self.register_segment(wrapped_obj._shm) + return wrapped_obj + + +class SharedMemoryServer(Server): + def __init__(self, *args, **kwargs): + Server.__init__(self, *args, **kwargs) + self.shared_memory_context = \ + SharedMemoryTracker(f"shmm_{self.address}_{os.getpid()}") + util.debug(f"SharedMemoryServer started by pid {os.getpid()}") + + def create(self, c, typeid, *args, **kwargs): + # Unless set up as a shared proxy, don't make shared_memory_context + # a standard part of kwargs. This makes things easier for supplying + # simple functions. + if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): + kwargs['shared_memory_context'] = self.shared_memory_context + return Server.create(self, c, typeid, *args, **kwargs) + + def shutdown(self, c): + self.shared_memory_context.unlink() + return Server.shutdown(self, c) + + +class SharedMemoryManager(SyncManager): + """Like SyncManager but uses SharedMemoryServer instead of Server. + + TODO: Consider relocate/merge into managers submodule.""" + + _Server = SharedMemoryServer + + def __init__(self, *args, **kwargs): + SyncManager.__init__(self, *args, **kwargs) + util.debug(f"{self.__class__.__name__} created by pid {os.getpid()}") + + def __del__(self): + util.debug(f"{self.__class__.__name__} told die by pid {os.getpid()}") + pass + + def get_server(self): + 'Better than monkeypatching for now; merge into Server ultimately' + if self._state.value != State.INITIAL: + if self._state.value == State.STARTED: + raise ProcessError("Already started server") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("Manager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) + return _Server(self._registry, self._address, + self._authkey, self._serializer) diff --git a/Misc/NEWS.d/next/Library/2019-01-23-22-44-37.bpo-35813.Yobj-Y.rst b/Misc/NEWS.d/next/Library/2019-01-23-22-44-37.bpo-35813.Yobj-Y.rst new file mode 100644 index 0000000..714a24c --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-01-23-22-44-37.bpo-35813.Yobj-Y.rst @@ -0,0 +1,2 @@ +Shared memory submodule added to multiprocessing to avoid need for +serialization between processes diff --git a/Modules/_multiprocessing/posixshmem.c b/Modules/_multiprocessing/posixshmem.c new file mode 100644 index 0000000..7dd29f4 --- /dev/null +++ b/Modules/_multiprocessing/posixshmem.c @@ -0,0 +1,724 @@ +/* +posixshmem - A Python module for accessing POSIX 1003.1b-1993 shared memory. + +Copyright (c) 2012, Philip Semanchuk +Copyright (c) 2018, 2019, Davin Potts +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of posixshmem nor the names of its contributors may + be used to endorse or promote products derived from this software + without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY ITS CONTRIBUTORS ''AS IS'' AND ANY +EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL Philip Semanchuk BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#define PY_SSIZE_T_CLEAN + +#include +#include "structmember.h" + +#include +#include +#include +#include +#include + +// For shared memory stuff +#include +#include + +/* SEM_FAILED is defined as an int in Apple's headers, and this makes the +compiler complain when I compare it to a pointer. Python faced the same +problem (issue 9586) and I copied their solution here. +ref: http://bugs.python.org/issue9586 + +Note that in /Developer/SDKs/MacOSX10.4u.sdk/usr/include/sys/semaphore.h, +SEM_FAILED is #defined as -1 and that's apparently the definition used by +Python when building. In /usr/include/sys/semaphore.h, it's defined +as ((sem_t *)-1). +*/ +#ifdef __APPLE__ + #undef SEM_FAILED + #define SEM_FAILED ((sem_t *)-1) +#endif + +/* POSIX says that a mode_t "shall be an integer type". To avoid the need +for a specific get_mode function for each type, I'll just stuff the mode into +a long and mention it in the Xxx_members list for each type. +ref: http://www.opengroup.org/onlinepubs/000095399/basedefs/sys/types.h.html +*/ + +typedef struct { + PyObject_HEAD + char *name; + long mode; + int fd; +} SharedMemory; + + +// FreeBSD (and perhaps other BSDs) limit names to 14 characters. In the +// code below, strings of this length are allocated on the stack, so +// increase this gently or change that code to use malloc(). +#define MAX_SAFE_NAME_LENGTH 14 + + +/* Struct to contain an IPC object name which can be None */ +typedef struct { + int is_none; + char *name; +} NoneableName; + + +/* + Exceptions for this module +*/ + +static PyObject *pBaseException; +static PyObject *pPermissionsException; +static PyObject *pExistentialException; + + +#ifdef POSIX_IPC_DEBUG +#define DPRINTF(fmt, args...) fprintf(stderr, "+++ " fmt, ## args) +#else +#define DPRINTF(fmt, args...) +#endif + +static char * +bytes_to_c_string(PyObject* o, int lock) { +/* Convert a bytes object to a char *. Optionally lock the buffer if it is a + bytes array. + This code swiped directly from Python 3.1's posixmodule.c by Philip S. + The name there is bytes2str(). +*/ + if (PyBytes_Check(o)) + return PyBytes_AsString(o); + else if (PyByteArray_Check(o)) { + if (lock && PyObject_GetBuffer(o, NULL, 0) < 0) + /* On a bytearray, this should not fail. */ + PyErr_BadInternalCall(); + return PyByteArray_AsString(o); + } else { + /* The FS converter should have verified that this + is either bytes or bytearray. */ + Py_FatalError("bad object passed to bytes2str"); + /* not reached. */ + return ""; + } +} + +static void +release_bytes(PyObject* o) + /* Release the lock, decref the object. + This code swiped directly from Python 3.1's posixmodule.c by Philip S. + */ +{ + if (PyByteArray_Check(o)) + o->ob_type->tp_as_buffer->bf_releasebuffer(NULL, 0); + Py_DECREF(o); +} + + +static int +random_in_range(int min, int max) { + // returns a random int N such that min <= N <= max + int diff = (max - min) + 1; + + // ref: http://www.c-faq.com/lib/randrange.html + return ((int)((double)rand() / ((double)RAND_MAX + 1) * diff)) + min; +} + + +static +int create_random_name(char *name) { + // The random name is always lowercase so that this code will work + // on case-insensitive file systems. It always starts with a forward + // slash. + int length; + char *alphabet = "abcdefghijklmnopqrstuvwxyz"; + int i; + + // Generate a random length for the name. I subtract 1 from the + // MAX_SAFE_NAME_LENGTH in order to allow for the name's leading "/". + length = random_in_range(6, MAX_SAFE_NAME_LENGTH - 1); + + name[0] = '/'; + name[length] = '\0'; + i = length; + while (--i) + name[i] = alphabet[random_in_range(0, 25)]; + + return length; +} + + +static int +convert_name_param(PyObject *py_name_param, void *checked_name) { + /* Verifies that the py_name_param is either None or a string. + If it's a string, checked_name->name points to a PyMalloc-ed buffer + holding a NULL-terminated C version of the string when this function + concludes. The caller is responsible for releasing the buffer. + */ + int rc = 0; + NoneableName *p_name = (NoneableName *)checked_name; + PyObject *py_name_as_bytes = NULL; + char *p_name_as_c_string = NULL; + + DPRINTF("inside convert_name_param\n"); + DPRINTF("PyBytes_Check() = %d \n", PyBytes_Check(py_name_param)); + DPRINTF("PyString_Check() = %d \n", PyString_Check(py_name_param)); + DPRINTF("PyUnicode_Check() = %d \n", PyUnicode_Check(py_name_param)); + + p_name->is_none = 0; + + // The name can be None or a Python string + if (py_name_param == Py_None) { + DPRINTF("name is None\n"); + rc = 1; + p_name->is_none = 1; + } + else if (PyUnicode_Check(py_name_param) || PyBytes_Check(py_name_param)) { + DPRINTF("name is Unicode or bytes\n"); + // The caller passed me a Unicode string or a byte array; I need a + // char *. Getting from one to the other takes a couple steps. + + if (PyUnicode_Check(py_name_param)) { + DPRINTF("name is Unicode\n"); + // PyUnicode_FSConverter() converts the Unicode object into a + // bytes or a bytearray object. (Why can't it be one or the other?) + PyUnicode_FSConverter(py_name_param, &py_name_as_bytes); + } + else { + DPRINTF("name is bytes\n"); + // Make a copy of the name param. + py_name_as_bytes = PyBytes_FromObject(py_name_param); + } + + // bytes_to_c_string() returns a pointer to the buffer. + p_name_as_c_string = bytes_to_c_string(py_name_as_bytes, 0); + + // PyMalloc memory and copy the user-supplied name to it. + p_name->name = (char *)PyMem_Malloc(strlen(p_name_as_c_string) + 1); + if (p_name->name) { + rc = 1; + strcpy(p_name->name, p_name_as_c_string); + } + else + PyErr_SetString(PyExc_MemoryError, "Out of memory"); + + // The bytes version of the name isn't useful to me, and per the + // documentation for PyUnicode_FSConverter(), I am responsible for + // releasing it when I'm done. + release_bytes(py_name_as_bytes); + } + else + PyErr_SetString(PyExc_TypeError, "Name must be None or a string"); + + return rc; +} + + + +/* ===== Begin Shared Memory implementation functions ===== */ + +static PyObject * +shm_str(SharedMemory *self) { + return PyUnicode_FromString(self->name ? self->name : "(no name)"); +} + +static PyObject * +shm_repr(SharedMemory *self) { + char mode[32]; + + sprintf(mode, "0%o", (int)(self->mode)); + + return PyUnicode_FromFormat("_posixshmem.SharedMemory(\"%s\", mode=%s)", + self->name, mode); +} + +static PyObject * +my_shm_unlink(const char *name) { + DPRINTF("unlinking shm name %s\n", name); + if (-1 == shm_unlink(name)) { + switch (errno) { + case EACCES: + PyErr_SetString(pPermissionsException, "Permission denied"); + break; + + case ENOENT: + PyErr_SetString(pExistentialException, + "No shared memory exists with the specified name"); + break; + + case ENAMETOOLONG: + PyErr_SetString(PyExc_ValueError, "The name is too long"); + break; + + default: + PyErr_SetFromErrno(PyExc_OSError); + break; + } + + goto error_return; + } + + Py_RETURN_NONE; + + error_return: + return NULL; +} + + +static PyObject * +SharedMemory_new(PyTypeObject *type, PyObject *args, PyObject *kwlist) { + SharedMemory *self; + + self = (SharedMemory *)type->tp_alloc(type, 0); + + return (PyObject *)self; +} + + +static int +SharedMemory_init(SharedMemory *self, PyObject *args, PyObject *keywords) { + NoneableName name; + char temp_name[MAX_SAFE_NAME_LENGTH + 1]; + unsigned int flags = 0; + unsigned long size = 0; + int read_only = 0; + static char *keyword_list[ ] = {"name", "flags", "mode", "size", "read_only", NULL}; + + // First things first -- initialize the self struct. + self->name = NULL; + self->fd = 0; + self->mode = 0600; + + if (!PyArg_ParseTupleAndKeywords(args, keywords, "O&|Iiki", keyword_list, + &convert_name_param, &name, &flags, + &(self->mode), &size, &read_only)) + goto error_return; + + if ( !(flags & O_CREAT) && (flags & O_EXCL) ) { + PyErr_SetString(PyExc_ValueError, + "O_EXCL must be combined with O_CREAT"); + goto error_return; + } + + if (name.is_none && ((flags & O_EXCL) != O_EXCL)) { + PyErr_SetString(PyExc_ValueError, + "Name can only be None if O_EXCL is set"); + goto error_return; + } + + flags |= (read_only ? O_RDONLY : O_RDWR); + + if (name.is_none) { + // (name == None) ==> generate a name for the caller + do { + errno = 0; + create_random_name(temp_name); + + DPRINTF("calling shm_open, name=%s, flags=0x%x, mode=0%o\n", + temp_name, flags, (int)self->mode); + self->fd = shm_open(temp_name, flags, (mode_t)self->mode); + + } while ( (-1 == self->fd) && (EEXIST == errno) ); + + // PyMalloc memory and copy the randomly-generated name to it. + self->name = (char *)PyMem_Malloc(strlen(temp_name) + 1); + if (self->name) + strcpy(self->name, temp_name); + else { + PyErr_SetString(PyExc_MemoryError, "Out of memory"); + goto error_return; + } + } + else { + // (name != None) ==> use name supplied by the caller. It was + // already converted to C by convert_name_param(). + self->name = name.name; + + DPRINTF("calling shm_open, name=%s, flags=0x%x, mode=0%o\n", + self->name, flags, (int)self->mode); + self->fd = shm_open(self->name, flags, (mode_t)self->mode); + } + + DPRINTF("shm fd = %d\n", self->fd); + + if (-1 == self->fd) { + self->fd = 0; + switch (errno) { + case EACCES: + PyErr_Format(pPermissionsException, + "No permission to %s this segment", + (flags & O_TRUNC) ? "truncate" : "access" + ); + break; + + case EEXIST: + PyErr_SetString(pExistentialException, + "Shared memory with the specified name already exists"); + break; + + case ENOENT: + PyErr_SetString(pExistentialException, + "No shared memory exists with the specified name"); + break; + + case EINVAL: + PyErr_SetString(PyExc_ValueError, "Invalid parameter(s)"); + break; + + case EMFILE: + PyErr_SetString(PyExc_OSError, + "This process already has the maximum number of files open"); + break; + + case ENFILE: + PyErr_SetString(PyExc_OSError, + "The system limit on the total number of open files has been reached"); + break; + + case ENAMETOOLONG: + PyErr_SetString(PyExc_ValueError, + "The name is too long"); + break; + + default: + PyErr_SetFromErrno(PyExc_OSError); + break; + } + + goto error_return; + } + else { + if (size) { + DPRINTF("calling ftruncate, fd = %d, size = %ld\n", self->fd, size); + if (-1 == ftruncate(self->fd, (off_t)size)) { + // The code below will raise a Python error. Since that error + // is raised during __init__(), it will look to the caller + // as if object creation failed entirely. Here I clean up + // the system object I just created. + close(self->fd); + shm_unlink(self->name); + + // ftruncate can return a ton of different errors, but most + // are not relevant or are extremely unlikely. + switch (errno) { + case EINVAL: + PyErr_SetString(PyExc_ValueError, + "The size is invalid or the memory is read-only"); + break; + + case EFBIG: + PyErr_SetString(PyExc_ValueError, + "The size is too large"); + break; + + case EROFS: + case EACCES: + PyErr_SetString(pPermissionsException, + "The memory is read-only"); + break; + + default: + PyErr_SetFromErrno(PyExc_OSError); + break; + } + + goto error_return; + } + } + } + + return 0; + + error_return: + return -1; +} + + +static void SharedMemory_dealloc(SharedMemory *self) { + DPRINTF("dealloc\n"); + PyMem_Free(self->name); + self->name = NULL; + + Py_TYPE(self)->tp_free((PyObject*)self); +} + + +PyObject * +SharedMemory_getsize(SharedMemory *self, void *closure) { + struct stat fileinfo; + off_t size = -1; + + if (0 == fstat(self->fd, &fileinfo)) + size = fileinfo.st_size; + else { + switch (errno) { + case EBADF: + case EINVAL: + PyErr_SetString(pExistentialException, + "The segment does not exist"); + break; + + default: + PyErr_SetFromErrno(PyExc_OSError); + break; + } + + goto error_return; + } + + return Py_BuildValue("k", (unsigned long)size); + + error_return: + return NULL; +} + + +PyObject * +SharedMemory_close_fd(SharedMemory *self) { + if (self->fd) { + if (-1 == close(self->fd)) { + switch (errno) { + case EBADF: + PyErr_SetString(PyExc_ValueError, + "The file descriptor is invalid"); + break; + + default: + PyErr_SetFromErrno(PyExc_OSError); + break; + } + + goto error_return; + } + } + + Py_RETURN_NONE; + + error_return: + return NULL; +} + + +PyObject * +SharedMemory_unlink(SharedMemory *self) { + return my_shm_unlink(self->name); +} + + +/* ===== End Shared Memory functions ===== */ + + +/* + * + * Shared memory meta stuff for describing myself to Python + * + */ + + +static PyMemberDef SharedMemory_members[] = { + { "name", + T_STRING, + offsetof(SharedMemory, name), + READONLY, + "The name specified in the constructor" + }, + { "fd", + T_INT, + offsetof(SharedMemory, fd), + READONLY, + "Shared memory segment file descriptor" + }, + { "mode", + T_LONG, + offsetof(SharedMemory, mode), + READONLY, + "The mode specified in the constructor" + }, + {NULL} /* Sentinel */ +}; + + +static PyMethodDef SharedMemory_methods[] = { + { "close_fd", + (PyCFunction)SharedMemory_close_fd, + METH_NOARGS, + "Closes the file descriptor associated with the shared memory." + }, + { "unlink", + (PyCFunction)SharedMemory_unlink, + METH_NOARGS, + "Unlink (remove) the shared memory." + }, + {NULL, NULL, 0, NULL} /* Sentinel */ +}; + + +static PyGetSetDef SharedMemory_getseters[] = { + // size is read-only + { "size", + (getter)SharedMemory_getsize, + (setter)NULL, + "size", + NULL + }, + {NULL} /* Sentinel */ +}; + + +static PyTypeObject SharedMemoryType = { + PyVarObject_HEAD_INIT(NULL, 0) + "_posixshmem._PosixSharedMemory", // tp_name + sizeof(SharedMemory), // tp_basicsize + 0, // tp_itemsize + (destructor) SharedMemory_dealloc, // tp_dealloc + 0, // tp_print + 0, // tp_getattr + 0, // tp_setattr + 0, // tp_compare + (reprfunc) shm_repr, // tp_repr + 0, // tp_as_number + 0, // tp_as_sequence + 0, // tp_as_mapping + 0, // tp_hash + 0, // tp_call + (reprfunc) shm_str, // tp_str + 0, // tp_getattro + 0, // tp_setattro + 0, // tp_as_buffer + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, + // tp_flags + "POSIX shared memory object", // tp_doc + 0, // tp_traverse + 0, // tp_clear + 0, // tp_richcompare + 0, // tp_weaklistoffset + 0, // tp_iter + 0, // tp_iternext + SharedMemory_methods, // tp_methods + SharedMemory_members, // tp_members + SharedMemory_getseters, // tp_getset + 0, // tp_base + 0, // tp_dict + 0, // tp_descr_get + 0, // tp_descr_set + 0, // tp_dictoffset + (initproc) SharedMemory_init, // tp_init + 0, // tp_alloc + (newfunc) SharedMemory_new, // tp_new + 0, // tp_free + 0, // tp_is_gc + 0 // tp_bases +}; + + +/* + * + * Module-level functions & meta stuff + * + */ + +static PyObject * +posixshmem_unlink_shared_memory(PyObject *self, PyObject *args) { + const char *name; + + if (!PyArg_ParseTuple(args, "s", &name)) + return NULL; + else + return my_shm_unlink(name); +} + + +static PyMethodDef module_methods[ ] = { + { "unlink_shared_memory", + (PyCFunction)posixshmem_unlink_shared_memory, + METH_VARARGS, + "Unlink shared memory" + }, + {NULL} /* Sentinel */ +}; + + +static struct PyModuleDef this_module = { + PyModuleDef_HEAD_INIT, // m_base + "_posixshmem", // m_name + "POSIX shared memory module", // m_doc + -1, // m_size (space allocated for module globals) + module_methods, // m_methods + NULL, // m_reload + NULL, // m_traverse + NULL, // m_clear + NULL // m_free +}; + +/* Module init function */ +PyMODINIT_FUNC +PyInit__posixshmem(void) { + PyObject *module; + PyObject *module_dict; + + // I call this in case I'm asked to create any random names. + srand((unsigned int)time(NULL)); + + module = PyModule_Create(&this_module); + + if (!module) + goto error_return; + + if (PyType_Ready(&SharedMemoryType) < 0) + goto error_return; + + Py_INCREF(&SharedMemoryType); + PyModule_AddObject(module, "_PosixSharedMemory", (PyObject *)&SharedMemoryType); + + + PyModule_AddStringConstant(module, "__copyright__", "Copyright 2012 Philip Semanchuk, 2018-2019 Davin Potts"); + + PyModule_AddIntConstant(module, "O_CREAT", O_CREAT); + PyModule_AddIntConstant(module, "O_EXCL", O_EXCL); + PyModule_AddIntConstant(module, "O_CREX", O_CREAT | O_EXCL); + PyModule_AddIntConstant(module, "O_TRUNC", O_TRUNC); + + if (!(module_dict = PyModule_GetDict(module))) + goto error_return; + + // Exceptions + if (!(pBaseException = PyErr_NewException("_posixshmem.Error", NULL, NULL))) + goto error_return; + else + PyDict_SetItemString(module_dict, "Error", pBaseException); + + if (!(pPermissionsException = PyErr_NewException("_posixshmem.PermissionsError", pBaseException, NULL))) + goto error_return; + else + PyDict_SetItemString(module_dict, "PermissionsError", pPermissionsException); + + if (!(pExistentialException = PyErr_NewException("_posixshmem.ExistentialError", pBaseException, NULL))) + goto error_return; + else + PyDict_SetItemString(module_dict, "ExistentialError", pExistentialException); + + return module; + + error_return: + return NULL; +} diff --git a/setup.py b/setup.py index 44a563b..d54bbe0 100644 --- a/setup.py +++ b/setup.py @@ -1592,6 +1592,17 @@ class PyBuildExt(build_ext): if (sysconfig.get_config_var('HAVE_SEM_OPEN') and not sysconfig.get_config_var('POSIX_SEMAPHORES_NOT_ENABLED')): multiprocessing_srcs.append('_multiprocessing/semaphore.c') + if (self.compiler.find_library_file(lib_dirs, 'rt') or + host_platform != 'cygwin'): + posixshmem_srcs = [ '_multiprocessing/posixshmem.c', + ] + libs = [] + if self.compiler.find_library_file(lib_dirs, 'rt'): + libs.append('rt') + exts.append( Extension('_posixshmem', posixshmem_srcs, + define_macros={}, + libraries=libs, + include_dirs=["Modules/_multiprocessing"])) exts.append ( Extension('_multiprocessing', multiprocessing_srcs, define_macros=list(macros.items()), -- cgit v0.12