/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include "IPCStreamSource.h"
#include "mozilla/webrender/WebRenderTypes.h"
#include "nsIAsyncInputStream.h"
#include "nsICancelableRunnable.h"
#include "nsIRunnable.h"
#include "nsISerialEventTarget.h"
#include "nsStreamUtils.h"
#include "nsThreadUtils.h"

using mozilla::dom::workers::Canceling;
using mozilla::dom::workers::GetCurrentThreadWorkerPrivate;
using mozilla::dom::workers::Status;
using mozilla::dom::workers::WorkerPrivate;
using mozilla::wr::ByteBuffer;

namespace mozilla {
namespace ipc {

class IPCStreamSource::Callback final : public nsIInputStreamCallback
                                      , public nsIRunnable
                                      , public nsICancelableRunnable
{
public:
  explicit Callback(IPCStreamSource* aSource)
    : mSource(aSource)
    , mOwningEventTarget(GetCurrentThreadSerialEventTarget())
  {
    MOZ_ASSERT(mSource);
  }

  NS_IMETHOD
  OnInputStreamReady(nsIAsyncInputStream* aStream) override
  {
    // any thread
    if (mOwningEventTarget->IsOnCurrentThread()) {
      return Run();
    }

    // If this fails, then it means the owning thread is a Worker that has
    // been shutdown.  Its ok to lose the event in this case because the
    // IPCStreamChild listens for this event through the WorkerHolder.
    nsresult rv = mOwningEventTarget->Dispatch(this, nsIThread::DISPATCH_NORMAL);
    if (NS_FAILED(rv)) {
      NS_WARNING("Failed to dispatch stream readable event to owning thread");
    }

    return NS_OK;
  }

  NS_IMETHOD
  Run() override
  {
    MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
    if (mSource) {
      mSource->OnStreamReady(this);
    }
    return NS_OK;
  }

  nsresult
  Cancel() override
  {
    // Cancel() gets called when the Worker thread is being shutdown.  We have
    // nothing to do here because IPCStreamChild handles this case via
    // the WorkerHolder.
    return NS_OK;
  }

  void
  ClearSource()
  {
    MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
    MOZ_ASSERT(mSource);
    mSource = nullptr;
  }

private:
  ~Callback()
  {
    // called on any thread

    // ClearSource() should be called before the Callback is destroyed
    MOZ_ASSERT(!mSource);
  }

  // This is a raw pointer because the source keeps alive the callback and,
  // before beeing destroyed, it nullifies this pointer (this happens when
  // ActorDestroyed() is called).
  IPCStreamSource* mSource;

  nsCOMPtr<nsISerialEventTarget> mOwningEventTarget;

  NS_DECL_THREADSAFE_ISUPPORTS
};

NS_IMPL_ISUPPORTS(IPCStreamSource::Callback, nsIInputStreamCallback,
                                             nsIRunnable,
                                             nsICancelableRunnable);

IPCStreamSource::IPCStreamSource(nsIAsyncInputStream* aInputStream)
  : WorkerHolder("IPCStreamSource")
  , mStream(aInputStream)
  , mWorkerPrivate(nullptr)
  , mState(ePending)
{
  MOZ_ASSERT(aInputStream);
}

IPCStreamSource::~IPCStreamSource()
{
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
  MOZ_ASSERT(mState == eClosed);
  MOZ_ASSERT(!mCallback);
  MOZ_ASSERT(!mWorkerPrivate);
}

bool
IPCStreamSource::Initialize()
{
  bool nonBlocking = false;
  MOZ_ALWAYS_TRUE(NS_SUCCEEDED(mStream->IsNonBlocking(&nonBlocking)));
  // IPCStreamChild reads in the current thread, so it is only supported on
  // non-blocking, async channels
  if (!nonBlocking) {
    return false;
  }

  // A source can be used on any thread, but we only support IPCStream on
  // main thread, Workers and PBackground thread right now.  This is due
  // to the requirement  that the thread be guaranteed to live long enough to
  // receive messages. We can enforce this guarantee with a WorkerHolder on
  // worker threads, but not other threads. Main-thread and PBackground thread
  // do not need anything special in order to be kept alive.
  WorkerPrivate* workerPrivate = nullptr;
  if (!NS_IsMainThread()) {
    workerPrivate = GetCurrentThreadWorkerPrivate();
    if (workerPrivate) {
      bool result = HoldWorker(workerPrivate, Canceling);
      if (!result) {
        return false;
      }

      mWorkerPrivate = workerPrivate;
    } else {
      AssertIsOnBackgroundThread();
    }
  }

  return true;
}

void
IPCStreamSource::ActorConstructed()
{
  MOZ_ASSERT(mState == ePending);
  mState = eActorConstructed;
}

bool
IPCStreamSource::Notify(Status aStatus)
{
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);

