diff --git a/examples/example_config.cpp b/examples/example_config.cpp index 5d7db36..2c92166 100644 --- a/examples/example_config.cpp +++ b/examples/example_config.cpp @@ -19,6 +19,8 @@ class ExampleActorConfig : public myframe::Actor { auto conf = GetConfig(); LOG(INFO) << GetActorName() << " pending queue size " << GetMailbox()->GetPendingQueueSize(); + LOG(INFO) << GetActorName() << " run queue size " + << GetMailbox()->GetRunQueueSize(); LOG(INFO) << GetActorName() << " conf " << conf->toStyledString(); return 0; } diff --git a/examples/example_config.json b/examples/example_config.json index 44194cd..7ebeeac 100644 --- a/examples/example_config.json +++ b/examples/example_config.json @@ -6,6 +6,8 @@ { "instance_name":"#1", "instance_config":{ + "pending_queue_size":-1, + "run_queue_size":-1, "key1":"hello", "key2":"world" } diff --git a/launcher/conf/sys.json b/launcher/conf/sys.json index 31ba1bb..a620858 100644 --- a/launcher/conf/sys.json +++ b/launcher/conf/sys.json @@ -1,5 +1,6 @@ { "default_pending_queue_size":-1, + "default_run_queue_size":-1, "thread_poll_size":4, "conn_event_size":2, "warning_msg_size":10 diff --git a/launcher/launcher.cpp b/launcher/launcher.cpp index a0b0dbb..b0d1395 100644 --- a/launcher/launcher.cpp +++ b/launcher/launcher.cpp @@ -78,7 +78,8 @@ int main(int argc, char** argv) { module_args.GetThreadPoolSize(), module_args.GetConnEventSize(), module_args.GetWarningMsgSize(), - module_args.GetDefaultPendingQueueSize())) { + module_args.GetDefaultPendingQueueSize(), + module_args.GetDefaultRunQueueSize())) { LOG(ERROR) << "Init failed"; return -1; } diff --git a/launcher/module_argument.cpp b/launcher/module_argument.cpp index 7e1227b..5b2b042 100644 --- a/launcher/module_argument.cpp +++ b/launcher/module_argument.cpp @@ -135,6 +135,10 @@ bool ModuleArgument::ParseSysConf(const std::string& sys_conf) { && root["default_pending_queue_size"].isInt()) { default_pending_queue_size_ = root["default_pending_queue_size"].asInt(); } + if (root.isMember("default_run_queue_size") + && root["default_run_queue_size"].isInt()) { + default_run_queue_size_ = root["default_run_queue_size"].asInt(); + } return true; } diff --git a/launcher/module_argument.h b/launcher/module_argument.h index 95c0732..d46c96c 100644 --- a/launcher/module_argument.h +++ b/launcher/module_argument.h @@ -31,6 +31,9 @@ class ModuleArgument final { inline int GetDefaultPendingQueueSize() const { return default_pending_queue_size_; } + inline int GetDefaultRunQueueSize() const { + return default_run_queue_size_; + } private: bool ParseSysConf(const std::string&); @@ -39,6 +42,7 @@ class ModuleArgument final { int conn_event_size_{2}; int warning_msg_size_{10}; int default_pending_queue_size_{-1}; + int default_run_queue_size_{-1}; std::string log_dir_; std::string lib_dir_; std::string conf_dir_; diff --git a/myframe/actor_context.cpp b/myframe/actor_context.cpp index d4ddf9d..d879f8a 100644 --- a/myframe/actor_context.cpp +++ b/myframe/actor_context.cpp @@ -26,11 +26,16 @@ ActorContext::ActorContext( actor_->SetContext(this); mailbox_.SetAddr(actor_->GetActorName()); int pending_queue_size = app->GetDefaultPendingQueueSize(); + int run_queue_size = app->GetDefaultRunQueueSize(); auto cfg = actor_->GetConfig(); if (cfg->isMember("pending_queue_size")) { pending_queue_size = cfg->get("pending_queue_size", -1).asInt(); } + if (cfg->isMember("run_queue_size")) { + run_queue_size = cfg->get("run_queue_size", -1).asInt(); + } mailbox_.SetPendingQueueSize(pending_queue_size); + mailbox_.SetRunQueueSize(run_queue_size); LOG(INFO) << mailbox_.Addr() << " context create"; } diff --git a/myframe/actor_context_manager.h b/myframe/actor_context_manager.h index 7c2f0db..6ad50cf 100644 --- a/myframe/actor_context_manager.h +++ b/myframe/actor_context_manager.h @@ -39,11 +39,12 @@ class ActorContextManager final { std::vector GetAllActorAddr(); bool HasActor(const std::string& name); + /* 将有消息的actor放入链表 */ + void PushContext(std::shared_ptr ctx); + private: /* 获得actor名对应的actor */ std::shared_ptr GetContext(const std::string& actor_name); - /* 将有消息的actor放入链表 */ - void PushContext(std::shared_ptr ctx); void PrintWaitQueue(); /// 当前注册actor数量 diff --git a/myframe/app.cpp b/myframe/app.cpp index 0943f66..3aa1266 100644 --- a/myframe/app.cpp +++ b/myframe/app.cpp @@ -64,7 +64,8 @@ bool App::Init( int thread_pool_size, int event_conn_size, int warning_msg_size, - int default_pending_queue_size) { + int default_pending_queue_size, + int default_run_queue_size) { if (state_.load() != kUninitialized) { return true; } @@ -73,6 +74,7 @@ bool App::Init( lib_dir_ = lib_dir; warning_msg_size_.store(warning_msg_size); default_pending_queue_size_ = default_pending_queue_size; + default_run_queue_size_ = default_run_queue_size; ret &= poller_->Init(); ret &= worker_ctx_mgr_->Init(warning_msg_size); ret &= ev_conn_mgr_->Init(event_conn_size); @@ -558,6 +560,10 @@ void App::CheckStopWorkers() { worker_ctx_mgr_->PopFrontIdleWorker(); auto common_idle_worker = worker_ctx->GetWorker(); common_idle_worker->SetActorContext(actor_ctx); + // 接收队列不空,重新加入等待执行队列 + if (!actor_mailbox->RecvEmpty()) { + actor_ctx_mgr_->PushContext(std::move(actor_ctx)); + } worker_ctx->GetCmdChannel()->SendToOwner(CmdChannel::Cmd::kRun); } else { LOG(ERROR) << actor_ctx->GetActor()->GetActorName() << " has no msg"; @@ -821,4 +827,8 @@ int App::GetDefaultPendingQueueSize() const { return default_pending_queue_size_; } +int App::GetDefaultRunQueueSize() const { + return default_run_queue_size_; +} + } // namespace myframe diff --git a/myframe/app.h b/myframe/app.h index 44bb36b..c6f3a9f 100644 --- a/myframe/app.h +++ b/myframe/app.h @@ -60,7 +60,8 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this { int thread_pool_size = 4, int event_conn_size = 2, int warning_msg_size = 10, - int default_pending_queue_size = -1); + int default_pending_queue_size = -1, + int default_run_queue_size = -1); int LoadServiceFromDir(const std::string& path); @@ -93,6 +94,7 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this { void Quit(); int GetDefaultPendingQueueSize() const; + int GetDefaultRunQueueSize() const; private: bool CreateActorContext( @@ -143,6 +145,7 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this { std::string node_addr_; /// int default_pending_queue_size_{-1}; + int default_run_queue_size_{-1}; std::atomic warning_msg_size_{10}; std::atomic state_{kUninitialized}; std::recursive_mutex local_mtx_; diff --git a/myframe/mailbox.cpp b/myframe/mailbox.cpp index 9c61a9c..5a5bd9b 100644 --- a/myframe/mailbox.cpp +++ b/myframe/mailbox.cpp @@ -90,6 +90,16 @@ const std::shared_ptr Mailbox::PopRecv() { } void Mailbox::MoveToRun() { + if (run_queue_size_ > 0) { + auto it = recv_.begin(); + for (size_t i = 0; + i < static_cast(run_queue_size_) && it != recv_.end(); + ++i) { + ++it; + } + run_.splice(run_.begin(), recv_, recv_.begin(), it); + return; + } run_.splice(run_.end(), recv_); } @@ -118,6 +128,14 @@ int Mailbox::GetPendingQueueSize() const { return pending_queue_size_; } +void Mailbox::SetRunQueueSize(int sz) { + run_queue_size_ = sz; +} + +int Mailbox::GetRunQueueSize() const { + return run_queue_size_; +} + std::list>* Mailbox::GetRecvList() { return &recv_; } diff --git a/myframe/mailbox.h b/myframe/mailbox.h index d5273ef..deaa581 100644 --- a/myframe/mailbox.h +++ b/myframe/mailbox.h @@ -46,12 +46,14 @@ class MYFRAME_EXPORT Mailbox final { bool RunEmpty() const; int RunSize() const; const std::shared_ptr PopRun(); - void SetPendingQueueSize(int sz); - int GetPendingQueueSize() const; + void SetRunQueueSize(int sz); + int GetRunQueueSize() const; /// 收件箱 int RecvSize() const; bool RecvEmpty() const; + void SetPendingQueueSize(int sz); + int GetPendingQueueSize() const; private: /// 收件箱 @@ -70,6 +72,7 @@ class MYFRAME_EXPORT Mailbox final { std::list> send_; std::list> run_; int pending_queue_size_{-1}; + int run_queue_size_{-1}; }; MYFRAME_EXPORT std::ostream& operator<<(