Multi Reader specification

Overview

The Multi Reader is used to read several signals simultaneously, synchronizing them in time. This document outlines some features, use cases, setup requirements, as well as limitations of the Multi Reader. The article assumes previous knowledge of openDAQ reader usage and focuses on the intricacies of the Multi Reader instead of basic Reader concepts. The guide uses C++ examples to showcase how the multi reader behaves.

Multi reader example

Simple example on how to use the Multi Reader to read data of signals of equal sampling rates. The following sections will break down this example and explain each individual section of code, as well as provide additional examples highlighting different ways of reading data with the Multi Reader.

  • Cpp

using namespace daq;
using namespace std::chrono_literals;

int main()
{
    auto instance = Instance();
    auto refDevice = instance.addDevice("daqref://device0");
    auto signals = refDevice.getSignalsRecursive();

    // Create reader that converts values to `double` and time data to `int64`
    auto multiReaderBuilder = MultiReaderBuilder().setValueReadType(SampleType::Float64).setDomainReadType(SampleType::Int64);
    for (const auto& signal : signals)
        multiReaderBuilder.addSignal(signal);
    auto multiReader = multiReaderBuilder.build();

    // Allocate buffers for each signal
    auto signalsCount = signals.getCount();
    auto kBufferSize = SizeT{0};
    auto domainBuffers = std::vector<void*>(signalsCount, nullptr);
    auto dataBuffers = std::vector<void*>(signalsCount, nullptr);

    // read data every 50ms, up to a maximum of kBufferSize samples
    for (size_t readCount = 0; readCount < 20; readCount++)
    {
        auto dataAvailable = multiReader.getAvailableCount();
        auto count = std::min(kBufferSize, dataAvailable);
        auto status = multiReader.readWithDomain(dataBuffers.data(), domainBuffers.data(), &count);

        if (status.getReadStatus() == ReadStatus::Event)
        {
            // Set buffer size based on sample rate, allocate buffers
            // Buffers have 100ms worth of memory for each signal
            auto sampleRate = reader::getSampleRate(
                status.getMainDescriptor().getParameters().get(event_packet_param::DOMAIN_DATA_DESCRIPTOR));

            kBufferSize = sampleRate / 10;

            for (size_t i = 0; i < signalsCount; ++i)
            {
                dataBuffers[i] = std::calloc(kBufferSize, getSampleSize(SampleType::Float64));
                domainBuffers[i] = std::calloc(kBufferSize, getSampleSize(SampleType::Int64));
            }
        }
        else if (status.getReadStatus() == ReadStatus::Ok && count > 0)
        {
            std::cout << "Data: ";
            for (const auto& buf : dataBuffers)
                std::cout << std::to_string(static_cast<double*>(buf)[0]) << "; ";
            std::cout << "\n";
        }

        std::this_thread::sleep_for(50ms);
    }
}

Using the Multi Reader

The Multi Reader has a large suite of configuration options, and its usage patterns vary based on how it is configured. We will start by highlighting a basic example where the reader is used to read data of equal sampling rates and continue with more advanced options.

Creating the Multi Reader

A set of different factories exist that allow users to create a Multi Reader. In this guide we will mainly be using the MultiReaderBuilder factory, as it is the only one that allows for full configuration of parameters, whereas other factories only provide access to a limited set of settings.

Basic options

The basic Multi Reader options are as follows:

  • Value read type - The data type to which value signal data should be converted. If set to SampleType::Undefined, data in signal packets will not be converted, but left as-is.

  • Domain read type - The data type to which domain signal data should be converted. If set to SampleType::Undefined, data in signal packets will not be converted, but left as-is.

  • Read mode - Specifies whether Scaling should be applied. Can also affect the Value read type. Can be one of:

    • Scaled - Applies Post Scaling, Reference Domain Offset, and calculates Implicit Data Rule on packet data. Output type is equal to Value read type.

    • Unscaled - Does not apply Post Scaling and Reference Domain Offset do packet data. Data samples output type is equal to Value read type. Unscaled mode works only with Explicit signals

    • Raw - Same as Unscaled, with the exception Value read type being forced to SampleType::Undefined.