  // Keep the worker thread alive until the stream is finished.
  return true;
}

void
IPCStreamSource::ActorDestroyed()
{
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);

  mState = eClosed;

  if (mCallback) {
    mCallback->ClearSource();
    mCallback = nullptr;
  }

  if (mWorkerPrivate) {
    ReleaseWorker();
    mWorkerPrivate = nullptr;
  }
}

void
IPCStreamSource::Start()
{
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
  DoRead();
}

void
IPCStreamSource::StartDestroy()
{
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
  OnEnd(NS_ERROR_ABORT);
}

void
IPCStreamSource::DoRead()
{
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
  MOZ_ASSERT(mState == eActorConstructed);
  MOZ_ASSERT(!mCallback);

  // The input stream (likely a pipe) probably uses a segment size of
  // 4kb.  If there is data already buffered it would be nice to aggregate
  // multiple segments into a single IPC call.  Conversely, don't send too
  // too large of a buffer in a single call to avoid spiking memory.
  static const uint64_t kMaxBytesPerMessage = 32 * 1024;
  static_assert(kMaxBytesPerMessage <= static_cast<uint64_t>(UINT32_MAX),
                "kMaxBytesPerMessage must cleanly cast to uint32_t");

  char buffer[kMaxBytesPerMessage];

  while (true) {
    // It should not be possible to transition to closed state without
    // this loop terminating via a return.
    MOZ_ASSERT(mState == eActorConstructed);

    // See if the stream is closed by checking the return of Available.
    uint64_t dummy;
    nsresult rv = mStream->Available(&dummy);
    if (NS_FAILED(rv)) {
      OnEnd(rv);
      return;
    }

    uint32_t bytesRead = 0;
    rv = mStream->Read(buffer, kMaxBytesPerMessage, &bytesRead);

    if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
      MOZ_ASSERT(bytesRead == 0);
      Wait();
      return;
    }

    if (NS_FAILED(rv)) {
      MOZ_ASSERT(bytesRead == 0);
      OnEnd(rv);
      return;
    }

    // Zero-byte read indicates end-of-stream.
    if (bytesRead == 0) {
      OnEnd(NS_BASE_STREAM_CLOSED);
      return;
    }

    // We read some data from the stream, send it across.
    SendData(ByteBuffer(bytesRead, reinterpret_cast<uint8_t*>(buffer)));
  }
}

void
IPCStreamSource::Wait()
{
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
  MOZ_ASSERT(mState == eActorConstructed);
  MOZ_ASSERT(!mCallback);

  // Set mCallback immediately instead of waiting for success.  Its possible
  // AsyncWait() will callback synchronously.
  mCallback = new Callback(this);
  nsresult rv = mStream->AsyncWait(mCallback, 0, 0, nullptr);
  if (NS_FAILED(rv)) {
    OnEnd(rv);
    return;
  }
}

void
IPCStreamSource::OnStreamReady(Callback* aCallback)
{
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
  MOZ_ASSERT(mCallback);
  MOZ_ASSERT(aCallback == mCallback);
  mCallback->ClearSource();
  mCallback = nullptr;
  DoRead();
}

void
IPCStreamSource::OnEnd(nsresult aRv)
{
  NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
  MOZ_ASSERT(aRv != NS_BASE_STREAM_WOULD_BLOCK);

  if (mState == eClosed) {
    return;
  }

  mState = eClosed;

  mStream->CloseWithStatus(aRv);

  if (aRv == NS_BASE_STREAM_CLOSED) {
    aRv = NS_OK;
  }

  // This will trigger an ActorDestroy() from the other side
  Close(aRv);
}

} // namespace ipc
} // namespace mozilla
