This example illustrates how to create a simple pipeline with a single source, node, and sink connected together in C++. Each of the nodes in the segment ("node" here refers to either a source, sink or an object that is both a source and sink) is responsible for a simple task:
- Source: Creates 3
int
in the sequence 1, 2, 3 and completes - Node: Transforms the
int
to afloat
by multiplying it by 2.5, resulting in a float - Sink: Prints any received
float
value and counts the number of emitted items
Each of the objects in the Segment is created using the segment::IBuilder::make_XXX(NAME, ...)
function where XXX
is replace with either source
, sink
or node
.
Once each object is created, they can be linked together using segment::IBuilder::make_edge(SOURCE, SINK)
. There are a few rules when making edges:
- Objects deriving from
mrc::node::SourceProperties
can only appear in the left-hand argument of make_edge() - Objects deriving from
mrc::node::SinkProperties
can only appear in the right-hand argument of make_edge() - Node objects can appear in either side since they derive from both
mrc::node::SourceProperties
andmrc::node::SinkProperties
- Sources can only be connected to one downstream sink
- To use multiple downstream sinks with broadcast or round-robin functionality, see the guide on operators
- Sinks can accept multiple upstream sources
- Its possible to connect two sources to a single sink. The sink will process the messages in the order that they are received in
The first "node" that we will be creating is a source. Source objects have no upstream dependencies and are responsible for producing data to be consume by downstream object.
To create a source, you call make_source<T>
on the segment::IBuilder
object passing a name and a lambda of type std::function<(rxcpp::subscriber<T>)>
Please see https://reactivex.io/RxCpp/ and navigate to rxcpp::subscriber<T, Observer> for more detail on the lambda type. where T
is the type of object that the source will be producing. In our example, we are creating integers so the source object looks like:
auto source = s.make_source<int>("int_source", [](rxcpp::subscriber<int> s) {
s.on_next(1);
s.on_next(2);
s.on_next(3);
s.on_completed();
});
The reason that we pass a lambda to make_source
is because this function is only called once the source is started. Inside of our lambda, the source generates objects and then passes them to downstream objects by calling s.on_next()
. Once completed, sources indicate that no more values will be generated by either calling s.on_completed()
if the source completed successfully, or s.on_error()
if the source completed with an error. After this point, it is undefined behavior to call s.on_next
or any other function on s
after on_completed
or on_error
is called.
Node objects work as both a source and a sink and are responsible for connecting the upstream source to any downstream sink. Because the upstream source and downstream sink can be different types, make_node<T, R>
accepts two template arguments for the source type and sink type respectively. To make building nodes easier, MRC supports Reactive operators in the make_node
function. Please see https://reactivex.io/RxCpp/ and navigate to Operators for more details on Reactive operators. These Reactive operators can be used to chain multiple steps together and simplify node creation.
For our example, we will only be using a single Reactive operator: map
. To build the node, we need to call make_node
and provide the source type, sink type, node name and node operators:
auto node = s.make_node<int, float>("int_to_float", rxcpp::operators::map([](const int& data) {
// Multiple the input value returning a float
return float(2.5f * data);
}));
In our example, the map
operator takes a lambda that will be called for each value that passes through the node. The result returned by this lambda will be passed on to the next downstream node. Our lambda is very simple, only multipling the input value by 2.5
and returning a float
.
The final "node" we will be making is a sink object. Sinks are the opposite of sources and are terminators in the pipeline graph. They only accept upstream connections and do not provide the ability to pass data on. To make a sink, we need to provide the sink type, sink name, and an rxcpp::observer
shown here:
auto sink = s.make_sink<float>("float_sink", rxcpp::make_observer_dynamic<float>([&counter](float data) {
counter++;
std::cout << "sink: " << data << std::endl;
}));
We build up the rxcpp::observer
using rxcpp::make_observer_dynamic
which allows us to create the observer using only lambda functions. Each observer has 3 lambdas, an on_next
, and on_error
and an on_completed
. These are the functions that get called by the respective calls in the source object.
In our sink, we are simply printing the values passed to the sink, and also incrementing a counter for each message received. Using lambda captures like this is the best way to get data out of the sink.
To connect sources and sinks together, you simply need to call s.make_edge(source, sink)
passing the source and sink objects. During compilation, the source and sink types will be checked for compatibility and result in a compilation error if they are incompatible.
We can see this simple pipeline in action by running the example in the build folder:
$ ${QSG_BUILD_DIR}/docs/quickstart/cpp/ex00_simple_pipeline/ex00_simple_pipeline.x
mrc pipeline starting...
sink: 2.5
sink: 5
sink: 7.5
mrc pipeline complete: counter should be 3; counter=3
Where ${QSG_BUILD_DIR}
is the output location of the CMake build.
We can see that the sink function was called 3 times, one for each value emitted by the source. What happens if you change the number of on_next
statements in the source object?