Skip to content
8 changes: 7 additions & 1 deletion lib/http/HttpClient_Curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ namespace MAT_NS_BEGIN {
auto curlOperation = std::make_shared<CurlHttpOperation>(curlRequest->m_method, curlRequest->m_url, callback, requestHeaders, curlRequest->m_body, false, HTTP_CONN_TIMEOUT, m_sslVerify, sslCaInfo);
curlRequest->SetOperation(curlOperation);

// The lifetime of curlOperation is guarnteed by the call to result.wait() in the d'tor.
// The lifetime of curlOperation across the async Send is guaranteed by
// ~CurlHttpOperation. After this function returns, the only remaining
// shared_ptr is the one held by the owning CurlHttpRequest. When that
// request is destroyed from another thread, the destructor waits for the
// async result; if the callback below leads to the request being
// destroyed on the async thread itself (OnHttpResponse ->
// EventsUploadContext::clear()), the destructor defers the join instead.
curlOperation->SendAsync([this, callback, requestId](CurlHttpOperation& operation) {
this->EraseRequest(requestId);

Expand Down
70 changes: 67 additions & 3 deletions lib/http/HttpClient_Curl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <numeric>
#include <future>
#include <atomic>
#include <thread>
#include <new>

#include <poll.h>
#include <curl/curl.h>
Expand Down Expand Up @@ -173,11 +175,58 @@ class CurlHttpOperation {
*/
virtual ~CurlHttpOperation()
{
// Given the request has not been aborted we should wait for completion here
// This guarantees the lifetime of this request.
// libstdc++'s std::future<>::~future implicitly joins the async thread
// during destruction. If this destructor runs ON that same async thread
// (the async callback led to the owning CurlHttpRequest being destroyed
// on that thread, e.g. OnHttpResponse -> EventsUploadContext::clear()),
// that join is a self-join and throws std::system_error("Resource
// deadlock avoided"); since it originates in this noexcept destructor it
// aborts the process.
//
// Distinguish the two cases by the thread id published when the async
// task started:
// * self-join -> Send() has returned (we are running inside its
// callback), but the async task itself has not yet
// returned (this destructor is executing inside it),
// so defer the future's join to a detached helper
// thread rather than joining on this (the async)
// thread; the helper's join completes once the task
// returns after this destructor unwinds.
// * cross-thread -> the async Send() may still be running, so wait()
// to keep the curl handle, response buffer and the
// by-reference request body alive until it finishes.
if (result.valid())
{
result.wait();
if (m_asyncThreadIdSet.load(std::memory_order_acquire) &&
std::this_thread::get_id() == m_asyncThreadId)
{
// Heap-allocate first so a rare std::thread spawn failure leaks
// the already-finished future rather than joining it on this
// async thread (EDEADLK) or letting std::system_error escape
// this noexcept destructor.
std::future<long>* pending = new (std::nothrow) std::future<long>(std::move(result));
if (pending == nullptr)
{
// Out of memory: `result` is still valid and would self-join
// (EDEADLK) when destroyed on this async thread at the end of
// the destructor, and there is no allocation-free way to move
// it off-thread. Abort as a last resort rather than fall
// through to a guaranteed EDEADLK abort.
std::abort();
}
try
{
std::thread([pending]() { delete pending; }).detach();
}
catch (...)
{
// Thread exhaustion: intentionally leak *pending.
}
}
else
{
result.wait();
}
}
DispatchEvent(OnDestroy);
res = CURLE_OK;
Expand Down Expand Up @@ -318,7 +367,13 @@ class CurlHttpOperation {
}

std::future<long> & SendAsync(std::function<void(CurlHttpOperation &)> callback = nullptr) {
// Reset the publication flag before launching so self-join detection
// stays correct even if this operation were ever reused (today each
// CurlHttpOperation is single-use: one SendAsync call per request).
m_asyncThreadIdSet.store(false, std::memory_order_release);
result = std::async(std::launch::async, [this, callback] {
m_asyncThreadId = std::this_thread::get_id();
m_asyncThreadIdSet.store(true, std::memory_order_release);
long result = Send();
if (callback!=nullptr)
callback(*this);
Expand Down Expand Up @@ -437,6 +492,15 @@ class CurlHttpOperation {

CURL *curl; // Local curl instance
CURLcode res = CURLE_OK; // Curl result OR HTTP status code if successful

// Id of the thread running the async Send() task, published via the
// atomic<bool> flag below (release/acquire). ~CurlHttpOperation uses these
// to detect a self-join (destruction from within the async callback) and
// avoid the EDEADLK that joining the future would raise. A plain thread::id
// plus an atomic<bool> flag is used instead of std::atomic<std::thread::id>,
// which is not guaranteed to be supported across standard libraries.
std::thread::id m_asyncThreadId{};
std::atomic<bool> m_asyncThreadIdSet{ false };

IHttpResponseCallback* m_callback = nullptr;

Expand Down
Loading