Read Basic Value And Domain Data
For brevity, in |
In this guide, you will go through the basics of the information provided in the Common Behavior of Readers on the example of a Stream Reader. You will learn what options you have with creating and configuring a Reader and how to properly issue read calls and provide read buffers. In the end, you’ll also get the option to continue on a more specific guide depending on the features you’re interested in.
Creating the Reader
For starters, let’s explore how you’d create a Stream Reader where you read the value data as Float64
and domain as Int
.
-
Cpp
// These calls all create the same reader
auto reader = StreamReader(signal);
auto reader = StreamReader<double, Int>(signal);
auto reader = StreamReader(signal, SampleType::Float64, SampleType::Int64);
By default, if no explicit read sample-types are provided, they’re assumed to be:
|
Setting up the Reader this way will let you read the Signal’s data as Float64
even if the samples it produces are in a different format.
In the background, the Reader will attempt to convert the samples to Float64
if possible otherwise it will invalidate the Reader.
How to resolve the invalid state is explained later in this guide.
For the purposes of a Reader, a conversion exists if it can be performed with an assignment cast. E.g.: The following expression must be valid in C++
Type1 a{}; Type2 b = (Type2) a; |
Reading with Signal’s data-type
A reader can also be constructed without knowing the Signal’s sample-types in advance by using SampleType::Undefined
.
This is useful if you don’t want any conversions to be performed or just as a helper method that spares you the time to manually query the Signal’s sample-types. You can choose to use the automatic sample-type deduction for either value or domain or both.
-
Cpp
// Use the Signal's sample-types for both value and domain
auto reader = StreamReader(signal, SampleType::Undefined, SampleType::Undefined);
// ony for value
auto reader = StreamReader(signal, SampleType::Undefined, SampleType::Int64);
// or ony for domain
auto reader = StreamReader(signal, SampleType::Float64, SampleType::Undefined);
In case you choose to use the automatic deduction, you must take extra care to check the actual types before reading and provide correct buffers to the reader read calls, otherwise the results are undefined and will probably cause a crash. |
Reading data
Once you’ve successfully created and configured the Reader, you can now actually start to read the data from the Signal. You do this by issuing read calls but at first, it is best to check how many samples are actually available and decide on how many you wish to read. After that, you need to properly prepare the call parameters and set up sample-buffers.
The example below shows how to check for and read at first 5
value samples and then another 5
with the associated domain values.
The count / size parameter needs to be set before the call to a desired maximum count and will be modified with the actual amount read after. |
The type of the allocated memory buffer must match with the type the Reader is configured to read. There are no run-time checks to enforce this. If the buffer is bigger than the read amount, the rest of the buffer is not modified. |
-
Cpp
auto reader = StreamReader<double, Int>(signal);
// Should return 0
auto available = reader.getAvailableCount();
//
// Signal produces 8 samples
//
// Should return 8
available = reader.getAvailableCount();
SizeT readCount{5};
double values[5]{};
reader.read(values, &readCount);
std::cout << "Read " << readCount << " values" << std::endl;
for(double value : values)
{
std::cout << value << std::endl;
}
readCount = 5;
double newValues[5];
Int newDomain[5];
reader.readWithDomain(newValues, newDomain, &readCount);
// `readCount` should now be `3`
std::cout << "Read another " << readCount << " value and domain samples" << std::endl;
for (SizeT i = 0; i < readCount; ++i)
{
std::cout << newValues[i] << ", " << newDomain[i] << std::endl;
}
As you can see in the example on the second read, the Stream Reader didn’t wait for the full 5
samples and returned only the currently available ones.
The count
parameter in the read calls should always be the maximum number of samples the reader should read, and the sample-buffers must be big enough a contiguous block to fit at least this number of samples.
The Reader makes no checks if this is actually the case and assumes the user provided a buffer of proper size.
If this isn’t the case, it will write past the end and will probably cause stack or heap corruption resulting in an Access Violation
or Segmentation Fault
.
Handling Signal changes
The Signal stores the information about itself and its data in a Data Descriptor.
Each time any of the Signal information changes, it creates an Event Packet with the id of "DATA_DESCRIPTOR_CHANGED"
.
The user can react to these changes by installing a callback as shown below.
The event contains two Data Descriptors, for value and domain, each of which can be null
if unchanged but not both.
The Reader first forwards the descriptors to their respective internal data-readers to update their information and check if the data can still be converted to the requested sample-types.
If all these internal checks pass, the user callback is called (if installed) with the event’s descriptors to check if the change is still permissible to the user otherwise the Reader is invalidated.
-
Cpp
// Signal value sample-type is `Float64`
auto reader = StreamReader<double, Int>(signal);
// Signal produces 2 samples { 1.1, 2.2 }
//
// The value sample-type of the `signal` changes from `Float64` to `Int32`
//
// Signal produces 2 samples { 3, 4 }
// The call succeeds because `Int32` is convertible to `Float64`
// and results in `4` samples { 1.1, 2.2, 3.0, 4.0 }
SizeT count{5};
double values[5]{};
reader.read(values, &count);
// Instal a custom callback that invalidates the reader if the new value sample-type is `Int64`
reader.setOnDescriptorChanged([](const DataDescriptorPtr& valueDescriptor,
const DataDescriptorPtr& /*domainDescriptor*/)
{
// If the value descriptor has changed
if (valueDescriptor.assigned())
{
// and the new sample type is `Int64`
if (valueDescriptor.getSampleType() == SampleType::Int64)
{
return false;
}
}
return true;
});
//
// The value sample-type of the `signal` changes from `Int32` to `Int64`
//
// Signal produces 2 samples { 5, 6 }
try
{
count = {2};
double newValues[2]{};
// Fails even if the new sample-type is convertible to `double` because
// the user callback invalidated the reader.
reader.read(newValues, &count);
}
catch (const InvalidDataException& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
Reader invalidation and reuse
Once the Reader falls into invalid state, it can’t be used to read data anymore and all attempts will result in an OPENDAQ_ERR_INVALID_DATA
error code or the associated exception.
The only way to resolve this is to pass the Reader to a new Reader instance with valid sample-types and settings.
This enables the new reader to reuse the Connection from the invalidated one and as such, provides the ability to losslessly continue reading.
-
Cpp
// Signal value sample-type is `Float64`
auto reader = StreamReader<double, Int>(signal);
// Instal a custom callback that invalidates the reader if the new value sample-type is `Int64`
reader.setOnDescriptorChanged([](const DataDescriptorPtr& valueDescriptor,
const DataDescriptorPtr& /*domainDescriptor*/)
{
// If the value descriptor has changed
if (valueDescriptor.assigned())
{
// and the new sample type is `Int64`
if (valueDescriptor.getSampleType() == SampleType::Int16)
{
return false;
}
}
return true;
});
//
// The value sample-type of the `signal` changes from `Float64` to `Int16`
//
//
// Signal produces 2 samples { 1, 2 }
//
try
{
// Fails even if the new sample-type is convertible to `double` because
// the user callback invalidated the reader.
SizeT count{5};
double values[5]{};
reader.read(values, &count);
}
catch (const InvalidDataException& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
// Clear the user callback
reader.setOnDescriptorChanged(nullptr);
// This will reuse the Reader's configuration and Connection but change read type
// from to `Float64` to `Int64` and clear the `invalid` state.
auto newReader = StreamReaderFromExisting<Int, Int>(reader);
SizeT count{5};
Int values[5]{};
newReader.read(values, &count); // count = 2, values = { 1, 2 }
You can also reuse a valid Reader, for example, if you want to change the read sample-type or change any other configuration that is immutable after creating a Reader. This will make the old reader invalid.
-
Cpp
auto reader = StreamReader<Int, Int>(signal);
// Signal produces 5 samples { 1, 2, 3, 4, 5 }
auto packet1 = createPacketForSignal(signal, 5);
auto data1 = static_cast<Int*>(packet1.getData());
data1[0] = 1;
data1[1] = 2;
data1[2] = 3;
data1[3] = 4;
data1[4] = 5;
signal.sendPacket(packet1);
SizeT count{2};
Int values[2]{};
reader.read(values, &count); // count = 2, values = { 1, 2 }
// Reuse the reader
auto newReader = StreamReaderFromExisting<double, Int>(reader);
// new reader successfully continues on from previous reader's position
count = 2;
double newValues[2]{};
newReader.read(newValues, &count); // count = 2, values = { 3, 4 }
try
{
// The old reader has been invalidated when re-used by a new one
count = 2;
Int oldValues[2]{};
reader.read(oldValues, &count);
}
catch (const InvalidDataException& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
Full listing
The following is a self-contained file with all the above examples of Reader basics. To properly illustrate the point and provide reproducibility, the data is manually generated, but the same should hold when connecting to a real device.
-
Cpp
#include <opendaq/context_factory.h>
#include <opendaq/data_rule_factory.h>
#include <opendaq/packet_factory.h>
#include <opendaq/reader_exceptions.h>
#include <opendaq/reader_factory.h>
#include <opendaq/scheduler_factory.h>
#include <opendaq/signal_factory.h>
#include <cassert>
#include <iostream>
using namespace daq;
SignalConfigPtr setupExampleSignal();
SignalPtr setupExampleDomain(const SignalPtr& value);
DataPacketPtr createPacketForSignal(const SignalPtr& signal, SizeT numSamples, Int offset = 0);
daq::DataDescriptorPtr setupDescriptor(daq::SampleType type, const daq::DataRulePtr& rule = nullptr);
/*
* Example 1: These calls all create the same reader
*/
void example1(const SignalConfigPtr& signal)
{
auto reader1 = StreamReader(signal);
auto reader2 = StreamReader<double, Int>(signal);
auto reader3 = StreamReader(signal, SampleType::Float64, SampleType::Int64);
// For value
assert(reader1.getValueReadType() == SampleType::Float64);
assert(reader2.getValueReadType() == SampleType::Float64);
assert(reader3.getValueReadType() == SampleType::Float64);
// For domain
assert(reader1.getDomainReadType() == SampleType::Int64);
assert(reader2.getDomainReadType() == SampleType::Int64);
assert(reader3.getDomainReadType() == SampleType::Int64);
}
/*
* Example 2: Creating a reader with the Signal’s sample-type
*/
void example2(const SignalConfigPtr& signal)
{
// Use the Signal's sample-types for both value and domain
auto reader1 = StreamReader(signal, SampleType::Undefined, SampleType::Undefined);
assert(reader1.getValueReadType() == SampleType::Float64);
assert(reader1.getDomainReadType() == SampleType::Int64);
// ony for value
auto reader2 = StreamReader(signal, SampleType::Undefined, SampleType::Int64);
assert(reader2.getValueReadType() == SampleType::Float64);
assert(reader2.getDomainReadType() == SampleType::Int64);
// or ony for domain
auto reader3 = StreamReader(signal, SampleType::Float64, SampleType::Undefined);
assert(reader3.getValueReadType() == SampleType::Float64);
assert(reader3.getDomainReadType() == SampleType::Int64);
}
/*
* Reading basic value and domain data
*/
void example3(const SignalConfigPtr& signal)
{
auto reader = StreamReader<double, Int>(signal);
// Should return 0
auto available = reader.getAvailableCount();
assert(available == 0u);
//
// Signal produces 8 samples
//
auto packet1 = createPacketForSignal(signal, 8);
signal.sendPacket(packet1);
// Should return 8
available = reader.getAvailableCount();
assert(available == 8u);
SizeT readCount{5};
double values[5]{};
reader.read(values, &readCount);
std::cout << "Read " << readCount << " values" << std::endl;
for (double value : values)
{
std::cout << value << std::endl;
}
readCount = 5;
double newValues[5];
Int newDomain[5];
reader.readWithDomain(newValues, newDomain, &readCount);
// `readCount` should now be `3`
std::cout << "Read another " << readCount << " value and domain samples" << std::endl;
for (SizeT i = 0; i < readCount; ++i)
{
std::cout << newValues[i] << ", " << newDomain[i] << std::endl;
}
}
/*
* Example 4: Handling Signal changes
*/
void example4(const SignalConfigPtr& signal)
{
// Signal value sample-type is `Float64`
signal.setDescriptor(setupDescriptor(SampleType::Float64));
auto reader = StreamReader<double, Int>(signal);
// Signal produces 2 samples { 1.1, 2.2 }
auto packet1 = createPacketForSignal(signal, 2);
auto data1 = static_cast<double*>(packet1.getData());
data1[0] = 1.1;
data1[1] = 2.2;
signal.sendPacket(packet1);
//
// The value sample-type of the `signal` changes from `Float64` to `Int32`
//
signal.setDescriptor(setupDescriptor(SampleType::Int32));
// Signal produces 2 samples { 3, 4 }
auto packet2 = createPacketForSignal(signal, 2);
auto data2 = static_cast<std::int32_t*>(packet2.getData());
data2[0] = 3;
data2[1] = 4;
signal.sendPacket(packet2);
// The call succeeds because `Int32` is convertible to `Float64`
// and results in `4` samples { 1.1, 2.2, 3.0, 4.0 }
SizeT count{5};
double values[5]{};
reader.read(values, &count);
assert(count == 4u);
assert(values[0] == 1.1);
assert(values[1] == 2.2);
assert(values[2] == 3.0);
assert(values[3] == 4.0);
// Instal a custom callback that invalidates the reader if the new value sample-type is `Int64`
reader.setOnDescriptorChanged([](const DataDescriptorPtr& valueDescriptor,
const DataDescriptorPtr& /*domainDescriptor*/)
{
// If the value descriptor has changed
if (valueDescriptor.assigned())
{
// and the new sample type is `Int64`
if (valueDescriptor.getSampleType() == SampleType::Int64)
{
return false;
}
}
return true;
});
//
// The value sample-type of the `signal` changes from `Int32` to `Int64`
//
signal.setDescriptor(setupDescriptor(SampleType::Int64));
// Signal produces 2 samples { 5, 6 }
auto packet3 = createPacketForSignal(signal, 2);
auto data3 = static_cast<std::int64_t*>(packet3.getData());
data3[0] = 3;
data3[1] = 4;
signal.sendPacket(packet3);
bool failed{true};
try
{
count = {2};
double newValues[2]{};
// Fails even if the new sample-type is convertible to `double` because
// the user callback invalidated the reader.
reader.read(newValues, &count);
failed = false;
}
catch (const InvalidDataException& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
assert(failed);
}
/*
* Example 5: Reader invalidation
*/
void example5(const SignalConfigPtr& signal)
{
// Signal value sample-type is `Float64`
signal.setDescriptor(setupDescriptor(SampleType::Float64));
auto reader = StreamReader<double, Int>(signal);
// Instal a custom callback that invalidates the reader if the new value sample-type is `Int64`
reader.setOnDescriptorChanged([](const DataDescriptorPtr& valueDescriptor,
const DataDescriptorPtr& /*domainDescriptor*/)
{
// If the value descriptor has changed
if (valueDescriptor.assigned())
{
// and the new sample type is `Int64`
if (valueDescriptor.getSampleType() == SampleType::Int16)
{
return false;
}
}
return true;
});
//
// The value sample-type of the `signal` changes from `Float64` to `Int16`
//
signal.setDescriptor(setupDescriptor(SampleType::Int16));
//
// Signal produces 2 samples { 1, 2 }
//
auto packet = createPacketForSignal(signal, 2);
auto data = static_cast<std::int16_t*>(packet.getData());
data[0] = 1;
data[1] = 2;
signal.sendPacket(packet);
bool failed{true};
try
{
SizeT count{5};
double values[5]{};
reader.read(values, &count);
failed = false;
}
catch (const InvalidDataException& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
assert(failed);
// Clear the user callback
reader.setOnDescriptorChanged(nullptr);
// This will reuse the Reader's configuration and Connection but change read type
// from to `Float64` to `Int64` and clear the `invalid` state.
auto newReader = StreamReaderFromExisting<Int, Int>(reader);
SizeT count{5};
Int values[5]{};
newReader.read(values, &count); // count = 2, values = { 1, 2 }
assert(count == 2u);
assert(values[0] == 1);
assert(values[1] == 2);
}
/*
* Example 6: Reader reuse
*/
void example6(const SignalConfigPtr& signal)
{
signal.setDescriptor(setupDescriptor(SampleType::Int64));
auto reader = StreamReader<Int, Int>(signal);
// Signal produces 5 samples { 1, 2, 3, 4, 5 }
auto packet1 = createPacketForSignal(signal, 5);
auto data1 = static_cast<Int*>(packet1.getData());
data1[0] = 1;
data1[1] = 2;
data1[2] = 3;
data1[3] = 4;
data1[4] = 5;
signal.sendPacket(packet1);
SizeT count{2};
Int values[2]{};
reader.read(values, &count); // count = 2, values = { 1, 2 }
assert(count == 2u);
assert(values[0] == 1);
assert(values[1] == 2);
// Reuse the reader
auto newReader = StreamReaderFromExisting<double, Int>(reader);
// new reader successfully continues on from previous reader's position
count = 2;
double newValues[2]{};
newReader.read(newValues, &count); // count = 2, values = { 3, 4 }
assert(count == 2u);
assert(newValues[0] == 3);
assert(newValues[1] == 4);
bool failed{true};
try
{
// The old reader has been invalidated when reused by a new one
count = 2;
Int oldValues[2]{};
reader.read(oldValues, &count);
failed = false;
}
catch (const InvalidDataException& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
assert(failed);
}
/*
* ENTRY POINT
*/
int main(int /*argc*/, const char* /*argv*/ [])
{
SignalConfigPtr signal = setupExampleSignal();
signal.setDomainSignal(setupExampleDomain(signal));
example1(signal);
example2(signal);
example3(signal);
example4(signal);
example5(signal);
example6(signal);
return 0;
}
/*
* Set up the Signal with Float64 data
*/
SignalConfigPtr setupExampleSignal()
{
auto logger = Logger();
auto context = Context(Scheduler(logger, 1), logger, nullptr);
auto signal = Signal(context, nullptr, "example signal");
signal.setDescriptor(setupDescriptor(SampleType::Float64));
return signal;
}
SignalPtr setupExampleDomain(const SignalPtr& value)
{
auto domain = Signal(value.getContext(), nullptr, "domain signal");
domain.setDescriptor(setupDescriptor(daq::SampleType::Int64, daq::LinearDataRule(1, 0)));
return domain;
}
DataDescriptorPtr setupDescriptor(daq::SampleType type, const daq::DataRulePtr& rule)
{
// Set-up the data descriptor with the provided Sample-Type
const auto dataDescriptor = daq::DataDescriptorBuilder().setSampleType(type);
// For the Domain we provide a Linear Rule to generate time-stamps
if (rule.assigned())
dataDescriptor.setRule(rule);
return dataDescriptor.build();
}
DataPacketPtr createPacketForSignal(const SignalPtr& signal, SizeT numSamples, Int offset)
{
// Create a data packet where the values are generated via the +1 rule starting at 0
auto domainPacket = daq::DataPacket(
signal.getDomainSignal().getDescriptor(),
numSamples,
offset // offset from 0 to start the sample generation at
);
return daq::DataPacketWithDomain(
domainPacket,
signal.getDescriptor(),
numSamples
);
}