Signals vs. Ports

Depending on usage, a Multi Reader can be created with signals or input ports as builder input parameters. While similar, there are some differences:

  • Signals: The Multi Reader creates an internal input port and connects the signal to said port automatically. Users do not have access to the port. Mainly used when an application has a pre-defined list of signals it wants to read.

  • Input Ports: The port is already created and can be accessed by users. In this case, the Multi Reader takes ownership of the port and waits for signals to be connected. Mainly used in function blocks where users connect signals into a set of pre-created input ports after the function block has been created.

  • Cpp

// Creating a reader from signals
auto multiReaderBuilderSignals = daq::MultiReaderBuilder();
for (const auto& signal : signals)
    multiReaderBuilderSignals.addSignal(signal);
auto multiReaderSignals = multiReaderBuilderSignals.build();

// Creating a reader from ports
auto multiReaderBuilderPorts = daq::MultiReaderBuilder();
for (const auto& inputPort : inputPorts)
    multiReaderBuilderPorts.addInputPort(inputPort);
auto multiReaderPorts = multiReaderBuilderPorts.build();

Domain signal requirements

The domain signals of all signals read by the multi reader must fulfil the following requirements:

  • Domain: The domain signals must represent time in seconds. The domain unit must have the symbol "s" and the quantity "time".

  • Rule: The data rule must be linear.

  • Sampling Rates (SRs): All sampling rates must be compatible and measured as an integer number of samples per second. Sample rate (or sampling rate) is the number of samples of a continuous signal taken per domain unit during its conversion into a digital signal. It can be calculated from domain signal decsriptor as sr = 1 / (r * d), where r - signal tickResolution, d - signal linearRule.delta.

  • Reference Domain Info: All domain signals can be grouped by pair of reference domain ID and time source. The Multi Reader requires the following:

    • At least one of the read signals in a given reference domain ID group must have a known time source, the others can have an unknown source.

    • Signals that do not belong to the same reference domain group can still be read together as long as they have a matching time source.

    • Signals without a reference domain will be treated as wildcards and the reader will attempt to synchronize them, but might not be successful if the signals are not synchronized.

Reading signal descriptors

The first read of the Multi Reader always returns a sample count of 0 and read status Event because the Multi Reader provides signal descriptors during the initial read. These can be used to determine the input signal sampling rates, as well as the signal data types. If the descriptor of any read signals changes, the following read call will also have the read status Event, notifying you of the change.

  • Cpp

auto dataAvailable = multiReader.getAvailableCount();
auto count = std::min(kBufferSize, dataAvailable);

// Read and check for whether an event was encountered.
auto status = multiReader.readWithDomain(dataBuffers.data(), domainBuffers.data(), &count);
if (status.getReadStatus() == ReadStatus::Event)
{
    std::cout << "Event received\n";
}

When an event is encountered, the signal descriptors can be obtained from the read status. They can be used to validate signal compatibility with the user application, and used to calculate optimal buffer sizes.

  • Cpp

if (status.getReadStatus() == ReadStatus::Event)
{
    // Set buffer size based on sample rate (in hertz), allocate buffers
    // Buffers have 100ms worth of memory for each signal
    auto sampleRate = reader::getSampleRate(
        status.getMainDescriptor().getParameters().get(event_packet_param::DOMAIN_DATA_DESCRIPTOR));
    kBufferSize = sampleRate / 10;
}

If data descriptors are not consistent with what a function block or application expects, the Multi Reader can be deactivated by calling multiReader.setActive(false). While deactivated, the Multi Reader will drop data packets, ensuring it does not run out of memory. Event packets, however, will still be received, allowing users to re-enable the reader if a new, accepted descriptor is set for the input signal.

