// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "components/mirroring/service/remoting_sender.h"

#include <algorithm>

#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/logging.h"
#include "base/time/default_tick_clock.h"
#include "media/cast/constants.h"
#include "media/cast/sender/sender_encoded_frame.h"
#include "media/mojo/common/mojo_data_pipe_read_write.h"

namespace mirroring {

RemotingSender::RemotingSender(
    scoped_refptr<media::cast::CastEnvironment> cast_environment,
    media::cast::CastTransport* transport,
    const media::cast::FrameSenderConfig& config,
    mojo::ScopedDataPipeConsumerHandle pipe,
    media::mojom::RemotingDataStreamSenderRequest request,
    base::OnceClosure error_callback)
    : FrameSender(cast_environment,
                  transport,
                  config,
                  media::cast::NewFixedCongestionControl(config.max_bitrate)),
      clock_(cast_environment->Clock()),
      error_callback_(std::move(error_callback)),
      data_pipe_reader_(new media::MojoDataPipeReader(std::move(pipe))),
      binding_(this, std::move(request)),
      input_queue_discards_remaining_(0),
      is_reading_(false),
      flow_restart_pending_(true),
      weak_factory_(this) {
  binding_.set_connection_error_handler(base::BindOnce(
      &RemotingSender::OnRemotingDataStreamError, base::Unretained(this)));
}

RemotingSender::~RemotingSender() {}

void RemotingSender::SendFrame(uint32_t frame_size) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  const bool need_to_start_processing = input_queue_.empty();
  input_queue_.push(base::BindRepeating(&RemotingSender::ReadFrame,
                                        base::Unretained(this), frame_size));
  input_queue_.push(base::BindRepeating(&RemotingSender::TrySendFrame,
                                        base::Unretained(this)));
  if (need_to_start_processing)
    ProcessNextInputTask();
}

void RemotingSender::CancelInFlightData() {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);

// TODO(miu): The following code is something we want to do as an
// optimization. However, as-is, it's not quite correct. We can only cancel
// frames where no packets have actually hit the network yet. Said another
// way, we can only cancel frames the receiver has definitely not seen any
// part of (including kickstarting!). http://crbug.com/647423
#if 0
  if (latest_acked_frame_id_ < last_sent_frame_id_) {
    std::vector<media::cast::FrameId> frames_to_cancel;
    do {
      ++latest_acked_frame_id_;
      frames_to_cancel.push_back(latest_acked_frame_id_);
    } while (latest_acked_frame_id_ < last_sent_frame_id_);
    transport_->CancelSendingFrames(ssrc_, frames_to_cancel);
  }
#endif

  // Flag that all pending input operations should discard data.
  input_queue_discards_remaining_ = input_queue_.size();

  flow_restart_pending_ = true;
  VLOG(1) << "Now restarting because in-flight data was just canceled.";
}

int RemotingSender::GetNumberOfFramesInEncoder() const {
  NOTREACHED();
  return 0;
}

base::TimeDelta RemotingSender::GetInFlightMediaDuration() const {
  NOTREACHED();
  return base::TimeDelta();
}

void RemotingSender::OnCancelSendingFrames() {
  // One or more frames were canceled. This may allow pending input operations
  // to complete.
  ProcessNextInputTask();
}

void RemotingSender::ProcessNextInputTask() {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  if (input_queue_.empty() || is_reading_)
    return;

  input_queue_.front().Run();
}

void RemotingSender::ReadFrame(uint32_t size) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  DCHECK(!is_reading_);
  if (!data_pipe_reader_->IsPipeValid()) {
    VLOG(1) << "Data pipe handle no longer valid.";
    OnRemotingDataStreamError();
    return;
  }

  is_reading_ = true;
  if (input_queue_discards_remaining_ > 0) {
    data_pipe_reader_->Read(
        nullptr, size,
        base::BindOnce(&RemotingSender::OnFrameRead, base::Unretained(this)));
  } else {
    next_frame_data_.resize(size);
    data_pipe_reader_->Read(
        reinterpret_cast<uint8_t*>(base::data(next_frame_data_)), size,
        base::BindOnce(&RemotingSender::OnFrameRead, base::Unretained(this)));
  }
}

void RemotingSender::TrySendFrame() {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  DCHECK(!is_reading_);
  if (input_queue_discards_remaining_ > 0) {
    OnInputTaskComplete();
    return;
  }

  // If there would be too many frames in-flight, do not proceed.
  if (GetUnacknowledgedFrameCount() >= media::cast::kMaxUnackedFrames) {
    VLOG(1) << "Cannot send frame now because too many frames are in flight.";
    return;
  }

  const bool is_first_frame_to_be_sent = last_send_time_.is_null();
  const media::cast::FrameId frame_id = is_first_frame_to_be_sent
                                            ? media::cast::FrameId::first()
                                            : (last_sent_frame_id_ + 1);

  base::TimeTicks last_frame_reference_time = last_send_time_;
  auto remoting_frame = std::make_unique<media::cast::SenderEncodedFrame>();
  remoting_frame->frame_id = frame_id;
  if (flow_restart_pending_) {
    remoting_frame->dependency = media::cast::EncodedFrame::KEY;
    flow_restart_pending_ = false;
  } else {
    DCHECK(!is_first_frame_to_be_sent);
    remoting_frame->dependency = media::cast::EncodedFrame::DEPENDENT;
  }
  remoting_frame->referenced_frame_id =
      remoting_frame->dependency == media::cast::EncodedFrame::KEY
          ? frame_id
          : frame_id - 1;
  remoting_frame->reference_time = clock_->NowTicks();
  remoting_frame->encode_completion_time = remoting_frame->reference_time;
  media::cast::RtpTimeTicks last_frame_rtp_timestamp;
  if (is_first_frame_to_be_sent) {
    last_frame_reference_time = remoting_frame->reference_time;
    last_frame_rtp_timestamp =
        media::cast::RtpTimeTicks() - media::cast::RtpTimeDelta::FromTicks(1);
  } else {
    last_frame_rtp_timestamp = GetRecordedRtpTimestamp(frame_id - 1);
  }
  // Ensure each successive frame's RTP timestamp is unique, but otherwise just
  // base it on the reference time.
  remoting_frame->rtp_timestamp =
      last_frame_rtp_timestamp +
      std::max(media::cast::RtpTimeDelta::FromTicks(1),
               media::cast::RtpTimeDelta::FromTimeDelta(
                   remoting_frame->reference_time - last_frame_reference_time,
                   media::cast::kRemotingRtpTimebase));
  remoting_frame->data.swap(next_frame_data_);

  SendEncodedFrame(0, std::move(remoting_frame));

  OnInputTaskComplete();
}

void RemotingSender::OnFrameRead(bool success) {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  DCHECK(is_reading_);
  is_reading_ = false;
  if (!success) {
    OnRemotingDataStreamError();
    return;
  }
  OnInputTaskComplete();
}

void RemotingSender::OnInputTaskComplete() {
  DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
  DCHECK(!input_queue_.empty());
  input_queue_.pop();
  if (input_queue_discards_remaining_ > 0)
    --input_queue_discards_remaining_;

  // Always force a post task to prevent the stack from growing too deep.
  base::ThreadTaskRunnerHandle::Get()->PostTask(
      FROM_HERE, base::BindOnce(&RemotingSender::ProcessNextInputTask,
                                weak_factory_.GetWeakPtr()));
}

void RemotingSender::OnRemotingDataStreamError() {
  data_pipe_reader_.reset();
  binding_.Close();
  if (!error_callback_.is_null())
    std::move(error_callback_).Run();
}

}  // namespace mirroring
