# HG changeset patch # User Richard Oudkerk # Date 1372700728 -3600 # Node ID bc34fe4a0d58a047509798acb0b4b2a21ce1e375 # Parent 26ef5d5d5c3ea76ab411f2984d507aadce0ce8d7 Issue #17097: Make multiprocessing ignore EINTR. diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -270,7 +270,14 @@ class SocketListener(object): self._unlink = None def accept(self): - s, self._last_accepted = self._socket.accept() + while True: + try: + s, self._last_accepted = self._socket.accept() + except socket.error as e: + if e.args[0] != errno.EINTR: + raise + else: + break s.setblocking(True) fd = duplicate(s.fileno()) conn = _multiprocessing.Connection(fd) diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -2461,12 +2461,80 @@ class TestForkAwareThreadLock(unittest.T self.assertLessEqual(new_size, old_size) # +# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc +# + +class TestIgnoreEINTR(unittest.TestCase): + + @classmethod + def _test_ignore(cls, conn): + def handler(signum, frame): + pass + signal.signal(signal.SIGUSR1, handler) + conn.send('ready') + x = conn.recv() + conn.send(x) + conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block + + @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') + def test_ignore(self): + conn, child_conn = multiprocessing.Pipe() + try: + p = multiprocessing.Process(target=self._test_ignore, + args=(child_conn,)) + p.daemon = True + p.start() + child_conn.close() + self.assertEqual(conn.recv(), 'ready') + time.sleep(0.1) + os.kill(p.pid, signal.SIGUSR1) + time.sleep(0.1) + conn.send(1234) + self.assertEqual(conn.recv(), 1234) + time.sleep(0.1) + os.kill(p.pid, signal.SIGUSR1) + self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024)) + time.sleep(0.1) + p.join() + finally: + conn.close() + + @classmethod + def _test_ignore_listener(cls, conn): + def handler(signum, frame): + pass + signal.signal(signal.SIGUSR1, handler) + l = multiprocessing.connection.Listener() + conn.send(l.address) + a = l.accept() + a.send('welcome') + + @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') + def test_ignore_listener(self): + conn, child_conn = multiprocessing.Pipe() + try: + p = multiprocessing.Process(target=self._test_ignore_listener, + args=(child_conn,)) + p.daemon = True + p.start() + child_conn.close() + address = conn.recv() + time.sleep(0.1) + os.kill(p.pid, signal.SIGUSR1) + time.sleep(0.1) + client = multiprocessing.connection.Client(address) + self.assertEqual(client.recv(), 'welcome') + p.join() + finally: + conn.close() + +# # # testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb, - TestFlags, TestForkAwareThreadLock] + TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR] # # diff --git a/Modules/_multiprocessing/socket_connection.c b/Modules/_multiprocessing/socket_connection.c --- a/Modules/_multiprocessing/socket_connection.c +++ b/Modules/_multiprocessing/socket_connection.c @@ -23,6 +23,21 @@ #endif /* + * Wrapper for PyErr_CheckSignals() which can be called without the GIL + */ + +static int +check_signals(void) +{ + PyGILState_STATE state; + int res; + state = PyGILState_Ensure(); + res = PyErr_CheckSignals(); + PyGILState_Release(state); + return res; +} + +/* * Send string to file descriptor */ @@ -34,8 +49,14 @@ static Py_ssize_t while (length > 0) { res = WRITE(h, p, length); - if (res < 0) + if (res < 0) { + if (errno == EINTR) { + if (check_signals() < 0) + return MP_EXCEPTION_HAS_BEEN_SET; + continue; + } return MP_SOCKET_ERROR; + } length -= res; p += res; } @@ -56,12 +77,16 @@ static Py_ssize_t while (remaining > 0) { temp = READ(h, p, remaining); - if (temp <= 0) { - if (temp == 0) - return remaining == length ? - MP_END_OF_FILE : MP_EARLY_END_OF_FILE; - else - return temp; + if (temp < 0) { + if (errno == EINTR) { + if (check_signals() < 0) + return MP_EXCEPTION_HAS_BEEN_SET; + continue; + } + return temp; + } + else if (temp == 0) { + return remaining == length ? MP_END_OF_FILE : MP_EARLY_END_OF_FILE; } remaining -= temp; p += temp; @@ -171,9 +196,16 @@ conn_poll(ConnectionObject *conn, double p.revents = 0; if (timeout < 0) { - res = poll(&p, 1, -1); + do { + res = poll(&p, 1, -1); + } while (res < 0 && errno == EINTR); } else { res = poll(&p, 1, (int)(timeout * 1000 + 0.5)); + if (res < 0 && errno == EINTR) { + /* We were interrupted by a signal. Just indicate a + timeout even though we are early. */ + return FALSE; + } } if (res < 0) { @@ -209,12 +241,19 @@ conn_poll(ConnectionObject *conn, double FD_SET((SOCKET)conn->handle, &rfds); if (timeout < 0.0) { - res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL); + do { + res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL); + } while (res < 0 && errno == EINTR); } else { struct timeval tv; tv.tv_sec = (long)timeout; tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5); res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv); + if (res < 0 && errno == EINTR) { + /* We were interrupted by a signal. Just indicate a + timeout even though we are early. */ + return FALSE; + } } if (res < 0) {