/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include "IPCBlobInputStreamChild.h"
#include "IPCBlobInputStreamThread.h"

#include "mozilla/ipc/IPCStreamUtils.h"
#include "WorkerHolder.h"
#include "WorkerPrivate.h"
#include "WorkerRunnable.h"

namespace mozilla {
namespace dom {

using namespace workers;

namespace {

// This runnable is used in case the last stream is forgotten on the 'wrong'
// thread.
class ShutdownRunnable final : public CancelableRunnable
{
public:
  explicit ShutdownRunnable(IPCBlobInputStreamChild* aActor)
    : CancelableRunnable("dom::ShutdownRunnable")
    , mActor(aActor)
  {}

  NS_IMETHOD
  Run() override
  {
    mActor->Shutdown();
    return NS_OK;
  }

private:
  RefPtr<IPCBlobInputStreamChild> mActor;
};

// This runnable is used in case StreamNeeded() has been called on a non-owning
// thread.
class StreamNeededRunnable final : public CancelableRunnable
{
public:
  explicit StreamNeededRunnable(IPCBlobInputStreamChild* aActor)
    : CancelableRunnable("dom::StreamNeededRunnable")
    , mActor(aActor)
  {}

  NS_IMETHOD
  Run() override
  {
    MOZ_ASSERT(mActor->State() != IPCBlobInputStreamChild::eActiveMigrating &&
               mActor->State() != IPCBlobInputStreamChild::eInactiveMigrating);
    if (mActor->State() == IPCBlobInputStreamChild::eActive) {
      mActor->SendStreamNeeded();
    }
    return NS_OK;
  }

private:
  RefPtr<IPCBlobInputStreamChild> mActor;
};

// When the stream has been received from the parent, we inform the
// IPCBlobInputStream.
class StreamReadyRunnable final : public CancelableRunnable
{
public:
  StreamReadyRunnable(IPCBlobInputStream* aDestinationStream,
                      already_AddRefed<nsIInputStream> aCreatedStream)
    : CancelableRunnable("dom::StreamReadyRunnable")
    , mDestinationStream(aDestinationStream)
    , mCreatedStream(Move(aCreatedStream))
  {
    MOZ_ASSERT(mDestinationStream);
    // mCreatedStream can be null.
  }

  NS_IMETHOD
  Run() override
  {
    mDestinationStream->StreamReady(mCreatedStream.forget());
    return NS_OK;
  }

private:
  RefPtr<IPCBlobInputStream> mDestinationStream;
  nsCOMPtr<nsIInputStream> mCreatedStream;
};

class IPCBlobInputStreamWorkerHolder final : public WorkerHolder
{
public:
  IPCBlobInputStreamWorkerHolder()
    : WorkerHolder("IPCBlobInputStreamWorkerHolder")
  {}

  bool Notify(Status aStatus) override
  {
    // We must keep the worker alive until the migration is completed.
    return true;
  }
};

class ReleaseWorkerHolderRunnable final : public CancelableRunnable
{
public:
  explicit ReleaseWorkerHolderRunnable(
    UniquePtr<workers::WorkerHolder>&& aWorkerHolder)
    : CancelableRunnable("dom::ReleaseWorkerHolderRunnable")
    , mWorkerHolder(Move(aWorkerHolder))
  {}

  NS_IMETHOD
  Run() override
  {
    mWorkerHolder = nullptr;
    return NS_OK;
  }