Reading data

After the user has initially read the data descriptors of the packets, it’s time to read the data. The Multi Reader returns data in a "jagged array." The allocated memory for the jagged array is provided through a void** pointer in the read()/readWithDomain() call. The buffers should be allocated to have space for the maximum read amount times the memory size of the data type read. In our example, we read data as double and domain data as int64. A helper, daq::getSampleSize, allowing for calculating the required memory size given a SampleType is also provided by openDAQ.

  • Cpp

if (status.getReadStatus() == ReadStatus::Event)
{
    // ...

    for (size_t i = 0; i < signalsCount; ++i)
    {
        dataBuffers[i] = std::calloc(kBufferSize, getSampleSize(SampleType::Float64));
        domainBuffers[i] = std::calloc(kBufferSize, getSampleSize(SampleType::Int64));
    }
}

Reading data in a loop

There are two options for reading data with readers: in a loop, or in a callback. When reading data in a loop, the application must provide a thread in which read is called periodically. In the below example, readWithDomain is called every 50ms.

  • Cpp

for (size_t readCount = 0; readCount < 20; readCount++)
{
    auto dataAvailable = multiReader.getAvailableCount();
    auto count = std::min(kBufferSize, dataAvailable);
    auto status = multiReader.readWithDomain(dataBuffers.data(), domainBuffers.data(), &count);

    if (status.getReadStatus() == ReadStatus::Event)
    {
        // ...
    }
    else if (status.getReadStatus() == ReadStatus::Ok && count > 0)
    {
        std::cout << "Data: ";
        for (const auto& buf : dataBuffers)
            std::cout << std::to_string(static_cast<double*>(buf)[0]) << "; ";
        std::cout << "\n";
    }

    std::this_thread::sleep_for(50ms);
}

Reading data in callbacks

To read data in a callback, the multi reader setOnDataAvailable() method can be used. When a callback is provided via said method, the callback will be triggered whenever the Multi Reader has data that can be read, or an event has been encountered.

  • Cpp

std::mutex mutex;
bool running = true;

// Create lambda that is invoked when data is available
auto readData = [&]
{
    // Read data under lock, stop reading once application terminates
    std::scoped_lock lock(mutex);
    if (!running)
        return;

    auto dataAvailable = multiReader.getAvailableCount();
    auto count = std::min(kBufferSize, dataAvailable);
    auto status = multiReader.readWithDomain(dataBuffers.data(), domainBuffers.data(), &count);

    if (status.getReadStatus() == ReadStatus::Event)
    {
        // ...
    }
    else if (status.getReadStatus() == ReadStatus::Ok && count > 0)
    {
        std::cout << "Data: ";
        for (const auto& buf : dataBuffers)
            std::cout << std::to_string(static_cast<double*>(buf)[0]) << "; ";
        std::cout << "\n";
    }
};

multiReader.setOnDataAvailable(readData);

// ...

{
    // Stop reading on application termination to prevent access to destroyed objects
    std::scoped_lock lock(mutex);
    running = false;
}

Reusing domain data

To simplify the creation of output domain signals in function blocks that uses the Multi Reader, the read status provides a "main descriptor" that can be obtained through the getMainDescriptor() reader function. In function blocks that aggregate signals (ie. multiplication or summation of signals), the output signal likely has the same sampling rate and timestamps as its inputs. As such, the domain descriptor of the "main signal" can be used as the descriptor for the output domain signal.

The main descriptor simply corresponds to the first signal in the list of signals read by the Multi Reader.
  • Cpp

auto eventPacket = status.getMainDescriptor();
auto outputDomainDescriptor = eventPacket.getParameters().get(event_packet_param::DOMAIN_DATA_DESCRIPTOR);
auto outputDomainSignal = SignalWithDescriptor(context, outputDomainDescriptor, parent, "outputDomainSignal");

