Skip to content

Commit

Permalink
refactor: use event queue for more stability
Browse files Browse the repository at this point in the history
  • Loading branch information
hans00 committed Aug 28, 2024
1 parent d253ef7 commit 2fbd0a8
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 64 deletions.
155 changes: 93 additions & 62 deletions cpp/react-native-jsi-udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,27 @@ UdpManager::UdpManager(Runtime &jsiRuntime, RunOnJS runOnJS) : _runtime(jsiRunti
global.setProperty(jsiRuntime, "dgc_IP_DROP_MEMBERSHIP", static_cast<int>(IP_DROP_MEMBERSHIP));
global.setProperty(jsiRuntime, "dgc_IP_TTL", static_cast<int>(IP_TTL));
}

eventThread = thread(&UdpManager::receiveEvent, this);
}

UdpManager::~UdpManager() {
_invalidate = true;
runOnJS = nullptr;
cond.notify_all();
invalidate();
eventThread.join();
}

void UdpManager::invalidate() {
for (auto &entry : running) {
running[entry.first] = false;
if (entry.second) ::close(entry.first);
}
running.clear();
for (auto &entry : workers) {
if (entry.second.joinable()) entry.second.detach();
if (entry.second.joinable()) entry.second.join();
}
workers.clear();
}

JSI_HOST_FUNCTION(UdpManager::create) {
Expand All @@ -218,7 +229,7 @@ JSI_HOST_FUNCTION(UdpManager::create) {

struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 100000; // 100ms
tv.tv_usec = 10;
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));

return fd;
Expand All @@ -236,61 +247,9 @@ JSI_HOST_FUNCTION(UdpManager::startWorker) {
workers[fd].join();
}

eventHandlers[fd] = move(handler);
running[fd] = true;
workers[fd] = thread(
&UdpManager::workerLoop,
this,
fd,
[this, handler](Event event) {
if (_invalidate || runOnJS == nullptr) return;
runOnJS([this, handler, event]() {
auto eventObj = Object(_runtime);
eventObj.setProperty(
_runtime,
"type",
String::createFromAscii(
_runtime,
event.type == MESSAGE ? "message" : event.type == ERROR ? "error" : "close"
)
);
if (event.type == MESSAGE) {
auto ArrayBuffer = _runtime.global().getPropertyAsFunction(_runtime, "ArrayBuffer");
auto arrayBufferObj = ArrayBuffer
.callAsConstructor(_runtime, static_cast<int>(event.data.size()))
.getObject(_runtime);
auto arrayBuffer = arrayBufferObj.getArrayBuffer(_runtime);
memcpy(arrayBuffer.data(_runtime), event.data.c_str(), event.data.size());
eventObj.setProperty(
_runtime,
"data",
move(arrayBuffer)
);
eventObj.setProperty(
_runtime,
"family",
String::createFromAscii(_runtime, event.family == AF_INET ? "IPv4" : "IPv6")
);
eventObj.setProperty(
_runtime,
"address",
String::createFromAscii(_runtime, event.address)
);
eventObj.setProperty(
_runtime,
"port",
static_cast<int>(event.port)
);
} else if (event.type == ERROR) {
auto Error = _runtime.global().getPropertyAsFunction(_runtime, "Error");
auto errorObj = Error
.callAsConstructor(_runtime, String::createFromAscii(_runtime, event.data))
.getObject(_runtime);
eventObj.setProperty(_runtime, "error", errorObj);
}
handler->call(_runtime, eventObj);
});
}
);
workers[fd] = thread(&UdpManager::workerLoop, this, fd);

return Value::undefined();
}
Expand Down Expand Up @@ -539,25 +498,95 @@ JSI_HOST_FUNCTION(UdpManager::getSockName) {
return result;
}

