// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "media/cast/net/pacing/paced_sender.h"

#include "base/big_endian.h"
#include "base/bind.h"
#include "base/debug/dump_without_crashing.h"
#include "base/message_loop/message_loop.h"
#include "base/numerics/safe_conversions.h"

namespace media {
namespace cast {

namespace {

static const int64_t kPacingIntervalMs = 10;
// Each frame will be split into no more than kPacingMaxBurstsPerFrame
// bursts of packets.
static const size_t kPacingMaxBurstsPerFrame = 3;
static const size_t kMaxDedupeWindowMs = 500;

// "Impossible" upper-bound on the maximum number of packets that should ever be
// enqueued in the pacer.  This is used to detect bugs, reported as crash dumps.
static const size_t kHugeQueueLengthSeconds = 10;
static const size_t kRidiculousNumberOfPackets =
    kHugeQueueLengthSeconds * (kMaxBurstSize * 1000 / kPacingIntervalMs);

}  // namespace

DedupInfo::DedupInfo() : last_byte_acked_for_audio(0) {}

PacketKey::PacketKey() : ssrc(0), packet_id(0) {}

PacketKey::PacketKey(base::TimeTicks capture_time,
                     uint32_t ssrc,
                     FrameId frame_id,
                     uint16_t packet_id)
    : capture_time(capture_time),
      ssrc(ssrc),
      frame_id(frame_id),
      packet_id(packet_id) {}

PacketKey::PacketKey(const PacketKey& other) = default;

PacketKey::~PacketKey() {}

struct PacedSender::PacketSendRecord {
  PacketSendRecord()
      : last_byte_sent(0), last_byte_sent_for_audio(0), cancel_count(0) {}

  base::TimeTicks time;    // Time when the packet was sent.
  int64_t last_byte_sent;  // Number of bytes sent to network just after this
                           // packet was sent.
  int64_t last_byte_sent_for_audio;  // Number of bytes sent to network from
                                     // audio stream just before this packet.
  int cancel_count;  // Number of times the packet was canceled (debugging).
};

struct PacedSender::RtpSession {
  explicit RtpSession(bool is_audio_stream)
      : last_byte_sent(0), is_audio(is_audio_stream) {}
  RtpSession() {}