As the Multi Reader allows only Implicit rule signals, the output domain packets of the outputDomainSignal should contain no buffers, they should only have the PacketOffset configured. The reader status getOffset function can be used to obtain the PacketOffset for output domain signal packet creation.

  • Cpp

// `count` corresponds to the amount of samples read
auto outputDomainPacket = DataPacket(outputDomainDescriptor, count, status.getOffset());
outputDomainSignal.sendPacket(outputDomainPacket);

Advanced usage

This section highlights the remaining Multi Reader options, and details when and how the Multi Reader can be used to read signals with different sampling rates.

Advanced builder options

  • Min read count: Specifies the minimal amount of samples that can be read. If there are less samples than specified available, getAvailableCount() will return 0, while /read()/readWithDomain()` will not read any data. If there are less samples available than the minimum read count before the next event packet would be read, the samples before the event are discarded.

  • Required common sample rate: Common sample rate of of the Multi Reader calculated as the Least Common Multiple (LCM) of all signals sample rates. If common sample rate setup manually, all signal dividers will be calculated according to those sample rate.

  • Start on full unit of domain: Align common starting point of all singals to even numbers of domain units from common origin, eg. on a full second.

Different sample rates

In this section two terms will be used:

  • Common sample rate (SR): The least common sampling rate multiple of all signals read by the multi reader. Obtained via reader.getCommonSampleRate() that returns the sampling rate in Hz. The function should be used only after the first read/readWithDomain() call, as the reader does not have information on signal rates before that.

  • SR divider: The value obtained via by dividing the common sample rate with the sampling rate of an individual read signal: commonSampleRate / signalSampleRate.

When reading signals with different sample rates the following changes:

  • The output sample count will be different for each signal and can be calculated using SR dividers.

  • The main descriptor and offset from the Multi Reader status still correspond only to the first signal in the reader, but do not apply generally to all signal rates. These fields should be used with caution.

To obtain the sample rate of any individual signal, the function reader::getSampleRate(domainDescriptor) is available. It can be used on any domain descriptor obtained via events when reading data.

When reading signals with different rates, no fewer samples than the least common multiple (LCM) of the dividers of all signals connected to the Multi Reader can be read. Additionally, when allocating buffers, the SR dividers should be used to determine the sizes of said buffers for any given signal. For example, if we would wish to read at the minimum amount of data (equal to the LCM), the buffer for each individual signal would be allocated as LCM / signal.SRDiv. To hold more data, the buffers can be expanded, by multiplying the minimum size (LCM / signal.SRDiv) by an integer coefficient.

  • Cpp

std::vector<size_t> dividers;

auto dataAvailable = multiReader.getAvailableCount();
auto count = std::min(kBufferSize, dataAvailable);
auto status = multiReader.readWithDomain(dataBuffers.data(), domainBuffers.data(), &count);

if (status.getReadStatus() == ReadStatus::Event)
{
    auto packets = status.getEventPackets();
    if (!(packets.getValueList()[0].getEventId() == event_packet_id::DATA_DESCRIPTOR_CHANGED))
        continue;

    // SRDiv calculation
    size_t commonSampleRate = multiReader.getCommonSampleRate();
    dividers.clear();
    std::cout << "Dividers: ";
    for (const auto& [_, eventPacket] : status.getEventPackets())
    {
        auto descriptor = eventPacket.getParameters().get(event_packet_param::DOMAIN_DATA_DESCRIPTOR);
        auto sampleRate = reader::getSampleRate(descriptor);
        dividers.push_back(commonSampleRate / sampleRate);
        std::cout << dividers.back() << ", ";
    }
    std::cout << "\n";

    // Allocate buffers for 100ms according to commonSampleRate
    size_t lcm = 1;
    for (const auto& div : dividers)
        lcm = std::lcm<std::size_t>(lcm, div);

    // Calculate k as the minimum number of LCM-size blocks to read ~100ms of data
    size_t k = std::max(commonSampleRate / lcm / 10, static_cast<size_t>(1));
    kBufferSize = k * lcm;

    std::cout << "Buffer sizes: ";
    for (size_t i = 0; i < signalsCount; ++i)
    {
        dataBuffers[i] = std::calloc(kBufferSize / dividers[i], getSampleSize(SampleType::Float64));
        domainBuffers[i] = std::calloc(kBufferSize / dividers[i], getSampleSize(SampleType::Int64));
        std::cout << kBufferSize / dividers[i] << ", ";
    }
    std::cout << "\n";
}

Signal synchronization

Synchronization of multiple readers refers to identifying a common point in the domain space and beginning to read all signal samples simultaneously from that point or immediately afterward.

Synchronization happens during three Multi Reader calls - getAvailableCount() and read()/readWithDomain(). Synchronization within getAvailableCount() is not a complete procedure. during getAvailableCount(), the Multi Reader does not obtain domain samples and only checks sample counts in connection queues, making it impossible to find common starting point.

To synchronize two signals, the Multi Reader initially reads their domain descriptors from the connection packet queue. This is triggerd by calling read()` or readWithDomain(). Before those calls, getAvailableCount() will return 0, and the getCommonSampleRate() will not yet be available.

