Read basic value and Domain data

For brevity, in C++, we assume that all the code is in namespace daq or it has been imported via using namespace daq; and omit any otherwise necessary header includes until the final complete listing.

In this guide, you will go through the basics of the information provided in the Common Behavior of Readers on the example of a Stream Reader. You will learn what options you have with creating and configuring a Reader and how to properly issue read calls and provide read buffers. In the end, you’ll also get the option to continue on a more specific guide depending on the features you’re interested in.

Creating the Reader

For starters, let’s explore how you’d create a Stream Reader where you read the value data as double and Domain as Int (signed 64-bit integer).

Creating a default Stream Reader
  • Cpp

  • Python

// These calls all create the same Reader
auto reader = StreamReader(signal);
auto reader = StreamReader<double, Int>(signal);
auto reader = StreamReader(signal, SampleType::Float64, SampleType::Int64);
#These calls all create the same Reader
reader = opendaq.StreamReader(signal)
reader = opendaq.StreamReader(
    signal, value_type=opendaq.SampleType.Float64, domain_type=opendaq.SampleType.Int64)

The constructor for the reader is designed to offer flexibility in its input parameters. It can accept either a signal or an input port, providing users with versatile options based on their specific requirements.

By default, if no explicit read Sample Types are provided, they’re assumed to be:

  • SampleType::Float64 / daq::Float / double for value.

  • SampleType::Int64 / daq::Int / std::int64_t for Domain.

Setting up the Reader this way will let you read the Signal’s data as Float64 even if the samples it produces are in a different format. In the background, the Reader will attempt to convert the samples to Float64 if possible otherwise it will invalidate the Reader. How to resolve the invalid state is explained later in this guide.

For the purposes of a Reader, a conversion exists if it can be performed with an assignment cast.

E.g.: The following expression must be valid in C++
Type1 a{};
Type2 b = (Type2) a;

Reading with Signal’s Sample Type

A reader can also be constructed without knowing the Signal’s Sample Types in advance by using SampleType::Undefined. This is useful if you don’t want any conversions to be performed or just as a helper method that spares you the time to manually query the Signal’s Sample Types. You can choose to use the automatic Sample Type deduction for either value or Domain or both.

Creating a reader with the Signal’s Sample Type
  • Cpp

  • Python

// Use the Signal's Sample Types for both value and Domain
auto reader = StreamReader(signal, SampleType::Undefined, SampleType::Undefined);

// Only for value
auto reader = StreamReader(signal, SampleType::Undefined, SampleType::Int64);

// Or only for Domain
auto reader = StreamReader(signal, SampleType::Float64, SampleType::Undefined);
# Use the Signal's Sample Types for both value and Domain
reader = opendaq.StreamReader(signal, value_type=opendaq.SampleType.Undefined, domain_type=opendaq.SampleType.Undefined)
# Only for values
reader = opendaq.StreamReader(signal, value_type=opendaq.SampleType.Undefined, domain_type=opendaq.SampleType.Int64)
# Or only for Domain
reader = opendaq.StreamReader(signal, value_type=opendaq.SampleType.Float64, domain_type=opendaq.SampleType.Undefined)
In case you choose to use the automatic deduction, you must take extra care to check the actual types before reading and provide correct buffers to the reader read calls, otherwise the results are undefined and will probably cause a crash.

Reading data

Once you’ve successfully created and configured the Reader, you can now actually start to read the data from the Signal. You do this by issuing read calls but at first, it is best to check how many samples are actually available and decide on how many you wish to read. After that, you need to properly prepare the call parameters and set up sample-buffers.

The example below shows how to check for and read at first 5 value samples and then another 5 with the associated Domain values.

The count / size parameter needs to be set before the call to a desired maximum count and will be modified with the actual amount read after.
The type of the allocated memory buffer must match with the type the Reader is configured to read. There are no run-time checks to enforce this. If the buffer is bigger than the read amount, the rest of the buffer is not modified.
Reading first 5 values samples then another 5 with the associated Domain
  • Cpp

  • Python

auto reader = StreamReader<double, Int>(signal);

// Should return 0
auto available = reader.getAvailableCount();

//
// Signal produces 8 samples
//

// Should return 8
available = reader.getAvailableCount();

SizeT readCount{5};
double values[5]{};
reader.read(values, &readCount);

std::cout << "Read " << readCount << " values" << std::endl;
for (double value : values)
{
    std::cout << value << std::endl;
}

readCount = 5;
double newValues[5];
Int newDomain[5];
reader.readWithDomain(newValues, newDomain, &readCount);

