Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QC-1038] fix Race condition in Service Discovery #2024

Merged
merged 4 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Framework/include/QualityControl/ServiceDiscovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <memory>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>

typedef void CURL;

Expand Down Expand Up @@ -65,6 +67,9 @@ class ServiceDiscovery
const std::string mId; ///< Instance (service) ID
std::string mHealthUrl; ///< hostname of health check endpoint
size_t mHealthPort; ///< Port of the health check endpoint
std::mutex mHealthPortMutex; ///< Mutex for the port
std::condition_variable mHealthPortCV; ///< Condition varaible for the port
std::atomic<bool> mHealthPortAssigned; ///< Port of the health check is ready.
std::thread mHealthThread; ///< Health check thread
std::atomic<bool> mThreadRunning; ///< Health check thread running flag

Expand Down
18 changes: 15 additions & 3 deletions Framework/src/ServiceDiscovery.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ServiceDiscovery::ServiceDiscovery(const std::string& url, const std::string& na
: curlHandle(initCurl(), &ServiceDiscovery::deleteCurl), mConsulUrl(url), mName(name), mId(id), mHealthUrl(healthEndUrl)
{
mHealthUrl = mHealthUrl.empty() ? boost::asio::ip::host_name() : mHealthUrl;
mHealthPortAssigned = false;

mHealthThread = std::thread([=] {
#ifdef __linux__
Expand Down Expand Up @@ -96,6 +97,11 @@ bool ServiceDiscovery::_register(const std::string& objects)
check.put("Name", "Health check " + mId);
check.put("Interval", "5s");
check.put("DeregisterCriticalServiceAfter", "1m");
// Wait until port is set by the thread
{
std::unique_lock<std::mutex> lk(mHealthPortMutex);
mHealthPortCV.wait(lk, [this] { return mHealthPortAssigned == true; });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I misunderstood how condition_variable works, but won't it block the second time you call _register? I am not sure if it requires the other thread to call notify_one() for each wait.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I wrote a little reproducer and it seems it my concern is not valid, it seems that notify_one() unblocks the main thread forever, as long as the predicate is provided in wait.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you noted, as long as the predicate is true, it will let the main thread continue.

}
check.put("TCP", mHealthUrl + ":" + std::to_string(mHealthPort));
checks.push_back(std::make_pair("", check));

Expand Down Expand Up @@ -151,12 +157,18 @@ void ServiceDiscovery::runHealthServer()
ILOG(Debug, Trace) << "ServiceDiscovery::runHealthServer - cound not bind to " << port << ENDM;
continue; // try the next one
}
// if we reach this point it means that we could bind
mHealthPort = port;
ILOG(Debug, Devel) << "ServiceDiscovery selected port: " << mHealthPort << ENDM;
break;
}

// assign the port and unblock the main thread (we got a port or we failed)
{
ILOG(Debug, Devel) << "ServiceDiscovery selected port: " << mHealthPort << ENDM;
std::lock_guard<std::mutex> lk(mHealthPortMutex);
mHealthPort = port;
mHealthPortAssigned = true;
}
mHealthPortCV.notify_one();

if (cycle == rangeLength) {
ILOG(Error, Support) << "Could not find a free port for the ServiceDiscovery, aborting the ServiceDiscovery health check" << ENDM;
return;
Expand Down
Loading