// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "media/cast/logging/encoding_event_subscriber.h"

#include <stdint.h>

#include <algorithm>
#include <cstring>
#include <utility>

#include "base/logging.h"
#include "base/numerics/safe_conversions.h"
#include "media/cast/logging/proto/proto_utils.h"

using google::protobuf::RepeatedPtrField;
using media::cast::proto::AggregatedFrameEvent;
using media::cast::proto::AggregatedPacketEvent;
using media::cast::proto::BasePacketEvent;
using media::cast::proto::LogMetadata;

namespace {

// A size limit on maps to keep lookups fast.
const size_t kMaxMapSize = 200;

// The smallest (oredered by RTP timestamp) |kNumMapEntriesToTransfer| entries
// will be moved when the map size reaches |kMaxMapSize|.
// Must be smaller than |kMaxMapSize|.
const size_t kNumMapEntriesToTransfer = 100;

template <typename ProtoPtr>
bool IsRtpTimestampLessThan(const ProtoPtr& lhs, const ProtoPtr& rhs) {
  return lhs->relative_rtp_timestamp() < rhs->relative_rtp_timestamp();
}

BasePacketEvent* GetNewBasePacketEvent(AggregatedPacketEvent* event_proto,
    int packet_id, int size) {
  BasePacketEvent* base = event_proto->add_base_packet_event();
  base->set_packet_id(packet_id);
  base->set_size(size);
  return base;
}

}

