/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#ifndef mozilla_MultiWriterQueue_h_
#define mozilla_MultiWriterQueue_h_

#include "mozilla/Atomics.h"
#include "mozilla/MemoryReporting.h"
#include "mozilla/Move.h"
#include "mozilla/Mutex.h"
#include "prthread.h"
#include "RollingNumber.h"
#include <cstdint>

namespace mozilla {

// Default reader locking strategy, using a mutex to ensure that concurrent
// PopAll calls won't overlap.
class MultiWriterQueueReaderLocking_Mutex
{
public:
  MultiWriterQueueReaderLocking_Mutex()
    : mMutex("MultiWriterQueueReaderLocking_Mutex")
  {
  }
  void Lock() { mMutex.Lock(); };
  void Unlock() { mMutex.Unlock(); };

private:
  Mutex mMutex;
};

// Reader non-locking strategy, trusting that PopAll will never be called
// concurrently (e.g., by only calling it from a specific thread).
class MultiWriterQueueReaderLocking_None
{
public:
#ifndef DEBUG
  void Lock(){};
  void Unlock(){};
#else
  // DEBUG-mode checks to catch concurrent misuses.
  void Lock() { MOZ_ASSERT(mLocked.compareExchange(false, true)); };
  void Unlock() { MOZ_ASSERT(mLocked.compareExchange(true, false)); };

private:
  Atomic<bool> mLocked{ false };
#endif
};

static constexpr uint32_t MultiWriterQueueDefaultBufferSize = 8192;

// Multi-writer, single-reader queue of elements of type `T`.
// Elements are bunched together in buffers of `BufferSize` elements.
//
// This queue is heavily optimized for pushing. In most cases pushes will only
// cost a couple of atomic reads and a few non-atomic reads. Worst cases:
// - Once per buffer, a push will allocate or reuse a buffer for later pushes;
// - During the above new-buffer push, other pushes will be blocked.
//
// By default, popping is protected by mutex; it may be disabled if popping is
// guaranteed never to be concurrent.
// In any case, popping will never negatively impact pushes.
// (However, *not* popping will add runtime costs, as unread buffers will not
// be freed, or made available to future pushes; Push functions provide
// feedback as to when popping would be most efficient.)
template<typename T,
         uint32_t BufferSize = MultiWriterQueueDefaultBufferSize,
         typename ReaderLocking = MultiWriterQueueReaderLocking_Mutex>
class MultiWriterQueue
{
  static_assert(BufferSize > 0, "0-sized MultiWriterQueue buffer");

public:
  // Constructor.
  // Allocates the initial buffer that will receive the first `BufferSize`
  // elements. Also allocates one reusable buffer, which will definitely be
  // needed after the first `BufferSize` elements have been pushed.
  // Ideally (if the reader can process each buffer quickly enough), there
  // won't be a need for more buffer allocations.
  MultiWriterQueue()
    : mBuffersCoverAtLeastUpTo(BufferSize - 1)
    , mMostRecentBuffer(new Buffer{})
    , mReusableBuffers(new Buffer{})
    , mOldestBuffer(static_cast<Buffer*>(mMostRecentBuffer))
    , mLiveBuffersStats(1)
    , mReusableBuffersStats(1)
    , mAllocatedBuffersStats(2)
  {
  }

  ~MultiWriterQueue()
  {
    auto DestroyList = [](Buffer* aBuffer)
    {
      while (aBuffer) {
        Buffer* older = aBuffer->Older();
        delete aBuffer;
        aBuffer = older;
      }
    };
    DestroyList(mMostRecentBuffer);
    DestroyList(mReusableBuffers);
  }

  // We need the index to be order-resistant to overflow, i.e., numbers before
  // an overflow should test smaller-than numbers after the overflow.
  // This is because we keep pushing elements with increasing Index, and this
  // Index is used to find the appropriate buffer based on a range; and this
  // need to work smoothly when crossing the overflow boundary.
  using Index = RollingNumber<uint32_t>;