// `readCount` should now be 3
std::cout << "Read another " << readCount << " value and Domain samples" << std::endl;
for (SizeT i = 0; i < readCount; ++i)
{
    std::cout << newValues[i] << ", " << newDomain[i] << std::endl;
}
reader = opendaq.StreamReader(signal)

# Should be 0
available = reader.available_count

# Signal produces 8 samples

# Should be 8
available = reader.available_count

values = reader.read(5)
print(f'Read {len(values)} samples:')
print(values)

to_read = 5
values, domain = reader.read_with_domain(to_read)

# Should be 3
to_read = len(values)
print(f'Read another {to_read} samples with domain:')
for value, domain in zip(values, domain):
    print(domain, value)

As you can see in the example on the second read, the Stream Reader didn’t wait for the full 5 samples and returned only the currently available ones. The count parameter in the read calls should always be the maximum number of samples the reader should read, and the sample-buffers must be big enough a contiguous block to fit at least this number of samples. The Reader makes no checks if this is actually the case and assumes the user provided a buffer of proper size. If this isn’t the case, it will write past the end and will probably cause stack or heap corruption resulting in an Access Violation or Segmentation Fault.

Handling Signal changes

The Signal stores the information about itself and its data in a Data Descriptor. Each time any of the Signal information changes, it creates an Event Packet with the id of "DATA_DESCRIPTOR_CHANGED". The user can react to these changes by installing a callback as shown below.

The event contains two Data Descriptors, one for value and one for Domain, each of which can be null if unchanged, but not both. The Reader first forwards the descriptors to their respective internal data-readers to update their information and check if the data can still be converted to the requested Sample Types. Then it returns the Reader Status with event Packet and status of data conversion.

Reacting to a Data Descriptor changed event
  • Cpp

// Signal Sample Type value is `Float64`

auto reader = StreamReader<double, Int>(signal);

// Signal produces 2 samples { 1.1, 2.2 }

//
// The value Sample Type of the `signal` changes from `Float64` to `Int32`
//

// Signal produces 2 samples { 3, 4 }

// If Descriptor has changed, Reader will return Reader status with that event
// Call succeeds and results in 2 samples { 1.1, 2.2 }
SizeT count{5};
double values[5]{};
auto status = reader.read(values, &count);
assert(status.getReadStatus() == ReadStatus::Event);

// The subsequent call succeeds because `Int32` is convertible to `Float64`
// and results in 2 samples { 3.0, 4.0 }
reader.read(values, &count);

//
// The value Sample Type of the `signal` changes from `Int32` to `Int64`
//

// Signal produces 2 samples { 5, 6 }

// Reader reads 0 values and returns status with new Event Packet
SizeT newCount{2};
double newValues[2]{};
auto newStatus = reader.read(newValues, &newCount);
assert(newCount == 0u);
assert(newStatus.getReadStatus() == ReadStatus::Event);

Reader invalidation and reuse

Once the Reader falls into an invalid state, it can’t be used to read data anymore and all attempts to do so will result the Reader status with getValid equal to false. The only way to resolve this is to pass the Reader to a new Reader instance with valid Sample Types and settings. This enables the new reader to reuse the Connection from the invalidated one and as such, provides the ability to losslessly continue reading. You can also reuse a valid Reader, for example, if you want to change the read Sample Type or change any other configuration that is immutable after creating a Reader. This will make the old reader invalid.

Reusing a Reader
  • Cpp

auto reader = StreamReader<Int, Int>(signal);

// Signal produces 5 samples { 1, 2, 3, 4, 5 }

SizeT count{2};
Int values[2]{};
reader.read(values, &count);  // count = 2, values = { 1, 2 }

// Reuse the Reader
auto newReader = StreamReaderFromExisting<double, Int>(reader);

// New Reader successfully continues on from previous Reader's position
count = 2;
double newValues[2]{};
newReader.read(newValues, &count);  // count = 2, values = { 3.0, 4.0 }

// The old Reader has been invalidated when reused by a new one
count = 2;
Int oldValues[2]{};
auto status = reader.read(oldValues, &count);
assert(status.getValid() == false);

Full listing

The following is a self-contained file with all the above examples of Reader basics. To properly illustrate the point and provide reproducibility, the data is manually generated, but the same should hold when connecting to a real device.

Full listing
  • Cpp

