diff --git a/examples/examples-communication/a2dp/basic-player-a2dp/basic-player-a2dp.ino b/examples/examples-communication/a2dp/basic-player-a2dp/basic-player-a2dp.ino index 99ee8f181..e67b186e7 100644 --- a/examples/examples-communication/a2dp/basic-player-a2dp/basic-player-a2dp.ino +++ b/examples/examples-communication/a2dp/basic-player-a2dp/basic-player-a2dp.ino @@ -23,7 +23,7 @@ const char* ext="mp3"; AudioSourceSDFAT source(startFilePath, ext, PIN_AUDIO_KIT_SD_CARD_CS); MP3DecoderHelix decoder; //Setup of synchronized buffer -SynchronizedNBuffer buffer(buffer_size,buffer_count, portMAX_DELAY, 10); +SynchronizedNBuffer buffer(buffer_size,buffer_count, portMAX_DELAY, 10); QueueStream out(buffer); // convert Buffer to Stream AudioPlayer player(source, out, decoder); BluetoothA2DPSource a2dp; diff --git a/examples/examples-communication/http-client/streams-url_mp3-pwm/streams-url_mp3-pwm.ino b/examples/examples-communication/http-client/streams-url_mp3-pwm/streams-url_mp3-pwm.ino new file mode 100644 index 000000000..15b96d25b --- /dev/null +++ b/examples/examples-communication/http-client/streams-url_mp3-pwm/streams-url_mp3-pwm.ino @@ -0,0 +1,49 @@ +/** + * @file streams-url_mp3-out.ino + * @author Phil Schatzmann + * @brief decode MP3 stream from url and output it with the help of PWM + * @version 0.1 + * @date 2021-96-25 + * + * @copyright Copyright (c) 2021 + */ + +// install https://github.com/pschatzmann/arduino-libhelix.git + +#include "AudioTools.h" +#include "AudioTools/AudioCodecs/CodecMP3Helix.h" + + +URLStream url("ssid","password"); +PWMAudioOutput out; // final output of decoded stream +EncodedAudioStream dec(&out, new MP3DecoderHelix()); // Decoding stream +StreamCopy copier(dec, url); // copy url to decoder + + +void setup(){ + Serial.begin(115200); + AudioToolsLogger.begin(Serial, AudioToolsLogLevel::Info); + + // setup out + auto config = out.defaultConfig(TX_MODE); + //config.resolution = 8; // must be between 8 and 11 -> drives pwm frequency (8 is default) + // alternative 1 + //config.start_pin = 3; + // alternative 2 + int pins[] = {22, 23}; + // alternative 3 + //Pins pins = {3}; + //config.setPins(pins); + out.begin(config); + + // setup I2S based on sampling rate provided by decoder + dec.begin(); + +// mp3 radio + url.begin("http://stream.srg-ssr.ch/m/rsj/mp3_128","audio/mp3"); + +} + +void loop(){ + copier.copy(); +} diff --git a/examples/tests/basic/test-queue/test-queue.ino b/examples/tests/basic/test-queue/test-queue.ino index 0c8b84032..0726b6f84 100644 --- a/examples/tests/basic/test-queue/test-queue.ino +++ b/examples/tests/basic/test-queue/test-queue.ino @@ -1,6 +1,6 @@ #include "AudioTools.h" #include "AudioTools/Concurrency/QueueLockFree.h" -#include "AudioTools/Concurrency/QueueRTOS.h" +#include "AudioTools/Concurrency/RTOS/QueueRTOS.h" // test different queue implementations: diff --git a/examples/tests/concurrency/BufferRTOS/BufferRTOS.ino b/examples/tests/concurrency/BufferRTOS/BufferRTOS.ino index 2241891b9..25e9b5d4a 100644 --- a/examples/tests/concurrency/BufferRTOS/BufferRTOS.ino +++ b/examples/tests/concurrency/BufferRTOS/BufferRTOS.ino @@ -9,7 +9,7 @@ * */ #include "AudioTools.h" -#include "AudioTools/AudioLibs/Concurrency.h" +#include "AudioTools/Concurrency/RTOS.h" BufferRTOS buffer(512 * 10); diff --git a/examples/tests/concurrency/NBuffer/NBuffer.ino b/examples/tests/concurrency/NBuffer/NBuffer.ino index 7c9a8a759..a5ab24e70 100644 --- a/examples/tests/concurrency/NBuffer/NBuffer.ino +++ b/examples/tests/concurrency/NBuffer/NBuffer.ino @@ -9,7 +9,6 @@ * */ #include "AudioTools.h" -#include "AudioTools/AudioLibs/Concurrency.h" NBuffer buffer(512, 4); diff --git a/examples/tests/concurrency/SynchronizedNBuffer/SynchronizedNBuffer.ino b/examples/tests/concurrency/SynchronizedNBuffer/SynchronizedNBuffer.ino deleted file mode 100644 index b0d406c6f..000000000 --- a/examples/tests/concurrency/SynchronizedNBuffer/SynchronizedNBuffer.ino +++ /dev/null @@ -1,67 +0,0 @@ -/** - * @file SynchonizedNBuffer.ino - * @author Phil Schatzmann - * @brief Multitask with SynchronizedBuffer using NBuffer - * @version 0.1 - * @date 2022-11-25 - * - * @copyright Copyright (c) 2022 - * - */ -#include "AudioTools.h" -#include "AudioTools/AudioLibs/Concurrency.h" - -SynchronizedNBuffer buffer(1024, 10); - -Task writeTask("write", 3000, 10, 0); - -Task readTask("read", 3000, 10, 1); - -void setup() { - Serial.begin(115200); - - writeTask.begin([]() { - int16_t data[512]; - for (int j = 0; j < 512; j++) { - data[j] = j; - } - assert(buffer.writeArray(data, 512)==512); - delay(1); - - }); - - readTask.begin([]() { - static uint64_t start = millis(); - static size_t total_bytes = 0; - static size_t errors = 0; - static int16_t data[512]; - - // read data - assert(buffer.readArray(data, 512)==512); - delay(1); - - // check data - for (int j = 0; j < 512; j++) { - if (data[j] != j) errors++; - } - // calculate bytes per second - total_bytes += sizeof(int16_t) * 512; - if (total_bytes >= 1024000) { - uint64_t duration = millis() - start; - float mbps = static_cast(total_bytes) / duration / 1000.0; - - // print result - Serial.print("Mbytes per second: "); - Serial.print(mbps); - Serial.print(" with "); - Serial.print(errors); - Serial.println(" errors"); - - start = millis(); - errors = 0; - total_bytes = 0; - } - }); -} - -void loop() { delay(1000); } \ No newline at end of file diff --git a/examples/tests/concurrency/audio-test/audio-test.ino b/examples/tests/concurrency/audio-test/audio-test.ino index 333b4867d..dd15f6c8e 100644 --- a/examples/tests/concurrency/audio-test/audio-test.ino +++ b/examples/tests/concurrency/audio-test/audio-test.ino @@ -1,7 +1,7 @@ #include "AudioTools.h" #include "AudioTools/AudioLibs/AudioBoardStream.h" -#include "AudioTools/AudioLibs/Concurrency.h" +#include "AudioTools/Concurrency/RTOS.h" AudioInfo info(44100, 2, 16); // source and sink @@ -9,7 +9,7 @@ SineWaveGenerator sineWave(32000); GeneratedSoundStream sound(sineWave); AudioBoardStream out(AudioKitEs8388V1); // queue -// SynchronizedNBuffer buffer(1024, 10); +// SynchronizedNBuffer buffer(1024, 10); BufferRTOS buffer(1024 * 10); QueueStream queue(buffer); // copy diff --git a/examples/tests/concurrency/synchNBuffer/synchNBuffer.ino b/examples/tests/concurrency/synchNBuffer/synchNBuffer.ino index ff5d4075c..da6044ec3 100644 --- a/examples/tests/concurrency/synchNBuffer/synchNBuffer.ino +++ b/examples/tests/concurrency/synchNBuffer/synchNBuffer.ino @@ -9,9 +9,9 @@ * */ #include "AudioTools.h" -#include "AudioTools/AudioLibs/Concurrency.h" +#include "AudioTools/Concurrency/RTOS.h" -audio_tools::Mutex mutex; +MutexRTOS mutex; NBuffer nbuffer(512, 10); SynchronizedBuffer buffer(nbuffer, mutex); diff --git a/examples/tests/concurrency/synchRingBuffer/synchRingBuffer.ino b/examples/tests/concurrency/synchRingBuffer/synchRingBuffer.ino index 334dbed54..440a5faec 100644 --- a/examples/tests/concurrency/synchRingBuffer/synchRingBuffer.ino +++ b/examples/tests/concurrency/synchRingBuffer/synchRingBuffer.ino @@ -9,9 +9,9 @@ * */ #include "AudioTools.h" -#include "AudioTools/AudioLibs/Concurrency.h" +#include "AudioTools/Concurrency/RTOS.h" -audio_tools::Mutex mutex; +MutexRTOS mutex; RingBuffer nbuffer(512 * 4); SynchronizedBuffer buffer(nbuffer, mutex); diff --git a/examples/tests/performance/mp3-SynchronizedNBuffer/mp3-SynchronizedNBuffer.ino b/examples/tests/performance/mp3-SynchronizedNBuffer/mp3-SynchronizedNBuffer.ino index 4aab9d579..3c07019ed 100644 --- a/examples/tests/performance/mp3-SynchronizedNBuffer/mp3-SynchronizedNBuffer.ino +++ b/examples/tests/performance/mp3-SynchronizedNBuffer/mp3-SynchronizedNBuffer.ino @@ -21,7 +21,7 @@ const char *startFilePath = "/"; const char *ext = "mp3"; AudioSourceSDFAT source(startFilePath, ext, PIN_AUDIO_KIT_SD_CARD_CS); MP3DecoderHelix decoder; -SynchronizedNBuffer buffer(buffer_size, buffer_count, portMAX_DELAY, +SynchronizedNBuffer buffer(buffer_size, buffer_count, portMAX_DELAY, 10); QueueStream out(buffer); // convert Buffer to Stream AudioPlayer player(source, out, decoder); diff --git a/examples/tests/rp2040/buffer2040/buffer2040.ino b/examples/tests/rp2040/buffer2040/buffer2040.ino new file mode 100644 index 000000000..3941d754b --- /dev/null +++ b/examples/tests/rp2040/buffer2040/buffer2040.ino @@ -0,0 +1,31 @@ + +#include "AudioTools.h" +#include "AudioTools/Concurrency/RP2040.h" + +AudioInfo info(44100, 2, 16); +SineWaveGenerator sineWave(32000); // subclass of SoundGenerator with max amplitude of 32000 +GeneratedSoundStream sound(sineWave); // Stream generated from sine wave +BufferRP2040 buffer(1024, 10); +QueueStream queue(buffer); +CsvOutput csv(Serial); +StreamCopy copierFill(queue, sound); // copies sound into i2s +StreamCopy copierConsume(csv, queue); // copies sound into i2s + + +void setup(){ + Serial.begin(115200); + while(!Serial); + AudioToolsLogger.begin(Serial, AudioToolsLogLevel::Info); + + queue.begin(); + sineWave.begin(info, N_B4); + csv.begin(info); +} + +void loop(){ + copierFill.copy(); +} + +void loop1(){ + copierConsume.copy(); +} \ No newline at end of file diff --git a/examples/tests/rp2040/mutex/mutex.ino b/examples/tests/rp2040/mutex/mutex.ino new file mode 100644 index 000000000..ff9f7c816 --- /dev/null +++ b/examples/tests/rp2040/mutex/mutex.ino @@ -0,0 +1,61 @@ +#include "AudioTools.h" +#include "AudioTools/Concurrency/RP2040.h" + +SpinLock mutex; +NBuffer nbuffer(512, 10); +SynchronizedBuffer buffer(nbuffer, mutex); + +void setup() { + Serial.begin(115200); + while (!Serial); + Serial.println("starting..."); +} + +void loop() { + int16_t data[512]; + for (int j = 0; j < 512; j++) { + data[j] = j; + } + size_t result = buffer.writeArray(data, 512); + if(result != 512){ + // queue is full: give the reading queue some chance to empty + delay(5); + } +} + +void loop1() { + static uint64_t start = millis(); + static size_t total_bytes = 0; + static size_t errors = 0; + static int16_t data[512]; + + // read data + size_t result = buffer.readArray(data, 512); + if(result == 0){ + // reading queue is empty: give some time to fill + delay(5); + return; + } + + // check data + for (int j = 0; j < 512; j++) { + if (data[j] != j) errors++; + } + // calculate bytes per second + total_bytes += sizeof(int16_t) * 512; + if (total_bytes >= 1024000) { + uint64_t duration = millis() - start; + float mbps = static_cast(total_bytes) / duration / 1000.0; + + // print result + Serial.print("Mbytes per second: "); + Serial.print(mbps); + Serial.print(" with "); + Serial.print(errors); + Serial.println(" errors"); + + start = millis(); + errors = 0; + total_bytes = 0; + } +} \ No newline at end of file diff --git a/examples/tests/rp2040/mutex2040/mutex2040.ino b/examples/tests/rp2040/mutex2040/mutex2040.ino new file mode 100644 index 000000000..38f7bab2b --- /dev/null +++ b/examples/tests/rp2040/mutex2040/mutex2040.ino @@ -0,0 +1,60 @@ +#include "AudioTools.h" +#include "AudioTools/Concurrency/RP2040.h" + +MutexRP2040 mutex; +NBuffer nbuffer(512, 10); +SynchronizedBuffer buffer(nbuffer, mutex); + +void setup() { + Serial.begin(115200); + while (!Serial); + Serial.println("starting..."); +} + +void loop() { + int16_t data[512]; + for (int j = 0; j < 512; j++) { + data[j] = j; + } + size_t result = buffer.writeArray(data, 512); + if(result != 512){ + Serial.print("write failed: "); + Serial.println(result); + } +} + +void loop1() { + static uint64_t start = millis(); + static size_t total_bytes = 0; + static size_t errors = 0; + static int16_t data[512]; + + // read data + size_t result = buffer.readArray(data, 512); + if(result != 512){ + Serial.print("read failed: "); + Serial.println(result); + } + + // check data + for (int j = 0; j < 512; j++) { + if (data[j] != j) errors++; + } + // calculate bytes per second + total_bytes += sizeof(int16_t) * 512; + if (total_bytes >= 1024000) { + uint64_t duration = millis() - start; + float mbps = static_cast(total_bytes) / duration / 1000.0; + + // print result + Serial.print("Mbytes per second: "); + Serial.print(mbps); + Serial.print(" with "); + Serial.print(errors); + Serial.println(" errors"); + + start = millis(); + errors = 0; + total_bytes = 0; + } +} diff --git a/src/AudioTools/AudioLibs/A2DPStream.h b/src/AudioTools/AudioLibs/A2DPStream.h index 22f3d2c27..18d328245 100644 --- a/src/AudioTools/AudioLibs/A2DPStream.h +++ b/src/AudioTools/AudioLibs/A2DPStream.h @@ -13,7 +13,7 @@ #include "BluetoothA2DPSink.h" #include "BluetoothA2DPSource.h" #include "AudioTools/CoreAudio/AudioStreams.h" -#include "AudioTools/Concurrency/BufferRTOS.h" +#include "AudioTools/Concurrency/RTOS/BufferRTOS.h" #include "AudioTools/CoreAudio/AudioBasic/StrView.h" diff --git a/src/AudioTools/AudioLibs/VBANStream.h b/src/AudioTools/AudioLibs/VBANStream.h index 88493d309..315d5fbfb 100644 --- a/src/AudioTools/AudioLibs/VBANStream.h +++ b/src/AudioTools/AudioLibs/VBANStream.h @@ -4,7 +4,7 @@ #include "AudioTools/AudioLibs/vban/vban.h" #include "AudioTools/CoreAudio/AudioStreams.h" -#include "AudioTools/Concurrency/BufferRTOS.h" +#include "AudioTools/Concurrency/RTOS/BufferRTOS.h" namespace audio_tools { diff --git a/src/AudioTools/Concurrency/Mutex.h b/src/AudioTools/Concurrency/Mutex.h index 0b9f30e0d..1b23b1341 100644 --- a/src/AudioTools/Concurrency/Mutex.h +++ b/src/AudioTools/Concurrency/Mutex.h @@ -20,9 +20,8 @@ class MutexBase { virtual void unlock() {} }; - class SpinLock : public MutexBase { - std::atomic lock_ = {0}; + volatile std::atomic lock_ = {0}; void lock() { for (;;) { diff --git a/src/AudioTools/Concurrency/RP2040.h b/src/AudioTools/Concurrency/RP2040.h new file mode 100644 index 000000000..016581ea1 --- /dev/null +++ b/src/AudioTools/Concurrency/RP2040.h @@ -0,0 +1,6 @@ +#pragma once +#include "AudioTools/Concurrency/Mutex.h" +#include "AudioTools/Concurrency/SynchronizedBuffer.h" +#include "AudioTools/Concurrency/SynchronizedQueue.h" +#include "AudioTools/Concurrency/RP2040/BufferRP2040.h" +#include "AudioTools/Concurrency/RP2040/MutexRP2040.h" diff --git a/src/AudioTools/Concurrency/RP2040/BufferRP2040.h b/src/AudioTools/Concurrency/RP2040/BufferRP2040.h new file mode 100644 index 000000000..48bb23ef1 --- /dev/null +++ b/src/AudioTools/Concurrency/RP2040/BufferRP2040.h @@ -0,0 +1,164 @@ +#pragma once + +#ifndef ARDUINO_ARCH_RP2040 +# error "Unsupported architecture" +#endif + +#include "AudioConfig.h" +#include "AudioTools/CoreAudio/AudioLogger.h" +#include "AudioTools/CoreAudio/Buffers.h" + +namespace audio_tools { + +// #if ESP_IDF_VERSION_MAJOR >= 4 + +/** + * @brief Buffer implementation which is based on a RP2040 queue. This + * class is intended to be used to exchange data between the 2 different + * cores. + * @ingroup buffers + * @ingroup concurrency + * @author Phil Schatzmann + * @copyright GPLv3 * + * @tparam T + */ +template +class BufferRP2040T : public BaseBuffer { + public: + BufferRP2040T(size_t bufferSize, int bufferCount = 2) : BaseBuffer() { + buffer_size = bufferSize * sizeof(T); + buffer_size_req_bytes = buffer_size * bufferCount; + } + + ~BufferRP2040T() { reset(); } + + /// Re-Allocats the memory and the queue + bool resize(size_t size) { + buffer_size_req_bytes = size; + if (buffer_size_total_bytes != buffer_size_req_bytes) { + LOGI("resize %d -> %d", buffer_size_total_bytes, buffer_size_req_bytes); + assert(buffer_size > 0); + write_buffer.resize(buffer_size); + read_buffer.resize(buffer_size * 2); + // release existing queue + if (buffer_size_total_bytes > 0) { + queue_free(&queue); + } + // create new queu + if (buffer_size_req_bytes > 0) { + int count = buffer_size_req_bytes / buffer_size; + LOGI("queue_init(size:%d, count:%d)", buffer_size, count); + queue_init(&queue, buffer_size, count); + } + buffer_size_total_bytes = buffer_size_req_bytes; + } + return true; + } + + // reads a single value + T read() { + T data = 0; + readArray(&data, 1); + return data; + } + + // peeks the actual entry from the buffer + T peek() { + LOGE("peek not implmented"); + return 0; + } + + // reads multiple values + int readArray(T data[], int len) override { + LOGD("readArray: %d", len); + // handle unalloc;ated queue + if (buffer_size_total_bytes == 0) return 0; + if (isEmpty() && read_buffer.isEmpty()) return 0; + // fill read buffer if necessary + while (read_buffer.availableForWrite() >= buffer_size) { + LOGD("reading %d %d ", buffer_size, read_buffer.availableForWrite()); + T tmp[buffer_size]; + if (queue_try_remove(&queue, tmp)){ + LOGD("queue_try_remove -> success"); + read_buffer.writeArray(tmp, buffer_size); + } else { + LOGD("queue_try_remove -> failed"); + break; + } + } + LOGD("read_buffer.available: %d, availableForWrite: %d ", read_buffer.available(), read_buffer.availableForWrite()); + int result = read_buffer.readArray(data, len); + LOGD("=> readArray: %d -> %d", len, result); + return result; + } + + int writeArray(const T data[], int len) override { + LOGD("writeArray: %d", len); + // make sure that we have the data allocated + resize(buffer_size_req_bytes); + + // blocking write: wait for available space + while (availableForWrite()<=len){ + delay(5); + }; + + // fill the write buffer and when it is full flush it to the queue + for (int j = 0; j < len; j++) { + write_buffer.write(data[j]); + if (write_buffer.isFull()) { + LOGD("queue_add_blocking"); + queue_add_blocking(&queue, write_buffer.data()); + write_buffer.reset(); + } + } + return len; + } + + // checks if the buffer is full + bool isFull() override { + if (buffer_size_total_bytes == 0) return false; + return queue_is_full(&queue); + } + + bool isEmpty() { + if (buffer_size_total_bytes == 0) return true; + return queue_is_empty(&queue); + } + + // write add an entry to the buffer + bool write(T data) override { return writeArray(&data, 1) == 1; } + + // clears the buffer + void reset() override { + queue_free(&queue); + buffer_size_total_bytes = 0; + } + + // provides the number of entries that are available to read + int available() override { + return buffer_size / sizeof(T); + } + + // provides the number of entries that are available to write + int availableForWrite() override { return size() - available(); } + + // returns the address of the start of the physical read buffer + T *address() override { + LOGE("address() not implemented"); + return nullptr; + } + + size_t size() { return buffer_size_req_bytes / sizeof(T); } + + protected: + queue_t queue; + int buffer_size_total_bytes = 0; + int buffer_size_req_bytes = 0; + int buffer_size = 0; + SingleBuffer write_buffer{0}; + audio_tools::RingBuffer read_buffer{0}; +}; + +using BufferRP2040 = BufferRP2040T; + +} // namespace audio_tools diff --git a/src/AudioTools/Concurrency/RP2040/MutexRP2040.h b/src/AudioTools/Concurrency/RP2040/MutexRP2040.h new file mode 100644 index 000000000..53627db61 --- /dev/null +++ b/src/AudioTools/Concurrency/RP2040/MutexRP2040.h @@ -0,0 +1,60 @@ +#pragma once +#include "AudioLogger.h" +#include "AudioTools/Concurrency/Mutex.h" + +namespace audio_tools { + +/** + * @brief Disable, enable interrupts (only on the actual core) + * @ingroup concurrency + * @author Phil Schatzmann + * @copyright GPLv3 * + */ +class NoInterruptHandler : public MutexBase { + public: + void lock() override { + TRACED(); + noInterrupts(); + } + void unlock() override { + TRACED(); + interrupts(); + } +}; + +/** + * @brief Mutex API for non IRQ mutual exclusion between cores. + * Mutexes are application level locks usually used protecting data structures + * that might be used by multiple threads of execution. Unlike critical + * sections, the mutex protected code is not necessarily required/expected + * to complete quickly, as no other sytem wide locks are held on account of + * an acquired mutex. + * @ingroup concurrency + * @author Phil Schatzmann + * @copyright GPLv3 + */ + +class MutexRP2040 : public MutexBase { + public: + MutexRP2040() { + TRACED(); + mutex_init(&mtx); + } + virtual ~MutexRP2040() = default; + + void lock() override { + TRACED(); + mutex_enter_blocking(&mtx); + } + void unlock() override { + TRACED(); + mutex_exit(&mtx); + } + + protected: + mutex_t mtx; +}; + +using Mutex = MutexRP2040; + +} // namespace audio_tools \ No newline at end of file diff --git a/src/AudioTools/Concurrency/RTOS.h b/src/AudioTools/Concurrency/RTOS.h index 38de2126d..1d70ea7dd 100644 --- a/src/AudioTools/Concurrency/RTOS.h +++ b/src/AudioTools/Concurrency/RTOS.h @@ -1,7 +1,8 @@ #pragma once -#include "AudioTools/Concurrency/QueueRTOS.h" -#include "AudioTools/Concurrency/BufferRTOS.h" -#include "AudioTools/Concurrency/SynchronizedBuffers.h" -#include "AudioTools/Concurrency/Task.h" -#include "MutexRTOS.h" -#include "AudioTools/Concurrency/LockGuard.h" \ No newline at end of file +#include "AudioTools/Concurrency/RTOS/QueueRTOS.h" +#include "AudioTools/Concurrency/RTOS/BufferRTOS.h" +#include "AudioTools/Concurrency/RTOS/Task.h" +#include "AudioTools/Concurrency/RTOS/MutexRTOS.h" +#include "AudioTools/Concurrency/RTOS/SynchronizedNBufferRTOS.h" +#include "AudioTools/Concurrency/LockGuard.h" +#include "AudioTools/Concurrency/SynchronizedQueue.h" diff --git a/src/AudioTools/Concurrency/BufferRTOS.h b/src/AudioTools/Concurrency/RTOS/BufferRTOS.h similarity index 100% rename from src/AudioTools/Concurrency/BufferRTOS.h rename to src/AudioTools/Concurrency/RTOS/BufferRTOS.h diff --git a/src/AudioTools/Concurrency/MutexRTOS.h b/src/AudioTools/Concurrency/RTOS/MutexRTOS.h similarity index 82% rename from src/AudioTools/Concurrency/MutexRTOS.h rename to src/AudioTools/Concurrency/RTOS/MutexRTOS.h index 44ade8869..314ce9379 100644 --- a/src/AudioTools/Concurrency/MutexRTOS.h +++ b/src/AudioTools/Concurrency/RTOS/MutexRTOS.h @@ -1,6 +1,6 @@ #pragma once #include "AudioConfig.h" -#include "Mutex.h" +#include "AudioTools/Concurrency/Mutex.h" #ifdef ESP32 # include "freertos/FreeRTOS.h" @@ -18,14 +18,14 @@ namespace audio_tools { * @author Phil Schatzmann * @copyright GPLv3 * */ -class Mutex : public MutexBase { +class MutexRTOS : public MutexBase { public: - Mutex() { + MutexRTOS() { TRACED(); xSemaphore = xSemaphoreCreateBinary(); xSemaphoreGive(xSemaphore); } - ~Mutex() { + virtual ~MutexRTOS() { TRACED(); vSemaphoreDelete(xSemaphore); } @@ -42,4 +42,6 @@ class Mutex : public MutexBase { SemaphoreHandle_t xSemaphore = NULL; }; +using Mutex = MutexRTOS; + } \ No newline at end of file diff --git a/src/AudioTools/Concurrency/QueueRTOS.h b/src/AudioTools/Concurrency/RTOS/QueueRTOS.h similarity index 100% rename from src/AudioTools/Concurrency/QueueRTOS.h rename to src/AudioTools/Concurrency/RTOS/QueueRTOS.h diff --git a/src/AudioTools/Concurrency/RTOS/SynchronizedNBufferRTOS.h b/src/AudioTools/Concurrency/RTOS/SynchronizedNBufferRTOS.h new file mode 100644 index 000000000..5390175cd --- /dev/null +++ b/src/AudioTools/Concurrency/RTOS/SynchronizedNBufferRTOS.h @@ -0,0 +1,122 @@ +#pragma once +#include "AudioConfig.h" +#include "QueueRTOS.h" + +namespace audio_tools { + +/** + * @brief NBuffer which uses some RTOS queues to manage the available and filled buffers + * @ingroup buffers + * @ingroup concurrency + * @tparam T + * @tparam COUNT number of buffers + */ +template +class SynchronizedNBufferRTOST : public NBuffer { +public: + SynchronizedNBufferRTOST(int bufferSize, int bufferCount, int writeMaxWait=portMAX_DELAY, int readMaxWait=portMAX_DELAY) { + TRACED(); + read_max_wait = readMaxWait; + write_max_wait = writeMaxWait; + resize(bufferSize, bufferCount); + } + ~SynchronizedNBufferRTOST(){ + cleanup(); + } + + void resize(int bufferSize, int bufferCount) { + TRACED(); + if (buffer_size == bufferSize && buffer_count == bufferCount){ + return; + } + + max_size = bufferSize * bufferCount; + NBuffer::buffer_count = bufferCount; + NBuffer::buffer_size = bufferSize; + + cleanup(); + available_buffers.resize(bufferCount); + filled_buffers.resize(bufferCount); + + setReadMaxWait(read_max_wait); + setWriteMaxWait(write_max_wait); + + // setup buffers + for (int j = 0; j < bufferCount; j++) { + BaseBuffer *tmp = new SingleBuffer(bufferSize); + if (tmp != nullptr) { + available_buffers.enqueue(tmp); + } else { + LOGE("Not Enough Memory for buffer %d", j); + } + } + } + + void setReadMaxWait(TickType_t ticks){ + available_buffers.setReadMaxWait(ticks); + filled_buffers.setReadMaxWait(ticks); + } + + void setWriteMaxWait(TickType_t ticks){ + available_buffers.setWriteMaxWait(ticks); + filled_buffers.setWriteMaxWait(ticks); + } + + size_t size() { + return max_size; + } + + int bufferCountFilled() { + return filled_buffers.size(); + } + + int bufferCountEmpty() { + return available_buffers.size(); + } + +protected: + QueueRTOS*> available_buffers{0,portMAX_DELAY,0}; + QueueRTOS*> filled_buffers{0,portMAX_DELAY,0}; + size_t max_size; + size_t read_max_wait, write_max_wait; + int buffer_size = 0, buffer_count = 0; + + /// Removes all allocated buffers + void cleanup(){ + TRACED(); + BaseBuffer* buffer = nullptr;; + while (available_buffers.dequeue(buffer)){ + delete buffer; + } + while (filled_buffers.dequeue(buffer)){ + delete buffer; + } + } + + BaseBuffer *getNextAvailableBuffer() { + TRACED(); + BaseBuffer* result; + return available_buffers.dequeue(result) ? result : nullptr; + } + + bool addAvailableBuffer(BaseBuffer *buffer) { + TRACED(); + return available_buffers.enqueue(buffer); + } + + BaseBuffer *getNextFilledBuffer() { + TRACED(); + BaseBuffer* result; + return filled_buffers.dequeue(result) ? result : nullptr; + } + + bool addFilledBuffer(BaseBuffer *buffer) { + TRACED(); + return filled_buffers.enqueue(buffer); + } +}; + +using SynchronizedNBufferRTOS = SynchronizedNBufferRTOST; +using SynchronizedNBuffer = SynchronizedNBufferRTOS; + +} // namespace audio_tools \ No newline at end of file diff --git a/src/AudioTools/Concurrency/Task.h b/src/AudioTools/Concurrency/RTOS/Task.h similarity index 100% rename from src/AudioTools/Concurrency/Task.h rename to src/AudioTools/Concurrency/RTOS/Task.h diff --git a/src/AudioTools/Concurrency/SynchronizedBuffer.h b/src/AudioTools/Concurrency/SynchronizedBuffer.h new file mode 100644 index 000000000..3efdc403c --- /dev/null +++ b/src/AudioTools/Concurrency/SynchronizedBuffer.h @@ -0,0 +1,119 @@ + +#pragma once +#include "AudioConfig.h" +#include "AudioTools/CoreAudio/AudioTypes.h" +#include "AudioTools/CoreAudio/Buffers.h" +#include "AudioTools/CoreAudio/AudioLogger.h" +#include "Mutex.h" +#include "LockGuard.h" + +namespace audio_tools { + +/** + * @brief Wrapper class that can turn any Buffer into a thread save + * implementation. + * @ingroup buffers + * @ingroup concurrency + * @author Phil Schatzmann + * @copyright GPLv3 * + * @tparam T + */ +template +class SynchronizedBuffer : public BaseBuffer { +public: + SynchronizedBuffer(BaseBuffer &buffer, MutexBase &mutex, bool syncAvailable=false) { + p_buffer = &buffer; + p_mutex = &mutex; + is_sync_available = syncAvailable; + } + + // reads a single value + T read() override { + TRACED(); + LockGuard guard(p_mutex); + return p_buffer->read(); + } + + // reads multiple values + int readArray(T data[], int len) { + TRACED(); + LockGuard guard(p_mutex); + int lenResult = MIN(len, available()); + for (int j = 0; j < lenResult; j++) { + data[j] = p_buffer->read(); + } + return lenResult; + } + + int writeArray(const T data[], int len) { + LOGD("%s: %d", LOG_METHOD, len); + LockGuard guard(p_mutex); + int result = 0; + for (int j = 0; j < len; j++) { + if (p_buffer->write(data[j]) == 0) { + break; + } + result = j + 1; + } + return result; + } + + // peeks the actual entry from the buffer + T peek() override { + TRACED(); + LockGuard guard(p_mutex); + return p_buffer->peek(); + } + + // checks if the buffer is full + bool isFull() override { return p_buffer->isFull(); } + + bool isEmpty() { return available() == 0; } + + // write add an entry to the buffer + bool write(T data) override { + TRACED(); + LockGuard guard(p_mutex); + return p_buffer->write(data); + } + + // clears the buffer + void reset() override { + TRACED(); + LockGuard guard(p_mutex); + p_buffer->reset(); + } + + // provides the number of entries that are available to read + int available() override { + TRACED(); + if (is_sync_available) LockGuard guard(p_mutex); + return p_buffer->available(); + } + + // provides the number of entries that are available to write + int availableForWrite() override { + TRACED(); + if (is_sync_available) LockGuard guard(p_mutex); + return p_buffer->availableForWrite(); + } + + // returns the address of the start of the physical read buffer + T *address() override { + TRACED(); + return p_buffer->address(); + } + + size_t size() { + return p_buffer->size(); + } + +protected: + BaseBuffer *p_buffer = nullptr; + MutexBase *p_mutex = nullptr; + bool is_sync_available = false; +}; + + +} // namespace audio_tools + diff --git a/src/AudioTools/Concurrency/SynchronizedBuffers.h b/src/AudioTools/Concurrency/SynchronizedBuffers.h deleted file mode 100644 index 5862b34a2..000000000 --- a/src/AudioTools/Concurrency/SynchronizedBuffers.h +++ /dev/null @@ -1,243 +0,0 @@ - -#pragma once -#include "AudioConfig.h" -#include "AudioTools/CoreAudio/AudioTypes.h" -#include "AudioTools/CoreAudio/Buffers.h" -#include "AudioTools/CoreAudio/AudioLogger.h" - - -#ifdef ESP32 -# include "freertos/FreeRTOS.h" -# include "AudioTools/Concurrency/QueueRTOS.h" -# if ESP_IDF_VERSION_MAJOR >= 4 -# include -# endif -#else -# include "stream_buffer.h" -#endif - -#include "Mutex.h" -#include "LockGuard.h" - -namespace audio_tools { - -/** - * @brief Wrapper class that can turn any Buffer into a thread save - * implementation. - * @ingroup buffers - * @ingroup concurrency - * @author Phil Schatzmann - * @copyright GPLv3 * - * @tparam T - */ -template -class SynchronizedBuffer : public BaseBuffer { -public: - SynchronizedBuffer(BaseBuffer &buffer, MutexBase &mutex, bool syncAvailable=false) { - p_buffer = &buffer; - p_mutex = &mutex; - is_sync_available = syncAvailable; - } - - // reads a single value - T read() override { - TRACED(); - LockGuard guard(p_mutex); - return p_buffer->read(); - } - - // reads multiple values - int readArray(T data[], int len) { - TRACED(); - LockGuard guard(p_mutex); - int lenResult = MIN(len, available()); - for (int j = 0; j < lenResult; j++) { - data[j] = p_buffer->read(); - } - return lenResult; - } - - int writeArray(const T data[], int len) { - LOGD("%s: %d", LOG_METHOD, len); - LockGuard guard(p_mutex); - int result = 0; - for (int j = 0; j < len; j++) { - if (p_buffer->write(data[j]) == 0) { - break; - } - result = j + 1; - } - return result; - } - - // peeks the actual entry from the buffer - T peek() override { - TRACED(); - LockGuard guard(p_mutex); - return p_buffer->peek(); - } - - // checks if the buffer is full - bool isFull() override { return p_buffer->isFull(); } - - bool isEmpty() { return available() == 0; } - - // write add an entry to the buffer - bool write(T data) override { - TRACED(); - LockGuard guard(p_mutex); - return p_buffer->write(data); - } - - // clears the buffer - void reset() override { - TRACED(); - LockGuard guard(p_mutex); - p_buffer->reset(); - } - - // provides the number of entries that are available to read - int available() override { - TRACED(); - if (is_sync_available) LockGuard guard(p_mutex); - return p_buffer->available(); - } - - // provides the number of entries that are available to write - int availableForWrite() override { - TRACED(); - if (is_sync_available) LockGuard guard(p_mutex); - return p_buffer->availableForWrite(); - } - - // returns the address of the start of the physical read buffer - T *address() override { - TRACED(); - return p_buffer->address(); - } - - size_t size() { - return p_buffer->size(); - } - -protected: - BaseBuffer *p_buffer = nullptr; - MutexBase *p_mutex = nullptr; - bool is_sync_available = false; -}; - -/** - * @brief NBuffer which uses some RTOS queues to manage the available and filled buffers - * @ingroup buffers - * @ingroup concurrency - * @tparam T - * @tparam COUNT number of buffers - */ -template -class SynchronizedNBuffer : public NBuffer { -public: - SynchronizedNBuffer(int bufferSize, int bufferCount, int writeMaxWait=portMAX_DELAY, int readMaxWait=portMAX_DELAY) { - TRACED(); - read_max_wait = readMaxWait; - write_max_wait = writeMaxWait; - resize(bufferSize, bufferCount); - } - ~SynchronizedNBuffer(){ - cleanup(); - } - - void resize(int bufferSize, int bufferCount) { - TRACED(); - if (buffer_size == bufferSize && buffer_count == bufferCount){ - return; - } - - max_size = bufferSize * bufferCount; - NBuffer::buffer_count = bufferCount; - NBuffer::buffer_size = bufferSize; - - cleanup(); - available_buffers.resize(bufferCount); - filled_buffers.resize(bufferCount); - - setReadMaxWait(read_max_wait); - setWriteMaxWait(write_max_wait); - - // setup buffers - for (int j = 0; j < bufferCount; j++) { - BaseBuffer *tmp = new SingleBuffer(bufferSize); - if (tmp != nullptr) { - available_buffers.enqueue(tmp); - } else { - LOGE("Not Enough Memory for buffer %d", j); - } - } - } - - void setReadMaxWait(TickType_t ticks){ - available_buffers.setReadMaxWait(ticks); - filled_buffers.setReadMaxWait(ticks); - } - - void setWriteMaxWait(TickType_t ticks){ - available_buffers.setWriteMaxWait(ticks); - filled_buffers.setWriteMaxWait(ticks); - } - - size_t size() { - return max_size; - } - - int bufferCountFilled() { - return filled_buffers.size(); - } - - int bufferCountEmpty() { - return available_buffers.size(); - } - -protected: - QueueRTOS*> available_buffers{0,portMAX_DELAY,0}; - QueueRTOS*> filled_buffers{0,portMAX_DELAY,0}; - size_t max_size; - size_t read_max_wait, write_max_wait; - int buffer_size = 0, buffer_count = 0; - - /// Removes all allocated buffers - void cleanup(){ - TRACED(); - BaseBuffer* buffer = nullptr;; - while (available_buffers.dequeue(buffer)){ - delete buffer; - } - while (filled_buffers.dequeue(buffer)){ - delete buffer; - } - } - - BaseBuffer *getNextAvailableBuffer() { - TRACED(); - BaseBuffer* result; - return available_buffers.dequeue(result) ? result : nullptr; - } - - bool addAvailableBuffer(BaseBuffer *buffer) { - TRACED(); - return available_buffers.enqueue(buffer); - } - - BaseBuffer *getNextFilledBuffer() { - TRACED(); - BaseBuffer* result; - return filled_buffers.dequeue(result) ? result : nullptr; - } - - bool addFilledBuffer(BaseBuffer *buffer) { - TRACED(); - return filled_buffers.enqueue(buffer); - } -}; - - -} // namespace audio_tools - diff --git a/src/AudioTools/Concurrency/SynchronizedQueue.h b/src/AudioTools/Concurrency/SynchronizedQueue.h new file mode 100644 index 000000000..39c1c7943 --- /dev/null +++ b/src/AudioTools/Concurrency/SynchronizedQueue.h @@ -0,0 +1,59 @@ +#pragma once +#include "AudioTools/CoreAudio/AudioBasic/Collections/List.h" + +namespace audio_tools { + +/** + * @brief FIFO Queue which is based on a List that is thread + * save. + * @ingroup collections + * @author Phil Schatzmann + * @copyright GPLv3 + * @tparam T + * @tparam TMutex + */ +template +class SynchronizedQueue { + public: + SynchronizedQueue() = default; + + bool enqueue(T& data) { + LockGuard(mutex); + return l.push_front(data); + } + + bool peek(T& data) { + LockGuard(mutex); + if (l.end()->prior == nullptr) return false; + data = *(l.end()->prior); + return true; + } + + bool dequeue(T& data) { + LockGuard(mutex); + return l.pop_back(data); + } + + size_t size() { + LockGuard(mutex); + return l.size(); + } + + bool clear() { + LockGuard(mutex); + return l.clear(); + } + + bool empty() { + LockGuard(mutex); + return l.empty(); + } + + void setAllocator(Allocator& allocator) { l.setAllocator(allocator); } + + protected: + List l; + TMutex mutex; +}; + +} // namespace audio_tools \ No newline at end of file diff --git a/src/AudioTools/CoreAudio/AudioHttp/URLStreamBuffered.h b/src/AudioTools/CoreAudio/AudioHttp/URLStreamBuffered.h index 18ee6dd60..3e23d62c0 100644 --- a/src/AudioTools/CoreAudio/AudioHttp/URLStreamBuffered.h +++ b/src/AudioTools/CoreAudio/AudioHttp/URLStreamBuffered.h @@ -118,7 +118,7 @@ class BufferedTaskStream : public AudioStream { bool active = false; Task task{"BufferedTaskStream", STACK_SIZE, URL_STREAM_PRIORITY, URL_STREAM_CORE}; - SynchronizedNBuffer buffers{DEFAULT_BUFFER_SIZE, + SynchronizedNBuffer buffers{DEFAULT_BUFFER_SIZE, URL_STREAM_BUFFER_COUNT}; bool ready = false; diff --git a/src/AudioTools/CoreAudio/AudioStreams.h b/src/AudioTools/CoreAudio/AudioStreams.h index 5e3b2f441..90bd77bad 100644 --- a/src/AudioTools/CoreAudio/AudioStreams.h +++ b/src/AudioTools/CoreAudio/AudioStreams.h @@ -762,6 +762,10 @@ class MeasuringStream : public ModifyingStream { report_bytes = flag; } + void setName(const char* name){ + this->name = name; + } + protected: int max_count=0; int count=0; @@ -774,6 +778,7 @@ class MeasuringStream : public ModifyingStream { NullStream null; Print *p_logout=nullptr; bool report_bytes = false; + const char* name = ""; size_t measure(size_t len) { count--; @@ -796,9 +801,9 @@ class MeasuringStream : public ModifyingStream { void printResult() { char msg[70]; if (report_bytes || frame_size==0){ - snprintf(msg, 70, "==> Bytes per second: %d", bytes_per_second); + snprintf(msg, 70, "%s ==> Bytes per second: %d", name, bytes_per_second); } else { - snprintf(msg, 70, "==> Samples per second: %d", bytes_per_second/frame_size); + snprintf(msg, 70, "%s ==> Samples per second: %d", name, bytes_per_second/frame_size); } if (p_logout!=nullptr){ p_logout->println(msg); diff --git a/src/AudioTools/CoreAudio/BaseStream.h b/src/AudioTools/CoreAudio/BaseStream.h index e687df7a9..7dc8afaa2 100644 --- a/src/AudioTools/CoreAudio/BaseStream.h +++ b/src/AudioTools/CoreAudio/BaseStream.h @@ -87,7 +87,7 @@ class BaseStream : public Stream { RingBuffer tmp_out{0}; void refillReadBuffer() { - tmp_in.resize(MAX_SINGLE_CHARS); + tmp_in.resize(DEFAULT_BUFFER_SIZE); if (tmp_in.isEmpty()) { TRACED(); const int len = tmp_in.size(); diff --git a/src/AudioTools/CoreAudio/StreamCopy.h b/src/AudioTools/CoreAudio/StreamCopy.h index ba69f7d93..94e1d9b12 100644 --- a/src/AudioTools/CoreAudio/StreamCopy.h +++ b/src/AudioTools/CoreAudio/StreamCopy.h @@ -70,7 +70,7 @@ class StreamCopyT { /// assign a new output and input stream void begin(Print &to, Stream &from){ is_cleanup_from = true; - this->from = new AudioStreamWrapper(from); + this->from = &from; this->to = &to; begin(); } @@ -78,6 +78,7 @@ class StreamCopyT { /// assign a new output and input stream void begin(Print &to, AudioStream &from){ this->from = &from; + this->from_audio = &from; this->to = &to; begin(); } @@ -339,8 +340,8 @@ class StreamCopyT { /// Determine frame size int minCopySize() { - if (min_copy_size==0){ - AudioInfo info = from->audioInfoOut(); + if (min_copy_size==0 && from_audio != nullptr){ + AudioInfo info = from_audio->audioInfoOut(); min_copy_size = info.bits_per_sample / 8 * info.channels; } return min_copy_size; @@ -357,7 +358,8 @@ class StreamCopyT { } protected: - AudioStream *from = nullptr; + Stream *from = nullptr; + AudioStream *from_audio = nullptr; Print *to = nullptr; Vector buffer{0}; int buffer_size = DEFAULT_BUFFER_SIZE; @@ -384,8 +386,8 @@ class StreamCopyT { void syncAudioInfo(){ // synchronize audio info - if (is_sync_audio_info && from != nullptr && p_audio_info_support != nullptr){ - AudioInfo info_from = from->audioInfoOut(); + if (is_sync_audio_info && from_audio != nullptr && p_audio_info_support != nullptr){ + AudioInfo info_from = from_audio->audioInfoOut(); AudioInfo info_to = p_audio_info_support->audioInfo(); if (info_from != info_to){ LOGI("--> StreamCopy: ");