Fix AsyncioConnection race conditions causing EBADF errors (#614)#697
Fix AsyncioConnection race conditions causing EBADF errors (#614)#697dkropachev wants to merge 6 commits intomasterfrom
Conversation
fe6be33 to
e4f0c6b
Compare
Fix four race conditions in AsyncioConnection that cause "[Errno 9] Bad file descriptor" errors during node restarts, especially with TLS: 1. close() now waits for _close() to complete when called from outside the event loop thread, eliminating the window where is_closed=True but the socket fd is still open. 2. handle_read() sets last_error on server EOF so factory() detects dead connections instead of returning them to callers. 3. push() rejects data when the connection is already closed, preventing writes from being enqueued to a shutting-down connection. 4. handle_write() treats BrokenPipeError/ConnectionResetError as clean peer disconnections instead of defuncting, and both I/O handlers skip defunct() if the connection is already shutting down.
ProactorEventLoop does not support remove_reader/remove_writer (raises NotImplementedError). Wrap these calls so the socket is always closed regardless, and use try/finally to ensure connected_event is always set even if cleanup fails.
On Windows, ProactorEventLoop may raise plain OSError with winerror=10054 (WSAECONNRESET) or 10053 (WSAECONNABORTED) instead of ConnectionResetError. Use ConnectionError base class plus winerror check for robust cross-platform detection.
macOS raises OSError(57, "Socket is not connected") (ENOTCONN) when writing to a peer-disconnected socket. Unlike BrokenPipeError or ConnectionResetError, this is a plain OSError — not a ConnectionError subclass — so it was not caught by the isinstance(err, ConnectionError) check. Add errno-based detection for ENOTCONN, ESHUTDOWN, ECONNRESET, and ECONNABORTED to handle these platform-specific disconnect errors cleanly instead of defuncting the connection.
e4f0c6b to
46524ac
Compare
- Guard SocketPairAsyncioConnection definition with ASYNCIO_AVAILABLE check to avoid subclassing NoneType when asyncio is unavailable - Add debug logging in close() except block instead of bare pass - Add inline comments to except blocks in _close() for remove_reader/remove_writer
46524ac to
7b815c8
Compare
| # Copyright DataStax, Inc. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. |
There was a problem hiding this comment.
Why do you keep putting datastax copyright on new files?
| # When called from outside the event loop thread, wait for _close() | ||
| # to complete so the socket is actually closed when close() returns. | ||
| # This prevents a race window where is_closed=True but the socket fd | ||
| # is still open, which causes EBADF in pending I/O operations. |
There was a problem hiding this comment.
I don't get it, and it's not explained anywhere: why would this situation cause EBADF? How exactly does that happen?
| if threading.current_thread() != self._loop_thread: | ||
| try: | ||
| future.result(timeout=5) | ||
| except Exception: | ||
| pass | ||
|
|
There was a problem hiding this comment.
If you just pass on timeout, then nothing has changed. If timeout occurs, we will return from this function, which is exactly what the original code did.
| if self.is_closed or self.is_defunct: | ||
| raise ConnectionShutdown( | ||
| "Connection to %s is already closed" % (self.endpoint,)) | ||
|
|
There was a problem hiding this comment.
Why do we need to check that in push? push only puts data in the asyncio queue, it has nothing to do with actually putting that data in the socket, and thus no way to cause the error.
cassandra/io/asyncioreactor.py
Outdated
| # Peer disconnected — do a clean close instead of defunct | ||
| if isinstance(err, (BrokenPipeError, ConnectionResetError)): | ||
| log.debug("Connection %s closed by peer during write: %s", | ||
| self, err) | ||
| self.close() | ||
| return |
There was a problem hiding this comment.
Could you explain how is close "cleaner" that defunct, and what is the difference between them in general? I see that defunct does call close internally, so its not obvious to me.
| try: | ||
| self._loop.remove_writer(self._socket.fileno()) | ||
| except (NotImplementedError, OSError): | ||
| pass | ||
| try: | ||
| self._loop.remove_reader(self._socket.fileno()) | ||
| except (NotImplementedError, OSError): |
There was a problem hiding this comment.
Commit: Fix _close() on Windows ProactorEventLoop
If the issue is with ProactorEventLoop, why do you also ignore OsError?
There was a problem hiding this comment.
Also, what do those remove_writer / remove_reader calls do? Specifically, why is it ok to just not use them on Windows? Won't it lead to other issues, leaks etc?
| finally: | ||
| if not self.is_defunct: | ||
| msg = "Connection to %s was closed" % self.endpoint | ||
| if self.last_error: | ||
| msg += ": %s" % (self.last_error,) | ||
| self.error_all_requests(ConnectionShutdown(msg)) | ||
| # don't leave in-progress operations hanging | ||
| self.connected_event.set() |
There was a problem hiding this comment.
Commit: Fix _close() on Windows ProactorEventLoop
You added try/catch around remove_writer / remove_reader, but I see that you also put this part inside finally block. Why? Is there some other exception, not mentioned in commit message, that you are trying to guard against here?
There was a problem hiding this comment.
Commit: "Address code quality review comments"
This commit should not exist. Its changes should instead be integrated into commits that it is fixing.
Summary
close()to wait for_close()completion from non-event-loop threads, eliminating the window whereis_closed=Truebut the socket fd is still open (root cause of EBADF)last_erroron server EOF inhandle_read()sofactory()detects dead connections instead of returning themis_closed/is_defunctguard inpush()to reject writes on closed connectionsBrokenPipeError/ConnectionResetErrorinhandle_write()as clean peer disconnections instead of defuncting, and skipdefunct()in both I/O handlers if the connection is already shutting downTest plan
tests/unit/io/test_asyncio_race_614.pycover all four race conditions using real socket pairs