/*
 * Copyright © 2016 Mozilla Foundation
 *
 * This program is made available under an ISC-style license.  See the
 * accompanying file LICENSE for details.
 */

#ifndef CUBEB_RING_BUFFER_H
#define CUBEB_RING_BUFFER_H

#include "cubeb_utils.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <memory>
#include <thread>

/**
 * Single producer single consumer lock-free and wait-free ring buffer.
 *
 * This data structure allows producing data from one thread, and consuming it on
 * another thread, safely and without explicit synchronization. If used on two
 * threads, this data structure uses atomics for thread safety. It is possible
 * to disable the use of atomics at compile time and only use this data
 * structure on one thread.
 *
 * The role for the producer and the consumer must be constant, i.e., the
 * producer should always be on one thread and the consumer should always be on
 * another thread.
 *
 * Some words about the inner workings of this class:
 * - Capacity is fixed. Only one allocation is performed, in the constructor.
 *   When reading and writing, the return value of the method allows checking if
 *   the ring buffer is empty or full.
 * - We always keep the read index at least one element ahead of the write
 *   index, so we can distinguish between an empty and a full ring buffer: an
 *   empty ring buffer is when the write index is at the same position as the
 *   read index. A full buffer is when the write index is exactly one position
 *   before the read index.
 * - We synchronize updates to the read index after having read the data, and
 *   the write index after having written the data. This means that the each
 *   thread can only touch a portion of the buffer that is not touched by the
 *   other thread.
 * - Callers are expected to provide buffers. When writing to the queue,
 *   elements are copied into the internal storage from the buffer passed in.
 *   When reading from the queue, the user is expected to provide a buffer.
 *   Because this is a ring buffer, data might not be contiguous in memory,
 *   providing an external buffer to copy into is an easy way to have linear
 *   data for further processing.
 */