  // Pushes indicate whether they have just reached the end of a buffer.
  using DidReachEndOfBuffer = bool;

  // Push new element and call aF on it.
  // Element may be in just-created state, or recycled after a PopAll call.
  // Atomically thread-safe; in the worst case some pushes may be blocked
  // while a new buffer is created/reused for them.
  // Returns whether that push reached the end of a buffer; useful if caller
  // wants to trigger processing regularly at the most efficient time.
  template<typename F>
  DidReachEndOfBuffer PushF(F&& aF)
  {
    // Atomically claim ownership of the next available element.
    const Index index{ mNextElementToWrite++ };
    // And now go and set that element.
    for (;;) {
      Index lastIndex{ mBuffersCoverAtLeastUpTo };

      if (MOZ_UNLIKELY(index == lastIndex)) {
        // We have claimed the last element in the current head -> Allocate a
        // new head in advance of more pushes. Make it point at the current
        // most-recent buffer.
        // This whole process is effectively guarded:
        // - Later pushes will wait until mBuffersCoverAtLeastUpTo changes to
        //   one that can accept their claimed index.
        // - Readers will stop until the last element is marked as valid.
        Buffer* ourBuffer = mMostRecentBuffer;
        Buffer* newBuffer = NewBuffer(ourBuffer, index + 1);
        // Because we have claimed this very specific index, we should be the
        // only one touching the most-recent buffer pointer.
        MOZ_ASSERT(mMostRecentBuffer == ourBuffer);
        // Just pivot the most-recent buffer pointer to our new buffer.
        mMostRecentBuffer = newBuffer;
        // Because we have claimed this very specific index, we should be the
        // only one touching the buffer coverage watermark.
        MOZ_ASSERT(mBuffersCoverAtLeastUpTo == lastIndex.Value());
        // Update it to include the just-added most-recent buffer.
        mBuffersCoverAtLeastUpTo = index.Value() + BufferSize;
        // We know for sure that `ourBuffer` is the correct one for this index.
        ourBuffer->SetAndValidateElement(aF, index);
        // And indicate that we have reached the end of a buffer.
        return true;
      }

      if (MOZ_UNLIKELY(index > lastIndex)) {
        // We have claimed an element in a yet-unavailable buffer, wait for our
        // target buffer to be created (see above).
        while (Index(mBuffersCoverAtLeastUpTo) < index) {
          PR_Sleep(PR_INTERVAL_NO_WAIT); // Yield
        }
        // Then loop to examine the new situation.
        continue;
      }

      // Here, we have claimed a number that is covered by current buffers.
      // These buffers cannot be destroyed, because our buffer is not filled
      // yet (we haven't written in it yet), therefore the reader thread will
      // have to stop there (or before) and won't destroy our buffer or more
      // recent ones.
      MOZ_ASSERT(index < lastIndex);
      Buffer* ourBuffer = mMostRecentBuffer;

      // In rare situations, another thread may have had the time to create a
      // new more-recent buffer, in which case we need to find our older buffer.
      while (MOZ_UNLIKELY(index < ourBuffer->Origin())) {
        // We assume that older buffers with still-invalid elements (e.g., the
        // one we have just claimed) cannot be destroyed.
        MOZ_ASSERT(ourBuffer->Older());
        ourBuffer = ourBuffer->Older();
      }

      // Now we can set&validate the claimed element, and indicate that we have
      // not reached the end of a buffer.
      ourBuffer->SetAndValidateElement(aF, index);
      return false;
    }
  }

  // Push new element and assign it a value.
  // Atomically thread-safe; in the worst case some pushes may be blocked
  // while a new buffer is created/reused for them.
  // Returns whether that push reached the end of a buffer; useful if caller
  // wants to trigger processing regularly at the most efficient time.
  DidReachEndOfBuffer Push(const T& aT)
  {
    return PushF([&aT](T& aElement, Index) { aElement = aT; });
  }

