Read Multiple Signals Aligned
This guide assumes that you’ve read the background section about the common behavior of Readers or gone through the Read Basic Value And Domain Data how-to guide. For brevity, in |
When you start reading more than one Signal at once, you immediately run into a problem. The fact that the next sample in all the signals read doesn’t represent the same point in a Domain (usually time domain). This happens because the signals can have different Origins, produce data with different Tick Resolutions representing different Units and with Data Rules taken into account at different rates (usually called sample-rate).
This is quite a complex problem, and even more if you take into account that the read Signals can completely change at any time during measurement. But the fact is also that most Signals don’t change at all or at least not radically, and almost all of them are in a time domain, so in practice, we can cover reading a large portion of Signals when we limit ourselves to a subset of allowed configurations.
With this in mind, we limit ourselves to Signals that have:
-
A domain Signal assigned
-
Implicit domain where:
-
Domain ticks represent time in seconds
-
Domain origin is specified in ISO-8601 format
-
-
A combination of Tick Resolution and Data Rule that result in the same sample rate
-
Inter-sample offsets are ignored
-
-
Values that can be converted to a common data-type
Some of the above limitations might be lifted in the future. |
Using the Multi-Reader
Using the Multi Reader is largely the same as any other Reader and is basically a Stream Reader for multiple Signals at once except that with multiple Signals it performs some additional checks to keep the Signals compatible and synchronized to the same domain point.
-
Cpp
// These calls all create the same reader
auto reader = MultiReader(signals);
auto reader = MultiReader<double, Int>(signals);
auto reader = MultiReader(signals, SampleType::Float64, SampleType::Int64);
On creation the Reader first checks the preconditions listed in [limitations] and throws an error if they aren’t met. Once constructed, the Signals are not yet synchronized. This happens on the first read call or any call where the number of samples ready needs to be determined. There the Reader tries to align all signals to the first common domain-point which serves as the new start from which the reader will provide the data.
It does this by dropping samples until the domain value is equal or greater than the chosen start one.
Until there are enough samples in the queue of all signals to reach the synchronized start domain value the count
parameter will be 0
even if in the background the reader actually reads as much as possible accounting for the timeout.
Aligning Signals start
The basic example we will follow uses 3
Signals with:
-
Value type of
SampleType::Float64
-
Domain signal with
-
Tick resolution of
1
/1000
-
Linear rule with
-
Delta: 1
-
Offset: 0
-
-
But different origins of:
-
"2022-09-27T00:02:03+00:00"
-
"2022-09-27T00:02:04+00:00"
-
"2022-09-27T00:02:04.123+00:00"
The third signal has the highest domain value.
This makes the first signal start 1.123s
or 1123
samples and second 0.123s
or 123
samples after.
Since the Offset is 0
and tick-resolutions are the same no additional adjustments are made.
-
Cpp
ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");
ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};
auto reader = MultiReader(signals);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
auto available = reader.getAvailableCount(); // 0
After the Reader construction, Signals produce Data Packets of differing sizes but not enough to align on the start domain point.
So the reader reports it has 0
samples available as it dropped them on the call to check the number of samples available as they are below the start domain-point.
In the examples, a helper function |
Reading synchronized data
After some time, more data packets arrive and the Reader finally has enough samples to align the start. The situation after 3 data packets for each signal is:
-
523 * 3
=1569
samples (1.569s)-
need
1123
to sync -
1569
-1123
=446
remaining
-
-
732 * 3
=2196
samples (2.196s)-
need
123
to sync -
2196 - 123
=2073
remaining
-
-
843 * 3
=2529
samples (2.529s)-
need
0
to sync -
2529
remaining
-
To issue read calls, you first need to pre-allocate buffers for the Reader to fill. The procedure is the same as with other Readers except that instead of providing a pointer to the start of the buffer, you now specify an array of per signal pointers to buffers (called a jagged array);
We request 523
samples from the reader but as it needed to align the start and drop 1123
samples from the first signal only 446
aligned samples remain which are then returned.
-
Cpp
constexpr const auto NUM_SIGNALS = 3;
ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");
ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};
auto reader = MultiReader(signals);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
auto available = reader.getAvailableCount(); // 0
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
// Samples per signal
// 523 * 3 = 1569 (1.569s) need 1123 to sync
// 732 * 3 = 2196 (2.196s) need 123 to sync
// 843 * 3 = 2529 (2.529s) need 0 to sync
auto available = reader.getAvailableCount(); // 446
constexpr const SizeT SAMPLES = 523;
std::array<double[SAMPLES], NUM_SIGNALS> values{};
std::array<ClockTick[SAMPLES], NUM_SIGNALS> domain{};
void* valuesPerSignal[NUM_SIGNALS]{values[0], values[1], values[2]};
void* domainPerSignal[NUM_SIGNALS]{domain[0], domain[1], domain[2]};
count = SAMPLES;
reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
// count = 446
available = reader.getAvailableCount(); // 0
Using the Multi-Reader to read time-stamps
The Time Reader presented in Read With Absolute Time-Stamps can also be used with Multi-Reader.
-
Cpp
constexpr const auto NUM_SIGNALS = 3;
auto logger = Logger();
auto context = Context(Scheduler(logger, 1), logger, nullptr);
ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");
ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};
auto reader = MultiReader(signals);
//
// To install the domain transform function to system time-stamps
//
TimeReader timeReader(reader);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
auto available = reader.getAvailableCount(); // 0
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
// Samples per signal
// 523 * 3 = 1569 (1.569s) need 1123 to sync
// 732 * 3 = 2196 (2.196s) need 123 to sync
// 843 * 3 = 2529 (2.529s) need 0 to sync
auto available = reader.getAvailableCount(); // 446
constexpr const SizeT SAMPLES = 523;
std::array<double[SAMPLES], NUM_SIGNALS> values{};
//
// Use time-stamps as a buffer instead of the domain-type
//
std::array<std::chrono::system_clock::time_point[SAMPLES], NUM_SIGNALS> domain{};
void* valuesPerSignal[NUM_SIGNALS]{values[0], values[1], values[2]};
void* domainPerSignal[NUM_SIGNALS]{domain[0], domain[1], domain[2]};
SizeT count{SAMPLES};
reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
count = SAMPLES;
reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
// count = 446
available = reader.getAvailableCount(); // 0
Full listing
The following is a self-contained file with all above examples of aligning the reading multiple signals. To properly illustrate the point and provide reproducibility, the data is manually generated, but the same should hold when connecting to a real device.
-
Cpp
#include <opendaq/opendaq.h>
using namespace daq;
struct ReadSignal
{
explicit ReadSignal(const SignalConfigPtr& signal, std::int64_t packetSize);
void sendPacket();
int packetIndex{0};
std::int64_t packetSize;
SignalConfigPtr signal;
DataDescriptorPtr valueDescriptor;
};
template <typename T, typename U>
void printData(std::int64_t samples, T& times, U& values);
SignalConfigPtr createDomainSignal(const ContextPtr& context, std::string epoch);
ReadSignal demoSignal(const ContextPtr& context, std::int64_t packetSize, const std::string& domainOrigin);
/*
* Aligns 3 signals to the same domain position and starts reading from there
*/
void exampleSimple()
{
constexpr const auto NUM_SIGNALS = 3;
auto logger = Logger();
auto context = Context(Scheduler(logger, 1), logger, nullptr);
ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");
ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};
auto reader = MultiReader(signals);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
[[maybe_unused]]
auto available = reader.getAvailableCount(); // 0
assert(available == 0);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
// Samples per signal
// 523 * 3 = 1569 (1.569s) need 1123 to sync
// 732 * 3 = 2196 (2.196s) need 123 to sync
// 843 * 3 = 2529 (2.529s) need 0 to sync
available = reader.getAvailableCount(); // 446
assert(available == 446);
constexpr const SizeT SAMPLES = 523;
std::array<double[SAMPLES], NUM_SIGNALS> values{};
std::array<ClockTick[SAMPLES], NUM_SIGNALS> domain{};
void* valuesPerSignal[NUM_SIGNALS]{values[0], values[1], values[2]};
void* domainPerSignal[NUM_SIGNALS]{domain[0], domain[1], domain[2]};
SizeT count = SAMPLES;
reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
// count = 446
assert(count == 446);
available = reader.getAvailableCount(); // 0
assert(available == 0);
/* Should print:
*
* Signal 0
* |d: 1123 |v: 1123.0
* |d: 1124 |v: 1124.0
* |d: 1125 |v: 1125.0
* |d: 1126 |v: 1126.0
* |d: 1127 |v: 1127.0
* --------
* Signal 1
* |d: 123 |v: 123.0
* |d: 124 |v: 124.0
* |d: 125 |v: 125.0
* |d: 126 |v: 126.0
* |d: 127 |v: 127.0
* --------
* Signal 2
* |d: 0 |v: 0.0
* |d: 1 |v: 1.0
* |d: 2 |v: 2.0
* |d: 3 |v: 3.0
* |d: 4 |v: 4.0
*/
printData(5, domain, values);
}
/*
* The same as example 1 but read domain in `std::chrono::system_clock::time_point` values
*/
void exampleWithTimeStamps()
{
constexpr const auto NUM_SIGNALS = 3;
auto logger = Logger();
auto context = Context(Scheduler(logger, 1), logger, nullptr);
ReadSignal sig0 = demoSignal(context, 523, "2022-09-27T00:02:03+00:00");
ReadSignal sig1 = demoSignal(context, 732, "2022-09-27T00:02:04+00:00");
ReadSignal sig2 = demoSignal(context, 843, "2022-09-27T00:02:04.123+00:00");
ListPtr<ISignal> signals{sig0.signal, sig1.signal, sig2.signal};
auto reader = MultiReader(signals);
TimeReader timeReader(reader);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
[[maybe_unused]]
auto available = reader.getAvailableCount(); // 0
assert(available == 0);
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
sig0.sendPacket();
sig1.sendPacket();
sig2.sendPacket();
// Samples per signal
// 523 * 3 = 1569 (1.569s) need 1123 to sync
// 732 * 3 = 2196 (2.196s) need 123 to sync
// 843 * 3 = 2529 (2.529s) need 0 to sync
available = reader.getAvailableCount(); // 446
assert(available == 446);
constexpr const SizeT SAMPLES = 523;
std::array<double[SAMPLES], NUM_SIGNALS> values{};
std::array<std::chrono::system_clock::time_point[SAMPLES], NUM_SIGNALS> domain{};
void* valuesPerSignal[NUM_SIGNALS]{values[0], values[1], values[2]};
void* domainPerSignal[NUM_SIGNALS]{domain[0], domain[1], domain[2]};
SizeT count = SAMPLES;
reader.readWithDomain(valuesPerSignal, domainPerSignal, &count);
// count = 446
assert(count == 446);
available = reader.getAvailableCount(); // 0
assert(available == 0);
/* Should print:
*
* Signal 0
* |d: 2022-09-27 00:02:04.1230000 |v: 1123.0
* |d: 2022-09-27 00:02:04.1240000 |v: 1124.0
* |d: 2022-09-27 00:02:04.1250000 |v: 1125.0
* |d: 2022-09-27 00:02:04.1260000 |v: 1126.0
* |d: 2022-09-27 00:02:04.1270000 |v: 1127.0
* --------
* Signal 1
* |d: 2022-09-27 00:02:04.1230000 |v: 123.0
* |d: 2022-09-27 00:02:04.1240000 |v: 124.0
* |d: 2022-09-27 00:02:04.1250000 |v: 125.0
* |d: 2022-09-27 00:02:04.1260000 |v: 126.0
* |d: 2022-09-27 00:02:04.1270000 |v: 127.0
* --------
* Signal 2
* |d: 2022-09-27 00:02:04.1230000 |v: 0.0
* |d: 2022-09-27 00:02:04.1240000 |v: 1.0
* |d: 2022-09-27 00:02:04.1250000 |v: 2.0
* |d: 2022-09-27 00:02:04.1260000 |v: 3.0
* |d: 2022-09-27 00:02:04.1270000 |v: 4.0
*/
printData(5, domain, values);
}
void drawBoxMessage(const std::string& message);
int main(int /*argc*/, const char* /*argv*/[])
{
drawBoxMessage("Example 1");
exampleSimple();
drawBoxMessage("Example 2");
exampleWithTimeStamps();
return 0;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////// Utility functions ///////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////////////
SignalConfigPtr createDomainSignal(const ContextPtr& context, std::string epoch)
{
daq::DataDescriptorPtr dataDescriptor = daq::DataDescriptorBuilder()
.setSampleType(SampleTypeFromType<ClockTick>::SampleType)
.setOrigin(epoch)
.setTickResolution(Ratio(1, 1000))
.setRule(LinearDataRule(1, 0))
.setUnit(daq::Unit("s", -1, "seconds", "time"))
.build();
auto domain = Signal(context, nullptr, "time");
domain.setDescriptor(dataDescriptor);
return domain;
}
ReadSignal demoSignal(const ContextPtr& context, std::int64_t packetSize, const std::string& domainOrigin)
{
static int counter = 0;
auto newSignal = Signal(context, nullptr, fmt::format("sig{}", counter++));
newSignal.setDescriptor(DataDescriptorBuilder().setSampleType(SampleType::Float64).build());
newSignal.setDomainSignal(createDomainSignal(context, domainOrigin));
return ReadSignal(newSignal, packetSize);
}
ReadSignal::ReadSignal(const SignalConfigPtr& signal, std::int64_t packetSize)
: packetSize(packetSize)
, signal(signal)
, valueDescriptor(signal.getDescriptor())
{
}
void ReadSignal::sendPacket()
{
auto domainSignal = signal.getDomainSignal();
auto domainDescriptor = domainSignal.getDescriptor();
Int delta = domainDescriptor.getRule().getParameters()["delta"];
auto offset = (packetSize * delta) * packetIndex;
auto domainPacket = daq::DataPacket(domainDescriptor, packetSize, offset);
auto packet = daq::DataPacketWithDomain(domainPacket, valueDescriptor, packetSize);
// Zero-out data
memset(packet.getData(), 0, packet.getSampleCount() * packet.getSampleMemSize());
auto* data = static_cast<double*>(packet.getData());
for (auto i = 0; i < packetSize; ++i)
{
data[i] = offset + i;
}
signal.sendPacket(packet);
packetIndex++;
}
template <typename T, typename U>
void printData(std::int64_t samples, T& times, U& values)
{
using namespace std::chrono;
using namespace reader;
int numSignals = std::size(times);
for (int sigIndex = 0; sigIndex < numSignals; ++sigIndex)
{
fmt::print("--------\n");
fmt::print("Signal {}\n", sigIndex);
for (int sampleIndex = 0; sampleIndex < samples; ++sampleIndex)
{
std::stringstream ss;
ss << times[sigIndex][sampleIndex];
fmt::print(" |d: {} |v: {}\n", ss.str(), values[sigIndex][sampleIndex]);
}
}
}
void drawBoxMessage(const std::string& message)
{
fmt::print("┌{0:─^{2}}┐\n"
"│{1: ^{2}}│\n"
"└{0:─^{2}}┘\n",
"",
message,
20);
}