diff --git a/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp b/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp index 633b1dabe..1b8dcfcc0 100644 --- a/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp +++ b/python/mrc/_pymrc/include/pymrc/asyncio_runnable.hpp @@ -291,8 +291,8 @@ void AsyncioRunnable::run(mrc::runnable::Context& ctx) scheduler->run_until_complete(this->main_task(scheduler)); // Need to drop the output edges - mrc::node::SourceProperties::release_edge_connection(); - mrc::node::SinkProperties::release_edge_connection(); + mrc::node::SourceProperties::release_edge_connection(); + mrc::node::SinkProperties::release_edge_connection(); } template diff --git a/python/mrc/_pymrc/tests/CMakeLists.txt b/python/mrc/_pymrc/tests/CMakeLists.txt index f40e20d72..02186de90 100644 --- a/python/mrc/_pymrc/tests/CMakeLists.txt +++ b/python/mrc/_pymrc/tests/CMakeLists.txt @@ -21,6 +21,7 @@ add_subdirectory(coro) # Keep all source files sorted!!! add_executable(test_pymrc + test_asyncio_runnable.cpp test_codable_pyobject.cpp test_executor.cpp test_main.cpp diff --git a/python/mrc/_pymrc/tests/test_asyncio_runnable.cpp b/python/mrc/_pymrc/tests/test_asyncio_runnable.cpp new file mode 100644 index 000000000..5202875b4 --- /dev/null +++ b/python/mrc/_pymrc/tests/test_asyncio_runnable.cpp @@ -0,0 +1,116 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "test_pymrc.hpp" + +#include "pymrc/asyncio_runnable.hpp" +#include "pymrc/executor.hpp" +#include "pymrc/pipeline.hpp" +#include "pymrc/port_builders.hpp" +#include "pymrc/types.hpp" + +#include "mrc/node/rx_node.hpp" +#include "mrc/node/rx_sink.hpp" +#include "mrc/node/rx_sink_base.hpp" +#include "mrc/node/rx_source.hpp" +#include "mrc/node/rx_source_base.hpp" +#include "mrc/options/options.hpp" +#include "mrc/options/topology.hpp" +#include "mrc/segment/builder.hpp" +#include "mrc/segment/object.hpp" +#include "mrc/types.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include // IWYU pragma: keep +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace py = pybind11; +namespace pymrc = mrc::pymrc; +using namespace std::string_literals; +using namespace py::literals; + +PYMRC_TEST_CLASS(AsyncioRunnable); + +class MyAsyncioRunnable : public pymrc::AsyncioRunnable +{ + mrc::coroutines::AsyncGenerator on_data(int&& value) override + { + co_yield value; + co_yield value * 2; + }; +}; + +TEST_F(TestAsyncioRunnable, Execute) +{ + std::atomic counter = 0; + pymrc::Pipeline p; + + pybind11::module_::import("mrc.core.coro"); + + auto init = [&counter](mrc::segment::IBuilder& seg) { + auto src = seg.make_source("src", [](rxcpp::subscriber& s) { + if (s.is_subscribed()) + { + s.on_next(5); + s.on_next(10); + s.on_next(7); + } + + s.on_completed(); + }); + + auto internal = seg.construct_object("internal"); + + auto sink = seg.make_sink("sink", [&counter](unsigned int x) { + counter.fetch_add(x, std::memory_order_relaxed); + }); + + seg.make_edge(src, internal); + seg.make_edge(internal, sink); + }; + + p.make_segment("seg1"s, init); + + auto options = std::make_shared(); + options->topology().user_cpuset("0"); + + pymrc::Executor exec{options}; + exec.register_pipeline(p); + + exec.start(); + exec.join(); + + EXPECT_EQ(counter, 66); +}