Read last N samples

This guide assumes that you’ve read the background section about the common behavior of Readers or gone through the Read basic value and Domain data how-to guide.

For brevity, in C++ we also assume that all the code is in namespace daq or it has been imported via using namespace daq; and omit any otherwise necessary header includes until the final complete listing.

When you are only interested in the last few samples of a Signal at any given time, this is the job for a Tail Reader. The Reader will connect to a Signal and always keep at least the last N samples in the cache ready to read. This number is referred to as history size.

To create the Reader, you must first decide on the number of last samples you’re interested in then pass it to the Reader as a parameter. In the example below we choose to keep the last 5 samples of the Signal signal.

Creating a default Tail Reader
  • Cpp

  • Python

// These calls all create the same Reader
auto reader = TailReader(signal, 5);
auto reader = TailReader<double, Int>(signal, 5);
auto reader = TailReader(signal, 5, SampleType::Float64, SampleType::Int64);
# These calls all create the same Reader
reader = opendaq.TailReader(signal, 5)
reader = opendaq.TailReader(signal, 5, value_type=opendaq.SampleType.Float64, domain_type=opendaq.SampleType.Int64)
reader = opendaq.TailReader(signal, 5, value_type=opendaq.SampleType.Float64)

By default, if no explicit read Sample Types are provided, they’re assumed to be:

  • SampleType::Float64 / daq::Float / double for value.

  • SampleType::Int64 / daq::Int / std::int64_t for domain.

After creating the Reader, we can issue the usual read calls. Until the Signal produces at least the history size samples (in our case 5) the Reader will on a read call only return the available number of samples.

  • Cpp

  • Python

auto reader = TailReader(signal, 5);

// Signal produces 3 samples: { 1, 2, 3 }

SizeT count{5};
double values[5]{};
Int domain[5]{};

reader.readWithDomain(values, domain, &count); // count = 3, values = { 1, 2, 3, 0, 0 }
reader = opendaq.TailReader(signal, 5)

# Signal produces 3 samples: 1, 2, 3

reader.read(5) # [1, 2, 3]

On read the reading position is not advanced, and subsequent calls may return overlapping samples if not enough new samples have been produced between calls.

  • Cpp

auto reader = TailReader(signal, 5);

// The Signal produces 3 samples: 1, 2, 3

SizeT count{5};
double values[5]{};

reader.read(values, &count); // count = 3, values = { 1, 2, 3, 0, 0 }

// The Signal produces 3 samples: 4, 5, 6

count = 5;
reader.read(values, &count); // count = 5, values = { 2, 3, 4, 5, 6 }

As the reader is unable to keep an indefinite number of Packets for an indefinite amount of time, when enough samples are produced to get above the history size, the Reader will start replacing old Packets with new ones when it is able to still stay above the requested limit. The consequence of this is that requesting more samples than history size will result in an error unless the number of samples in the cached Packets happens to be more than required due to having to hold on to a larger Packet in order to stay above the limit.

Reading above history size
  • Cpp

  • Python

auto reader = TailReader(signal, 5);

/*
 * The Signal produces 3 Packets
 * [Packet 1]: 4 samples
 * [Packet 2]: 3 samples
 * [Packet 3]: 1 sample
 * -------------------------------
 *      Total: 8 samples
 */

SizeT count{10};
double values[10]{};

// Will throw a SizeTooLargeException
reader.read(values, &count);

// Will succeed as [Packet 3] and [Packet 2] together are less than 5 samples
// and we still need to keep the [Packet 1] around to satisfy the minimum of 5 samples
count = 8;
reader.read(values, &count);
reader = opendaq.TailReader(signal, 5)

#  The Signal produces 3 Packets
#  [Packet 1]: 4 samples
#  [Packet 2]: 3 samples
#  [Packet 3]: 1 sample
#  -------------------------------
#       Total: 8 samples

# Will throw
values = reader.read(10)

# Will succeed as [Packet 3] and [Packet 2] together are less than 5 samples
# and we still need to keep the [Packet 1] around to satisfy the minimum of 5 samples

values = reader.read(8)

The following is a fully working example with most of the Tail Reader edge cases explained above. To properly illustrate the point and provide reproducibility, the data is manually generated, but the same should hold when connecting to a real device.

The full example code listing
  • Cpp

#include <opendaq/context_factory.h>
#include <opendaq/data_rule_factory.h>
#include <opendaq/packet_factory.h>
#include <opendaq/reader_factory.h>
#include <opendaq/scheduler_factory.h>
#include <opendaq/signal_factory.h>

#include <cassert>
#include <iostream>

using namespace daq;

SignalConfigPtr setupExampleSignal();
DataPacketPtr createPacketForSignal(const SignalPtr& signal, SizeT numSamples, Int offset = 0);
DataDescriptorPtr setupDescriptor(SampleType type, DataRulePtr rule = nullptr);

/*
 * Example 1: Behavior of the Tail Reader before getting the full history-size samples
 */