  // Push new element and move-assign it a value.
  // Atomically thread-safe; in the worst case some pushes may be blocked
  // while a new buffer is created/reused for them.
  // Returns whether that push reached the end of a buffer; useful if caller
  // wants to trigger processing regularly at the most efficient time.
  DidReachEndOfBuffer Push(T&& aT)
  {
    return PushF([&aT](T& aElement, Index) { aElement = Move(aT); });
  }

  // Pop all elements before the first invalid one, running aF on each of them
  // in FIFO order.
  // Thread-safety with other PopAll calls is controlled by the `Locking`
  // template argument.
  // Concurrent pushes are always allowed, because:
  // - PopAll won't read elements until valid,
  // - Pushes do not interfere with pop-related members -- except for
  //   mReusableBuffers, which is accessed atomically.
  template<typename F>
  void PopAll(F&& aF)
  {
    mReaderLocking.Lock();
    // Destroy every second fully-read buffer.
    // TODO: Research a better algorithm, probably based on stats.
    bool destroy = false;
    for (;;) {
      Buffer* b = mOldestBuffer;
      MOZ_ASSERT(!b->Older());
      // The next element to pop must be in that oldest buffer.
      MOZ_ASSERT(mNextElementToPop >= b->Origin());
      MOZ_ASSERT(mNextElementToPop < b->Origin() + BufferSize);

      // Start reading each element.
      if (!b->ReadAndInvalidateAll(aF, mNextElementToPop)) {
        // Found an invalid element, stop popping.
        mReaderLocking.Unlock();
        return;
      }

      // Reached the end of this oldest buffer
      MOZ_ASSERT(mNextElementToPop == b->Origin() + BufferSize);
      // Delete this oldest buffer.
      // Since the last element was valid, it must mean that there is a newer
      // buffer.
      MOZ_ASSERT(b->Newer());
      MOZ_ASSERT(mNextElementToPop == b->Newer()->Origin());
      StopUsing(b, destroy);
      destroy = !destroy;

      // We will loop and start reading the now-oldest buffer.
    }
  }

  // Size of all buffers (used, or recyclable), excluding external data.
  size_t ShallowSizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const
  {
    return mAllocatedBuffersStats.Count() * sizeof(Buffer);
  }

  struct CountAndWatermark
  {
    int mCount;
    int mWatermark;
  };

  CountAndWatermark LiveBuffersStats() const { return mLiveBuffersStats.Get(); }
  CountAndWatermark ReusableBuffersStats() const
  {
    return mReusableBuffersStats.Get();
  }
  CountAndWatermark AllocatedBuffersStats() const
  {
    return mAllocatedBuffersStats.Get();
  }

private:
  // Structure containing the element to be stored, and a validity-marker.
  class BufferedElement
  {
  public:
    // Run aF on an invalid element, and mark it as valid.
    template<typename F>
    void SetAndValidate(F&& aF, Index aIndex)
    {
      MOZ_ASSERT(!mValid);
      aF(mT, aIndex);
      mValid = true;
    }

    // Run aF on a valid element and mark it as invalid, return true.
    // Return false if element was invalid.
    template<typename F>
    bool ReadAndInvalidate(F&& aF)
    {
      if (!mValid) {
        return false;
      }
      aF(mT);
      mValid = false;
      return true;
    }

  private:
    T mT;
    // mValid should be atomically changed to true *after* mT has been written,
    // so that the reader can only see valid data.
    // ReleaseAcquire, because when set to `true`, we want the just-written mT
    // to be visible to the thread reading this `true`; and when set to `false`,
    // we want the previous reads to have completed.
    Atomic<bool, ReleaseAcquire> mValid{ false };
  };

  // Buffer contains a sequence of BufferedElements starting at a specific
  // index, and it points to the next-older buffer (if any).
  class Buffer
  {
  public:
    // Constructor of the very first buffer.
    Buffer()
      : mOlder(nullptr)
      , mNewer(nullptr)
      , mOrigin(0)
    {
    }

    // Constructor of later buffers.
    Buffer(Buffer* aOlder, Index aOrigin)
      : mOlder(aOlder)
      , mNewer(nullptr)
      , mOrigin(aOrigin)
    {
      MOZ_ASSERT(aOlder);
      aOlder->mNewer = this;
    }

