Configure Streaming

Server side Configuration

Typically, in addition to publishing its own structure (Signals, Channels, Function Blocks, etc.), an openDAQ™ Device also publishes information about Streaming protocols it supports. This information includes the Streaming protocol ID (e.g. "OpenDAQLTStreaming" or "OpenDAQNativeStreaming") and a range of optional parameters (e.g. port number). Upon initiating a Streaming server on the Device, it automatically generates and prepares the mentioned piece of information, also known as a Server capability, for publication alongside the Device’s structural details. To enable the publication of this information, the server responsible for the transfer of structural information should be added last to the openDAQ™ Instance, following the addition of all Streaming servers.

The example below demonstrates the correct sequence for adding various servers to the openDAQ™ Instance.

  • Cpp

#include <chrono>
#include <thread>
#include <opendaq/opendaq.h>

using namespace daq;

int main()
{
    using namespace std::chrono_literals;

    const InstancePtr instance = Instance();

    instance.setRootDevice("daqref://device1");

    // Creates and registers a Server capability with the ID `OpenDAQLTStreaming` and the default port number 7414
    instance.addServer("OpenDAQLTStreaming", nullptr);

    // Creates and registers a Server capability with the ID `OpenDAQNativeStreaming` and the default port number 7420
    instance.addServer("OpenDAQNativeStreaming", nullptr);

    // As the Streaming servers were added first, the registered Server capabilities are published over OPC UA
    instance.addServer("OpenDAQOPCUA", nullptr);

    while(true)
        std::this_thread::sleep_for(100ms);

    return 0;
}

Configure Streaming for structure-enabled Device automatically

Most openDAQ™ Devices support structural information transferring via the configuration protocol. These are openDAQ™ OPC UA-compatible Devices running an OPC UA server, as well as Devices that are compatible with the openDAQ™ Native configuration protocol. Regardless of the protocol type, the configuration protocol not only transfers the Device’s structural details but also the set of Streaming Server capabilities available for the Device.

Each Streaming Server capability is identified by the Streaming protocol ID, that is transformed into a prefix within the connection string formed by considering all parameters in the Streaming Server capability along with the known Device’s IP address. This connection string enables the delegation of Streaming instantiation to the appropriate data Streaming Module. As a result, when connecting an openDAQ™ Device, a streaming connection can be established automatically using the published streaming connection details. When establishing a connection to a gateway Devices, which contains nested Devices, the default behavior is to ignore direct Streaming connections for these nested Devices, and establishing the minimum number of Streaming connections possible.

However, users can customize the rules for automatically establishing the Streaming connections. This is done by passing the Device Configuration Property object as a second parameter to the addDevice call. The configuration mechanism allows filtering available streaming protocols by enabling or disabling specific ones. Protocol listed first as enabled is given higher priority, determining its selection as the active Streaming source for all of the Device’s Signals.

Furthermore, there’s an option to specify a Streaming path heuristic, particularly useful for multiple nested Devices connected in a tree-structured manner (as illustrated in the diagram). The allowed heuristics include:

  • "Minimize-connections" mode (ID 0) - is used to establish the fewest Streaming connections possible at the cost of routing Signals' data through gateway Devices, increasing the hop count (default mode).

  • "Minimize-hops" mode (ID 1) - is used to attempt streaming data directly from nested Devices to minimize the amount of hops between Devices the data must make.

  • "Not connected" mode (ID 2) - with this set the information about supported streaming protocols published by the Device is not used to automatically establish Streaming connections.

  • Cpp

#include <opendaq/opendaq.h>
#include <iostream>

using namespace daq;

