mirror of
https://github.com/dbalsom/x86_microcode.git
synced 2026-06-16 13:07:07 +03:00
839 lines
25 KiB
C++
839 lines
25 KiB
C++
// Classes for Filter graphs for manipulating streams of samples.
|
|
//
|
|
// A Filter is an object which can produce data, consume data or both (or, in
|
|
// the trivial case, neither).
|
|
//
|
|
// A Filter that produces data but does not consume is called a Source.
|
|
// A Filter that consumes data but does not produce is called a Sink.
|
|
// A Filter that both produces and consumes is called a Pipe.
|
|
//
|
|
// Filters communicate through Source and Sink objects. A combination of a
|
|
// Source and a Sink forms a circular buffer internally. These buffers always
|
|
// grow (never shrink) and are always a power of two number of samples.
|
|
// Therefore it is important to limit the amount of data a Filter produces
|
|
// before that data is consumed.
|
|
//
|
|
// Rather than accessing the buffer directly, a Filter obtains an Accessor
|
|
// object from the Source or Sink which encapsulates reading and writing
|
|
// operations. Calls to Accessor methods (especially the "one sample per call"
|
|
// methods) are expected to be inlined for maximum filter performance.
|
|
//
|
|
// Filters can work in two modes, "push mode" (Source driven) or "pull mode"
|
|
// (Sink driven). Not all filters in a filter graph are required to operate in
|
|
// the same mode, and any given filter may operate in both push and pull modes
|
|
// as necessary.
|
|
//
|
|
// In "push mode", a Filter produces data via one or more Source objects. When
|
|
// a Source's buffer gets too full, the connected Sink is notified to consume
|
|
// some data. These Filters that these Sinks are connected to may in turn
|
|
// produce more data and so on.
|
|
//
|
|
// In "pull mode" a Filter consumes data via one or more Sink objects. When a
|
|
// Sink's buffer is too empty, the connected Source is notified to produce some
|
|
// data. That Filter may in turn consume more data and so on.
|
|
//
|
|
// Filters are responsible for deciding how much data they will produce at once
|
|
// and how much data they will consume at once (which is also how much data the
|
|
// attached buffer will hold before it is considered "full"), though a hint is
|
|
// passed in to produce() (the number of samples required) and to consume()
|
|
// (the number of samples available). For most purposes this should generally
|
|
// be on the order of a kilobyte, depending on the number of Filters in the
|
|
// graph.
|
|
//
|
|
// Because Filters can be connected and disconnected at run time, each Filter
|
|
// doesn't know at compile time what it is connected to. In an ideal world it
|
|
// would be possible to instantiate the compiler back-end at runtime when
|
|
// connections are made to generate ideal code for a given Filter graph.
|
|
// However, that technology is not currently practical.
|
|
//
|
|
// Hence, a virtual function call is required for each produce() or consume()
|
|
// operation. A virtual function call cannot be inlined and interferes with a
|
|
// CPU's prediction heuristics, so is relatively expensive - perhaps on the
|
|
// order of 40 cycles. For data sampled at tens of MHz passing through tens of
|
|
// filters, this quickly becomes the limiting factor if a Filter only
|
|
// processes one sample per call. Hence Filters process multiple samples per
|
|
// call.
|
|
//
|
|
// An application will perform best if all the data required by its inner loop
|
|
// resides in L1 cache, which is generally 32Kb, so the average buffer size
|
|
// multiplied by the number of Connections should be less than this.
|
|
//
|
|
// A Pipe with a single Source and a single Sink can be implemented using the
|
|
// Pipe helper class.
|
|
//
|
|
// Sources can specify how many samples are remaining by calling "remaining".
|
|
// Sinks can obtain this figure (suitably adjusted for the latency caused by
|
|
// intermediate pipes) by calling "remaining".
|
|
//
|
|
// See also: http://dancinghacker.com/code/dataflow/ which is similar.
|
|
|
|
#include "alfe/main.h"
|
|
|
|
#ifndef INCLUDED_PIPES_H
|
|
#define INCLUDED_PIPES_H
|
|
|
|
//#include "alfe/thread.h"
|
|
|
|
// Infrastructure
|
|
|
|
// Number of samples a Sink will accumulate by default before consume() is
|
|
// called.
|
|
static const int defaultSampleCount = 1024;
|
|
|
|
// Base class for all filters. Filters are not copyable because connected
|
|
// filters will have pointers to them or their members.
|
|
class Filter : Uncopyable { };
|
|
|
|
|
|
// Describes a circular buffer that can be read from or written to.
|
|
template<class T> class Accessor
|
|
{
|
|
public:
|
|
Accessor() { }
|
|
Accessor(T* buffer, int position, int mask)
|
|
: _buffer(buffer),
|
|
_position(position),
|
|
_mask(mask)
|
|
{ }
|
|
void advance(int count) { _position = offset(count); }
|
|
T& item(int position) { return _buffer[offset(position)]; }
|
|
T& item()
|
|
{
|
|
T& sample = _buffer[_position];
|
|
advance(1);
|
|
return sample;
|
|
}
|
|
template<class F> void items(F& f, int count)
|
|
{
|
|
int n = min(count, _mask + 1 - _position);
|
|
f(_buffer + _position, n);
|
|
f(_buffer, count - n);
|
|
advance(count);
|
|
}
|
|
template<class F> void items(F& f, int position, int count)
|
|
{
|
|
int start = offset(position);
|
|
int n = min(count, _mask + 1 - (_position + position));
|
|
f(_buffer + start, n);
|
|
f(_buffer, count - n);
|
|
}
|
|
private:
|
|
int offset(int delta) { return (delta + _position)&_mask; }
|
|
|
|
T* _buffer;
|
|
int _position;
|
|
int _mask;
|
|
};
|
|
|
|
|
|
// Functor to memcpy from an Accessor.
|
|
template<class T> class CopyTo
|
|
{
|
|
public:
|
|
CopyTo(T* destination) : _destination(destination) { }
|
|
void operator()(T* source, int n)
|
|
{
|
|
memcpy(_destination, source, n*sizeof(T));
|
|
_destination += n;
|
|
}
|
|
private:
|
|
T* _destination;
|
|
};
|
|
|
|
|
|
// Functor to memcpy to an Accessor.
|
|
template<class T> class CopyFrom
|
|
{
|
|
public:
|
|
CopyFrom(T* source) : _source(source) { }
|
|
void operator()(T* destination, int n)
|
|
{
|
|
memcpy(destination, _source, n*sizeof(T));
|
|
_source += n;
|
|
}
|
|
private:
|
|
T* _source;
|
|
};
|
|
|
|
|
|
// Specialization to copy from one Accessor to another.
|
|
template<class T> class CopyTo<Accessor<T>>
|
|
{
|
|
public:
|
|
CopyTo(Accessor<T> destination) : _destination(destination) { }
|
|
void operator()(T* source, int n)
|
|
{
|
|
_destination.items(CopyFrom<T>(source), n);
|
|
}
|
|
private:
|
|
Accessor<T> _destination;
|
|
};
|
|
|
|
|
|
// Specialization to copy from a one Accessor to another.
|
|
template<class T> class CopyFrom<Accessor<T>>
|
|
{
|
|
public:
|
|
CopyFrom(Accessor<T> source) : _source(source) { }
|
|
void operator()(T* destination, int n)
|
|
{
|
|
_source.items(CopyTo<T>(destination), n);
|
|
}
|
|
private:
|
|
Accessor<T> _source;
|
|
};
|
|
|
|
|
|
// Functor to zero samples in an Accessor using memset.
|
|
template<class T> class Zero
|
|
{
|
|
public:
|
|
void operator()(T* destination, int n)
|
|
{
|
|
memset(destination, 0, n*sizeof(T));
|
|
}
|
|
};
|
|
|
|
|
|
// Functor to read samples from a Stream.
|
|
template<class T> class ReadFrom
|
|
{
|
|
public:
|
|
ReadFrom(Stream stream) : _stream(stream) { }
|
|
void operator()(T* destination, int n)
|
|
{
|
|
_stream.read(destination, n*sizeof(T));
|
|
}
|
|
private:
|
|
Stream _stream;
|
|
};
|
|
|
|
|
|
// Functor to write samples to a Stream.
|
|
template<class T> class WriteTo
|
|
{
|
|
public:
|
|
WriteTo(Stream* stream) : _stream(stream) { }
|
|
void operator()(T* source, int n) { _stream->write(source, n*sizeof(T)); }
|
|
private:
|
|
Stream* _stream;
|
|
};
|
|
|
|
|
|
// Base class for filter endpoints. An EndPoint is either a Source or a Sink.
|
|
template<class T> class EndPoint : public Filter
|
|
{
|
|
protected:
|
|
Accessor<T> accessor() { return Accessor<T>(_buffer, _position, _mask); }
|
|
int offset(int delta) { return (delta + _position)&_mask; }
|
|
|
|
T* _buffer;
|
|
int _mask;
|
|
int _position;
|
|
int _count;
|
|
int _size;
|
|
int _remaining;
|
|
};
|
|
|
|
|
|
template<class T> class Source;
|
|
|
|
// A Sink is the means by which a Filter consumes data. Filters with a single
|
|
// Sink and no Source can just inherit from Sink, others should include it as
|
|
// a member. The argument to the constructor should be increased if
|
|
// defaultSampleCount is too few samples for consume() to do anything with,
|
|
// or decreased if it's so many that latency will be too high.
|
|
template<class T> class Sink : public EndPoint<T>
|
|
{
|
|
public:
|
|
Sink(int n = defaultSampleCount) : _n(n), _finite(false), _source(0) { }
|
|
void connect(Source<T>* source)
|
|
{
|
|
if (_source == source)
|
|
return;
|
|
_source = source;
|
|
this->_buffer = source->_buffer;
|
|
this->_count = source->_count;
|
|
this->_size = source->_size;
|
|
this->_mask = source->_mask;
|
|
this->_position =
|
|
(source->_position + this->_size - this->_count) & this->_mask;
|
|
source->connect(this);
|
|
}
|
|
bool connected() const { return _source != 0; }
|
|
virtual void consume(int n) = 0;
|
|
Accessor<T> reader(int n)
|
|
{
|
|
_source->ensureData(n);
|
|
return this->accessor();
|
|
}
|
|
void read(int n)
|
|
{
|
|
this->_count -= n;
|
|
this->_position = this->offset(n);
|
|
_source->_count -= n;
|
|
_remaining -= n;
|
|
}
|
|
void consume()
|
|
{
|
|
while (this->_count >= _n)
|
|
consume(this->_count);
|
|
}
|
|
int remaining()
|
|
{
|
|
if (!_finite)
|
|
return 0x40000000;
|
|
else
|
|
return _remaining;
|
|
}
|
|
bool finite() { return _finite; }
|
|
protected:
|
|
int _n;
|
|
private:
|
|
void remaining(int n) { _remaining = n; _finite = true; }
|
|
Source<T>* _source;
|
|
bool _finite;
|
|
int _remaining;
|
|
|
|
friend class Source<T>;
|
|
};
|
|
|
|
|
|
// A Source is the means by which a Filter produces data. Filters with a
|
|
// single Source and no Sink can just inherit from Source, others should
|
|
// include it as a member.
|
|
template<class T> class Source : public EndPoint<T>
|
|
{
|
|
public:
|
|
Source() : _pulling(false), _sink(0)
|
|
{
|
|
this->_position = 0;
|
|
this->_size = 1;
|
|
this->_count = 0;
|
|
this->_mask = 0;
|
|
this->_buffer = new T[1];
|
|
}
|
|
~Source() { delete[] this->_buffer; }
|
|
void connect(Sink<T>* sink)
|
|
{
|
|
if (_sink == sink)
|
|
return;
|
|
_sink = sink;
|
|
sink->connect(this);
|
|
}
|
|
bool connected() const { return _sink != 0; }
|
|
virtual void produce(int n) = 0;
|
|
Accessor<T> writer(int n)
|
|
{
|
|
// Make sure we have enough space for an additional n items
|
|
int newN = n + this->_count;
|
|
if (this->_size < newN) {
|
|
// Double the size of the buffer until it's big enough.
|
|
int newSize = this->_size;
|
|
while (newSize < newN)
|
|
newSize <<= 1;
|
|
// Since buffers never shrink, this doesn't need to be particularly
|
|
// fast. Just copy all the data to the start of the new buffer.
|
|
T* newBuffer = new T[newSize];
|
|
int start = this->offset(-this->_count);
|
|
int n1 = min(this->_count, this->_size - start);
|
|
memcpy(newBuffer, this->_buffer + start, n1*sizeof(T));
|
|
memcpy(newBuffer + n1, this->_buffer,
|
|
(this->_count - n1)*sizeof(T));
|
|
delete[] this->_buffer;
|
|
this->_position = this->_count;
|
|
this->_size = newSize;
|
|
this->_buffer = newBuffer;
|
|
this->_mask = this->_size - 1;
|
|
_sink->_buffer = this->_buffer;
|
|
_sink->_size = this->_size;
|
|
_sink->_mask = this->_mask;
|
|
_sink->_position = 0;
|
|
}
|
|
return this->accessor();
|
|
}
|
|
void written(int n)
|
|
{
|
|
this->_position = this->offset(n);
|
|
this->_count += n;
|
|
_sink->_count += n;
|
|
// If we're pulling, the consumer will process anyway so we don't
|
|
// do it here.
|
|
if (!_pulling)
|
|
_sink->consume();
|
|
}
|
|
void ensureData(int n)
|
|
{
|
|
_pulling = true;
|
|
while (this->_count < n)
|
|
produce(n - this->_count);
|
|
_pulling = false;
|
|
}
|
|
void remaining(int n) { _sink->remaining(n + this->_count); }
|
|
private:
|
|
Sink<T>* _sink;
|
|
bool _pulling;
|
|
|
|
friend class Sink<T>;
|
|
};
|
|
|
|
|
|
// Base classes to make filter implementation easier.
|
|
|
|
// Base class that can be used by a filter with a single Source and a single
|
|
// Sink.
|
|
template<class ConsumedT, class ProducedT, class P> class Pipe
|
|
: public Filter
|
|
{
|
|
public:
|
|
Pipe(P* p, int n = defaultSampleCount)
|
|
: _source(p), _sink(p, n)
|
|
{ }
|
|
Source<ProducedT>* source() { return &_source; }
|
|
Sink<ConsumedT>* sink() { return &_sink; }
|
|
virtual void produce(int n) { consume(n); }
|
|
virtual void consume(int n) { produce(n); }
|
|
protected:
|
|
class PipeSource : public Source<ProducedT>
|
|
{
|
|
public:
|
|
PipeSource(P* p) : _p(p) { }
|
|
void produce(int n) { _p->produce(n); }
|
|
private:
|
|
P* _p;
|
|
};
|
|
class PipeSink : public Sink<ConsumedT>
|
|
{
|
|
public:
|
|
PipeSink(P* p, int n) : Sink<ConsumedT>(n), _p(p) { }
|
|
void consume(int n) { _p->consume(n); }
|
|
private:
|
|
P* _p;
|
|
};
|
|
PipeSource _source;
|
|
PipeSink _sink;
|
|
};
|
|
|
|
|
|
// Filter implementations
|
|
|
|
// A Sink that does nothing with its data. Useful when you have a Filter with
|
|
// multiple Sources but don't need the data from all of them. Push only.
|
|
template<class T> class BitBucketSink : public Sink<T>
|
|
{
|
|
public:
|
|
void consume(int n) { this->reader(n); this->read(n); }
|
|
};
|
|
|
|
|
|
// A Source that always produces the same sample. Pull only. Infinite.
|
|
template<class T> class ConstantSource : public Source<T>
|
|
{
|
|
public:
|
|
ConstantSource(T sample) : _sample(sample) { }
|
|
void produce(int n)
|
|
{
|
|
Accessor<T> w = this->writer(n);
|
|
for (int i = 0; i < n; ++i)
|
|
w.write(_sample);
|
|
this->written(n);
|
|
}
|
|
private:
|
|
T _sample;
|
|
};
|
|
|
|
|
|
// Data for a PeriodicSource filter. This is separate from PeriodicSource so
|
|
// that several producers can use the same data.
|
|
// TODO: refactor File constructor with File::contents().
|
|
template<class T> class PeriodicSourceData
|
|
{
|
|
public:
|
|
PeriodicSourceData(int size) : _buffer(size) { }
|
|
PeriodicSourceData(File file)
|
|
{
|
|
FileStream stream = file.openRead();
|
|
UInt64 size = stream.size();
|
|
if (size >= 0x80000000)
|
|
throw Exception("2Gb or more in file " + file.path());
|
|
_buffer.resize(size / sizeof(T));
|
|
stream.read(&_buffer[0], size);
|
|
}
|
|
|
|
// Function for accessing the underlying buffer directly (to fill it, and
|
|
// for use by PeriodicSource).
|
|
T* buffer() { return &_buffer[0]; }
|
|
|
|
// For use by PeriodicSource.
|
|
int length() const { return _buffer.count(); }
|
|
private:
|
|
Array<T> _buffer;
|
|
};
|
|
|
|
|
|
// A Source that produces the same information repeatedly. Pull only. Never
|
|
// finishes.
|
|
template<class T> class PeriodicSource : public Source<T>
|
|
{
|
|
public:
|
|
PeriodicSource(PeriodicSourceData<T>* data, int offset)
|
|
: _data(data),
|
|
_offset(offset)
|
|
{ }
|
|
void produce(int n)
|
|
{
|
|
int length = _data->length();
|
|
T* buffer = _data->buffer();
|
|
if (n < length - _offset) {
|
|
this->writer(n).items(CopyFrom<T>(buffer + _offset), n);
|
|
_offset += n;
|
|
}
|
|
else {
|
|
n = length - _offset;
|
|
this->writer(n).items(CopyFrom<T>(buffer + _offset), n);
|
|
_offset = 0;
|
|
}
|
|
this->written(n);
|
|
}
|
|
private:
|
|
PeriodicSourceData<T>* _data;
|
|
int _offset;
|
|
};
|
|
|
|
|
|
// A Source that takes data from a file. Pull only. Finishes.
|
|
template<class T> class FileSource : public Source<T>
|
|
{
|
|
public:
|
|
FileSource(File file) : _stream(file.openRead())
|
|
{
|
|
_size = _stream.size() / sizeof(T);
|
|
}
|
|
void produce(int n)
|
|
{
|
|
int nRead = n;
|
|
int nRemaining = n;
|
|
if (nRead > _size)
|
|
nRead = static_cast<int>(_size);
|
|
Accessor<T> w = this->writer(n);
|
|
if (nRead > 0) {
|
|
w.items(ReadFrom<T>(_stream), nRead);
|
|
nRemaining -= nRead;
|
|
}
|
|
if (nRemaining > 0)
|
|
w.items(Zero<T>(), nRemaining);
|
|
_size -= n;
|
|
this->written(n);
|
|
if (_size < 0x40000000)
|
|
this->remaining(static_cast<int>(_size));
|
|
}
|
|
private:
|
|
FileStream _stream;
|
|
SInt64 _size;
|
|
};
|
|
|
|
|
|
// A pipe that just takes samples from its Sink and sends them directly to its
|
|
// Source. This is useful as a pump: external code can call the produce()
|
|
// method directly to cause data to be pulled via the Sink and then pushed via
|
|
// the Source.
|
|
template<class T> class NopPipe : public Pipe<T, T, NopPipe<T>>
|
|
{
|
|
public:
|
|
void produce(int n)
|
|
{
|
|
this->_source.writer(n).items(
|
|
CopyFrom<Accessor<T> >(this->_sink.reader(n)), n);
|
|
this->_sink.read(n);
|
|
this->_source.written(n);
|
|
if (this->_sink.finite())
|
|
this->_source.remaining(this->_sink.remaining());
|
|
}
|
|
};
|
|
|
|
|
|
// A filter that converts from one type to another (with static_cast<>).
|
|
template<class ProducedT, class ConsumedT> class CastPipe
|
|
: public Pipe<ProducedT, ConsumedT, CastPipe<ProducedT, ConsumedT>>
|
|
{
|
|
public:
|
|
void produce(int n)
|
|
{
|
|
Accessor<ConsumedT> reader = this->_sink.reader(n);
|
|
Accessor<ProducedT> writer = this->_source.writer(n);
|
|
for (int i = 0; i < n; ++i)
|
|
writer.item() = static_cast<ProducedT>(reader.item());
|
|
this->_sink.read(n);
|
|
this->_source.written(n);
|
|
if (this->_sink.finite())
|
|
this->_source.remaining(this->_sink.remaining());
|
|
}
|
|
};
|
|
|
|
|
|
// A filter with one sink and two sources, each of which yields identical data
|
|
// to the Sink. A pull from one Source may result in a push from the other.
|
|
template<class T> class Tee
|
|
{
|
|
public:
|
|
Tee(int n = defaultSampleCount)
|
|
: _source1(this), _source2(this), _sink(this, n) { }
|
|
Sink<T>* sink() { return &_sink; }
|
|
Source<T>* source1() { return &_source1; }
|
|
Source<T>* source2() { return &_source2; }
|
|
void process(int n)
|
|
{
|
|
Accessor<T> r = _sink.reader(n);
|
|
_source1.writer(n).items(CopyFrom<Accessor<T> >(r), n);
|
|
_source1.written(n);
|
|
_source2.writer(n).items(CopyFrom<Accessor<T> >(r), n);
|
|
_source2.written(n);
|
|
_sink.read(n);
|
|
if (_sink.finite()) {
|
|
_source1.remaining(_sink.remaining());
|
|
_source2.remaining(_sink.remaining());
|
|
}
|
|
}
|
|
protected:
|
|
class TeeSource : public Source<T>
|
|
{
|
|
public:
|
|
TeeSource(Tee* t) : _t(t) { }
|
|
void produce(int n) { _t->process(n); }
|
|
private:
|
|
Tee* _t;
|
|
};
|
|
class TeeSink : public Sink<T>
|
|
{
|
|
public:
|
|
TeeSink(Tee* t, int n) : Sink<T>(n), _t(t) { }
|
|
void consume(int n) { _t->process(n); }
|
|
void remaining(int n) { _t->remaining(n); }
|
|
private:
|
|
Tee* _t;
|
|
};
|
|
TeeSink _sink;
|
|
TeeSource _source1;
|
|
TeeSource _source2;
|
|
};
|
|
|
|
|
|
// A pipe that interpolates using the nearest-neighbor algorithm.
|
|
template<class T, class Rate = int> class NearestNeighborInterpolator
|
|
: public Pipe<T, T, NearestNeighborInterpolator<T, Rate>>
|
|
{
|
|
public:
|
|
// For every "consumerRate" samples consumed we will produce "producerRate"
|
|
// samples.
|
|
NearestNeighborInterpolator(Rate producerRate, Rate consumerRate,
|
|
Rate offset = 0, int n = defaultSampleCount)
|
|
: Pipe<T, T, NearestNeighborInterpolator<T, Rate>>(this, n),
|
|
_producerRate(producerRate),
|
|
_consumerRate(consumerRate),
|
|
_offset(offset)
|
|
{ }
|
|
void produce(int n)
|
|
{
|
|
// TODO: We can probably speed this up somewhat by copying blocks
|
|
Accessor<T> reader = this->_sink.reader(n);
|
|
Accessor<T> writer = this->_source.writer(toProduce(n) + 1);
|
|
int written = 0;
|
|
for (int i = 0; i < n; ++i) {
|
|
T sample = reader.item();
|
|
while (_offset >= 0) {
|
|
writer.item() = sample;
|
|
++written;
|
|
_offset -= _producerRate;
|
|
}
|
|
_offset += _consumerRate;
|
|
}
|
|
this->_sink.read(n);
|
|
this->_source.written(written);
|
|
// TODO: Correct for _offset so that remaining goes down smoothly
|
|
if (this->_sink.finite())
|
|
this->_source.remaining(toProduce(this->_sink.remaining()));
|
|
}
|
|
private:
|
|
int toProduce(int consume)
|
|
{
|
|
return static_cast<int>(
|
|
(static_cast<Rate>(consume)*_producerRate)/_consumerRate);
|
|
}
|
|
Rate _producerRate;
|
|
Rate _consumerRate;
|
|
Rate _offset;
|
|
};
|
|
|
|
|
|
// A pipe that interpolates using the linear interpolation. TODO: modify this
|
|
// so it downsamples as well.
|
|
template<class T, class Rate = int> class LinearInterpolator
|
|
: public Pipe<T, T, LinearInterpolator<T, Rate>>
|
|
{
|
|
public:
|
|
// For every "consumerRate" samples consumed we will produce "producerRate"
|
|
// samples.
|
|
LinearInterpolator(Rate producerRate, Rate consumerRate, Rate offset = 0,
|
|
T previous = 0, int n = defaultSampleCount)
|
|
: Pipe<T, T, LinearInterpolator<T, Rate>>(this, n),
|
|
_producerRate(producerRate),
|
|
_consumerRate(consumerRate),
|
|
_offset(offset),
|
|
_previous(0)
|
|
{ }
|
|
void produce(int n)
|
|
{
|
|
Accessor<T> reader = this->_sink.reader(n);
|
|
Accessor<T> writer = this->_source.writer(toProduce(n) + 1);
|
|
int written = 0;
|
|
for (int i = 0; i < n; ++i) {
|
|
T sample = reader.item();
|
|
while (_offset >= 0) {
|
|
writer.item() = sample + static_cast<T>(
|
|
(static_cast<Rate>(_previous - sample)*_offset)/
|
|
_consumerRate);
|
|
++written;
|
|
_offset -= _producerRate;
|
|
}
|
|
_offset += _consumerRate;
|
|
_previous = sample;
|
|
}
|
|
this->_sink.read(n);
|
|
this->_source.written(written);
|
|
// TODO: Correct for _offset so that remaining goes down smoothly
|
|
if (this->_sink.finite())
|
|
this->_source.remaining(toProduce(this->_sink.remaining()));
|
|
}
|
|
private:
|
|
int toProduce(int consume)
|
|
{
|
|
return static_cast<int>(
|
|
(static_cast<Rate>(consume)*_producerRate)/_consumerRate);
|
|
}
|
|
Rate _producerRate;
|
|
Rate _consumerRate;
|
|
Rate _offset;
|
|
T _previous;
|
|
};
|
|
|
|
#if 0
|
|
// A pipe that neither pushes or pulls. If you try to push to it without
|
|
// pulling, it continues to accumulate data until it runs out of memory. If
|
|
// you try to pull from it without pushing, it blocks until data is pushed.
|
|
template<class T, class C = Tank<T>> class Tank : public Pipe<T, T, C>
|
|
{
|
|
public:
|
|
Tank()
|
|
: _buffer(new T[1]),
|
|
_size(1),
|
|
_count(0),
|
|
_mask(0),
|
|
_readPosition(0),
|
|
_writePosition(0)
|
|
{ }
|
|
void produce(int n)
|
|
{
|
|
while (_count < n)
|
|
_event.wait();
|
|
Lock lock(&_mutex);
|
|
_source.writer(n).items(CopyFrom<Accessor<T> >(reader()), n);
|
|
_readPosition = (_readPosition + n) & _mask;
|
|
_count -= n;
|
|
_source.written(n);
|
|
if (_sink.finite())
|
|
_source.remaining(_sink.remaining() + _count);
|
|
}
|
|
void consume(int n)
|
|
{
|
|
Lock lock(&_mutex);
|
|
// Make sure we have enough space for an additional n items
|
|
int newN = n + _count;
|
|
if (_size < newN) {
|
|
// Double the size of the buffer until it's big enough.
|
|
int newSize = _size;
|
|
while (newSize < newN)
|
|
newSize <<= 1;
|
|
T* newBuffer = new T[newSize];
|
|
int start = offset(-_count);
|
|
int n1 = min(_count, _size - start);
|
|
memcpy(newBuffer, _buffer + start, n1*sizeof(T));
|
|
memcpy(newBuffer + n1, _buffer, (_size - n1)*sizeof(T));
|
|
delete[] _buffer;
|
|
_writePosition = _count;
|
|
_readPosition = 0;
|
|
_size = newSize;
|
|
_buffer = newBuffer;
|
|
_mask = _size - 1;
|
|
}
|
|
writer().items(CopyFrom<Accessor<T> >(_sink.reader(n)), n);
|
|
_sink.read(n);
|
|
_writePosition = (_writePosition + n) & _mask;
|
|
_count += n;
|
|
_event.signal();
|
|
}
|
|
private:
|
|
Mutex _mutex;
|
|
Event _event;
|
|
|
|
Accessor<T> reader() { return Accessor<T>(_buffer, _readPosition, _mask); }
|
|
Accessor<T> writer() { return Accessor<T>(_buffer, _writePosition, _mask); }
|
|
|
|
T* _buffer;
|
|
int _size;
|
|
volatile int _count;
|
|
int _mask;
|
|
int _readPosition;
|
|
int _writePosition;
|
|
};
|
|
|
|
|
|
// A pipe that can be both pulled from and pushed to, and which adjusts the
|
|
// rate of a connected interpolator to match the pull rate to the push rate.
|
|
// The "timeConstant" parameter must be tuned. If it is too small, the
|
|
// resampling rate will fluctuate wildly as samples are pushed and pulled. If
|
|
// it is too large, it will take too long to adjust to changes in the push or
|
|
// pull rates, leading to high latencies or stalls waiting for the connected
|
|
// Source to push (PushPullPipe will never push or pull itself). "timeConstant"
|
|
// is measured in samples consumed.
|
|
template<class T, class Interpolator> class PushPullPipe
|
|
: public Tank<T, PushPullPipe<T, Interpolator>>
|
|
{
|
|
public:
|
|
PushPullPipe(int timeConstant, Interpolator* interpolator)
|
|
: _timeConstant(timeConstant),
|
|
_interpolator(interpolator)
|
|
{ }
|
|
void produce(int n)
|
|
{
|
|
_produced += n*_rate;
|
|
updateRate();
|
|
Tank::produce(n);
|
|
}
|
|
void consume(int n)
|
|
{
|
|
double c = exp(-n/_timeConstant);
|
|
_produced *= c;
|
|
_consumed *= c;
|
|
_consumed += n;
|
|
updateRate();
|
|
Tank::consume(n);
|
|
}
|
|
private:
|
|
void updateRate()
|
|
{
|
|
// TODO: Adjust slightly so that we speed up if we have a large number
|
|
// of samples
|
|
_rate = _produced/_consumed;
|
|
_interpolator->setRate(_rate);
|
|
}
|
|
|
|
int _timeConstant;
|
|
Interpolator* _interpolator;
|
|
double _produced;
|
|
double _consumed;
|
|
double _rate;
|
|
};
|
|
#endif
|
|
|
|
#endif // INCLUDED_PIPES_H
|