Skip to content

Commit

Permalink
allow Async queue for pub/sub + docs
Browse files Browse the repository at this point in the history
  • Loading branch information
boazsegev committed Dec 14, 2024
1 parent c4bc044 commit b496109
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 38 deletions.
47 changes: 30 additions & 17 deletions fio-stl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38011,6 +38011,8 @@ typedef struct {
void (*on_unsubscribe)(void *udata);
/** The opaque udata value is ignored and made available to the callbacks. */
void *udata;
/** The queue to which the callbacks should be routed. May be NULL. */
fio_queue_s *queue;
/** Replay cached messages (if any) since supplied time in milliseconds. */
uint64_t replay_since;
/**
Expand Down Expand Up @@ -38469,6 +38471,7 @@ typedef struct fio_subscription_s {
uint64_t replay_since;
fio_io_s *io;
fio_channel_s *channel;
fio_queue_s *queue;
void (*on_message)(fio_msg_s *msg);
void (*on_unsubscribe)(void *udata);
void *udata;
Expand All @@ -38478,7 +38481,8 @@ typedef struct fio_subscription_s {
* Reference counting: `fio_subscription_dup(sb)` / `fio_subscription_free(sb)`
*/
FIO_SFUNC void fio___pubsub_subscription_on_destroy(fio_subscription_s *sub);
#define FIO_REF_NAME fio_subscription
#define FIO_REF_NAME fio___subscription
#define FIO_REF_TYPE fio_subscription_s
#define FIO_REF_DESTROY(obj) fio___pubsub_subscription_on_destroy(&(obj))
#define FIO_REF_CONSTRUCTOR_ONLY 1
#define FIO___RECURSIVE_INCLUDE 1
Expand Down Expand Up @@ -39045,7 +39049,7 @@ FIO_IFUNC void fio___pubsub_unsubscribe_task(void *sub_, void *ignr_) {
sub->channel = NULL;

no_channel:
fio_subscription_free(sub);
fio___subscription_free(sub);
return;
(void)ignr_;
}
Expand All @@ -39071,10 +39075,11 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) {
} uptr;
if (args.channel.len > 0xFFFFUL)
goto sub_error;
s = fio_subscription_new();
s = fio___subscription_new();
if (!s)
goto sub_error;