int main()
{
    // Create a new Instance that we will use for all the interactions with the SDK
    InstancePtr instance = Instance();

    // Create an empty Property object
    PropertyObjectPtr deviceConfig = PropertyObject();

    // Add property to allow multiple Streaming protocols with native protocol having the first priority
    auto prioritizedStreamingProtocols = List<IString>("OpenDAQLTStreaming", "OpenDAQNativeStreaming");
    deviceConfig.addProperty(ListProperty("PrioritizedStreamingProtocols", prioritizedStreamingProtocols));

    // Set property to disregard direct Streaming connections for nested Devices,
    // and establish the minimum number of streaming connections possible.
    const auto streamingConnectionHeuristicProp =  SelectionProperty("StreamingConnectionHeuristic",
                                                                    List<IString>("MinConnections",
                                                                                  "MinHops",
                                                                                  "NotConnected"),
                                                                    0);
    deviceConfig.addProperty(streamingConnectionHeuristicProp);

    // Find and connect to a Device hosting an OPC UA TMS server
    const auto availableDevices = instance.getAvailableDevices();
    DevicePtr device;
    for (const auto& deviceInfo : availableDevices)
    {
        for (const auto& capability : deviceInfo.getServerCapabilities())
        if (capability.getProtocolName() == "OpenDAQOPCUA")
        {
            device = instance.addDevice(capability.getConnectionString(), deviceConfig);
            break;
        }
    }

    if (!device.assigned())
        std::cerr << "No relevant Device found!" << std::endl;
    else
        // Output the name of the added Device
        std::cout << device.getInfo().getName() << std::endl;

    return 0;
}

Add Streaming for structure-enabled Device manually

The additional Streaming connections for the Device can be instantiated manually at any time after the Device is connected.

  • Cpp

#include <opendaq/opendaq.h>
#include <iostream>

using namespace daq;

int main()
{
    // Create a new Instance that we will use for all the interactions with the SDK
    InstancePtr instance = Instance();

    // Create an empty Property object
    PropertyObjectPtr deviceConfig = PropertyObject();

    // Set property to disable automatic Streaming connection
    const auto streamingConnectionHeuristicProp =  SelectionProperty("StreamingConnectionHeuristic",
                                                                    List<IString>("MinConnections",
                                                                                  "MinHops",
                                                                                  "NotConnected"),
                                                                    2);
    deviceConfig.addProperty(streamingConnectionHeuristicProp);

    // Connect to a Device hosting an OPC UA TMS server using connection string
    DevicePtr device = instance.addDevice("daq.opcua://127.0.0.1", deviceConfig);

    if (!device.assigned())
    {
        std::cerr << "No relevant Device found!" << std::endl;
        return 0;
    }
    else
    {
        // Output the name of the added Device
        std::cout << device.getInfo().getName() << std::endl;
    }

    // Connect to a Native Streaming protocol using connection string
    StreamingPtr streaming = device.addStreaming("daq.ns://127.0.0.1");

    // Get all Device's Signals recursively
    const auto deviceSignals = device.getSignals(search::Recursive(search::Any()));

    // Associate Device's Signals with Streaming
    streaming.addSignals(deviceSignals);

    return 0;
}

Connecting to Streaming protocol based Pseudo-Devices

Pseudo-Devices belong to a category of openDAQ™ Devices whose implementation solely relies on the Streaming protocol. Such Devices offer a flat list of Signals without detailed structural information. These Devices are created using the Module responsible for establishing the corresponding Streaming connection. The Device connection string serves to route and delegate Device object instantiation to the relevant Module. This connection string is identical to the Streaming connection string used for Streaming connection instantiation, with the exception that the prefix indicating the Streaming protocol type might be replaced with the prefix representing the appropriate Device type. Following this prefix, the same set of parameters unique to each Streaming protocol type is appended.

For example, the prefix "daq.ns" in the Device connection string aligns with the Native Streaming protocol, which is identified by the same prefix "daq.ns" in the Streaming connection string. Similarly, the Device connection string prefix "daq.lt" corresponds to the Websocket Streaming protocol, recognized by the Streaming connection string prefix "daq.lt".

  • Cpp