template <typename T>
class ring_buffer_base
{
public:
  /**
   * Constructor for a ring buffer.
   *
   * This performs an allocation, but is the only allocation that will happen
   * for the life time of a `ring_buffer_base`.
   *
   * @param capacity The maximum number of element this ring buffer will hold.
   */
  ring_buffer_base(int capacity)
    /* One more element to distinguish from empty and full buffer. */
    : capacity_(capacity + 1)
  {
    assert(storage_capacity() <
           std::numeric_limits<int>::max() / 2 &&
           "buffer too large for the type of index used.");
    assert(capacity_ > 0);

    data_.reset(new T[storage_capacity()]);
    /* If this queue is using atomics, initializing those members as the last
     * action in the constructor acts as a full barrier, and allow capacity() to
     * be thread-safe. */
    write_index_ = 0;
    read_index_ = 0;
  }
  /**
   * Push `count` zero or default constructed elements in the array.
   *
   * Only safely called on the producer thread.
   *
   * @param count The number of elements to enqueue.
   * @return The number of element enqueued.
   */
  int enqueue_default(int count)
  {
    return enqueue(nullptr, count);
  }
  /**
   * @brief Put an element in the queue
   *
   * Only safely called on the producer thread.
   *
   * @param element The element to put in the queue.
   *
   * @return 1 if the element was inserted, 0 otherwise.
   */
  int enqueue(T& element)
  {
    return enqueue(&element, 1);
  }
  /**
   * Push `count` elements in the ring buffer.
   *
   * Only safely called on the producer thread.
   *
   * @param elements a pointer to a buffer containing at least `count` elements.
   * If `elements` is nullptr, zero or default constructed elements are enqueued.
   * @param count The number of elements to read from `elements`
   * @return The number of elements successfully coped from `elements` and inserted
   * into the ring buffer.
   */
  int enqueue(T * elements, int count)
  {
#ifndef NDEBUG
    assert_correct_thread(producer_id);
#endif

    int rd_idx = read_index_.load(std::memory_order::memory_order_relaxed);
    int wr_idx = write_index_.load(std::memory_order::memory_order_relaxed);

    if (full_internal(rd_idx, wr_idx)) {
      return 0;
    }

    int to_write =
      std::min(available_write_internal(rd_idx, wr_idx), count);

    /* First part, from the write index to the end of the array. */
    int first_part = std::min(storage_capacity() - wr_idx,
                                          to_write);
    /* Second part, from the beginning of the array */
    int second_part = to_write - first_part;

    if (elements) {
      Copy(data_.get() + wr_idx, elements, first_part);
      Copy(data_.get(), elements + first_part, second_part);
    } else {
      ConstructDefault(data_.get() + wr_idx, first_part);
      ConstructDefault(data_.get(), second_part);
    }

    write_index_.store(increment_index(wr_idx, to_write), std::memory_order::memory_order_release);

    return to_write;
  }
  /**
   * Retrieve at most `count` elements from the ring buffer, and copy them to
   * `elements`, if non-null.
   *
   * Only safely called on the consumer side.
   *
   * @param elements A pointer to a buffer with space for at least `count`
   * elements. If `elements` is `nullptr`, `count` element will be discarded.
   * @param count The maximum number of elements to dequeue.
   * @return The number of elements written to `elements`.
   */
  int dequeue(T * elements, int count)
  {
#ifndef NDEBUG
    assert_correct_thread(consumer_id);
#endif

    int wr_idx = write_index_.load(std::memory_order::memory_order_acquire);
    int rd_idx = read_index_.load(std::memory_order::memory_order_relaxed);

    if (empty_internal(rd_idx, wr_idx)) {
      return 0;
    }

    int to_read =
      std::min(available_read_internal(rd_idx, wr_idx), count);

    int first_part = std::min(storage_capacity() - rd_idx, to_read);
    int second_part = to_read - first_part;

    if (elements) {
      Copy(elements, data_.get() + rd_idx, first_part);
      Copy(elements + first_part, data_.get(), second_part);
    }

    read_index_.store(increment_index(rd_idx, to_read), std::memory_order::memory_order_relaxed);

    return to_read;
  }
  /**
   * Get the number of available element for consuming.
   *
   * Only safely called on the consumer thread.
   *
   * @return The number of available elements for reading.
   */
  int available_read() const
  {
#ifndef NDEBUG
    assert_correct_thread(consumer_id);
#endif
    return available_read_internal(read_index_.load(std::memory_order::memory_order_relaxed),
                                   write_index_.load(std::memory_order::memory_order_relaxed));
  }
  /**
   * Get the number of available elements for consuming.
   *
   * Only safely called on the producer thread.
   *
   * @return The number of empty slots in the buffer, available for writing.
   */
  int available_write() const
  {
#ifndef NDEBUG
    assert_correct_thread(producer_id);
#endif
    return available_write_internal(read_index_.load(std::memory_order::memory_order_relaxed),
                                    write_index_.load(std::memory_order::memory_order_relaxed));
  }
  /**
   * Get the total capacity, for this ring buffer.
   *
   * Can be called safely on any thread.
   *
   * @return The maximum capacity of this ring buffer.
   */
  int capacity() const
  {
    return storage_capacity() - 1;
  }
  /**
   * Reset the consumer and producer thread identifier, in case the thread are
   * being changed. This has to be externally synchronized. This is no-op when
   * asserts are disabled.
   */
  void reset_thread_ids()
  {
#ifndef NDEBUG
    consumer_id = producer_id = std::thread::id();
#endif
  }
private:
  /** Return true if the ring buffer is empty.
   *
   * @param read_index the read index to consider
   * @param write_index the write index to consider
   * @return true if the ring buffer is empty, false otherwise.
   **/
  bool empty_internal(int read_index,
                      int write_index) const
  {
    return write_index == read_index;
  }
  /** Return true if the ring buffer is full.
   *
   * This happens if the write index is exactly one element behind the read
   * index.
   *
   * @param read_index the read index to consider
   * @param write_index the write index to consider
   * @return true if the ring buffer is full, false otherwise.
   **/
  bool full_internal(int read_index,
                     int write_index) const
  {
    return (write_index + 1) % storage_capacity() == read_index;
  }
  /**
   * Return the size of the storage. It is one more than the number of elements
   * that can be stored in the buffer.
   *
   * @return the number of elements that can be stored in the buffer.
   */
  int storage_capacity() const
  {
    return capacity_;
  }
  /**
   * Returns the number of elements available for reading.
   *
   * @return the number of available elements for reading.
   */
  int
  available_read_internal(int read_index,
                          int write_index) const
  {
    if (write_index >= read_index) {
      return write_index - read_index;
    } else {
      return write_index + storage_capacity() - read_index;
    }
  }
  /**
   * Returns the number of empty elements, available for writing.
   *
   * @return the number of elements that can be written into the array.
   */
  int
  available_write_internal(int read_index,
                           int write_index) const
  {
    /* We substract one element here to always keep at least one sample
     * free in the buffer, to distinguish between full and empty array. */
    int rv = read_index - write_index - 1;
    if (write_index >= read_index) {
      rv += storage_capacity();
    }
    return rv;
  }
  /**
   * Increments an index, wrapping it around the storage.
   *
   * @param index a reference to the index to increment.
   * @param increment the number by which `index` is incremented.
   * @return the new index.
   */
  int
  increment_index(int index, int increment) const
  {
    assert(increment >= 0);
    return (index + increment) % storage_capacity();
  }
  /**
   * @brief This allows checking that enqueue (resp. dequeue) are always called
   * by the right thread.
   *
   * @param id the id of the thread that has called the calling method first.
   */
#ifndef NDEBUG
  static void assert_correct_thread(std::thread::id& id)
  {
    if (id == std::thread::id()) {
      id = std::this_thread::get_id();
      return;
    }
    assert(id == std::this_thread::get_id());
  }
#endif
  /** Index at which the oldest element is at, in samples. */
  std::atomic<int> read_index_;
  /** Index at which to write new elements. `write_index` is always at
   * least one element ahead of `read_index_`. */
  std::atomic<int> write_index_;
  /** Maximum number of elements that can be stored in the ring buffer. */
  const int capacity_;
  /** Data storage */
  std::unique_ptr<T[]> data_;
#ifndef NDEBUG
  /** The id of the only thread that is allowed to read from the queue. */
  mutable std::thread::id consumer_id;
  /** The id of the only thread that is allowed to write from the queue. */
  mutable std::thread::id producer_id;
#endif
};