On the first read call, the Multi Reader saves the resolution for each signal, saves their origin, and calculates the sampling rates and SR dividers. At this point, if a change in sampling rate is detected, the reader switches to an invalid state. The common sample rate among all signals is calculated as the least common multiplier (LCM) of the sampling rates of all signals. If the required sample rate paramater is configured, said rate will be enforced as the common one.

The divider for each individual signal is calculated using the common rate - it must be divisible by an integer divider without remainder, or the Multi Reader switches to the invalid state. Here, the LCM of all dividers is calculated. It is later used to calculate the count of available, read, and skipped samples, as the LCM of dividers represents the minimum read count.

When event packets are removed from the beginning of input port queues, synchronization occurs. The earliest orign and highest resolution of all signals are calculated (the system resolution is also considered as one of the resolutions and often becomes the highest one). Then, for each signal, the offset in maximum resolution ticks from the earliest epoch value is calculated. Those, along with a multiplier that represents the ratio of the signal resolution to the maximum resolution (multiplier = signal_resolution / maximum_resolution), are used to convert signal ticks from the signal resolution to the common maximum resolution: max_resolution_ticks = signal_resolution_ticks * multiplier

Finally, the Multi Reader reads the start domain value of each signal. The latest domain value among all signals becomes the common starting point from which reading should start. This starting point is also rounded up to an interval defined as either the ratio of the LCM of the sample rate dividers to the common sample rate, or to full units of the domain if such an option was used during creation.

When the starting point is determined, each signal skips samples until said point is reached. When the domain value of a signal becomes greater than or equal to this starting point, it is considered synchronized. When all signals are synchronized, the entire Multi Reader state also becomes synchronized.

Resynchronization

Resynchronization is triggered by domain descriptor updates or changes in the active state via the setActive() call. Changes in resolution or origin set the reader to an unsynchronized state, while changes in sample rate set it to "invalid".

Resynchronization includes all the steps described in the synchronization process.

Drop Conditions

Synchronization is dropped if inputs violate domain or sample rate rules.

Multi reader limitations

  • No Asynchronous Signals: The Multi Reader does not support asynchronous signals. Only signals with domain signals that have a linear data rule are accepted.

  • Gap Packets: Gap packets can only be detected when the reader is created from ports, not directly from signals.

  • Fixed Sample Rates: Changing input sample rates invalidates the reader, making it impossible to re-use the Multi Reader if one of the signals sample rate was changed. Instead, a new Multi Reader must be created, using the previous one.

  • Additions Post-Creation: Adding new signals or ports to the reader after creation is not supported.

  • Domain Offsets: Reference domain offsets must have the same time source, as the Multi Reader does not yet account for the time differences between them (eg. UTC vs TAI).