namespace media {
namespace cast {

EncodingEventSubscriber::EncodingEventSubscriber(
    EventMediaType event_media_type,
    size_t max_frames)
    : event_media_type_(event_media_type), max_frames_(max_frames) {
  Reset();
}

EncodingEventSubscriber::~EncodingEventSubscriber() {
  DCHECK(thread_checker_.CalledOnValidThread());
}

void EncodingEventSubscriber::OnReceiveFrameEvent(
    const FrameEvent& frame_event) {
  DCHECK(thread_checker_.CalledOnValidThread());

  if (event_media_type_ != frame_event.media_type)
    return;

  const RtpTimeDelta relative_rtp_timestamp =
      GetRelativeRtpTimestamp(frame_event.rtp_timestamp);
  uint32_t lower_32_bits = relative_rtp_timestamp.lower_32_bits();
  FrameEventMap::iterator it = frame_event_map_.find(relative_rtp_timestamp);
  linked_ptr<AggregatedFrameEvent> event_proto;

  // Look up existing entry. If not found, create a new entry and add to map.
  if (it == frame_event_map_.end()) {
    if (!ShouldCreateNewProto(lower_32_bits))
      return;

    IncrementStoredProtoCount(lower_32_bits);
    event_proto.reset(new AggregatedFrameEvent);
    event_proto->set_relative_rtp_timestamp(lower_32_bits);
    frame_event_map_.insert(
        std::make_pair(relative_rtp_timestamp, event_proto));
  } else {
    event_proto = it->second;
    if (event_proto->event_type_size() >= kMaxEventsPerProto) {
      DVLOG(2) << "Too many events in frame " << frame_event.rtp_timestamp
               << ". Using new frame event proto.";
      AddFrameEventToStorage(event_proto);
      if (!ShouldCreateNewProto(lower_32_bits)) {
        frame_event_map_.erase(it);
        return;
      }

      IncrementStoredProtoCount(lower_32_bits);
      event_proto.reset(new AggregatedFrameEvent);
      event_proto->set_relative_rtp_timestamp(lower_32_bits);
      it->second = event_proto;
    }
  }

  event_proto->add_event_type(ToProtoEventType(frame_event.type));
  event_proto->add_event_timestamp_ms(
      (frame_event.timestamp - base::TimeTicks()).InMilliseconds());

  if (frame_event.type == FRAME_CAPTURE_END) {
    if (frame_event.media_type == VIDEO_EVENT &&
        frame_event.width > 0 && frame_event.height > 0) {
      event_proto->set_width(frame_event.width);
      event_proto->set_height(frame_event.height);
    }
  } else if (frame_event.type == FRAME_ENCODED) {
    event_proto->set_encoded_frame_size(frame_event.size);
    if (frame_event.encoder_cpu_utilization >= 0.0) {
      event_proto->set_encoder_cpu_percent_utilized(
          base::saturated_cast<int32_t>(
              frame_event.encoder_cpu_utilization * 100.0 + 0.5));
    }
    if (frame_event.idealized_bitrate_utilization >= 0.0) {
      event_proto->set_idealized_bitrate_percent_utilized(
          base::saturated_cast<int32_t>(
              frame_event.idealized_bitrate_utilization * 100.0 + 0.5));
    }
    if (frame_event.media_type == VIDEO_EVENT) {
      event_proto->set_key_frame(frame_event.key_frame);
      event_proto->set_target_bitrate(frame_event.target_bitrate);
    }
  } else if (frame_event.type == FRAME_PLAYOUT) {
    event_proto->set_delay_millis(frame_event.delay_delta.InMilliseconds());
  }

  if (frame_event_map_.size() > kMaxMapSize)
    TransferFrameEvents(kNumMapEntriesToTransfer);

  DCHECK(frame_event_map_.size() <= kMaxMapSize);
  DCHECK(frame_event_storage_.size() <= max_frames_);
}

void EncodingEventSubscriber::OnReceivePacketEvent(
    const PacketEvent& packet_event) {
  DCHECK(thread_checker_.CalledOnValidThread());

  if (event_media_type_ != packet_event.media_type)
    return;

  const RtpTimeDelta relative_rtp_timestamp =
      GetRelativeRtpTimestamp(packet_event.rtp_timestamp);
  uint32_t lower_32_bits = relative_rtp_timestamp.lower_32_bits();
  PacketEventMap::iterator it =
      packet_event_map_.find(relative_rtp_timestamp);
  linked_ptr<AggregatedPacketEvent> event_proto;
  BasePacketEvent* base_packet_event_proto = NULL;

  // Look up existing entry. If not found, create a new entry and add to map.
  if (it == packet_event_map_.end()) {
    if (!ShouldCreateNewProto(lower_32_bits))
      return;

    IncrementStoredProtoCount(lower_32_bits);
    event_proto.reset(new AggregatedPacketEvent);
    event_proto->set_relative_rtp_timestamp(lower_32_bits);
    packet_event_map_.insert(
        std::make_pair(relative_rtp_timestamp, event_proto));
    base_packet_event_proto = GetNewBasePacketEvent(
        event_proto.get(), packet_event.packet_id, packet_event.size);
  } else {
    // Found existing entry, now look up existing BasePacketEvent using packet
    // ID. If not found, create a new entry and add to proto.
    event_proto = it->second;
    RepeatedPtrField<BasePacketEvent>* field =
        event_proto->mutable_base_packet_event();
    for (RepeatedPtrField<BasePacketEvent>::pointer_iterator base_it =
             field->pointer_begin();
         base_it != field->pointer_end();
         ++base_it) {
      if ((*base_it)->packet_id() == packet_event.packet_id) {
        base_packet_event_proto = *base_it;
        break;
      }
    }
    if (!base_packet_event_proto) {
      if (event_proto->base_packet_event_size() >= kMaxPacketsPerFrame) {
        DVLOG(3) << "Too many packets in AggregatedPacketEvent "
                 << packet_event.rtp_timestamp << ". "
                 << "Using new packet event proto.";
        AddPacketEventToStorage(event_proto);
        if (!ShouldCreateNewProto(lower_32_bits)) {
          packet_event_map_.erase(it);
          return;
        }

        IncrementStoredProtoCount(lower_32_bits);
        event_proto.reset(new AggregatedPacketEvent);
        event_proto->set_relative_rtp_timestamp(lower_32_bits);
        it->second = event_proto;
      }

      base_packet_event_proto = GetNewBasePacketEvent(
          event_proto.get(), packet_event.packet_id, packet_event.size);
    } else if (base_packet_event_proto->event_type_size() >=
               kMaxEventsPerProto) {
      DVLOG(3) << "Too many events in packet "
               << packet_event.rtp_timestamp << ", "
               << packet_event.packet_id << ". Using new packet event proto.";
      AddPacketEventToStorage(event_proto);
      if (!ShouldCreateNewProto(lower_32_bits)) {
        packet_event_map_.erase(it);
        return;
      }

      IncrementStoredProtoCount(lower_32_bits);
      event_proto.reset(new AggregatedPacketEvent);
      event_proto->set_relative_rtp_timestamp(lower_32_bits);
      it->second = event_proto;
      base_packet_event_proto = GetNewBasePacketEvent(
          event_proto.get(), packet_event.packet_id, packet_event.size);
    }
  }

  base_packet_event_proto->add_event_type(
      ToProtoEventType(packet_event.type));
  base_packet_event_proto->add_event_timestamp_ms(
      (packet_event.timestamp - base::TimeTicks()).InMilliseconds());

  // |base_packet_event_proto| could have been created with a receiver event
  // which does not have the packet size and we would need to overwrite it when
  // we see a sender event, which does have the packet size.
  if (packet_event.size > 0) {
    base_packet_event_proto->set_size(packet_event.size);
  }

  if (packet_event_map_.size() > kMaxMapSize)
    TransferPacketEvents(kNumMapEntriesToTransfer);

  DCHECK(packet_event_map_.size() <= kMaxMapSize);
  DCHECK(packet_event_storage_.size() <= max_frames_);
}

void EncodingEventSubscriber::GetEventsAndReset(LogMetadata* metadata,
    FrameEventList* frame_events, PacketEventList* packet_events) {
  DCHECK(thread_checker_.CalledOnValidThread());

  // Flush all events.
  TransferFrameEvents(frame_event_map_.size());
  TransferPacketEvents(packet_event_map_.size());
  std::sort(frame_event_storage_.begin(), frame_event_storage_.end(),
            &IsRtpTimestampLessThan<linked_ptr<AggregatedFrameEvent> >);
  std::sort(packet_event_storage_.begin(), packet_event_storage_.end(),
            &IsRtpTimestampLessThan<linked_ptr<AggregatedPacketEvent> >);

  metadata->set_is_audio(event_media_type_ == AUDIO_EVENT);
  metadata->set_first_rtp_timestamp(first_rtp_timestamp_.lower_32_bits());
  metadata->set_num_frame_events(frame_event_storage_.size());
  metadata->set_num_packet_events(packet_event_storage_.size());
  metadata->set_reference_timestamp_ms_at_unix_epoch(
      (base::TimeTicks::UnixEpoch() - base::TimeTicks()).InMilliseconds());
  frame_events->swap(frame_event_storage_);
  packet_events->swap(packet_event_storage_);
  Reset();
}

void EncodingEventSubscriber::TransferFrameEvents(size_t max_num_entries) {
  DCHECK(frame_event_map_.size() >= max_num_entries);

  FrameEventMap::iterator it = frame_event_map_.begin();
  for (size_t i = 0;
       i < max_num_entries && it != frame_event_map_.end();
       i++, ++it) {
    AddFrameEventToStorage(it->second);
  }

  frame_event_map_.erase(frame_event_map_.begin(), it);
}

void EncodingEventSubscriber::TransferPacketEvents(size_t max_num_entries) {
  PacketEventMap::iterator it = packet_event_map_.begin();
  for (size_t i = 0;
       i < max_num_entries && it != packet_event_map_.end();
       i++, ++it) {
    AddPacketEventToStorage(it->second);
  }

  packet_event_map_.erase(packet_event_map_.begin(), it);
}

void EncodingEventSubscriber::AddFrameEventToStorage(
    const linked_ptr<AggregatedFrameEvent>& frame_event_proto) {
  if (frame_event_storage_.size() >= max_frames_) {
    auto& entry = frame_event_storage_[frame_event_storage_index_];
    DecrementStoredProtoCount(entry->relative_rtp_timestamp());
    entry = frame_event_proto;
  } else {
    frame_event_storage_.push_back(frame_event_proto);
  }

  frame_event_storage_index_ = (frame_event_storage_index_ + 1) % max_frames_;
}

void EncodingEventSubscriber::AddPacketEventToStorage(
    const linked_ptr<AggregatedPacketEvent>& packet_event_proto) {
  if (packet_event_storage_.size() >= max_frames_) {
    auto& entry = packet_event_storage_[packet_event_storage_index_];
    DecrementStoredProtoCount(entry->relative_rtp_timestamp());
    entry = packet_event_proto;
  } else {
    packet_event_storage_.push_back(packet_event_proto);
  }

  packet_event_storage_index_ = (packet_event_storage_index_ + 1) % max_frames_;
}

bool EncodingEventSubscriber::ShouldCreateNewProto(
    uint32_t relative_rtp_timestamp_lower_32_bits) const {
  auto it = stored_proto_counts_.find(relative_rtp_timestamp_lower_32_bits);
  int proto_count = it == stored_proto_counts_.end() ? 0 : it->second;
  DVLOG_IF(2, proto_count >= kMaxProtosPerFrame)
      << relative_rtp_timestamp_lower_32_bits
      << " already reached max number of protos.";
  return proto_count < kMaxProtosPerFrame;
}

void EncodingEventSubscriber::IncrementStoredProtoCount(
    uint32_t relative_rtp_timestamp_lower_32_bits) {
  stored_proto_counts_[relative_rtp_timestamp_lower_32_bits]++;
  DCHECK_LE(stored_proto_counts_[relative_rtp_timestamp_lower_32_bits],
            kMaxProtosPerFrame)
      << relative_rtp_timestamp_lower_32_bits
      << " exceeded max number of event protos.";
}

void EncodingEventSubscriber::DecrementStoredProtoCount(
    uint32_t relative_rtp_timestamp_lower_32_bits) {
  auto it = stored_proto_counts_.find(relative_rtp_timestamp_lower_32_bits);
  DCHECK(it != stored_proto_counts_.end())
      << "no event protos for " << relative_rtp_timestamp_lower_32_bits;
  if (it->second > 1)
    it->second--;
  else
    stored_proto_counts_.erase(it);
}

RtpTimeDelta EncodingEventSubscriber::GetRelativeRtpTimestamp(
    RtpTimeTicks rtp_timestamp) {
  if (!seen_first_rtp_timestamp_) {
    seen_first_rtp_timestamp_ = true;
    first_rtp_timestamp_ = rtp_timestamp;
  }

  return rtp_timestamp - first_rtp_timestamp_;
}

void EncodingEventSubscriber::Reset() {
  frame_event_map_.clear();
  frame_event_storage_.clear();
  frame_event_storage_index_ = 0;
  packet_event_map_.clear();
  packet_event_storage_.clear();
  packet_event_storage_index_ = 0;
  stored_proto_counts_.clear();
  seen_first_rtp_timestamp_ = false;
  first_rtp_timestamp_ = RtpTimeTicks();
}

}  // namespace cast
}  // namespace media
