Skip to content

Latest commit

 

History

History
742 lines (646 loc) · 37.9 KB

150-MigrateInputOutputBlocks.md

File metadata and controls

742 lines (646 loc) · 37.9 KB

Migrating input and output blocks to the version 2 API

The previous version 1 API for writing custom input and output blocks was deprecated in version 10.7 and has been removed in version 10.17. To continue to work, you must update any blocks using the old API to the version 2 API as shown in this migration guide.

The following actions and events are part of the deprecated version 1 API:

  • BlockBase.listensForInput
  • BlockBase.sendsSyncOutput
  • BlockBase.sendsAsyncOutput
  • apama.analyticsbuilder.cumulocity.inventory.InputHelper
  • apama.analyticsbuilder.cumulocity.inventory.OutputHelper

The version 2 API simplifies the workflow for input and output blocks and allows using multiple devices within a single model. The following events are available as part of the version 2 API. For generic input and output blocks:

  • apama.analyticsbuilder.InputParams
  • apama.analyticsbuilder.InputHandler
  • apama.analyticsbuilder.OutputParams
  • apama.analyticsbuilder.OutputHandler

For Cumulocity IoT-specific input and output blocks:

  • apama.analyticsbuilder.cumulocity.CumulocityInputParams
  • apama.analyticsbuilder.cumulocity.CumulocityInputHandler
  • apama.analyticsbuilder.cumulocity.CumulocityOutputParams
  • apama.analyticsbuilder.cumulocity.CumulocityOutputHandler

Note: The version 2 API is also called "handler API" in the documentation.

New functionalities in the handler API

This section lists some of the new functionalities provided by the handler API.

  • A simpler mechanism for scheduling input events with or without source timestamps.
  • Forwarding output values to models consuming them even when models are on different workers.
  • Automatic handling of input events dropped because of being too old to process and notifying the framework about them.
  • Support for tagging of time-synchronous output events through callback actions before sending them to external receivers.
  • Simpler API for consuming and producing multiple inputs and outputs streams.
  • Automatic profiling of output events generated by output blocks.

Extra functionalities are provided by API for input and output blocks consuming or producing to Cumulocity IoT devices:

  • Automatic deduction of partition value to use when declaring inputs and outputs.
  • Automatic deduction of partition value to use when scheduling input events to be processed.
  • Automatic tagging of time-synchronous output events through callback actions before sending them to external receivers.
  • Utility actions to get device information.

Migrating input blocks

An input block is a block that receives data from an external source (to the model) and makes it available to other blocks within the model. See Input and output blocks for more details.

An input block listens for some events and optionally partitions data based on a field and filters on other event fields. It is advisable to pass this information to the Analytics Builder runtime to ensure that the model dependencies can be identified and the runtime can determine the appropriate execution order.

Note: For migrating input blocks that were using the Cumulocity IoT helper API, see the section Migrating custom Cumulocity IoT input blocks.

There are usually two aspects of input blocks: declaring the input stream, and scheduling the input event for processing.

Declaring an input stream

Using the version 1 API, to declare an input stream, the $base.listensForInput() action is called by providing the name of the event type being listened for, the fields of the events, and the partition value for the stream. This action returns the inputId of the stream which is then used to create the timer for the input event.

The example below declares an input block named MyInputBlock that consumes events of type MyEvent that listens for the partition value specified by the block parameter source and filters the events further on the type specified by the block parameter type.

event MyInputBlock {
    /** BlockBase Object. */
    BlockBase $base;
    /** The parameters for the block. */
    MyInputBlock_$Parameters $parameters;
    action $validate() {
        // Declare an input stream. The returned inputId (of the stream) is used to 
        // create the timer for the input event.
        string inputId := $base.listensForInput(MyEvent.getName(), 
                            {"type":<any>$parameters.type}, $parameters.source);
        
    }
}

Using the handler API, the input stream is declared by calling the $base.consumesInput action providing an InputParams object describing the input stream. The action returns an InputHandler object which is used to schedule the input events for processing.

For example, using the handler API, the previous declaration would look like the following:

event MyInputBlock {
    /**BlockBase object.*/
    BlockBase $base;
    /** The parameters for the block. */
    MyInputBlock_$Parameters $parameters;
    /** The input handler for scheduling input events. */
    InputHandler inputHandler;
    action $validate() {
        // Create an InputParams object to declare that the input block consumes events of type 
        // MyEvent, listens for the partition value specified by the block parameter `source` and 
        // filter the events further for the type specified by the block parameter `type`.
        InputParams inputParams := InputParams.forEventType(MyEvent.getName()).
                                    withPartition($parameters.source).
                                    withFields({"type":<any>$parameters.type});

        // Declaring input streams. Saved inputHandler object for scheduling input events for processing.
        inputHandler := $base.consumesInput(inputParams);
    }
}

Scheduling input

Typically, an input block will listen for events using an on all <EventType> listener, started in the $init action. On matching an event, the block will extract a timestamp from the event, and create a timer for that value.

Using the version 1 API, an input block explicitly calculates the time after which the timer should be triggered and has to call $base.createTimer and explicitly needs to calculate the time when either the IgnoreTimestamp parameter is enabled or input events do not have timestamp information. Also, the input block needs to report to the framework about dropped events.

An example of using the version 1 API for scheduling input events is shown below.

action $init() {

	// Listen for events of type MyEvent and filter them by type.
	on all MyEvent(source = $parameters.source, type = $parameters.type) as e {
		try{
			// Try to find the next available time to schedule when ignoreTimestamp is
			// enabled.
			Value value := new Value;
			value.value := e.value;
			value.properties["source"] = e.source;
			value.properties["type"] = e.type;

			if ignoreTimestamp {
				// Either use the model time or use the next possible time after the timeValue.
				timeToScheduleInput := float.max(timeToScheduleInput, $base.getModelTime())
										.nextAfter(float.INFINITY);
				value.timestamp := timeToScheduleInput;
			} else { 
				// Use timestamp present in the event.
				timeToScheduleInput := e.time;
				value.timestamp := e.time;
			}
			// Creating time params.
			TimerParams tp := TimerParams.absolute(timeToScheduleInput)
								.withPayload(value)
								.withPartition(e.source);
			// Create timer to process input event.
			$base.createTimerWith(tp);
		}
		catch(Exception exp) {
			// Exception handling and reporting about dropped events to the framework.
			$base.droppedEvent(e, e.time);
		}
	}
}

action $timerTriggered(Activation $activation, any $payload) {
    $setOutput_value($activation, $payload);
}

When using the handler API, the input block need not worry about calculating the time to schedule an input value to process, nor identifying events to drop. To schedule an input event, call the schedule action on the InputHandler object providing the input value, the source timestamp of the value, and the partition to which the input value belongs. The framework will drop the event if necessary, reporting this.

An example of using the handler API to schedule an input event is shown below.