Full example source

  • Cpp

#include <opendaq/event_packet_params.h>
#include <opendaq/opendaq.h>
#include <iostream>

using namespace daq;
using namespace std::chrono_literals;

void readDataSameRatesSignals(const ListPtr<ISignal>& signals);
void readDataSameRatesPortsAndOutput(const ListPtr<ISignal>& signals);
void readDataDifferentRates(const ListPtr<ISignal>& signals);

int main()
{
    auto instance = InstanceBuilder().setGlobalLogLevel(LogLevel::Error).build();
    auto refDevice = instance.addDevice("daqref://device0");
    refDevice.setPropertyValue("NumberOfChannels", 4);
    auto signals = refDevice.getSignalsRecursive();

    std::cout << "Same rate data, signals, read in a loop:\n";
    readDataSameRatesSignals(signals);

    std::cout << "\nSame rate data, using input ports, read in callbacks, data is output:\n";
    readDataSameRatesPortsAndOutput(signals);

    const auto channels = refDevice.getChannelsRecursive();
    channels[0].setPropertyValue("UseGlobalSampleRate", false);
    channels[0].setPropertyValue("SampleRate", 100);
    channels[1].setPropertyValue("UseGlobalSampleRate", false);
    channels[1].setPropertyValue("SampleRate", 200);
    channels[2].setPropertyValue("UseGlobalSampleRate", false);
    channels[2].setPropertyValue("SampleRate", 500);
    std::cout << "\nDifferent rate data:\n";
    readDataDifferentRates(signals);

    std::cout << "\nPress \"enter\" to exit the application..." << std::endl;
    std::cin.get();
    return 0;
}

void readDataSameRatesSignals(const ListPtr<ISignal>& signals)
{
    // Create reader that converts values to `double` and time data to `int64`
    auto multiReaderBuilder = MultiReaderBuilder().setValueReadType(SampleType::Float64).setDomainReadType(SampleType::Int64);
    for (const auto& signal : signals)
        multiReaderBuilder.addSignal(signal);
    auto multiReader = multiReaderBuilder.build();

    // Allocate buffers for each signal
    auto signalsCount = signals.getCount();
    auto kBufferSize = SizeT{0};
    auto domainBuffers = std::vector<void*>(signalsCount, nullptr);
    auto dataBuffers = std::vector<void*>(signalsCount, nullptr);

    // read data every 50ms, up to a maximum of kBufferSize samples
    for (size_t readCount = 0; readCount < 20; readCount++)
    {
        auto dataAvailable = multiReader.getAvailableCount();
        auto count = std::min(kBufferSize, dataAvailable);
        auto status = multiReader.readWithDomain(dataBuffers.data(), domainBuffers.data(), &count);

        if (status.getReadStatus() == ReadStatus::Event)
        {
            // Set buffer size based on sample rate, allocate buffers
            // Buffers have 100ms worth of memory for each signal
            auto sampleRate = reader::getSampleRate(
                status.getMainDescriptor().getParameters().get(event_packet_param::DOMAIN_DATA_DESCRIPTOR));
            kBufferSize = sampleRate / 10;

            for (size_t i = 0; i < signalsCount; ++i)
            {
                dataBuffers[i] = std::calloc(kBufferSize, getSampleSize(SampleType::Float64));
                domainBuffers[i] = std::calloc(kBufferSize, getSampleSize(SampleType::Int64));
            }
        }
        else if (status.getReadStatus() == ReadStatus::Ok && count > 0)
        {
            std::cout << "Data: ";
            for (const auto& buf : dataBuffers)
                std::cout << std::to_string(static_cast<double*>(buf)[0]) << "; ";
            std::cout << "\n";
        }

        std::this_thread::sleep_for(50ms);
    }

    for (size_t i = 0; i < signalsCount; ++i)
    {
        free(dataBuffers[i]);
        free(domainBuffers[i]);
    }
}