/**
 * Adapter for `ring_buffer_base` that exposes an interface in frames.
 */
template <typename T>
class audio_ring_buffer_base
{
public:
  /**
   * @brief Constructor.
   *
   * @param channel_count       Number of channels.
   * @param capacity_in_frames  The capacity in frames.
   */
  audio_ring_buffer_base(int channel_count, int capacity_in_frames)
    : channel_count(channel_count)
    , ring_buffer(frames_to_samples(capacity_in_frames))
  {
    assert(channel_count > 0);
  }
  /**
   * @brief Enqueue silence.
   *
   * Only safely called on the producer thread.
   *
   * @param frame_count The number of frames of silence to enqueue.
   * @return  The number of frames of silence actually written to the queue.
   */
  int enqueue_default(int frame_count)
  {
    return samples_to_frames(ring_buffer.enqueue(nullptr, frames_to_samples(frame_count)));
  }
  /**
   * @brief Enqueue `frames_count` frames of audio.
   *
   * Only safely called from the producer thread.
   *
   * @param [in] frames If non-null, the frames to enqueue.
   *                    Otherwise, silent frames are enqueued.
   * @param frame_count The number of frames to enqueue.
   *
   * @return The number of frames enqueued
   */

  int enqueue(T * frames, int frame_count)
  {
    return samples_to_frames(ring_buffer.enqueue(frames, frames_to_samples(frame_count)));
  }

  /**
   * @brief Removes `frame_count` frames from the buffer, and
   *        write them to `frames` if it is non-null.
   *
   * Only safely called on the consumer thread.
   *
   * @param frames      If non-null, the frames are copied to `frames`.
   *                    Otherwise, they are dropped.
   * @param frame_count The number of frames to remove.
   *
   * @return  The number of frames actually dequeud.
   */
  int dequeue(T * frames, int frame_count)
  {
    return samples_to_frames(ring_buffer.dequeue(frames, frames_to_samples(frame_count)));
  }
  /**
   * Get the number of available frames of audio for consuming.
   *
   * Only safely called on the consumer thread.
   *
   * @return The number of available frames of audio for reading.
   */
  int available_read() const
  {
    return samples_to_frames(ring_buffer.available_read());
  }
  /**
   * Get the number of available frames of audio for consuming.
   *
   * Only safely called on the producer thread.
   *
   * @return The number of empty slots in the buffer, available for writing.
   */
  int available_write() const
  {
    return samples_to_frames(ring_buffer.available_write());
  }
  /**
   * Get the total capacity, for this ring buffer.
   *
   * Can be called safely on any thread.
   *
   * @return The maximum capacity of this ring buffer.
   */
  int capacity() const
  {
    return samples_to_frames(ring_buffer.capacity());
  }
private:
  /**
   * @brief Frames to samples conversion.
   *
   * @param frames The number of frames.
   *
   * @return  A number of samples.
   */
  int frames_to_samples(int frames) const
  {
    return frames * channel_count;
  }
  /**
   * @brief Samples to frames conversion.
   *
   * @param samples The number of samples.
   *
   * @return  A number of frames.
   */
  int samples_to_frames(int samples) const
  {
    return samples / channel_count;
  }
  /** Number of channels of audio that will stream through this ring buffer. */
  int channel_count;
  /** The underlying ring buffer that is used to store the data. */
  ring_buffer_base<T> ring_buffer;
};

/**
 * Lock-free instantiation of the `ring_buffer_base` type. This is safe to use
 * from two threads, one producer, one consumer (that never change role),
 * without explicit synchronization.
 */
template<typename T>
using lock_free_queue = ring_buffer_base<T>;
/**
 * Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use
 * from two threads, one producer, one consumer (that never change role),
 * without explicit synchronization.
 */
template<typename T>
using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>;

#endif // CUBEB_RING_BUFFER_H