void example1(const SignalConfigPtr& signal)
{
    auto reader = TailReader(signal, 5);
    assert(reader.getAvailableCount() == 0u);

    // Allocate the buffers for the reader to copy data into
    SizeT count{};
    double values[5]{};
    Int domain[5]{};

    // Is below the history-size
    count = 3;
    reader.readWithDomain(values, domain, &count);
    assert(count == 0);

    try
    {
        // Is more than the history-size
        count = 6;
        reader.readWithDomain(values, domain, &count);
    }
    catch (const SizeTooLargeException& e)
    {
        std::cerr << "Exception: " << e.what() << std::endl;
    }

    // The Signal produces 3 samples: 1, 2, 3
    auto packet = createPacketForSignal(signal, 3);
    auto data = static_cast<double*>(packet.getData());
    data[0] = 1;
    data[1] = 2;
    data[2] = 3;
    signal.sendPacket(packet);

    count = 5;
    reader.readWithDomain(values, domain, &count);

    // count = 3, values = { 1, 2, 3, 0, 0 }
    assert(count == 3u);
    assert(values[0] == 1);
    assert(values[1] == 2);
    assert(values[2] == 3);
    assert(values[3] == 0);
    assert(values[4] == 0);
}

/*
 * Example 2: Subsequent reads can have overlapping samples
 */
void example2(const SignalConfigPtr& signal)
{
    auto reader = TailReader(signal, 5);

    // The Signal produces 3 samples: 1, 2, 3
    const SizeT FIRST_PACKET_SAMPLES = 3u;
    auto packet = createPacketForSignal(signal, FIRST_PACKET_SAMPLES);
    auto data = static_cast<double*>(packet.getData());
    data[0] = 1;
    data[1] = 2;
    data[2] = 3;
    signal.sendPacket(packet);

    // Allocate the buffers for the reader to copy data into
    SizeT count{5};
    double values[5]{};
    reader.read(values, &count);

    // count = 3, values = { 1, 2, 3, 0, 0 }
    assert(count == 3u);
    assert(values[0] == 1);
    assert(values[1] == 2);
    assert(values[2] == 3);
    assert(values[3] == 0);
    assert(values[4] == 0);

    // The Signal produces 3 samples: 4, 5, 6
    auto packet2 = createPacketForSignal(signal, 3, FIRST_PACKET_SAMPLES);
    auto data2 = static_cast<double*>(packet2.getData());
    data2[0] = 4;
    data2[1] = 5;
    data2[2] = 6;
    signal.sendPacket(packet2);

    count = 5;
    reader.read(values, &count);

    // count = 5, values = { 2, 3, 4, 5, 6 }
    assert(count == 5);
    assert(values[0] == 2);
    assert(values[1] == 3);
    assert(values[2] == 4);
    assert(values[3] == 5);
    assert(values[4] == 6);
}

void example3(const SignalConfigPtr& signal)
{
    auto reader = TailReader(signal, 5);

    /*
     * The Signal produces 3 Packets
     * [Packet 1]: 4 samples
     * [Packet 2]: 3 samples
     * [Packet 3]: 1 sample
     * -------------------------------
     *      Total: 8 samples
     */

    auto packet1 = createPacketForSignal(signal, 4);
    auto packet2 = createPacketForSignal(signal, 3);
    auto packet3 = createPacketForSignal(signal, 1);
    signal.sendPacket(packet1);
    signal.sendPacket(packet2);
    signal.sendPacket(packet3);

    assert(reader.getAvailableCount() == 8u);

    // Allocate the buffers for the reader to copy data into
    SizeT count{};
    double values[10]{};

    try
    {
        count = 10;

        // Will throw a SizeTooLargeException
        reader.read(values, &count);
    }
    catch (const SizeTooLargeException& e)
    {
        std::cerr << "Exception: " << e.what() << std::endl;
    }

    // Will succeed as [Packet 3] and [Packet 2] together are less than 5 samples,
    // and we still need to keep [Packet 1] around to satisfy the minimum of 5 samples
    count = 8;
    reader.read(values, &count);

    assert(count == 8u);
}

/*
 * ENTRY POINT
 */
int main(int /*argc*/, const char* /*argv*/[])
{
    SignalConfigPtr signal = setupExampleSignal();

    example1(signal);
    example2(signal);
    example3(signal);

    return 0;
}

/*
 * Set up the Signal with Float64 data
 */
SignalConfigPtr setupExampleSignal()
{
    auto logger = Logger();
    auto context = Context(Scheduler(logger, 1), logger, nullptr, nullptr);

    auto signal = Signal(context, nullptr, "example signal");
    signal.setDescriptor(setupDescriptor(SampleType::Float64));

    return signal;
}

DataDescriptorPtr setupDescriptor(SampleType type, DataRulePtr rule)
{
    // Set up the data descriptor with the provided Sample-Type
    const auto dataDescriptor = DataDescriptorBuilder().setSampleType(type);

    // For the Domain, we provide a Linear Rule to generate time-stamps
    if (rule.assigned())
        dataDescriptor.setRule(rule);

    return dataDescriptor.build();
}

DataPacketPtr createPacketForSignal(const SignalPtr& signal, SizeT numSamples, Int offset)
{
    // Create a Data Packet where the values are generated via the +1 rule starting at 0
    auto domainPacket = DataPacket(setupDescriptor(SampleType::Int64, LinearDataRule(1, 0)),
                                   numSamples,
                                   offset  // offset from 0 to start the sample generation at
    );

    return DataPacketWithDomain(domainPacket, signal.getDescriptor(), numSamples);
}