You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
216 lines
6.7 KiB
216 lines
6.7 KiB
|
|
# HG changeset patch |
|
# User Richard Oudkerk <shibturn@gmail.com> |
|
# Date 1372700728 -3600 |
|
# Node ID bc34fe4a0d58a047509798acb0b4b2a21ce1e375 |
|
# Parent 26ef5d5d5c3ea76ab411f2984d507aadce0ce8d7 |
|
Issue #17097: Make multiprocessing ignore EINTR. |
|
|
|
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py |
|
--- a/Lib/multiprocessing/connection.py |
|
+++ b/Lib/multiprocessing/connection.py |
|
@@ -270,7 +270,14 @@ class SocketListener(object): |
|
self._unlink = None |
|
|
|
def accept(self): |
|
- s, self._last_accepted = self._socket.accept() |
|
+ while True: |
|
+ try: |
|
+ s, self._last_accepted = self._socket.accept() |
|
+ except socket.error as e: |
|
+ if e.args[0] != errno.EINTR: |
|
+ raise |
|
+ else: |
|
+ break |
|
s.setblocking(True) |
|
fd = duplicate(s.fileno()) |
|
conn = _multiprocessing.Connection(fd) |
|
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py |
|
--- a/Lib/test/test_multiprocessing.py |
|
+++ b/Lib/test/test_multiprocessing.py |
|
@@ -2461,12 +2461,80 @@ class TestForkAwareThreadLock(unittest.T |
|
self.assertLessEqual(new_size, old_size) |
|
|
|
# |
|
+# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc |
|
+# |
|
+ |
|
+class TestIgnoreEINTR(unittest.TestCase): |
|
+ |
|
+ @classmethod |
|
+ def _test_ignore(cls, conn): |
|
+ def handler(signum, frame): |
|
+ pass |
|
+ signal.signal(signal.SIGUSR1, handler) |
|
+ conn.send('ready') |
|
+ x = conn.recv() |
|
+ conn.send(x) |
|
+ conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block |
|
+ |
|
+ @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') |
|
+ def test_ignore(self): |
|
+ conn, child_conn = multiprocessing.Pipe() |
|
+ try: |
|
+ p = multiprocessing.Process(target=self._test_ignore, |
|
+ args=(child_conn,)) |
|
+ p.daemon = True |
|
+ p.start() |
|
+ child_conn.close() |
|
+ self.assertEqual(conn.recv(), 'ready') |
|
+ time.sleep(0.1) |
|
+ os.kill(p.pid, signal.SIGUSR1) |
|
+ time.sleep(0.1) |
|
+ conn.send(1234) |
|
+ self.assertEqual(conn.recv(), 1234) |
|
+ time.sleep(0.1) |
|
+ os.kill(p.pid, signal.SIGUSR1) |
|
+ self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024)) |
|
+ time.sleep(0.1) |
|
+ p.join() |
|
+ finally: |
|
+ conn.close() |
|
+ |
|
+ @classmethod |
|
+ def _test_ignore_listener(cls, conn): |
|
+ def handler(signum, frame): |
|
+ pass |
|
+ signal.signal(signal.SIGUSR1, handler) |
|
+ l = multiprocessing.connection.Listener() |
|
+ conn.send(l.address) |
|
+ a = l.accept() |
|
+ a.send('welcome') |
|
+ |
|
+ @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') |
|
+ def test_ignore_listener(self): |
|
+ conn, child_conn = multiprocessing.Pipe() |
|
+ try: |
|
+ p = multiprocessing.Process(target=self._test_ignore_listener, |
|
+ args=(child_conn,)) |
|
+ p.daemon = True |
|
+ p.start() |
|
+ child_conn.close() |
|
+ address = conn.recv() |
|
+ time.sleep(0.1) |
|
+ os.kill(p.pid, signal.SIGUSR1) |
|
+ time.sleep(0.1) |
|
+ client = multiprocessing.connection.Client(address) |
|
+ self.assertEqual(client.recv(), 'welcome') |
|
+ p.join() |
|
+ finally: |
|
+ conn.close() |
|
+ |
|
+# |
|
# |
|
# |
|
|
|
testcases_other = [OtherTest, TestInvalidHandle, TestInitializers, |
|
TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb, |
|
- TestFlags, TestForkAwareThreadLock] |
|
+ TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR] |
|
|
|
# |
|
# |
|
diff --git a/Modules/_multiprocessing/socket_connection.c b/Modules/_multiprocessing/socket_connection.c |
|
--- a/Modules/_multiprocessing/socket_connection.c |
|
+++ b/Modules/_multiprocessing/socket_connection.c |
|
@@ -23,6 +23,21 @@ |
|
#endif |
|
|
|
/* |
|
+ * Wrapper for PyErr_CheckSignals() which can be called without the GIL |
|
+ */ |
|
+ |
|
+static int |
|
+check_signals(void) |
|
+{ |
|
+ PyGILState_STATE state; |
|
+ int res; |
|
+ state = PyGILState_Ensure(); |
|
+ res = PyErr_CheckSignals(); |
|
+ PyGILState_Release(state); |
|
+ return res; |
|
+} |
|
+ |
|
+/* |
|
* Send string to file descriptor |
|
*/ |
|
|
|
@@ -34,8 +49,14 @@ static Py_ssize_t |
|
|
|
while (length > 0) { |
|
res = WRITE(h, p, length); |
|
- if (res < 0) |
|
+ if (res < 0) { |
|
+ if (errno == EINTR) { |
|
+ if (check_signals() < 0) |
|
+ return MP_EXCEPTION_HAS_BEEN_SET; |
|
+ continue; |
|
+ } |
|
return MP_SOCKET_ERROR; |
|
+ } |
|
length -= res; |
|
p += res; |
|
} |
|
@@ -56,12 +77,16 @@ static Py_ssize_t |
|
|
|
while (remaining > 0) { |
|
temp = READ(h, p, remaining); |
|
- if (temp <= 0) { |
|
- if (temp == 0) |
|
- return remaining == length ? |
|
- MP_END_OF_FILE : MP_EARLY_END_OF_FILE; |
|
- else |
|
- return temp; |
|
+ if (temp < 0) { |
|
+ if (errno == EINTR) { |
|
+ if (check_signals() < 0) |
|
+ return MP_EXCEPTION_HAS_BEEN_SET; |
|
+ continue; |
|
+ } |
|
+ return temp; |
|
+ } |
|
+ else if (temp == 0) { |
|
+ return remaining == length ? MP_END_OF_FILE : MP_EARLY_END_OF_FILE; |
|
} |
|
remaining -= temp; |
|
p += temp; |
|
@@ -171,9 +196,16 @@ conn_poll(ConnectionObject *conn, double |
|
p.revents = 0; |
|
|
|
if (timeout < 0) { |
|
- res = poll(&p, 1, -1); |
|
+ do { |
|
+ res = poll(&p, 1, -1); |
|
+ } while (res < 0 && errno == EINTR); |
|
} else { |
|
res = poll(&p, 1, (int)(timeout * 1000 + 0.5)); |
|
+ if (res < 0 && errno == EINTR) { |
|
+ /* We were interrupted by a signal. Just indicate a |
|
+ timeout even though we are early. */ |
|
+ return FALSE; |
|
+ } |
|
} |
|
|
|
if (res < 0) { |
|
@@ -209,12 +241,19 @@ conn_poll(ConnectionObject *conn, double |
|
FD_SET((SOCKET)conn->handle, &rfds); |
|
|
|
if (timeout < 0.0) { |
|
- res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL); |
|
+ do { |
|
+ res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL); |
|
+ } while (res < 0 && errno == EINTR); |
|
} else { |
|
struct timeval tv; |
|
tv.tv_sec = (long)timeout; |
|
tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5); |
|
res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv); |
|
+ if (res < 0 && errno == EINTR) { |
|
+ /* We were interrupted by a signal. Just indicate a |
|
+ timeout even though we are early. */ |
|
+ return FALSE; |
|
+ } |
|
} |
|
|
|
if (res < 0) { |
|
|
|
|