Skip to content

Commit

Permalink
Restore mkpipe()
Browse files Browse the repository at this point in the history
  • Loading branch information
cpq committed Dec 21, 2023
1 parent df3c216 commit 6c22e3e
Show file tree
Hide file tree
Showing 7 changed files with 1,443 additions and 1,294 deletions.
101 changes: 34 additions & 67 deletions examples/multi-threaded/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,15 @@
// Multithreading example.
// For each incoming request, we spawn a separate thread, that sleeps for
// some time to simulate long processing time, produces an output and
// hands over that output to the request handler function.
//
// We pass POST body to the worker thread, and respond with a calculated CRC
// sends that output to the parent connection.

#include "mongoose.h"

#define LISTENING_ADDR "http://localhost:8000"

struct thread_data {
struct mg_queue queue; // Worker -> Connection queue
struct mg_str body; // Copy of message body
};

// These two helper UDP connections are used to wake up mongoose thread
static struct mg_connection *s_wakeup_server;
static struct mg_connection *s_wakeup_client;
#define WAKEUP_URL "udp://127.0.0.1:40111"

static void start_thread(void *(*f)(void *), void *p) {
#ifdef _WIN32
#define usleep(x) Sleep((x) / 1000)
_beginthread((void(__cdecl *)(void *)) f, 0, p);
#else
#define closesocket(x) close(x)
#include <pthread.h>
pthread_t thread_id = (pthread_t) 0;
pthread_attr_t attr;
Expand All @@ -37,72 +23,53 @@ static void start_thread(void *(*f)(void *), void *p) {
#endif
}

static void *worker_thread(void *param) {
struct thread_data *d = (struct thread_data *) param;
char buf[100]; // On-stack buffer for the message queue

mg_queue_init(&d->queue, buf, sizeof(buf)); // Init queue
usleep(1 * 1000 * 1000); // Simulate long execution time

// Send a response to the connection
if (d->body.len == 0) {
mg_queue_printf(&d->queue, "Send me POST data");
} else {
uint32_t crc = mg_crc32(0, d->body.ptr, d->body.len);
mg_queue_printf(&d->queue, "crc32: %#x", crc);
free((char *) d->body.ptr);
}

// Wake up Mongoose thread by sending something to one of its connections
mg_send(s_wakeup_client, "hi", 2);
struct thread_data {
struct mg_mgr *mgr;
unsigned long conn_id; // Parent connection ID
struct mg_str message; // Original HTTP request
};

// Wait until connection reads our message, then it is safe to quit
while (d->queue.tail != d->queue.head) usleep(1000);
MG_INFO(("done, cleaning up..."));
free(d);
static void *thread_function(void *param) {
struct thread_data *p = (struct thread_data *) param;
sleep(2); // Simulate long execution
mg_wakeup(p->mgr, p->conn_id, "hi!", 3); // Respond to parent
free((void *) p->message.ptr); // Free all resources that were
free(p); // passed to us
return NULL;
}

static void wfn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_READ) {
c->recv.len = 0; // Discard received data
}
(void) ev_data, (void) fn_data;
}

// HTTP request callback
static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_HTTP_MSG) {
// Received HTTP request. Allocate thread data and spawn a worker thread
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
struct thread_data *d = (struct thread_data *) calloc(1, sizeof(*d));
d->body = mg_strdup(hm->body); // Pass received body to the worker
start_thread(worker_thread, d); // Start a thread
*(void **) c->data = d; // Memorise data pointer in c->data
} else if (ev == MG_EV_POLL) {
// Poll event. Delivered to us every mg_mgr_poll interval or faster
struct thread_data *d = *(struct thread_data **) c->data;
size_t len;
char *buf;
// Check if we have a message from the worker
if (d != NULL && (len = mg_queue_next(&d->queue, &buf)) > 0) {
// Got message from worker. Send a response and cleanup
mg_http_reply(c, 200, "", "%.*s\n", (int) len, buf);
mg_queue_del(&d->queue, len); // Delete message
*(void **) c->data = NULL; // Forget about thread data
if (mg_http_match_uri(hm, "/fast")) {
// Single-threaded code path, for performance comparison
// The /fast URI responds immediately
mg_http_reply(c, 200, "Host: foo.com\r\n", "hi\n");
} else {
// Multithreading code path
struct thread_data *data = calloc(1, sizeof(*data)); // Worker owns it
data->message = mg_strdup(hm->message); // Pass message
data->conn_id = c->id;
data->mgr = c->mgr;
start_thread(thread_function, data); // Start thread and pass data
}
} else if (ev == MG_EV_WAKEUP) {
struct mg_str *data = (struct mg_str *) ev_data;
mg_http_reply(c, 200, "", "Result: %.*s\n", data->len, data->ptr);
}
(void) fn_data;
}

int main(void) {
struct mg_mgr mgr;
mg_mgr_init(&mgr);
mg_mgr_init(&mgr); // Initialise event manager
mg_log_set(MG_LL_DEBUG); // Set debug log level
mg_http_listen(&mgr, LISTENING_ADDR, fn, NULL);
s_wakeup_server = mg_listen(&mgr, WAKEUP_URL, wfn, NULL);
s_wakeup_client = mg_connect(&mgr, WAKEUP_URL, wfn, NULL);
for (;;) mg_mgr_poll(&mgr, 5000); // Event loop. Use 5s poll interval
mg_mgr_free(&mgr); // Cleanup
mg_http_listen(&mgr, "http://localhost:8000", fn, NULL); // Create listener
mg_wakeup(&mgr, 0, "", 0); // Initialise wakeup socket pair
for (;;) { // Event loop
mg_mgr_poll(&mgr, 1000);
}
mg_mgr_free(&mgr);
return 0;
}
Loading

0 comments on commit 6c22e3e

Please sign in to comment.