    Buffer* Older() const { return mOlder; }
    void SetOlder(Buffer* aOlder) { mOlder = aOlder; }

    Buffer* Newer() const { return mNewer; }
    void SetNewer(Buffer* aNewer) { mNewer = aNewer; }

    Index Origin() const { return mOrigin; }
    void SetOrigin(Index aOrigin) { mOrigin = aOrigin; }

    // Run aF on a yet-invalid element.
    // Not thread-safe by itself, but nothing else should write this element,
    // and reader won't access it until after it becomes valid.
    template<typename F>
    void SetAndValidateElement(F&& aF, Index aIndex)
    {
      MOZ_ASSERT(aIndex >= Origin());
      MOZ_ASSERT(aIndex < Origin() + BufferSize);
      mElements[aIndex - Origin()].SetAndValidate(aF, aIndex);
    }

    using DidReadLastElement = bool;

    // Read all valid elements starting at aIndex, marking them invalid and
    // updating aIndex.
    // Returns true if we ended up reading the last element in this buffer.
    // Accessing the validity bit is thread-safe (as it's atomic), but once
    // an element is valid, the reading itself is not thread-safe and should be
    // guarded.
    template<typename F>
    DidReadLastElement ReadAndInvalidateAll(F&& aF, Index& aIndex)
    {
      MOZ_ASSERT(aIndex >= Origin());
      MOZ_ASSERT(aIndex < Origin() + BufferSize);
      for (; aIndex < Origin() + BufferSize; ++aIndex) {
        if (!mElements[aIndex - Origin()].ReadAndInvalidate(aF)) {
          // Found an invalid element, stop here. (aIndex will not be updated
          // past it, so we will start from here next time.)
          return false;
        }
      }
      return true;
    }

  private:
    Buffer* mOlder;
    Buffer* mNewer;
    Index mOrigin;
    BufferedElement mElements[BufferSize];
  };

  // Reuse a buffer, or create a new one.
  // All buffered elements will be invalid.
  Buffer* NewBuffer(Buffer* aOlder, Index aOrigin)
  {
    MOZ_ASSERT(aOlder);
    for (;;) {
      Buffer* head = mReusableBuffers;
      if (!head) {
        ++mAllocatedBuffersStats;
        ++mLiveBuffersStats;
        Buffer* buffer = new Buffer(aOlder, aOrigin);
        return buffer;
      }
      Buffer* older = head->Older();
      // Try to pivot the reusable-buffer pointer from the current head to the
      // next buffer in line.
      if (mReusableBuffers.compareExchange(head, older)) {
        // Success! The reusable-buffer pointer now points at the older buffer,
        // so we can recycle this ex-head.
        --mReusableBuffersStats;
        ++mLiveBuffersStats;
        head->SetOlder(aOlder);
        aOlder->SetNewer(head);
        // We will be the newest; newer-pointer should already be null.
        MOZ_ASSERT(!head->Newer());
        head->SetOrigin(aOrigin);
        return head;
      }
      // Failure, someone else must have touched the list, loop to try again.
    }
  }

  // Discard a fully-read buffer.
  // If aDestroy is true, delete it.
  // If aDestroy is false, move the buffer to a reusable-buffer stack.
  void StopUsing(Buffer* aBuffer, bool aDestroy)
  {
    --mLiveBuffersStats;

    // We should only stop using the oldest buffer.
    MOZ_ASSERT(!aBuffer->Older());
    // The newest buffer should not be modified here.
    MOZ_ASSERT(aBuffer->Newer());
    MOZ_ASSERT(aBuffer->Newer()->Older() == aBuffer);
    // Detach from the second-oldest buffer.
    aBuffer->Newer()->SetOlder(nullptr);
    // Make the second-oldest buffer the now-oldest buffer.
    mOldestBuffer = aBuffer->Newer();

    if (aDestroy) {
      --mAllocatedBuffersStats;
      delete aBuffer;
    } else {
      ++mReusableBuffersStats;
      // The recycling stack only uses mOlder; mNewer is not needed.
      aBuffer->SetNewer(nullptr);

      // Make the given buffer the new head of reusable buffers.
      for (;;) {
        Buffer* head = mReusableBuffers;
        aBuffer->SetOlder(head);
        if (mReusableBuffers.compareExchange(head, aBuffer)) {
          break;
        }
      }
    }
  }