#include <opendaq/context_factory.h>
#include <opendaq/data_rule_factory.h>
#include <opendaq/packet_factory.h>
#include <opendaq/reader_exceptions.h>
#include <opendaq/reader_factory.h>
#include <opendaq/scheduler_factory.h>
#include <opendaq/signal_factory.h>

#include <cassert>
#include <iostream>

using namespace daq;

SignalConfigPtr setupExampleSignal();
SignalPtr setupExampleDomain(const SignalPtr& value);
DataPacketPtr createPacketForSignal(const SignalPtr& signal, SizeT numSamples, Int offset = 0);
DataDescriptorPtr setupDescriptor(SampleType type, const DataRulePtr& rule = nullptr);

/*
 * Example 1: These calls all create the same Reader
 */
void example1(const SignalConfigPtr& signal)
{
    auto reader1 = StreamReader(signal);
    auto reader2 = StreamReader<double, Int>(signal);
    auto reader3 = StreamReader(signal, SampleType::Float64, SampleType::Int64);

    // For value
    assert(reader1.getValueReadType() == SampleType::Float64);
    assert(reader2.getValueReadType() == SampleType::Float64);
    assert(reader3.getValueReadType() == SampleType::Float64);

    // For Domain
    assert(reader1.getDomainReadType() == SampleType::Int64);
    assert(reader2.getDomainReadType() == SampleType::Int64);
    assert(reader3.getDomainReadType() == SampleType::Int64);
}

/*
 * Example 2: Creating a Reader with the Signal’s Sample Type
 */
void example2(const SignalConfigPtr& signal)
{
    // Use the Signal's Sample Types for both value and Domain
    auto reader1 = StreamReader(signal, SampleType::Undefined, SampleType::Undefined);
    assert(reader1.getValueReadType() == SampleType::Float64);
    assert(reader1.getDomainReadType() == SampleType::Int64);

    // Only for value
    auto reader2 = StreamReader(signal, SampleType::Undefined, SampleType::Int64);
    assert(reader2.getValueReadType() == SampleType::Float64);
    assert(reader2.getDomainReadType() == SampleType::Int64);

    // Or only for Domain
    auto reader3 = StreamReader(signal, SampleType::Float64, SampleType::Undefined);
    assert(reader3.getValueReadType() == SampleType::Float64);
    assert(reader3.getDomainReadType() == SampleType::Int64);
}

/*
 * Example 3: Reading basic value and Domain data
 */
void example3(const SignalConfigPtr& signal)
{
    auto reader = StreamReader<double, Int>(signal);

    // Should return 0
    [[maybe_unused]] auto available = reader.getAvailableCount();
    assert(available == 0u);

    //
    // Signal produces 8 samples
    //
    auto packet1 = createPacketForSignal(signal, 8);
    signal.sendPacket(packet1);

    // Should return 8
    available = reader.getAvailableCount();
    assert(available == 8u);

    SizeT readCount{5};
    double values[5]{};
    reader.read(values, &readCount);

    std::cout << "Read " << readCount << " values" << std::endl;
    for (double value : values)
    {
        std::cout << value << std::endl;
    }

    readCount = 5;
    double newValues[5];
    Int newDomain[5];
    reader.readWithDomain(newValues, newDomain, &readCount);

    // `readCount` should now be 3
    std::cout << "Read another " << readCount << " value and Domain samples" << std::endl;
    for (SizeT i = 0; i < readCount; ++i)
    {
        std::cout << newValues[i] << ", " << newDomain[i] << std::endl;
    }
}

/*
 * Example 4: Handling Signal changes
 */
