Configure Streaming
Server side Configuration
Typically, in addition to publishing its own structure (Signals,
Channels,
Function Blocks, etc.), an openDAQ™
Device also publishes information about Streaming protocols it supports.
This information includes the Streaming protocol ID (e.g. "opendaq_lt_streaming"
or "opendaq_native_streaming"
)
and a range of optional parameters (e.g. port number). Upon initiating a Streaming server on the Device,
it automatically generates and prepares the mentioned piece of information, also known as a Server capability,
for publication alongside the Device’s structural details. To enable the publication of this information,
the server responsible for the transfer of structural information should be added last to the openDAQ™ Instance,
following the addition of all Streaming servers.
The example below demonstrates the correct sequence for adding various servers to the openDAQ™ Instance.
-
Cpp
#include <chrono>
#include <thread>
#include <opendaq/opendaq.h>
using namespace daq;
int main()
{
using namespace std::chrono_literals;
const InstancePtr instance = Instance();
instance.setRootDevice("daqref://device1");
// Creates and registers a Server capability with the ID `opendaq_lt_streaming` and the default port number 7414
instance.addServer("openDAQ LT Streaming", nullptr);
// Creates and registers a Server capability with the ID `opendaq_native_streaming` and the default port number 7420
instance.addServer("openDAQ Native Streaming", nullptr);
// As the Streaming servers were added first, the registered Server capabilities are published over OPC UA
instance.addServer("openDAQ OpcUa", nullptr);
while(true)
std::this_thread::sleep_for(100ms);
return 0;
}
Configure Streaming for structure-enabled Device
Most openDAQ™ Devices support structural information transferring. An example of such a Device is an
openDAQ™ OPC UA-compatible Device that is running a compatible OPC UA server. Clients establish
connections to these Devices using a connection string prefixed with "daq.opcua://"
recognized by the
openDAQ™ opcua_client_module
. This module not only manages the Device’s structural details but also
handles the received set of Streaming Server capabilities associated with the Device.
Each Streaming Server capability is identified by the Streaming protocol ID, that is transformed into a prefix within the connection string formed by considering all parameters in the Streaming Server capability along with the known Device’s IP address. This connection string enables the delegation of Streaming instantiation to the appropriate data Streaming Module. As a result, when connecting an openDAQ™ Device, a streaming connection can be established automatically using the published streaming connection details.
The Device Configuration Property object provides a mechanism for customizing the filtering of available Streaming protocols by enabling or disabling specific ones. Additionally, it allows to set the "primary" protocol type of the Streaming connection to be initially used as an active Streaming source for all of the Signals.
Furthermore, there’s an option to specify a Streaming path heuristic, particularly useful for multiple nested Devices connected in a tree-structured manner (as illustrated in the diagram). The allowed heuristics include:
-
"Minimize-connections" mode (ID 0) - is used to establish the fewest Streaming connections possible at the cost of routing Signals' data through gateway Devices, increasing the hop count.
-
"Minimize-hops" mode (ID 1) - is used to attempt streaming data directly from nested Devices to minimize the amount of hops between Devices the data must make.
-
"Fallback" mode (ID 2) - is used to establish all possible Streaming connections.
-
"Not connected" mode (ID 3) - with this set the information about supported streaming protocols published by the Device is not used to automatically establish Streaming connections.
Currently, the openDAQ™ structure-enabled Devices, operating through the Native Configuration protocol, exclusively support the Native streaming protocol for transmitting acquired data. |
-
Cpp
#include <opendaq/opendaq.h>
#include <iostream>
using namespace daq;
int main()
{
// Create a new Instance that we will use for all the interactions with the SDK
InstancePtr instance = Instance();
// Get the default Configuration Property object for OPC UA enabled Device type
PropertyObjectPtr deviceConfig = instance.getAvailableDeviceTypes().get("opendaq_opcua_config").createDefaultConfig();
// Allow multiple Streaming protocols by Device Configuration
deviceConfig.setPropertyValue("AllowedStreamingProtocols", List<IString>("opendaq_native_streaming", "opendaq_lt_streaming"));
// Set Streaming LT protocol as primary by Device Configuration
deviceConfig.setPropertyValue("PrimaryStreamingProtocol", "opendaq_lt_streaming");
// Disregard direct Streaming connections for nested Devices,
// establish the minimum number of streaming connections possible.
deviceConfig.setPropertyValue("StreamingConnectionHeuristic", 0);
// Find and connect to a Device hosting an OPC UA TMS server
const auto availableDevices = instance.getAvailableDevices();
DevicePtr device;
for (const auto& deviceInfo : availableDevices)
{
for (const auto& capability : deviceInfo.getServerCapabilities())
{
if (capability.getProtocolName() == "openDAQ OpcUa")
{
device = instance.addDevice(capability.getConnectionString(), deviceConfig);
break;
}
}
}
if (!device.assigned())
std::cerr << "No relevant Device found!" << std::endl;
else
// Output the name of the added Device
std::cout << device.getInfo().getName() << std::endl;
return 0;
}
Connecting to Streaming protocol based Pseudo-Devices
Pseudo-Devices belong to a category of openDAQ™ Devices whose implementation solely relies on the Streaming protocol. Such Devices offer a flat list of Signals without detailed structural information. These Devices are created using the Module responsible for establishing the corresponding Streaming connection. The Device connection string serves to route and delegate Device object instantiation to the relevant Module. This connection string is identical to the Streaming connection string used for Streaming connection instantiation, with the exception that the prefix indicating the Streaming protocol type might be replaced with the prefix representing the appropriate Device type. Following this prefix, the same set of parameters unique to each Streaming protocol type is appended.
For example, the prefix "daq.ns"
in the Device connection string aligns with the Native Streaming protocol,
which is identified by the same prefix "daq.ns"
in the Streaming connection string. Similarly, the Device
connection string prefix "daq.lt"
corresponds to the Websocket Streaming protocol, recognized
by the Streaming connection string prefix "daq.lt"
.
-
Cpp
#include <opendaq/opendaq.h>
#include <iostream>
using namespace daq;
int main()
{
// Create a new Instance that we will use for all the interactions with the SDK
InstancePtr instance = Instance();
// Find and connect to a Device hosting an Native Streaming server
const auto availableDevices = instance.getAvailableDevices();
DevicePtr device;
for (const auto& deviceInfo : availableDevices)
{
for (const auto& capability : deviceInfo.getServerCapabilities())
{
if (capability.getProtocolName() == "openDAQ Native Streaming")
{
device = instance.addDevice(capability.getConnectionString());
break;
}
}
}
if (!device.assigned())
std::cerr << "No relevant Device found!" << std::endl;
else
// Output the name of the added Device
std::cout << device.getInfo().getName() << std::endl;
return 0;
}
Configure Streaming per Signal
Once the Device is connected, the Streaming sources of its Signals can be examined and modified for each Signal individually at any given time.
The Streaming sources are identified by a connection string that includes the protocol prefix, indicating
the protocol type ID, and parameters based on the protocol type (IP address, port number etc.).
To manipulate the Streaming sources of particular Signal the MirroredSignalConfig
object is used,
it provides ability to:
-
retrieve a list of streaming sources available for signal by using
getStreamingSources
call, -
get the currently active streaming source by using
getActiveStreamingSource
call, -
change the active streaming source for a signal by using
setActiveStreamingSource
call, -
enable or disable data streaming for signal by using
setStreamed
call, -
check if streaming is enabled or disabled for signal by using
getStreamed
call.
-
Cpp
#include <opendaq/opendaq.h>
#include <iostream>
using namespace daq;
int main()
{
// ...
// Get the first Signal of connected Device
MirroredSignalConfigPtr signal = device.getSignalsRecursive()[0];
// Find and output the Streaming sources available for Signal
StringPtr nativeStreamingSource;
StringPtr websocketStreamingSource;
std::cout << "Signal supports " << signal.getStreamingSources().getCount() << " streaming sources:" << std::endl;
for (const auto& source : signal.getStreamingSources())
{
std::cout << source << std::endl;
if (source.toView().find("daq.ns://") != std::string::npos)
nativeStreamingSource = source;
if (source.toView().find("daq.lt://") != std::string::npos)
websocketStreamingSource = source;
}
// Output the active Streaming source of Signal
std::cout << "Active streaming source of signal: " << signal.getActiveStreamingSource() << std::endl;
// Output the Streaming status for the Signal to verify that streaming is enabled
std::cout << "Streaming enabled status for signal is: " << (signal.getStreamed() ? "true" : "false") << std::endl;
// Change the active Streaming source of Signal
signal.setActiveStreamingSource(nativeStreamingSource);
std::cout << "Press \"enter\" to exit the application..." << std::endl;
std::cin.get();
return 0;
}
Full listing
The following is a fully working example of configuring Streaming and reading Signal data using different Streaming sources.
-
Cpp
#include <opendaq/opendaq.h>
#include <chrono>
#include <iostream>
#include <thread>
using namespace daq;
void readSamples(const MirroredSignalConfigPtr signal)
{
using namespace std::chrono_literals;
StreamReaderPtr reader = StreamReader<double, uint64_t>(signal);
// Get the resolution and origin
DataDescriptorPtr descriptor = signal.getDomainSignal().getDescriptor();
RatioPtr resolution = descriptor.getTickResolution();
StringPtr origin = descriptor.getOrigin();
StringPtr unitSymbol = descriptor.getUnit().getSymbol();
std::cout << "\nReading signal: " << signal.getName() << "; active Streaming source: " << signal.getActiveStreamingSource()
<< std::endl;
std::cout << "Origin: " << origin << std::endl;
// Allocate buffer for reading double samples
double samples[100];
uint64_t domainSamples[100];
for (int i = 0; i < 40; ++i)
{
std::this_thread::sleep_for(25ms);
// Read up to 100 samples every 25 ms, storing the amount read into `count`
SizeT count = 100;
reader.readWithDomain(samples, domainSamples, &count);
if (count > 0)
{
Float domainValue = (Int) domainSamples[count - 1] * resolution;
std::cout << "Value: " << samples[count - 1] << ", Domain: " << domainValue << unitSymbol << std::endl;
}
}
}
int main(int /*argc*/, const char* /*argv*/[])
{
// Create a new Instance that we will use for all the interactions with the SDK
InstancePtr instance = Instance();
// Get the default Configuration Property object for OPC UA enabled Device type
PropertyObjectPtr deviceConfig = instance.getAvailableDeviceTypes().get("opendaq_opcua_config").createDefaultConfig();
// Allow multiple Streaming protocol by Device Configuration
deviceConfig.setPropertyValue("AllowedStreamingProtocols", List<IString>("opendaq_native_streaming", "opendaq_lt_streaming"));
// Set Streaming LT protocol as primary by Device Configuration
deviceConfig.setPropertyValue("PrimaryStreamingProtocol", "opendaq_lt_streaming");
// Find and connect to a Device hosting an OPC UA TMS server
const auto availableDevices = instance.getAvailableDevices();
DevicePtr device;
for (const auto& deviceInfo : availableDevices)
{
for (const auto& capability : deviceInfo.getServerCapabilities())
{
if (capability.getProtocolName() == "openDAQ OpcUa")
{
device = instance.addDevice(capability.getConnectionString(), deviceConfig);
break;
}
}
}
// Exit if no Device is found
if (!device.assigned())
{
std::cerr << "No relevant Device found!" << std::endl;
return 0;
}
// Output the name of the added Device
std::cout << device.getInfo().getName() << std::endl;
// Find the AI Signal
auto signals = device.getSignalsRecursive();
ChannelPtr channel;
MirroredSignalConfigPtr signal;
for (const auto& sig : signals)
{
auto name = sig.getDescriptor().getName();
if (name.toView().find("AI") != std::string_view::npos)
{
signal = sig;
channel = signal.getParent().getParent();
break;
}
}
if (!signal.assigned())
{
std::cerr << "No AI signal found!" << std::endl;
return 1;
}
// Find and output the Streaming sources of Signal
StringPtr nativeStreamingSource;
StringPtr websocketStreamingSource;
std::cout << "AI signal has " << signal.getStreamingSources().getCount() << " Streaming sources:" << std::endl;
for (const auto& source : signal.getStreamingSources())
{
std::cout << source << std::endl;
if (source.toView().find("daq.ns://") != std::string::npos)
nativeStreamingSource = source;
if (source.toView().find("daq.lt://") != std::string::npos)
websocketStreamingSource = source;
}
// Check the active Streaming source of Signal
if (signal.getActiveStreamingSource() != websocketStreamingSource)
{
std::cerr << "Wrong active Streaming source of AI signal" << std::endl;
return 1;
}
// Output samples using Reader with Streaming LT
readSamples(signal);
// Change the active Streaming source of Signal
signal.setActiveStreamingSource(nativeStreamingSource);
// Output samples using Reader with native Streaming
readSamples(signal);
std::cout << "Press \"enter\" to exit the application..." << std::endl;
std::cin.get();
return 0;
}