void readDataSameRatesPortsAndOutput(const ListPtr<ISignal>& signals)
{
    ListPtr<IInputPort> ports = List<IInputPort>();
    auto signalsCount = signals.getCount();
    auto context = signals[0].getContext();
    for (size_t i = 0; i < signalsCount; ++i)
    {
        const auto port = InputPort(context, nullptr, "port" + std::to_string(i));
        port.setNotificationMethod(PacketReadyNotification::SameThread);
        ports.pushBack(port);
    }

    // Create reader that converts values to `double` and time data to `int64`
    auto multiReaderBuilder = MultiReaderBuilder()
                              .setValueReadType(SampleType::Float64)
                              .setDomainReadType(SampleType::Int64);

    for (const auto& port : ports)
        multiReaderBuilder.addInputPort(port);
    auto multiReader = multiReaderBuilder.build();

    SignalConfigPtr outputSignal;
    SignalConfigPtr outputDomainSignal;

    // Allocate buffers for each signal
    auto kBufferSize = SizeT{0};
    auto domainBuffers = std::vector<void*>(signalsCount, nullptr);
    auto dataBuffers = std::vector<void*>(signalsCount, nullptr);

    std::mutex mutex;
    bool running = true;

    // read data a maximum of kBufferSize samples
    auto readData = [&]
    {
        std::scoped_lock lock(mutex);
        if (!running)
            return;

        auto dataAvailable = multiReader.getAvailableCount();
        auto count = std::min(kBufferSize, dataAvailable);
        auto status = multiReader.readWithDomain(dataBuffers.data(), domainBuffers.data(), &count);

        if (status.getReadStatus() == ReadStatus::Event)
        {
            // Set buffer size based on sample rate, allocate buffers
            // Buffers have 100ms worth of memory for each signal
            auto domainDataDescriptor = status.getMainDescriptor().getParameters().get(event_packet_param::DOMAIN_DATA_DESCRIPTOR);
            auto sampleRate = reader::getSampleRate(domainDataDescriptor);
            kBufferSize = sampleRate / 10;

            for (size_t i = 0; i < signalsCount; ++i)
            {
                dataBuffers[i] = std::calloc(kBufferSize, getSampleSize(SampleType::Float64));
                domainBuffers[i] = std::calloc(kBufferSize, getSampleSize(SampleType::Int64));
            }

            // Configure output signals
            outputSignal = SignalWithDescriptor(context, DataDescriptorBuilder().setSampleType(SampleType::Float64).build(), nullptr, "Avg");
            outputDomainSignal = SignalWithDescriptor(context, domainDataDescriptor, nullptr, "AvgTime");
            outputSignal.setDomainSignal(outputDomainSignal);
        }
        else if (status.getReadStatus() == ReadStatus::Ok && count > 0)
        {
            auto domainPacket = DataPacket(outputDomainSignal.getDescriptor(), count, status.getOffset());
            auto valuePacket = DataPacketWithDomain(domainPacket, outputSignal.getDescriptor(), count);

            // Average all signals and send output
            double* avgData = static_cast<double*>(valuePacket.getRawData());
            for (size_t i = 0; i < count; ++i)
            {
                avgData[i] = 0;
                for (const auto& buf : dataBuffers)
                    avgData[i] += static_cast<double*>(buf)[i];

                avgData[i] /= static_cast<double>(signalsCount);
            }

            outputSignal.sendPacket(valuePacket);
            outputDomainSignal.sendPacket(domainPacket);
        }
    };

    // Set read callback
    multiReader.setOnDataAvailable(readData);


    // Connect signals to ports
    for (size_t i = 0; i < signalsCount; ++i)
        ports[i].connect(signals[i]);

    // Read avg data with stream reader, pre-allocate 100ms of data, assuming 1KHz rates
    auto streamReader = StreamReader<double, int64_t>(outputSignal);
    double avgValues[100];

    for (size_t readCount = 0; readCount < 20; readCount++)
    {
        auto count = streamReader.getAvailableCount();
        streamReader.read(&avgValues, &count);

        if (count > 0)
            std::cout << "Avg data: " << avgValues[0] << "\n";

        std::this_thread::sleep_for(50ms);
    }

    {
        std::scoped_lock lock(mutex);
        running = false;
    }

    for (size_t i = 0; i < signalsCount; ++i)
    {
        free(dataBuffers[i]);
        free(domainBuffers[i]);
    }
}