action $init() {

	// Listen for events of type MyEvent and filter them by type.
	on all MyEvent(source = $parameters.source, type = $parameters.type) as e {
		// If ignoreTimestamp is enabled, use an empty timestamp
		// so that the framework will calculate the next timestamp to process the event.
		optional<float> timeValue := new optional<float>;

		// If ignoreTimestamp is disabled, use the event time.
		if not ignoreTimestamp {
			timeValue := e.time;
		}
		// Schedule events to be processed as per their timestamp.
		// Pass the input event as payload, it will be received back in 
		// $timerTriggered action as $payload parameter.
		TimerHandle handle := inputHandler.schedule(e, timeValue, e.source);
	}

	action $timerTriggered(Activation $activation, any $payload) {
		MyEvent e := <MyEvent> $payload;
		Value value := new Value;
		value.value := e.value;
		value.timestamp := $activation.timestamp;
		value.properties["source"] = e.source;
		value.properties["type"] = e.type;
		$setOutput_value($activation, value);
	}

Multiple inputs

Using the version 1 API, for blocks that use multiple inputs, BlockBase.listensForInput returns an input identifier that can be added to a TimerParams with the withInputId method. This allows the framework to identify which input a block is using, and is required if the block has more than one input. This can be stored on the block object itself.

When using the handler API, create a handler for each input, and use the respective handler to schedule input events.

Migrating output blocks

An output block is a block that sends data to an external (to the model) source. See Input and output blocks for more details.

Output blocks should construct the event to send, typically using block inputs and parameters. The event should be routed if synchronous (using the route statement) and sent (using the send statement) to the appropriate channel.

Note: For migrating output blocks that were using the Cumulocity IoT helper API, see the section Migrating custom Cumulocity IoT output blocks.

There are usually two aspects of output blocks: declaring the output stream, and sending events to external sources/receivers.

Declaring output streams

Using the version 1 API, to declare an output event, if the output is time-synchronous, the block has to call the BlockBase.sendsSyncOutput action with the same parameters as an input block would use, or if the output is time-asynchronous, the block has to call the BlockBase.sendsAsyncOutput action.

For example, as shown below, the output block declares that it produces a time-synchronous output of event type MyEvent and partitions on the source field and filters on source and type as follows.

event MyOutputBlock {
    /* Block base object. */
    BlockBase $base;
    MyOutputBlock_$Parameters $parameters;
    /* The model name to use for tagging. */
    string modelName;
    
    action $validate(dictionary<string, any> $modelScopeParameters) {
        // Extract the model name for tagging.
        modelName := $modelScopeParameters[ABConstants.MODEL_NAME_IDENTIFIER].valueToString();
        // Declare an output stream by declaring that the output block sends time-synchronous 
        // events of type MyEvent. Also specify the partition value to which it is sending and 
        // the fields output events have.
        $base.sendsSyncOutput(MyEvent.getName(),{"type": <any>$parameters.type}, $parameters.source);
    }
}

Using the new handler API, the block has to call BlockBase.producesOutput from the $validate action which takes an OutputParams object as a parameter and returns an OutputHandler object. This API also facilitates the tagging of events before sending to a channel via a callback mechanism, so that the user need not worry about tagging while sending output to external receivers.

event MyOutputBlock {
    BlockBase $base;
    MyOutputBlock_$Parameters $parameters;
    /** The output handler object for sending output events. */
    OutputHandler outputHandler;
    /* The model name to use for tagging. */
    string modelName;
	
    action $validate(dictionary<string, any> $modelScopeParameters) {
        // Extract the model name for tagging.
        modelName := $modelScopeParameters[ABConstants.MODEL_NAME_IDENTIFIER].valueToString();
        // Create an OutputParams object to declare that the output block produces time-synchronous 
        // events of type MyEvent. Also specify the partition value to which it is sending and 
        // the fields output events have.
        OutputParams params := OutputParams.forSyncEventType(MyEvent.getName(), 
                                                {"type":<any>$parameters.type}, tagOutputEvent)
                                            .withPartitionValue($parameters.source);
        // Now declare the output and save the output handler object.
        outputHandler := $base.producesOutput(params);
    }
    
    /** The callback action to add a tag to the output event.*/
    action tagOutputEvent(MyEvent e) {
        // Add the model name to the ouput event so that input blocks can identify and ignore it.
        e.params[ABConstants.MODEL_NAME_IDENTIFIER] := modelName;
    }
}

For asynchronous output, you must declare this in the OutputParams object by using the forAsyncEventType action instead and you do not need to supply a tagger action.

OutputParams params := OutputParams.forAsyncEventType(MyEvent.getName()).withPartitionValue($parameters.source);

Sending output

When using the version 1 API, an output block is responsible for routing events if the output is time-synchronous, tagging the event before sending it to the external source.

action $process(Activation $activation, string $input_source, string $input_type, float $input_value) {
    /* Creating an event to send to Cumulocity IoT.*/
    MyEvent m := MyEvent($input_source, $input_type, 0.0, new dictionary<string, any>);
    // Routing the event so that any other models can consume it.
    route m;
    // Tagging the event.
    m.params[apama.analyticsbuilder.ABConstants.MODEL_NAME_IDENTIFIER] := modelName;
    // Sending event to the output channel
    send m to m.SEND_CHANNEL;
    // Notify the framework about generated output for profiling purpose.
    $base.profile(BlockBase.PROFILE_OUTPUT);
}

To send output using the handler API, call the sendoutput action on the handler objects which get saved in the $validate action.

action $process(Activation $activation, string $input_source, string $input_type, float $input_value) {
    /* Creating an event to send to Cumulocity IoT.*/
    MyEvent m := MyEvent($input_source, $input_type, 0.0, new dictionary<string, any>);
    // Ask the framework to send the output to the output channel.
    // If output is synchronous, then it is tagged before sending it to the channel.
    outputHandler.sendOutput(m, MyEvent.SEND_CHANNEL, $activation);
}

Migrating custom Cumulocity IoT blocks

The following section is specific to blocks that input or output data associated with a Cumulocity IoT device or device group. To reduce the code duplication across Cumulocity IoT input/output blocks, a set of helper events is provided. See Cumulocity IoT-specific helpers for more details.

Migrating custom Cumulocity IoT input blocks

The handler API provides the CumulocityInputHandler event to deal with Cumulocity IoT input blocks.

The version 1 API which uses InputHelper dynamically populates variables using reflection. The variables that are set by the InputHelper API are devices to hold all the devices for which the block will be listening, isGroup to declare a device or device group and isBroadcastDevice to declare whether the device is a broadcast device.

event AlarmInput {
    BlockBase $base;
    /** The parameters for the block. */
    AlarmInput_$Parameters $parameters;
    /** All the devices for which the block will be listening. @private */
    sequence<string> devices;
    /** Whether it's a device or deviceGroup. @private */
    boolean isGroup;
    /** Flag to identify if the given device identifier is a broadcast device or not.*/
    boolean isBroadcastDevice;
}

Using the handler API, the input block need not worry about declaring these variables. Instead, declare a CumulocityInputHandler object which gets initialized while declaring inputs.

event AlarmInput {
    BlockBase $base;
    /**The parameters for the block. */
    AlarmInput_$Parameters $parameters;
    /** The input handler for scheduling input events. */
    CumulocityInputHandler inputHandler;
}

Declaring input streams

Using the version 1 API, the input block has to do a lookup for the device using the InventoryLookup event, which returns a Promise of the result of looking up an object in the inventory and populates the variables devices, isBroadcastDevice and isGroup. The block has to chain the Promise result to declare the input stream using the InputHelper.declareInput action.

For example, as shown below, the input block declares that it consumes events of type Alarm and filters the alarms further for the type specified by the block parameter alarmType.

/**
 * Validates that the device exists in the inventory.
 * See - "Asynchronous validations" in the Block SDK documentation for more details.
 * @return a <tt>Promise</tt> object. Validation of the model will be suspended until the Promise is fulfilled.
 */
action $validate(dictionary<string, any> $modelScopeParameters) returns Promise {
	// Create an `InputHelper` object by calling the `forBlock` static action
	InputHelper ihelper := InputHelper.forBlock(self, $modelScopeParameters);
	// Call the `setInput` method on the `InputHelper` object
	ihelper.setInput($parameters.deviceId, Alarm.getName(), {"type":<any>$parameters.alarmType});
	// Chain a call to process the result of InventoryLookup.lookup and return the `Promise`
	return InventoryLookup.lookup($parameters.deviceId).andThen(ihelper.declareInput);
    
}

Declaring an input block using the handler API is simplified. Create a CumulocityInputParams object and call declare on it, providing a callback action to pass a CumulocityInputHandler object to the block. The framework internally handles the lookup of the device in the inventory. This is an asynchronous operation that returns a Promise.

/**
 * Validates that the device exists in the inventory.
 * See - "Asynchronous validations" in the Block SDK documentation for more details.
 */
action $validate(dictionary<string, any> $modelScopeParameters) returns Promise {
	// Create a CumulocityInputParams object to declare that the input block consumes events of 
	// type Alarm and filter the alarms further for the type specified by the block parameter alarmType.
	CumulocityInputParams params := CumulocityInputParams
										.create($parameters.deviceId, self, Alarm.getName())
										.withFields({"type":<any>$parameters.alarmType});
	// Declare the input stream and provide the callback action to save the input handler. 
	// Return the Promise returned from the declare call.
	return params.declare(inputHandlerCreated);
}

/** Call back action to receive and save the handler object.*/
action inputHandlerCreated(CumulocityInputHandler inputHandler) {
    self.inputHandler := inputHandler;
}

Scheduling input

Using the version 1 API, the input block used to be responsible for calculating the time to schedule input events, handling of the broadcast device type and reporting dropped events.

Using the version 1 API, scheduling input for a custom Cumulocity IoT block is as follows:

/**
 * Method starts listening for alarms from Cumulocity IoT.
 */
action $init() {
	string id;
	// Devices set by reflection
	for id in devices {
		on all Alarm(type = $parameters.alarmType, source = id) as alm {
			try {
				// Create payload to create timer params
				Value value := new Value;
				// ... fill value object
				
				// If ignoreTimestamp enabled, calculating the next time to schedule input.
				float timerDelay;
				if ignoreTimestamp {
					timeValue := float.max(timeValue, $base.getModelTime()).nextAfter(float.INFINITY);
					timerDelay := timeValue;
					value.timestamp := timeValue;
				}
				else { // Calculate the time to schedule the input to process.
					timerDelay := alm.time;
					value.timestamp := alm.time;
				}
				// Creating timer params.
				TimerParams tp := TimerParams.absolute(timerDelay).withPayload(value);
				// Updating timer params with partition.
				if isGroup {
					tp := tp.withPartition(alm.source);
				} else if isBroadcastDevice { 
					// Updating timer params with Broadcast partition.
					tp := tp.withPartition(Partition_Broadcast(alm.source));
				}
				// Scheduling the input event.
				$base.createTimerWith(tp);
			} catch (Exception e) {
				// Notifying about the dropped event.
				$base.droppedEvent(alm, alm.time);
			}
		}
	}
}

Using the new handler API, scheduling an input event is done by calling the schedule action on the Cumulocity IoT handler object. The handler will determine the correct partition value and will check for you if the event should be dropped. Events dropped by the framework when schedule is called do not need to be reported using the droppedEvent action.

/**
 * Method starts listening for alarms from Cumulocity IoT.
 */
action $init() {

	// provide all the necessary info required to set up the listeners
	inputHandler.createListeners(Alarm.getName(), {"type" : <any>$parameters.alarmType}, handleAlarm);	
}
/**
 * Callback received from the CumulocityInputHandler when an Alarm event is received
 * @param a The incoming Alarm event.
 */
action handleAlarm(any a) {
	Alarm alm := <Alarm> a;
	
	// If ignoreTimestamp is enabled, use an empty timestamp
	// so that the framework will calculate the next timestamp to process the event.
	optional<float> timeValue := new optional<float>;

	// If ignoreTimestamp is disabled, use the event time.
	if not ignoreTimestamp {
		timeValue := alm.time;
	}

	// Schedule events to be processed as per the timeValue, passing the input event
	// as payload, it will be received back in the $timerTriggered action as the $payload parameter.
	// The inputHandler is the one that got saved from the callback of
	// the CumulocityInputParams.declare action.
	TimerHandle handle := inputHandler.schedule(alm, timeValue);
}

For a complete example, see the DeviceLocationInput.mon sample.

Multiple inputs

Using the version 1 API, if a block deals with multiple inputs, for each input, create an object of an event <BlockName>_SubInput event with a BlockBase base field and fields that the InputHelper sets (as below), and add an instance of this for each input to the block.

/** This is a per-input object passed to InputHelper for each of the two inputs that the block has.*/
event DualMeasurementIO_SubInput {
    BlockBase base; // Required by InputHelper; it will use a 'base' member if there is no '$base' member.
    /** All the devices for which the block will be listening. @private */
    sequence<string> devices; // Set by reflection in InputHelper
    /**    Whether the device is a  device group. @private */
    boolean isGroup; // Set by reflection in InputHelper
    /** Flag to identify if the given device identifier is a broadcast device or not.*/
    boolean isBroadcastDevice; // Set by reflection in InputHelper
    /** To create the timer for the input event. */
    string inputId; // Set by reflection in InputHelper
}

event DualMeasurementIO {
    /** Subinput for the numerator */
    DualMeasurementIO_SubInput numeratorInput;
    /** Subinput for the denominator */
    DualMeasurementIO_SubInput denominatorInput;
    
    action $validate(dictionary<string, any> $modelScopeParameters) returns Promise {
        numeratorInput.base := $base;
        denominatorInput.base := $base;
        // Declare subinput, that is declareNumeratorInput.
        // chain a call to declareNumerator to make it uniform across the inputs and outputs
        return Promise.resolve(new any).andThen(declareNumeratorInput);
    }
    
    // While declaring numeratorInput, chain a call to declare denominatorInput.
    action declareNumeratorInput(any dummy) returns any {
        InputHelper ihelper := InputHelper.forBlock(numeratorInput, modelScopeParameters);
        ihelper.setInput($parameters.deviceId, Measurement.getName(), 
                    {"fragment":<any>$parameters.numeratorFragment, "series":SERIES});
        // Returning a promise from a promise-chained callback will wait on that promise.
        return InventoryLookup.lookup($parameters.deviceId)
                    .andThen(ihelper.declareInput)
                    .andThen(declareDenominatorInput);             
    }
    
    action declareDenominatorInput(any dummy) returns any {
        InputHelper ihelper := InputHelper.forBlock(denominatorInput, modelScopeParameters);
        ihelper.setInput($parameters.deviceId, Measurement.getName(), 
                    {"fragment":<any>$parameters.denominatorFragment, "series":SERIES});

        return InventoryLookup.lookup($parameters.deviceId).andThen(ihelper.declareInput);
    }
}

Instead, using the new handler API, just create an input handler object for each input and schedule corresponding input values. See the DualMeasurementIO.mon sample for a complete example.

/** The input handler to schedule the numerator input events for processing at the specified time. */
CumulocityInputHandler numeratorInputHandler;
/** The input handler to schedule the denominator input events for processing at the specified time. */
CumulocityInputHandler denominatorInputHandler;

action $validate() returns Promise {
    // Create parameters for numerator and denominator inputs
    CumulocityInputParams numeratorParams := CumulocityInputParams
                    .create($parameters.deviceId, self, Measurement.getName())
                    .withFields({"fragment":<any>$parameters.numeratorFragment, "series":SERIES});
    CumulocityInputParams denominatorParams := CumulocityInputParams
                    .create($parameters.deviceId, self, Measurement.getName())
                    .withFields({"fragment":<any>$parameters.denominatorFragment, "series":SERIES});
    
    // Declare all inputs and combine all Promise objects into a single Promise object and return.
    return PromiseJoiner.joinAll([
        numeratorParams.declare(saveNumeratorInputHandler),
        denominatorParams.declare(saveDenominatorInputHandler)]);
}

/** Save the input handler for the numerator inputs. */
action saveNumeratorInputHandler(CumulocityInputHandler inputHandler) {
    numeratorInputHandler := inputHandler;
}
/** Save the input handler for the denominator inputs. */
action saveDenominatorInputHandler(CumulocityInputHandler inputHandler) {
    denominatorInputHandler := inputHandler;
}

Migrating custom Cumulocity IoT output blocks

The handler API provides the CumulocityOutputHandler event to deal with Cumulocity IoT blocks.

The version 1 API dynamically populates variables by reflection in the framework, namely deviceId currentDevice and isBroadcastDevice. Using the new handler API, the output block need not worry about declaring these variables.

For example, using the version 1 API , output block fields are declared as follows.

event CreateAlarm {
    /** BlockBase object. */
    BlockBase $base;
    /** The parameters for the block. */
    CreateAlarm_$Parameters $parameters;
    /** Sends to the 'current' device from the activation's partition. */
    boolean currentDevice; // Framework sets this by reflection. 
    /** Flag to identify if the given device identifier is a broadcast device or not. */
    boolean isBroadcastDevice; // Framework sets this by reflection.
    /** The current deviceId, if currentDevice is false */
    string deviceId; // Framework sets this by reflection.
    /** The text to actually use - either params.alarmText or the model name.*/
    string alarmText;
    /* Model name to be tagged. */
    string modelName;
}

Using the handler API, declare a CumulocityOutputHandler object which will get initialized while declaring the output streams.

event CreateAlarm {
    /** BlockBase object. */
    BlockBase $base;
    /** The parameters for the block. */
    CreateAlarm_$Parameters $parameters;
    /** The text to actually use - either params.alarmText or the model name.*/
    string alarmText;
    /** The handler object which is responsible to send actual output to a channel*/
    CumulocityOutputHandler outputHandler;
}

Declaring output

Using the version 1 API, the output block has to look up the device using the InventoryLookup event, which returns a Promise of a result of looking up an object in the inventory and populates the variable isBroadcastDevice. The block has to chain the Promise result to declare the output stream using the OutputHelper.declareOutput action.

For example, as shown below, the output block declares that it produces time-synchronous events of type Alarm of the type specified by the block parameter alarmType.

action $validate(dictionary<string, any> $modelScopeParameters) returns Promise {
    // Extract the model name from $modelScopeParameters.
    modelName := $modelScopeParameters[ABConstants.MODEL_NAME_IDENTIFIER].valueToString();
    // Check if output device is current device or the specified device. 
    // This is not required when using the OutputHandler API.
    switch($parameters.deviceId as dev) {
        case string: {
            deviceId := dev;
        }
        default: {
            currentDevice := true;
        }
    }
    // Creating an OutputHelper object by calling the forBlock static action.
    OutputHelper helper := OutputHelper.forBlock(self);
    // Set output params saying output block is declaring that
    // it produces events of type 'Alarm' and gets filtered on type of alarm.
    helper.setSyncOutput($parameters.deviceId, Alarm.getName(), {"type":<any>$parameters.alarmType});
    // Declare the output streams saying output block is producing output
    // of type 'Alarm'
    return InventoryLookup.lookup($parameters.deviceId).andThen(helper.declareOutput);
}

Declaring an output block using the handler API is simplified. Create a CumulocityOutputParams object by calling the forSyncEventType static action if the output is time-synchronous, or the forAsyncEventType action if the output is time-asynchronous. Then declare the output by calling the declare action on the CumulocityOutputParams object which takes a callback action as a parameter to pass a CumulocityOutputHandler object to the block. This is an asynchronous operation that returns a Promise.

action $validate() returns Promise {
    // Creating an CumulocityOutputParams object
    CumulocityOutputParams params := CumulocityOutputParams.forSyncEventType($parameters.deviceId, 
                                            self, Alarm.getName(), {"type":<any>$parameters.alarmType});
    // Declare the output stream and provide callback action to save the output handler object.
    // Return the Promise returned from the declare call.
    return params.declare(handlerCreated);
}
/** Callback action to receive and save CumulocityOutputHandler object.
 * @param cumulocityOutputHandler the handler object
 */
action handlerCreated(CumulocityOutputHandler cumulocityOutputHandler) {
    outputHandler := cumulocityOutputHandler;
}

Sending output

Using the version 1 API, the output block has to determine to which device it has to send the output. Also, it is responsible for explicitly routing (using the route statement), tagging, and sending the event to the output channel.

An example of using the version 1 API to send an output is shown below.

action $process(Activation $activation, boolean $input_createAlarm) {
    if $input_createAlarm {
        /* Creating event to send to Cumulocity IoT*/
        Alarm al := Alarm("", $parameters.alarmType, deviceId, $activation.timestamp, alarmText, "ACTIVE",
                        $parameters.alarmSeverity, 1, new dictionary<string,any>);
        /** Get the current device for which the output would be sent*/
        switch ($activation.partition as part) {
            case Partition_Broadcast: {
                if not isBroadcastDevice { return; }
            }
            case string: {
                if currentDevice { al.source := part; }
            }
            default: {
                if currentDevice { return; }
            }
        }
        // Route the event for comsumption by other models.
        route al;
        // Tag the event before sending to external receivers
        al.params[apama.analyticsbuilder.ABConstants.MODEL_NAME_IDENTIFIER] := modelName;
        // Send the event to the output channel
        send al to Alarm.CHANNEL;
        // Notify the framework of the profiling purpose.
        $base.profile(BlockBase.PROFILE_OUTPUT);
    }
}

To send an output using the handler API, the block should call deviceToOutput on the CumulocityOutputHandler object to get the device to which the output needs to be sent and then call sendoutput action to send the output to the external source/receivers and any models consuming the output values.

An example of using the handler API to send an output is shown below.

/**
 * Create and send an alarm to the device.
 * @param $activation The current activation.
 * @param $input_createAlarm Creates an alarm when a signal is received.
 */
action $process(Activation $activation, boolean $input_createAlarm) {
    if $input_createAlarm {
        /* Get the current device to which the output will be sent.*/
        ifpresent outputHandler.deviceToOutput($activation) as device {
            /* Creating an event to send to Cumulocity IoT.*/
            Alarm alarm := Alarm("", $parameters.alarmType, device, 
                            $activation.timestamp, alarmText, "ACTIVE",
                            $parameters.alarmSeverity, 1, new dictionary<string,any>);

            // Ask the framework to send the output to the output channel.
            // If output is synchronous, then it is tagged before sending it to the channel.
            outputHandler.sendOutput(alarm, Alarm.CHANNEL, $activation);
        }
    }
}

For a complete example, see the CreateEvent.mon sample.

Multiple outputs

To handle multiple outputs in a block using the version 1 API, for each output, it needs to create an object of <BlockName>_SubOutput event which has the BlockBase base field and fields that the OutputHelper sets and add an instance of this for each output to the block as follows:

/** This is a per-output object passed to OutputHelper for each of the two outputs that the block has.*/
event DualMeasurementIO_SubOutput {
    BlockBase base; // Required by OutputHelper; it will use a 'base' member if there is no '$base' member.
    boolean currentDevice;
    boolean isBroadcastDevice; // Set by reflection in OutputHelper
    /** The current deviceId, if currentDevice is false */
    string deviceId;
}
event DualMeasurementIO {
    /** Suboutput for the ratio */
    DualMeasurementIO_SubOutput ratioOutput;
    /** Suboutput for the inverse ratio output */
    DualMeasurementIO_SubOutput inverseOutput;

    action $validate(dictionary<string, any> $modelScopeParameters) returns Promise {
        ratioOutput.base := $base;
        inverseOutput.base := $base;
        return Promise.resolve(new any).andThen(declareRatioOutput);
    }
    // While declaring RatioOutput, chain a call to declare InverseOutput
    action declareRatioOutput(any dummy) returns any {
        OutputHelper helper := OutputHelper.forBlock(ratioOutput);
        helper.setSyncOutput($parameters.deviceId, Measurement.getName(), 
                    {"fragment":<any>$parameters.ratioFragment, "series":SERIES});
        return InventoryLookup.lookup($parameters.deviceId)
                .andThen(helper.declareOutput)
                .andThen(declareInverseOutput);
    }
    action declareInverseOutput(any dummy) returns any {
        OutputHelper helper := OutputHelper.forBlock(inverseOutput);
        helper.setSyncOutput($parameters.deviceId, Measurement.getName(), 
                    {"fragment":<any>$parameters.inverseFragment, "series":SERIES});
        return InventoryLookup.lookup($parameters.deviceId)
                .andThen(helper.declareOutput);
    }
}

To handle multiple outputs in a block using the handler API, for each output, it needs to create an handler object and use the respective object to send outputs as follows:

/** The output handler to send the ratio output events. */
CumulocityOutputHandler ratioOutputHandler;
/** The output handler to send the inverse output events. */
CumulocityOutputHandler inverseOutputHandler;

action $validate() returns Promise {
    // Create parameters for ratio and inverse outputs
    CumulocityOutputParams ratioParams := CumulocityOutputParams.forSyncEventType(
                                            $parameters.deviceId, self, Measurement.getName(), 
                                            {"fragment":<any>$parameters.ratioFragment, "series":SERIES});
    CumulocityOutputParams inverseParams := CumulocityOutputParams.forSyncEventType(
                                            $parameters.deviceId, self, Measurement.getName(), 
                                           {"fragment":<any>$parameters.inverseFragment, "series":SERIES});
    
    // Declare all outputs and combine all Promise objects into a single Promise object and return.
    return PromiseJoiner.joinAll([
        ratioParams.declare(saveRatioOutputHandler),
        inverseParams.declare(saveInverseOutputHandler)]);
}

See the DualMeasurementIO.mon sample for a complete example on multiple outputs in a block.

< Prev: Asynchronous validations | Contents | Next: Update Cumulocity IoT input blocks to receive from assets >