  // Tracks recently-logged RTP timestamps so that it can expand the truncated
  // values found in packets.
  RtpTimeTicks last_logged_rtp_timestamp_;
  int64_t last_byte_sent;
  bool is_audio;
};

PacedSender::PacedSender(
    size_t target_burst_size,
    size_t max_burst_size,
    base::TickClock* clock,
    std::vector<PacketEvent>* recent_packet_events,
    PacketTransport* transport,
    const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner)
    : clock_(clock),
      recent_packet_events_(recent_packet_events),
      transport_(transport),
      transport_task_runner_(transport_task_runner),
      last_byte_sent_for_audio_(0),
      target_burst_size_(target_burst_size),
      max_burst_size_(max_burst_size),
      current_max_burst_size_(target_burst_size_),
      next_max_burst_size_(target_burst_size_),
      next_next_max_burst_size_(target_burst_size_),
      current_burst_size_(0),
      state_(State_Unblocked),
      has_reached_upper_bound_once_(false),
      weak_factory_(this) {}

PacedSender::~PacedSender() {}

void PacedSender::RegisterSsrc(uint32_t ssrc, bool is_audio) {
  if (sessions_.find(ssrc) != sessions_.end())
    DVLOG(1) << "Re-register ssrc: " << ssrc;

  sessions_[ssrc] = RtpSession(is_audio);
}

void PacedSender::RegisterPrioritySsrc(uint32_t ssrc) {
  priority_ssrcs_.push_back(ssrc);
}

int64_t PacedSender::GetLastByteSentForPacket(const PacketKey& packet_key) {
  PacketSendHistory::const_iterator it = send_history_.find(packet_key);
  if (it == send_history_.end())
    return 0;
  return it->second.last_byte_sent;
}

int64_t PacedSender::GetLastByteSentForSsrc(uint32_t ssrc) {
  auto it = sessions_.find(ssrc);
  // Return 0 for unknown session.
  if (it == sessions_.end())
    return 0;
  return it->second.last_byte_sent;
}

bool PacedSender::SendPackets(const SendPacketVector& packets) {
  if (packets.empty()) {
    return true;
  }
  const bool high_priority = IsHighPriority(packets.begin()->first);
  for (size_t i = 0; i < packets.size(); i++) {
    if (VLOG_IS_ON(2)) {
      PacketSendHistory::const_iterator history_it =
          send_history_.find(packets[i].first);
      if (history_it != send_history_.end() &&
          history_it->second.cancel_count > 0) {
        VLOG(2) << "PacedSender::SendPackets() called for packet CANCELED "
                << history_it->second.cancel_count << " times: "
                << "ssrc=" << packets[i].first.ssrc
                << ", frame_id=" << packets[i].first.frame_id
                << ", packet_id=" << packets[i].first.packet_id;
      }
    }

    DCHECK(IsHighPriority(packets[i].first) == high_priority);
    if (high_priority) {
      priority_packet_list_[packets[i].first] =
          make_pair(PacketType_Normal, packets[i].second);
    } else {
      packet_list_[packets[i].first] =
          make_pair(PacketType_Normal, packets[i].second);
    }
  }
  if (state_ == State_Unblocked) {
    SendStoredPackets();
  }
  return true;
}

bool PacedSender::ShouldResend(const PacketKey& packet_key,
                               const DedupInfo& dedup_info,
                               const base::TimeTicks& now) {
  PacketSendHistory::const_iterator it = send_history_.find(packet_key);

  // No history of previous transmission. It might be sent too long ago.
  if (it == send_history_.end())
    return true;

  // Suppose there is request to retransmit X and there is an audio
  // packet Y sent just before X. Reject retransmission of X if ACK for
  // Y has not been received.
  // Only do this for video packets.
  //
  // TODO(miu): This sounds wrong.  Audio packets are always transmitted first
  // (because they are put in |priority_packet_list_|, see PopNextPacket()).
  auto session_it = sessions_.find(packet_key.ssrc);
  // The session should always have been registered in |sessions_|.
  DCHECK(session_it != sessions_.end());
  if (!session_it->second.is_audio) {
    if (dedup_info.last_byte_acked_for_audio &&
        it->second.last_byte_sent_for_audio &&
        dedup_info.last_byte_acked_for_audio <
        it->second.last_byte_sent_for_audio) {
      return false;
    }
  }
  // Retransmission interval has to be greater than |resend_interval|.
  if (now - it->second.time < dedup_info.resend_interval)
    return false;
  return true;
}

bool PacedSender::ResendPackets(const SendPacketVector& packets,
                                const DedupInfo& dedup_info) {
  if (packets.empty()) {
    return true;
  }
  const bool high_priority = IsHighPriority(packets.begin()->first);
  const base::TimeTicks now = clock_->NowTicks();
  for (size_t i = 0; i < packets.size(); i++) {
    if (VLOG_IS_ON(2)) {
      PacketSendHistory::const_iterator history_it =
          send_history_.find(packets[i].first);
      if (history_it != send_history_.end() &&
          history_it->second.cancel_count > 0) {
        VLOG(2) << "PacedSender::ReendPackets() called for packet CANCELED "
                << history_it->second.cancel_count << " times: "
                << "ssrc=" << packets[i].first.ssrc
                << ", frame_id=" << packets[i].first.frame_id
                << ", packet_id=" << packets[i].first.packet_id;
      }
    }

    if (!ShouldResend(packets[i].first, dedup_info, now)) {
      LogPacketEvent(packets[i].second->data, PACKET_RTX_REJECTED);
      continue;
    }

    DCHECK(IsHighPriority(packets[i].first) == high_priority);
    if (high_priority) {
      priority_packet_list_[packets[i].first] =
          make_pair(PacketType_Resend, packets[i].second);
    } else {
      packet_list_[packets[i].first] =
          make_pair(PacketType_Resend, packets[i].second);
    }
  }
  if (state_ == State_Unblocked) {
    SendStoredPackets();
  }
  return true;
}

bool PacedSender::SendRtcpPacket(uint32_t ssrc, PacketRef packet) {
  if (state_ == State_TransportBlocked) {
    const PacketKey key(base::TimeTicks(), ssrc, FrameId::first(), 0);
    priority_packet_list_[key] = make_pair(PacketType_RTCP, packet);
  } else {
    // We pass the RTCP packets straight through.
    if (!transport_->SendPacket(
            packet,
            base::Bind(&PacedSender::SendStoredPackets,
                       weak_factory_.GetWeakPtr()))) {
      state_ = State_TransportBlocked;
    }
  }
  return true;
}

void PacedSender::CancelSendingPacket(const PacketKey& packet_key) {
  packet_list_.erase(packet_key);
  priority_packet_list_.erase(packet_key);

  if (VLOG_IS_ON(2)) {
    PacketSendHistory::iterator history_it = send_history_.find(packet_key);
    if (history_it != send_history_.end())
      ++history_it->second.cancel_count;
  }
}

PacketRef PacedSender::PopNextPacket(PacketType* packet_type,
                                     PacketKey* packet_key) {
  // Always pop from the priority list first.
  PacketList* list = !priority_packet_list_.empty() ?
      &priority_packet_list_ : &packet_list_;
  DCHECK(!list->empty());

  // Determine which packet in the frame should be popped by examining the
  // |send_history_| for prior transmission attempts.  Packets that have never
  // been transmitted will be popped first.  If all packets have transmitted
  // before, pop the one that has not been re-attempted for the longest time.
  PacketList::iterator it = list->begin();
  PacketKey last_key = it->first;
  last_key.packet_id = UINT16_C(0xffff);
  PacketSendHistory::const_iterator history_it =
      send_history_.lower_bound(it->first);
  base::TimeTicks earliest_send_time =
      base::TimeTicks() + base::TimeDelta::Max();
  PacketList::iterator found_it = it;
  while (true) {
    if (history_it == send_history_.end() || it->first < history_it->first) {
      // There is no send history for this packet, which means it has not been
      // transmitted yet.
      found_it = it;
      break;
    }

    DCHECK(it->first == history_it->first);
    if (history_it->second.time < earliest_send_time) {
      earliest_send_time = history_it->second.time;
      found_it = it;
    }

    // Advance to next packet for the current frame, or break if there are no
    // more.
    ++it;
    if (it == list->end() || last_key < it->first)
      break;

    // Advance to next history entry.  Since there may be "holes" in the packet
    // list (e.g., due to packets canceled for retransmission), it's possible
    // |history_it| will have to be advanced more than once even though |it| was
    // only advanced once.
    do {
      ++history_it;
    } while (history_it != send_history_.end() &&
             history_it->first < it->first);
  }

  *packet_type = found_it->second.first;
  *packet_key = found_it->first;
  PacketRef ret = found_it->second.second;
  list->erase(found_it);
  return ret;
}

bool PacedSender::IsHighPriority(const PacketKey& packet_key) const {
  return std::find(priority_ssrcs_.begin(), priority_ssrcs_.end(),
                   packet_key.ssrc) != priority_ssrcs_.end();
}

bool PacedSender::empty() const {
  return packet_list_.empty() && priority_packet_list_.empty();
}

size_t PacedSender::size() const {
  return packet_list_.size() + priority_packet_list_.size();
}

// This function can be called from three places:
// 1. User called one of the Send* functions and we were in an unblocked state.
// 2. state_ == State_TransportBlocked and the transport is calling us to
//    let us know that it's ok to send again.
// 3. state_ == State_BurstFull and there are still packets to send. In this
//    case we called PostDelayedTask on this function to start a new burst.
void PacedSender::SendStoredPackets() {
  State previous_state = state_;
  state_ = State_Unblocked;
  if (empty()) {
    return;
  }

  // If the queue ever becomes impossibly long, send a crash dump without
  // actually crashing the process.
  if (size() > kRidiculousNumberOfPackets && !has_reached_upper_bound_once_) {
    NOTREACHED();
    // Please use Cr=Internals-Cast label in bug reports:
    base::debug::DumpWithoutCrashing();
    has_reached_upper_bound_once_ = true;
  }

  base::TimeTicks now = clock_->NowTicks();
  // I don't actually trust that PostDelayTask(x - now) will mean that
  // now >= x when the call happens, so check if the previous state was
  // State_BurstFull too.
  if (now >= burst_end_ || previous_state == State_BurstFull) {
    // Start a new burst.
    current_burst_size_ = 0;
    burst_end_ = now + base::TimeDelta::FromMilliseconds(kPacingIntervalMs);

    // The goal here is to try to send out the queued packets over the next
    // three bursts, while trying to keep the burst size below 10 if possible.
    // We have some evidence that sending more than 12 packets in a row doesn't
    // work very well, but we don't actually know why yet. Sending out packets
    // sooner is better than sending out packets later as that gives us more
    // time to re-send them if needed. So if we have less than 30 packets, just
    // send 10 at a time. If we have less than 60 packets, send n / 3 at a time.
    // if we have more than 60, we send 20 at a time. 20 packets is ~24Mbit/s
    // which is more bandwidth than the cast library should need, and sending
    // out more data per second is unlikely to be helpful.
    size_t max_burst_size = std::min(
        max_burst_size_,
        std::max(target_burst_size_, size() / kPacingMaxBurstsPerFrame));
    current_max_burst_size_ = std::max(next_max_burst_size_, max_burst_size);
    next_max_burst_size_ = std::max(next_next_max_burst_size_, max_burst_size);
    next_next_max_burst_size_ = max_burst_size;
  }

  base::Closure cb = base::Bind(&PacedSender::SendStoredPackets,
                                weak_factory_.GetWeakPtr());
  while (!empty()) {
    if (current_burst_size_ >= current_max_burst_size_) {
      transport_task_runner_->PostDelayedTask(FROM_HERE,
                                              cb,
                                              burst_end_ - now);
      state_ = State_BurstFull;
      return;
    }
    PacketType packet_type;
    PacketKey packet_key;
    PacketRef packet = PopNextPacket(&packet_type, &packet_key);
    PacketSendRecord* const send_record = &(send_history_[packet_key]);
    send_record->time = now;

    if (send_record->cancel_count > 0 && packet_type != PacketType_RTCP) {
      VLOG(2) << "PacedSender is sending a packet known to have been CANCELED "
              << send_record->cancel_count << " times: "
              << "ssrc=" << packet_key.ssrc
              << ", frame_id=" << packet_key.frame_id
              << ", packet_id=" << packet_key.packet_id;
    }

    switch (packet_type) {
      case PacketType_Resend:
        LogPacketEvent(packet->data, PACKET_RETRANSMITTED);
        break;
      case PacketType_Normal:
        LogPacketEvent(packet->data, PACKET_SENT_TO_NETWORK);
        break;
      case PacketType_RTCP:
        break;
    }

    const bool socket_blocked = !transport_->SendPacket(packet, cb);

    // Save the send record.
    send_record->last_byte_sent = transport_->GetBytesSent();
    send_record->last_byte_sent_for_audio = last_byte_sent_for_audio_;
    send_history_buffer_[packet_key] = *send_record;

    auto it = sessions_.find(packet_key.ssrc);
    // The session should always have been registered in |sessions_|.
    DCHECK(it != sessions_.end());
    it->second.last_byte_sent = send_record->last_byte_sent;
    if (it->second.is_audio)
      last_byte_sent_for_audio_ = send_record->last_byte_sent;

    if (socket_blocked) {
      state_ = State_TransportBlocked;
      return;
    }
    current_burst_size_++;
  }

  // Keep ~0.5 seconds of data (1000 packets).
  //
  // TODO(miu): This has no relation to the actual size of the frames, and so
  // there's no way to reason whether 1000 is enough or too much, or whatever.
  if (send_history_buffer_.size() >=
      max_burst_size_ * kMaxDedupeWindowMs / kPacingIntervalMs) {
    send_history_.swap(send_history_buffer_);
    send_history_buffer_.clear();
  }
  DCHECK_LE(send_history_buffer_.size(),
            max_burst_size_ * kMaxDedupeWindowMs / kPacingIntervalMs);
  state_ = State_Unblocked;
}

void PacedSender::LogPacketEvent(const Packet& packet, CastLoggingEvent type) {
  if (!recent_packet_events_)
    return;

  recent_packet_events_->push_back(PacketEvent());
  PacketEvent& event = recent_packet_events_->back();

  // Populate the new PacketEvent by parsing the wire-format |packet|.
  //
  // TODO(miu): This parsing logic belongs in RtpParser.
  event.timestamp = clock_->NowTicks();
  event.type = type;
  base::BigEndianReader reader(reinterpret_cast<const char*>(&packet[0]),
                               packet.size());
  bool success = reader.Skip(4);
  uint32_t truncated_rtp_timestamp;
  success &= reader.ReadU32(&truncated_rtp_timestamp);
  uint32_t ssrc;
  success &= reader.ReadU32(&ssrc);

  auto it = sessions_.find(ssrc);
  // The session should always have been registered in |sessions_|.
  DCHECK(it != sessions_.end());
  event.rtp_timestamp = it->second.last_logged_rtp_timestamp_ =
      it->second.last_logged_rtp_timestamp_.Expand(truncated_rtp_timestamp);
  event.media_type = it->second.is_audio ? AUDIO_EVENT : VIDEO_EVENT;
  success &= reader.Skip(2);
  success &= reader.ReadU16(&event.packet_id);
  success &= reader.ReadU16(&event.max_packet_id);
  event.size = base::checked_cast<uint32_t>(packet.size());
  DCHECK(success);
}

}  // namespace cast
}  // namespace media