void readDataDifferentRates(const ListPtr<ISignal>& signals)
{
    // Create reader that converts values to `double` and time data to `int64`
    auto multiReaderBuilder = MultiReaderBuilder().setValueReadType(SampleType::Float64).setDomainReadType(SampleType::Int64);
    for (const auto& signal : signals)
        multiReaderBuilder.addSignal(signal);
    auto multiReader = multiReaderBuilder.build();

    // Allocate buffers for each signal
    auto signalsCount = signals.getCount();
    auto kBufferSize = SizeT{0};
    auto domainBuffers = std::vector<void*>(signalsCount, nullptr);
    auto dataBuffers = std::vector<void*>(signalsCount, nullptr);
    std::vector<size_t> dividers;

    // read data every 50ms, up to a maximum of kBufferSize samples
    for (size_t readCount = 0; readCount < 20; readCount++)
    {
        auto dataAvailable = multiReader.getAvailableCount();
        auto count = std::min(kBufferSize, dataAvailable);
        auto status = multiReader.readWithDomain(dataBuffers.data(), domainBuffers.data(), &count);

        if (status.getReadStatus() == ReadStatus::Event)
        {
            auto packets = status.getEventPackets();
            if (!(packets.getValueList()[0].getEventId() == event_packet_id::DATA_DESCRIPTOR_CHANGED))
                continue;

            // SRDiv calculation
            size_t commonSampleRate = multiReader.getCommonSampleRate();
            dividers.clear();
            std::cout << "Dividers: ";
            for (const auto& [_, eventPacket] : status.getEventPackets())
            {
                auto descriptor = eventPacket.getParameters().get(event_packet_param::DOMAIN_DATA_DESCRIPTOR);
                auto sampleRate = reader::getSampleRate(descriptor);
                dividers.push_back(commonSampleRate / sampleRate);
                std::cout << dividers.back() << ", ";
            }
            std::cout << "\n";

            // Allocate buffers for 100ms according to commonSampleRate
            size_t lcm = 1;
            for (const auto& div : dividers)
                lcm = std::lcm<std::size_t>(lcm, div);

            // Calculate k as the minimum number of LCM-size blocks to read ~100ms of data
            size_t k = std::max(commonSampleRate / lcm / 10, static_cast<size_t>(1));
            kBufferSize = k * lcm;

            std::cout << "Buffer sizes: ";
            for (size_t i = 0; i < signalsCount; ++i)
            {
                dataBuffers[i] = std::calloc(kBufferSize / dividers[i], getSampleSize(SampleType::Float64));
                domainBuffers[i] = std::calloc(kBufferSize / dividers[i], getSampleSize(SampleType::Int64));
                std::cout << kBufferSize / dividers[i] << ", ";
            }
            std::cout << "\n";
        }
        else if (status.getReadStatus() == ReadStatus::Ok && count > 0)
        {
            std::cout << "Data: ";
            for (const auto& buf : dataBuffers)
                std::cout << std::to_string(static_cast<double*>(buf)[0]) << "; ";
            std::cout << "\n";
        }

        std::this_thread::sleep_for(50ms);
    }

    for (size_t i = 0; i < signalsCount; ++i)
    {
        free(dataBuffers[i]);
        free(domainBuffers[i]);
    }
}