void example4(const SignalConfigPtr& signal)
{
    // Signal Sample Type value is `Float64`
    signal.setDescriptor(setupDescriptor(SampleType::Float64));

    auto reader = StreamReader<double, Int>(signal);

    // Signal produces 2 samples { 1.1, 2.2 }
    auto packet1 = createPacketForSignal(signal, 2);
    auto data1 = static_cast<double*>(packet1.getData());
    data1[0] = 1.1;
    data1[1] = 2.2;

    signal.sendPacket(packet1);

    //
    // The value Sample Type of the `signal` changes from `Float64` to `Int32`
    //
    signal.setDescriptor(setupDescriptor(SampleType::Int32));

    // Signal produces 2 samples { 3, 4 }
    auto packet2 = createPacketForSignal(signal, 2);
    auto data2 = static_cast<std::int32_t*>(packet2.getData());
    data2[0] = 3;
    data2[1] = 4;

    signal.sendPacket(packet2);

    // If Descriptor has changed, Reader will return Reader status with that event
    // Call succeeds and results in 2 samples { 1.1, 2.2 }
    SizeT count{5};
    double values[5]{};
    auto status = reader.read(values, &count);
    assert(status.getReadStatus() == ReadStatus::Event);

    assert(count == 2u);
    assert(values[0] == 1.1);
    assert(values[1] == 2.2);

    // The subsequent call succeeds because `Int32` is convertible to `Float64`
    // and results in 2 samples { 3.0, 4.0 }
    reader.read(values, &count);
    assert(count == 2u);
    assert(values[0] == 3.0);
    assert(values[1] == 4.0);

    //
    // The value Sample Type of the `signal` changes from `Int32` to `Int64`
    //
    signal.setDescriptor(setupDescriptor(SampleType::Int64));

    // Signal produces 2 samples { 5, 6 }
    auto packet3 = createPacketForSignal(signal, 2);
    auto data3 = static_cast<std::int64_t*>(packet3.getData());
    data3[0] = 3;
    data3[1] = 4;
    signal.sendPacket(packet3);

    // Reader reads 0 values and returns status with new Event Packet
    SizeT newCount{2};
    double newValues[2]{};
    auto newStatus = reader.read(newValues, &newCount);
    assert(newCount == 0u);
    assert(newStatus.getReadStatus() == ReadStatus::Event);
}

/*
 * Example 5: Reader reuse
 */
void example5(const SignalConfigPtr& signal)
{
    signal.setDescriptor(setupDescriptor(SampleType::Int64));

    auto reader = StreamReader<Int, Int>(signal);

    // Signal produces 5 samples { 1, 2, 3, 4, 5 }
    auto packet1 = createPacketForSignal(signal, 5);
    auto data1 = static_cast<Int*>(packet1.getData());
    data1[0] = 1;
    data1[1] = 2;
    data1[2] = 3;
    data1[3] = 4;
    data1[4] = 5;

    signal.sendPacket(packet1);

    SizeT count{2};
    Int values[2]{};
    reader.read(values, &count);  // count = 2, values = { 1, 2 }

    assert(count == 2u);
    assert(values[0] == 1);
    assert(values[1] == 2);

    // Reuse the Reader
    auto newReader = StreamReaderFromExisting<double, Int>(reader);

    // New Reader successfully continues on from previous Reader's position
    count = 2;
    double newValues[2]{};
    newReader.read(newValues, &count);  // count = 2, values = { 3, 4 }

    assert(count == 2u);
    assert(newValues[0] == 3);
    assert(newValues[1] == 4);

    // The old Reader has been invalidated when reused by a new one
    count = 2;
    Int oldValues[2]{};
    auto status = reader.read(oldValues, &count);
    assert(status.getValid() == false);
}

/*
 * ENTRY POINT
 */
int main(int /*argc*/, const char* /*argv*/[])
{
    SignalConfigPtr signal = setupExampleSignal();
    signal.setDomainSignal(setupExampleDomain(signal));

    example1(signal);
    example2(signal);
    example3(signal);
    example4(signal);
    example5(signal);

    return 0;
}

/*
 * Set up the Signal with Float64 data
 */
SignalConfigPtr setupExampleSignal()
{
    auto logger = Logger();
    auto context = Context(Scheduler(logger, 1), logger, nullptr, nullptr);

    auto signal = Signal(context, nullptr, "example signal");
    signal.setDescriptor(setupDescriptor(SampleType::Float64));

    return signal;
}

SignalPtr setupExampleDomain(const SignalPtr& value)
{
    auto domain = Signal(value.getContext(), nullptr, "domain signal");
    domain.setDescriptor(setupDescriptor(SampleType::Int64, LinearDataRule(1, 0)));

    return domain;
}

DataDescriptorPtr setupDescriptor(SampleType type, const DataRulePtr& rule)
{
    // Set up the data descriptor with the provided Sample Type
    const auto dataDescriptor = DataDescriptorBuilder().setSampleType(type);

    // For the Domain, we provide a Linear Rule to generate time-stamps
    if (rule.assigned())
        dataDescriptor.setRule(rule);

    return dataDescriptor.build();
}

DataPacketPtr createPacketForSignal(const SignalPtr& signal, SizeT numSamples, Int offset)
{
    // Create a data packet where the values are generated via the +1 rule starting at 0
    auto domainPacket = DataPacket(signal.getDomainSignal().getDescriptor(),
                                   numSamples,
                                   offset  // offset from 0 to start the sample generation at
    );

    return DataPacketWithDomain(domainPacket, signal.getDescriptor(), numSamples);
}