  nsresult
  Cancel() override
  {
    return Run();
  }

private:
  UniquePtr<workers::WorkerHolder> mWorkerHolder;
};

} // anonymous

IPCBlobInputStreamChild::IPCBlobInputStreamChild(const nsID& aID,
                                                 uint64_t aSize)
  : mMutex("IPCBlobInputStreamChild::mMutex")
  , mID(aID)
  , mSize(aSize)
  , mState(eActive)
  , mOwningEventTarget(GetCurrentThreadSerialEventTarget())
{
  // If we are running in a worker, we need to send a Close() to the parent side
  // before the thread is released.
  if (!NS_IsMainThread()) {
    WorkerPrivate* workerPrivate = GetCurrentThreadWorkerPrivate();
    if (workerPrivate) {
      UniquePtr<WorkerHolder> workerHolder(
        new IPCBlobInputStreamWorkerHolder());
      if (workerHolder->HoldWorker(workerPrivate, Canceling)) {
        mWorkerHolder.swap(workerHolder);
      }
    }
  }
}

IPCBlobInputStreamChild::~IPCBlobInputStreamChild()
{}

void
IPCBlobInputStreamChild::Shutdown()
{
  MutexAutoLock lock(mMutex);

  RefPtr<IPCBlobInputStreamChild> kungFuDeathGrip = this;

  mWorkerHolder = nullptr;
  mPendingOperations.Clear();

  if (mState == eActive) {
    SendClose();
    mState = eInactive;
  }
}

void
IPCBlobInputStreamChild::ActorDestroy(IProtocol::ActorDestroyReason aReason)
{
  bool migrating = false;

  {
    MutexAutoLock lock(mMutex);
    migrating = mState == eActiveMigrating;
    mState = migrating ? eInactiveMigrating : eInactive;
  }

  if (migrating) {
    // We were waiting for this! Now we can migrate the actor in the correct
    // thread.
    RefPtr<IPCBlobInputStreamThread> thread =
      IPCBlobInputStreamThread::GetOrCreate();
    MOZ_ASSERT(thread, "We cannot continue without DOMFile thread.");

    ResetManager();
    thread->MigrateActor(this);
    return;
  }

  // Let's cleanup the workerHolder and the pending operation queue.
  Shutdown();
}

IPCBlobInputStreamChild::ActorState
IPCBlobInputStreamChild::State()
{
  MutexAutoLock lock(mMutex);
  return mState;
}

already_AddRefed<IPCBlobInputStream>
IPCBlobInputStreamChild::CreateStream()
{
  bool shouldMigrate = false;

  RefPtr<IPCBlobInputStream> stream = new IPCBlobInputStream(this);

  {
    MutexAutoLock lock(mMutex);

    if (mState == eInactive) {
      return nullptr;
    }

    // The stream is active but maybe it is not running in the DOM-File thread.
    // We should migrate it there.
    if (mState == eActive &&
        !IPCBlobInputStreamThread::IsOnFileEventTarget(mOwningEventTarget)) {
      MOZ_ASSERT(mStreams.IsEmpty());
      shouldMigrate = true;
      mState = eActiveMigrating;
    }

    mStreams.AppendElement(stream);
  }

  // Send__delete__ will call ActorDestroy(). mMutex cannot be locked at this
  // time.
  if (shouldMigrate) {
    Send__delete__(this);
  }

  return stream.forget();
}

void
IPCBlobInputStreamChild::ForgetStream(IPCBlobInputStream* aStream)
{
  MOZ_ASSERT(aStream);

  RefPtr<IPCBlobInputStreamChild> kungFuDeathGrip = this;

  {
    MutexAutoLock lock(mMutex);
    mStreams.RemoveElement(aStream);

    if (!mStreams.IsEmpty() || mState != eActive) {
      return;
    }
  }

  if (mOwningEventTarget->IsOnCurrentThread()) {
    Shutdown();
    return;
  }

  RefPtr<ShutdownRunnable> runnable = new ShutdownRunnable(this);
  mOwningEventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
}

void
IPCBlobInputStreamChild::StreamNeeded(IPCBlobInputStream* aStream,
                                      nsIEventTarget* aEventTarget)
{
  MutexAutoLock lock(mMutex);

  if (mState == eInactive) {
    return;
  }

  MOZ_ASSERT(mStreams.Contains(aStream));

  PendingOperation* opt = mPendingOperations.AppendElement();
  opt->mStream = aStream;
  opt->mEventTarget = aEventTarget;

  if (mState == eActiveMigrating || mState == eInactiveMigrating) {
    // This operation will be continued when the migration is completed.
    return;
  }

  MOZ_ASSERT(mState == eActive);

  if (mOwningEventTarget->IsOnCurrentThread()) {
    SendStreamNeeded();
    return;
  }

  RefPtr<StreamNeededRunnable> runnable = new StreamNeededRunnable(this);
  mOwningEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
}

mozilla::ipc::IPCResult
IPCBlobInputStreamChild::RecvStreamReady(const OptionalIPCStream& aStream)
{
  nsCOMPtr<nsIInputStream> stream = mozilla::ipc::DeserializeIPCStream(aStream);

  RefPtr<IPCBlobInputStream> pendingStream;
  nsCOMPtr<nsIEventTarget> eventTarget;

  {
    MutexAutoLock lock(mMutex);
    MOZ_ASSERT(!mPendingOperations.IsEmpty());
    MOZ_ASSERT(mState == eActive);

    pendingStream = mPendingOperations[0].mStream;
    eventTarget = mPendingOperations[0].mEventTarget;

    mPendingOperations.RemoveElementAt(0);
  }

  RefPtr<StreamReadyRunnable> runnable =
    new StreamReadyRunnable(pendingStream, stream.forget());

  // If IPCBlobInputStream::AsyncWait() has been executed without passing an
  // event target, we run the callback synchronous because any thread could be
  // result to be the wrong one. See more in nsIAsyncInputStream::asyncWait
  // documentation.
  if (eventTarget) {
    eventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
  } else {
    runnable->Run();
  }

  return IPC_OK();
}

void
IPCBlobInputStreamChild::Migrated()
{
  MutexAutoLock lock(mMutex);
  MOZ_ASSERT(mState == eInactiveMigrating);

  if (mWorkerHolder) {
    RefPtr<ReleaseWorkerHolderRunnable> runnable =
      new ReleaseWorkerHolderRunnable(Move(mWorkerHolder));
    mOwningEventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
  }

  mOwningEventTarget = GetCurrentThreadSerialEventTarget();
  MOZ_ASSERT(IPCBlobInputStreamThread::IsOnFileEventTarget(mOwningEventTarget));

  // Maybe we have no reasons to keep this actor alive.
  if (mStreams.IsEmpty()) {
    mState = eInactive;
    SendClose();
    return;
  }

  mState = eActive;

  // Let's processing the pending operations. We need a stream for each pending
  // operation.
  for (uint32_t i = 0; i < mPendingOperations.Length(); ++i) {
    SendStreamNeeded();
  }
}

} // namespace dom
} // namespace mozilla