#include <opendaq/opendaq.h>
#include <iostream>

using namespace daq;

int main()
{
    // Create a new Instance that we will use for all the interactions with the SDK
    InstancePtr instance = Instance();

    // Find and connect to a Device hosting an Native Streaming server
    const auto availableDevices = instance.getAvailableDevices();
    DevicePtr device;
    for (const auto& deviceInfo : availableDevices)
    {
        for (const auto& capability : deviceInfo.getServerCapabilities())
        {
            if (capability.getProtocolName() == "OpenDAQNativeStreaming")
            {
                device = instance.addDevice(capability.getConnectionString());
                break;
            }
        }
    }

    if (!device.assigned())
        std::cerr << "No relevant Device found!" << std::endl;
    else
        // Output the name of the added Device
        std::cout << device.getInfo().getName() << std::endl;

    return 0;
}

Configure Streaming per Signal

Once the Device is connected, the Streaming sources of its Signals can be examined and modified for each Signal individually at any given time.

The Streaming sources are identified by a connection string that includes the protocol prefix, indicating the protocol type ID, and parameters based on the protocol type (IP address, port number etc.). To manipulate the Streaming sources of particular Signal the MirroredSignalConfig object is used, it provides ability to:

  • retrieve a list of streaming sources available for signal by using getStreamingSources call,

  • get the currently active streaming source by using getActiveStreamingSource call,

  • change the active streaming source for a signal by using setActiveStreamingSource call,

  • enable or disable data streaming for signal by using setStreamed call,

  • check if streaming is enabled or disabled for signal by using getStreamed call.

  • Cpp

#include <opendaq/opendaq.h>
#include <iostream>

using namespace daq;

int main()
{
    // ...

    // Get the first Signal of connected Device
    MirroredSignalConfigPtr signal = device.getSignalsRecursive()[0];

    // Find and output the Streaming sources available for Signal
    StringPtr nativeStreamingSource;
    StringPtr websocketStreamingSource;
    std::cout << "Signal supports " << signal.getStreamingSources().getCount() << " streaming sources:" << std::endl;
    for (const auto& source : signal.getStreamingSources())
    {
        std::cout << source << std::endl;
        if (source.toView().find("daq.ns://") != std::string::npos)
            nativeStreamingSource = source;
        if (source.toView().find("daq.lt://") != std::string::npos)
            websocketStreamingSource = source;
    }

    // Output the active Streaming source of Signal
    std::cout << "Active streaming source of signal: " << signal.getActiveStreamingSource() << std::endl;

    // Output the Streaming status for the Signal to verify that streaming is enabled
    std::cout << "Streaming enabled status for signal is: " << (signal.getStreamed() ? "true" : "false") << std::endl;

    // Change the active Streaming source of Signal
    signal.setActiveStreamingSource(nativeStreamingSource);

    std::cout << "Press \"enter\" to exit the application..." << std::endl;
    std::cin.get();
    return 0;
}

Full listing

The following is a fully working example of configuring Streaming and reading Signal data using different Streaming sources.

The full example code listing
  • Cpp

#include <opendaq/opendaq.h>
#include <chrono>
#include <iostream>
#include <thread>

using namespace daq;

void readSamples(const MirroredSignalConfigPtr signal)
{
    using namespace std::chrono_literals;
    StreamReaderPtr reader = StreamReader<double, uint64_t>(signal);

    // Get the resolution and origin
    DataDescriptorPtr descriptor = signal.getDomainSignal().getDescriptor();
    RatioPtr resolution = descriptor.getTickResolution();
    StringPtr origin = descriptor.getOrigin();
    StringPtr unitSymbol = descriptor.getUnit().getSymbol();

    std::cout << "\nReading signal: " << signal.getName() << "; active Streaming source: " << signal.getActiveStreamingSource()
              << std::endl;
    std::cout << "Origin: " << origin << std::endl;

    // Allocate buffer for reading double samples
    double samples[100];
    uint64_t domainSamples[100];
    for (int i = 0; i < 40; ++i)
    {
        std::this_thread::sleep_for(25ms);

        // Read up to 100 samples every 25 ms, storing the amount read into `count`
        SizeT count = 100;
        reader.readWithDomain(samples, domainSamples, &count);
        if (count > 0)
        {
            Float domainValue = (Int) domainSamples[count - 1] * resolution;
            std::cout << "Value: " << samples[count - 1] << ", Domain: " << domainValue << unitSymbol << std::endl;
        }
    }
}