  // Index of the next element to write. Modified when an element index is
  // claimed for a push. If the last element of a buffer is claimed, that push
  // will be responsible for adding a new head buffer.
  // Relaxed, because there is no synchronization based on this variable, each
  // thread just needs to get a different value, and will then write different
  // things (which themselves have some atomic validation before they may be
  // read elsewhere, independent of this `mNextElementToWrite`.)
  Atomic<Index::ValueType, Relaxed> mNextElementToWrite{ 0 };

  // Index that a live recent buffer reaches. If a push claims a lesser-or-
  // equal number, the corresponding buffer is guaranteed to still be alive:
  // - It will have been created before this index was updated,
  // - It will not be destroyed until all its values have been written,
  //   including the one that just claimed a position within it.
  // Also, the push that claims this exact number is responsible for adding the
  // next buffer and updating this value accordingly.
  // ReleaseAcquire, because when set to a certain value, the just-created
  // buffer covering the new range must be visible to readers.
  Atomic<Index::ValueType, ReleaseAcquire> mBuffersCoverAtLeastUpTo;

  // Pointer to the most recent buffer. Never null.
  // This is the most recent of a deque of yet-unread buffers.
  // Only modified when adding a new head buffer.
  // ReleaseAcquire, because when modified, the just-created new buffer must be
  // visible to readers.
  Atomic<Buffer*, ReleaseAcquire> mMostRecentBuffer;

  // Stack of reusable buffers.
  // ReleaseAcquire, because when modified, the just-added buffer must be
  // visible to readers.
  Atomic<Buffer*, ReleaseAcquire> mReusableBuffers;

  // Template-provided locking mechanism to protect PopAll()-only member
  // variables below.
  ReaderLocking mReaderLocking;

  // Pointer to the oldest buffer, which contains the new element to be popped.
  // Never null.
  Buffer* mOldestBuffer;

  // Index of the next element to be popped.
  Index mNextElementToPop{ 0 };

  // Stats.
  class AtomicCountAndWatermark
  {
  public:
    explicit AtomicCountAndWatermark(int aCount)
      : mCount(aCount)
      , mWatermark(aCount)
    {
    }

    int Count() const { return int(mCount); }

    CountAndWatermark Get() const
    {
      return CountAndWatermark{ int(mCount), int(mWatermark) };
    }

    int operator++()
    {
      int count = int(++mCount);
      // Update watermark.
      for (;;) {
        int watermark = int(mWatermark);
        if (watermark >= count) {
          // printf("++[%p] -=> %d-%d\n", this, count, watermark);
          break;
        }
        if (mWatermark.compareExchange(watermark, count)) {
          // printf("++[%p] -x> %d-(was %d now %d)\n", this, count, watermark, count);
          break;
        }
      }
      return count;
    }

    int operator--()
    {
      int count = int(--mCount);
      // printf("--[%p] -> %d\n", this, count);
      return count;
    }

  private:
    // Relaxed, as these are just gathering stats, so consistency is not
    // critical.
    Atomic<int, Relaxed> mCount;
    Atomic<int, Relaxed> mWatermark;
  };
  // All buffers in the mMostRecentBuffer deque.
  AtomicCountAndWatermark mLiveBuffersStats;
  // All buffers in the mReusableBuffers stack.
  AtomicCountAndWatermark mReusableBuffersStats;
  // All allocated buffers (sum of above).
  AtomicCountAndWatermark mAllocatedBuffersStats;
};

} // namespace mozilla

#endif // mozilla_MultiWriterQueue_h_