if (!args.queue)
args.queue = fio_io_queue();
*s = (fio_subscription_s){
.replay_since = args.replay_since,
.io = args.io,
Expand All @@ -39084,6 +39089,7 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) {
: fio___subscription_mock_cb)),
.on_unsubscribe = args.on_unsubscribe,
.udata = args.udata,
.queue = args.queue,
};
args.is_pattern = !!args.is_pattern; /* make sure this is either 1 or zero */
uptr.ls = &s->node;
Expand Down Expand Up @@ -39141,7 +39147,7 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) {
fio_bstr_free(uptr.str->buf);
s->node = FIO_LIST_INIT(s->node);
s->history = FIO_LIST_INIT(s->history);
fio_subscription_free(s);
fio___subscription_free(s);
FIO_LOG_WARNING(
"(%d) master-only subscription attempt on a non-master process: %.*s",
fio_io_pid(),
Expand Down Expand Up @@ -39211,6 +39217,11 @@ int fio_unsubscribe FIO_NOOP(fio_subscribe_args_s args) {
Pub/Sub Message Distribution (local process)
***************************************************************************** */

static void fio___subscription_after_on_message_task(void *s, void *m) {
fio___subscription_free((fio_subscription_s *)s);
fio___pubsub_message_free((fio___pubsub_message_s *)m);
}

/* performs the subscription callback */
FIO_IFUNC void fio___subscription_on_message_task(void *s_, void *m_) {
fio_subscription_s *s = (fio_subscription_s *)s_;
Expand All @@ -39230,11 +39241,13 @@ FIO_IFUNC void fio___subscription_on_message_task(void *s_, void *m_) {
s->udata = container.msg.udata;
if (container.flag)
goto reschedule;
fio_subscription_free(s);
fio___pubsub_message_free(m);
fio_queue_push(fio_io_queue(),
fio___subscription_after_on_message_task,
s,
m);
return;
reschedule:
fio_queue_push(fio_io_queue(), fio___subscription_on_message_task, s_, m_);
fio_queue_push(s->queue, fio___subscription_on_message_task, s_, m_);
}

/* returns the internal message object. */
Expand All @@ -39259,18 +39272,18 @@ FIO_SFUNC void fio___pubsub_channel_deliver_task(void *ch_, void *m_) {
FIO_LIST_EACH(fio_subscription_s, node, head, s) {
if (m->data.io != s->io && m->data.published >= s->replay_since)
fio_queue_push(
fio_io_queue(),
s->queue,
(void (*)(void *, void *))fio___subscription_on_message_task,
fio_subscription_dup(s),
fio___subscription_dup(s),
fio___pubsub_message_dup(m));
}
} else {
FIO_LIST_EACH(fio_subscription_s, node, head, s) {
if (m->data.io != s->io)
fio_queue_push(
fio_io_queue(),
s->queue,
(void (*)(void *, void *))fio___subscription_on_message_task,
fio_subscription_dup(s),
fio___subscription_dup(s),
fio___pubsub_message_dup(m));
}
}
Expand All @@ -39279,17 +39292,17 @@ FIO_SFUNC void fio___pubsub_channel_deliver_task(void *ch_, void *m_) {
FIO_LIST_EACH(fio_subscription_s, node, head, s) {
if (m->data.published >= s->replay_since)
fio_queue_push(
fio_io_queue(),
s->queue,
(void (*)(void *, void *))fio___subscription_on_message_task,
fio_subscription_dup(s),
fio___subscription_dup(s),
fio___pubsub_message_dup(m));
}
} else {
FIO_LIST_EACH(fio_subscription_s, node, head, s) {
fio_queue_push(
fio_io_queue(),
s->queue,
(void (*)(void *, void *))fio___subscription_on_message_task,
fio_subscription_dup(s),
fio___subscription_dup(s),
fio___pubsub_message_dup(m));
}
}
Expand Down Expand Up @@ -44963,7 +44976,7 @@ typedef struct fio_http_settings_s {
*/
size_t max_body_size;
/**
* The maximum websocket message size/buffer (in bytes) for Websocket
* The maximum WebSocket message size/buffer (in bytes) for Websocket
* connections. Defaults to FIO_HTTP_DEFAULT_WS_MAX_MSG_SIZE bytes.
*/
size_t ws_max_msg_size;
Expand Down
6 changes: 4 additions & 2 deletions fio-stl.md
Original file line number Diff line number Diff line change
Expand Up @@ -11033,6 +11033,8 @@ typedef struct {
void (*on_unsubscribe)(void *udata);
/** The opaque udata value is ignored and made available to the callbacks. */
void *udata;
/** The queue to which the callbacks should be routed. May be NULL. */
fio_queue_s *queue;
/**
* OPTIONAL: subscription handle return value - should be NULL when using
* automatic memory management with the IO or global environment.
Expand Down Expand Up @@ -11095,7 +11097,7 @@ int fio_unsubscribe(subscribe_args_s args);

Cancels an existing subscriptions.

Accepts the same arguments as [`fio_subscribe`](fio_subscribe), except the `udata` and callback details are ignored (no need to provide `udata` or callback details).
Accepts the same arguments as [`fio_subscribe`](fio_subscribe), except the `udata`, and callback details are ignored (no need to provide `udata` or callback details).

If a `subscription_handle_ptr` was provided it should contain the value of the subscription handle returned.

Expand Down Expand Up @@ -11474,7 +11476,7 @@ typedef struct fio_http_settings_s {
*/
size_t max_body_size;
/**
* The maximum websocket message size/buffer (in bytes) for Websocket
* The maximum WebSocket message size/buffer (in bytes) for Websocket
* connections. Defaults to FIO_HTTP_DEFAULT_WS_MAX_MSG_SIZE bytes.
*/
size_t ws_max_msg_size;
Expand Down
45 changes: 29 additions & 16 deletions fio-stl/420 pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ typedef struct {
void (*on_unsubscribe)(void *udata);
/** The opaque udata value is ignored and made available to the callbacks. */
void *udata;
/** The queue to which the callbacks should be routed. May be NULL. */
fio_queue_s *queue;
/** Replay cached messages (if any) since supplied time in milliseconds. */
uint64_t replay_since;
/**
Expand Down Expand Up @@ -543,6 +545,7 @@ typedef struct fio_subscription_s {
uint64_t replay_since;
fio_io_s *io;
fio_channel_s *channel;
fio_queue_s *queue;
void (*on_message)(fio_msg_s *msg);
void (*on_unsubscribe)(void *udata);
void *udata;
Expand All @@ -552,7 +555,8 @@ typedef struct fio_subscription_s {
* Reference counting: `fio_subscription_dup(sb)` / `fio_subscription_free(sb)`
*/
FIO_SFUNC void fio___pubsub_subscription_on_destroy(fio_subscription_s *sub);
#define FIO_REF_NAME fio_subscription
#define FIO_REF_NAME fio___subscription
#define FIO_REF_TYPE fio_subscription_s
#define FIO_REF_DESTROY(obj) fio___pubsub_subscription_on_destroy(&(obj))
#define FIO_REF_CONSTRUCTOR_ONLY 1
#define FIO___RECURSIVE_INCLUDE 1
Expand Down Expand Up @@ -1119,7 +1123,7 @@ FIO_IFUNC void fio___pubsub_unsubscribe_task(void *sub_, void *ignr_) {
sub->channel = NULL;

no_channel:
fio_subscription_free(sub);
fio___subscription_free(sub);
return;
(void)ignr_;
}
Expand All @@ -1145,10 +1149,11 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) {
} uptr;
if (args.channel.len > 0xFFFFUL)
goto sub_error;
s = fio_subscription_new();
s = fio___subscription_new();
if (!s)
goto sub_error;

if (!args.queue)
args.queue = fio_io_queue();
*s = (fio_subscription_s){
.replay_since = args.replay_since,
.io = args.io,
Expand All @@ -1158,6 +1163,7 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) {
: fio___subscription_mock_cb)),
.on_unsubscribe = args.on_unsubscribe,
.udata = args.udata,
.queue = args.queue,
};
args.is_pattern = !!args.is_pattern; /* make sure this is either 1 or zero */
uptr.ls = &s->node;
Expand Down Expand Up @@ -1215,7 +1221,7 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) {
fio_bstr_free(uptr.str->buf);
s->node = FIO_LIST_INIT(s->node);
s->history = FIO_LIST_INIT(s->history);
fio_subscription_free(s);
fio___subscription_free(s);
FIO_LOG_WARNING(
"(%d) master-only subscription attempt on a non-master process: %.*s",
fio_io_pid(),
Expand Down Expand Up @@ -1285,6 +1291,11 @@ int fio_unsubscribe FIO_NOOP(fio_subscribe_args_s args) {
Pub/Sub Message Distribution (local process)
***************************************************************************** */

static void fio___subscription_after_on_message_task(void *s, void *m) {
fio___subscription_free((fio_subscription_s *)s);
fio___pubsub_message_free((fio___pubsub_message_s *)m);
}

/* performs the subscription callback */
FIO_IFUNC void fio___subscription_on_message_task(void *s_, void *m_) {
fio_subscription_s *s = (fio_subscription_s *)s_;
Expand All @@ -1304,11 +1315,13 @@ FIO_IFUNC void fio___subscription_on_message_task(void *s_, void *m_) {
s->udata = container.msg.udata;
if (container.flag)
goto reschedule;
fio_subscription_free(s);
fio___pubsub_message_free(m);
fio_queue_push(fio_io_queue(),
fio___subscription_after_on_message_task,
s,
m);
return;
reschedule:
fio_queue_push(fio_io_queue(), fio___subscription_on_message_task, s_, m_);
fio_queue_push(s->queue, fio___subscription_on_message_task, s_, m_);
}

/* returns the internal message object. */
Expand All @@ -1333,18 +1346,18 @@ FIO_SFUNC void fio___pubsub_channel_deliver_task(void *ch_, void *m_) {
FIO_LIST_EACH(fio_subscription_s, node, head, s) {
if (m->data.io != s->io && m->data.published >= s->replay_since)
fio_queue_push(
fio_io_queue(),
s->queue,
(void (*)(void *, void *))fio___subscription_on_message_task,
fio_subscription_dup(s),
fio___subscription_dup(s),
fio___pubsub_message_dup(m));
}
} else {
FIO_LIST_EACH(fio_subscription_s, node, head, s) {
if (m->data.io != s->io)
fio_queue_push(
fio_io_queue(),
s->queue,
(void (*)(void *, void *))fio___subscription_on_message_task,
fio_subscription_dup(s),
fio___subscription_dup(s),
fio___pubsub_message_dup(m));
}
}
Expand All @@ -1353,17 +1366,17 @@ FIO_SFUNC void fio___pubsub_channel_deliver_task(void *ch_, void *m_) {
FIO_LIST_EACH(fio_subscription_s, node, head, s) {
if (m->data.published >= s->replay_since)
fio_queue_push(
fio_io_queue(),
s->queue,
(void (*)(void *, void *))fio___subscription_on_message_task,
fio_subscription_dup(s),
fio___subscription_dup(s),
fio___pubsub_message_dup(m));
}
} else {
FIO_LIST_EACH(fio_subscription_s, node, head, s) {
fio_queue_push(
fio_io_queue(),
s->queue,
(void (*)(void *, void *))fio___subscription_on_message_task,
fio_subscription_dup(s),
fio___subscription_dup(s),
fio___pubsub_message_dup(m));
}
}
Expand Down
4 changes: 3 additions & 1 deletion fio-stl/420 pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ typedef struct {
void (*on_unsubscribe)(void *udata);
/** The opaque udata value is ignored and made available to the callbacks. */
void *udata;
/** The queue to which the callbacks should be routed. May be NULL. */
fio_queue_s *queue;
/**
* OPTIONAL: subscription handle return value - should be NULL when using
* automatic memory management with the IO or global environment.
Expand Down Expand Up @@ -132,7 +134,7 @@ int fio_unsubscribe(subscribe_args_s args);
Cancels an existing subscriptions.
Accepts the same arguments as [`fio_subscribe`](fio_subscribe), except the `udata` and callback details are ignored (no need to provide `udata` or callback details).
Accepts the same arguments as [`fio_subscribe`](fio_subscribe), except the `udata`, and callback details are ignored (no need to provide `udata` or callback details).
If a `subscription_handle_ptr` was provided it should contain the value of the subscription handle returned.
Expand Down
2 changes: 1 addition & 1 deletion fio-stl/439 http.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ typedef struct fio_http_settings_s {
*/
size_t max_body_size;
/**
* The maximum websocket message size/buffer (in bytes) for Websocket
* The maximum WebSocket message size/buffer (in bytes) for Websocket
* connections. Defaults to FIO_HTTP_DEFAULT_WS_MAX_MSG_SIZE bytes.
*/
size_t ws_max_msg_size;
Expand Down
2 changes: 1 addition & 1 deletion fio-stl/439 http.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ typedef struct fio_http_settings_s {
*/
size_t max_body_size;
/**
* The maximum websocket message size/buffer (in bytes) for Websocket
* The maximum WebSocket message size/buffer (in bytes) for Websocket
* connections. Defaults to FIO_HTTP_DEFAULT_WS_MAX_MSG_SIZE bytes.
*/
size_t ws_max_msg_size;
Expand Down

0 comments on commit b496109

Please sign in to comment.