void UdpManager::workerLoop(int fd, function<void(Event)> emitEvent) {
void UdpManager::receiveEvent() {
while (!_invalidate) {
std::unique_lock<std::mutex> lock(mutex);
cond.wait(lock, [this] { return _invalidate || !events.empty(); });
if (_invalidate || runOnJS == nullptr) {
break;
}
auto event = events.front();
events.pop();
lock.unlock();
if (eventHandlers.count(event.fd) > 0) {
runOnJS([this, &event]() {
auto handler = eventHandlers[event.fd];
auto eventObj = Object(_runtime);
eventObj.setProperty(
_runtime,
"type",
String::createFromAscii(
_runtime,
event.type == MESSAGE ? "message" : event.type == ERROR ? "error" : "close"
)
);
if (event.type == MESSAGE) {
auto ArrayBuffer = _runtime.global().getPropertyAsFunction(_runtime, "ArrayBuffer");
auto arrayBufferObj = ArrayBuffer
.callAsConstructor(_runtime, static_cast<int>(event.data.size()))
.getObject(_runtime);
auto arrayBuffer = arrayBufferObj.getArrayBuffer(_runtime);
memcpy(arrayBuffer.data(_runtime), event.data.c_str(), event.data.size());
eventObj.setProperty(
_runtime,
"data",
move(arrayBuffer)
);
eventObj.setProperty(
_runtime,
"family",
String::createFromAscii(_runtime, event.family == AF_INET ? "IPv4" : "IPv6")
);
eventObj.setProperty(
_runtime,
"address",
String::createFromAscii(_runtime, event.address)
);
eventObj.setProperty(
_runtime,
"port",
static_cast<int>(event.port)
);
} else if (event.type == ERROR) {
auto Error = _runtime.global().getPropertyAsFunction(_runtime, "Error");
auto errorObj = Error
.callAsConstructor(_runtime, String::createFromAscii(_runtime, event.data))
.getObject(_runtime);
eventObj.setProperty(_runtime, "error", errorObj);
}
handler->call(_runtime, eventObj);
});
}
}
}

void UdpManager::sendEvent(Event event) {
if (_invalidate) return;
std::lock_guard<std::mutex> lock(mutex);
events.push(event);
cond.notify_one();
}

void UdpManager::workerLoop(int fd) {
auto buffer = new char[MAX_PACK_SIZE];

while (running.count(fd) > 0 && running.at(fd)) {
while (!_invalidate && running.count(fd) > 0 && running.at(fd)) {
struct sockaddr_in in_addr;
socklen_t in_len = sizeof(in_addr);

auto recvn = ::recvfrom(fd, buffer, MAX_PACK_SIZE, 0, (struct sockaddr *)&in_addr, &in_len);

if (recvn < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
emitEvent({ ERROR, error_name(errno), 0, "", 0 });
sendEvent({ fd, ERROR, error_name(errno), 0, "", 0 });
break;
} else {
continue;
}
}

emitEvent({
sendEvent({
fd,
MESSAGE,
string(buffer, recvn),
in_addr.sin_family,
Expand All @@ -567,8 +596,10 @@ void UdpManager::workerLoop(int fd, function<void(Event)> emitEvent) {
}

::close(fd);
if (!_invalidate) {
sendEvent({ fd, CLOSE, "", 0, "", 0 });
}
delete[] buffer;
emitEvent({ CLOSE, "", 0, "", 0 });
}

} // namespace jsiudp
20 changes: 18 additions & 2 deletions cpp/react-native-jsi-udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
#include <map>
#include <functional>
#include <tuple>
#include <queue>
#include <condition_variable>
#include <mutex>
#include <atomic>
#include "helper.h"

#if __APPLE__
Expand Down Expand Up @@ -37,6 +41,7 @@ namespace jsiudp {
};

struct Event {
int fd;
EventType type;
std::string data;
int family;
Expand All @@ -49,12 +54,16 @@ namespace jsiudp {
UdpManager(facebook::jsi::Runtime &jsiRuntime, RunOnJS runOnJS);
~UdpManager();

void invalidate();

protected:
facebook::jsi::Runtime &_runtime;
RunOnJS runOnJS;
std::map<int, std::thread> workers;
std::map<int, bool> running;
bool _invalidate = false;
std::atomic<bool> _invalidate = false;
std::thread eventThread;
std::map<int, std::shared_ptr<facebook::jsi::Function>> eventHandlers;

JSI_HOST_FUNCTION(create);
JSI_HOST_FUNCTION(send);
Expand All @@ -65,6 +74,13 @@ namespace jsiudp {
JSI_HOST_FUNCTION(close);
JSI_HOST_FUNCTION(getSockName);

void workerLoop(int fd, std::function<void(Event)> handler);
void workerLoop(int fd);
void sendEvent(Event event);
void receiveEvent();

private:
std::condition_variable cond;
std::mutex mutex;
std::queue<Event> events;
};
}

0 comments on commit 2fbd0a8

Please sign in to comment.