int main(int /*argc*/, const char* /*argv*/[])
{
    // Create a new Instance that we will use for all the interactions with the SDK
    InstancePtr instance = Instance();

    // Create an empty Property object
    PropertyObjectPtr deviceConfig = PropertyObject();

    // Add property to allow multiple Streaming protocols with native protocol having the first priority
    auto prioritizedStreamingProtocols = List<IString>("OpenDAQLTStreaming", "OpenDAQNativeStreaming");
    deviceConfig.addProperty(ListProperty("PrioritizedStreamingProtocols", prioritizedStreamingProtocols));

    // Set property to ignore streaming sources of nested Devices
    const auto streamingConnectionHeuristicProp =  SelectionProperty("StreamingConnectionHeuristic",
                                                                    List<IString>("MinConnections",
                                                                                  "MinHops",
                                                                                  "Fallbacks",
                                                                                  "NotConnected"),
                                                                    0);
    deviceConfig.addProperty(streamingConnectionHeuristicProp);

    // Find and connect to a Device using the device info connection string
    const auto availableDevices = instance.getAvailableDevices();
    DevicePtr device;
    for (const auto& deviceInfo : availableDevices)
    {
        if (deviceInfo.getConnectionString().toView().find("daq://") != std::string::npos)
        {
            device = instance.addDevice(deviceInfo.getConnectionString(), deviceConfig);
            break;
        }
    }

    // Exit if no Device is found
    if (!device.assigned())
    {
        std::cerr << "No relevant Device found!" << std::endl;
        return 0;
    }

    // Output the name of the added Device
    std::cout << device.getInfo().getName() << std::endl;

    // Find the AI Signal
    auto signals = device.getSignalsRecursive();

    ChannelPtr channel;
    MirroredSignalConfigPtr signal;
    for (const auto& sig : signals)
    {
        auto name = sig.getDescriptor().getName();

        if (name.toView().find("AI") != std::string_view::npos)
        {
            signal = sig;
            channel = signal.getParent().getParent();
            break;
        }
    }

    if (!signal.assigned())
    {
        std::cerr << "No AI signal found!" << std::endl;
        return 1;
    }

    // Find and output the Streaming sources of Signal
    StringPtr nativeStreamingSource;
    StringPtr websocketStreamingSource;
    std::cout << "AI signal has " << signal.getStreamingSources().getCount() << " Streaming sources:" << std::endl;
    for (const auto& source : signal.getStreamingSources())
    {
        std::cout << source << std::endl;
        if (source.toView().find("daq.ns://") != std::string::npos)
            nativeStreamingSource = source;
        if (source.toView().find("daq.lt://") != std::string::npos)
            websocketStreamingSource = source;
    }

    // Check the active Streaming source of Signal
    if (signal.getActiveStreamingSource() != websocketStreamingSource)
    {
        std::cerr << "Wrong active Streaming source of AI signal" << std::endl;
        return 1;
    }
    // Output samples using Reader with Streaming LT
    readSamples(signal);

    // Change the active Streaming source of Signal
    signal.setActiveStreamingSource(nativeStreamingSource);
    // Output samples using Reader with native Streaming
    readSamples(signal);

    std::cout << "Press \"enter\" to exit the application..." << std::endl;
    std::cin.get();
    return 0;
}