/* GStreamer
 *
 * Copyright (C) 2013 Collabora Ltd.
 *   @author Julien Isorce <julien.isorce@collabora.co.uk>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
 */

#include <gst/check/gstcheck.h>
#include <gst/check/gstconsistencychecker.h>

#include <gst/rtp/gstrtpbuffer.h>

/* For ease of programming we use globals to keep refs for our floating
 * src and sink pads we create; otherwise we always have to do get_pad,
 * get_peer, and then remove references in every test function */
static GstPad *srcpad, *sinkpad;
/* we also have a list of src buffers */
static GList *inbuffers = NULL;

#define RTP_CAPS_STRING    \
    "application/x-rtp, "               \
    "media = (string)audio, "           \
    "payload = (int) 0, "               \
    "clock-rate = (int) 8000, "         \
    "ssrc = (uint) 42, "                \
    "encoding-name = (string)PCMU"

#define RTP_FRAME_SIZE 20

static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
    GST_PAD_SINK,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS ("application/x-rtp")
    );
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
    GST_PAD_SRC,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS ("application/x-rtp")
    );

static void
setup_rtprtx (GstElement * rtprtxsend, GstElement * rtprtxreceive,
    gint num_buffers)
{
  GstBuffer *buffer;
  GstPad *sendsrcpad;
  GstPad *receivesinkpad;
  gboolean ret = FALSE;

  /* a 20 sample audio block (2,5 ms) generated with
   * gst-launch audiotestsrc wave=silence blocksize=40 num-buffers=3 !
   *    "audio/x-raw,channels=1,rate=8000" ! mulawenc ! rtppcmupay !
   *     fakesink dump=1
   */
  guint8 in[] = {               /* first 4 bytes are rtp-header, next 4 bytes are timestamp */
    0x80, 0x80, 0x1c, 0x24, 0x46, 0xcd, 0xb7, 0x11, 0x3c, 0x3a, 0x7c, 0x5b,
    0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
    0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff
  };
  GstClockTime ts = G_GUINT64_CONSTANT (0);
  GstClockTime tso = gst_util_uint64_scale (RTP_FRAME_SIZE, GST_SECOND, 8000);
  gint i;

  srcpad = gst_check_setup_src_pad (rtprtxsend, &srctemplate);
  sendsrcpad = gst_element_get_static_pad (rtprtxsend, "src");
  ret = gst_pad_set_active (srcpad, TRUE);
  fail_if (ret == FALSE);

  sinkpad = gst_check_setup_sink_pad (rtprtxreceive, &sinktemplate);
  receivesinkpad = gst_element_get_static_pad (rtprtxreceive, "sink");
  ret = gst_pad_set_active (sinkpad, TRUE);
  fail_if (ret == FALSE);

  fail_if (gst_pad_link (sendsrcpad, receivesinkpad) != GST_PAD_LINK_OK);

  ret = gst_pad_set_active (sendsrcpad, TRUE);
  fail_if (ret == FALSE);
  ret = gst_pad_set_active (receivesinkpad, TRUE);
  fail_if (ret == FALSE);

  gst_object_unref (sendsrcpad);
  gst_object_unref (receivesinkpad);

  for (i = 0; i < num_buffers; i++) {
    buffer = gst_buffer_new_and_alloc (sizeof (in));
    gst_buffer_fill (buffer, 0, in, sizeof (in));
    GST_BUFFER_DTS (buffer) = ts;
    GST_BUFFER_PTS (buffer) = ts;
    GST_BUFFER_DURATION (buffer) = tso;
    GST_DEBUG ("created buffer: %p", buffer);

    /*if (!i)
       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); */

    inbuffers = g_list_append (inbuffers, buffer);

    /* hackish way to update the rtp header */
    in[1] = 0x00;
    in[3]++;                    /* seqnumber */
    in[7] += RTP_FRAME_SIZE;    /* inc. timestamp with framesize */
    ts += tso;
  }
}

static GstStateChangeReturn
start_rtprtx (GstElement * element)
{
  GstStateChangeReturn ret;

  ret = gst_element_set_state (element, GST_STATE_PLAYING);
  ck_assert_int_ne (ret, GST_STATE_CHANGE_FAILURE);

  ret = gst_element_get_state (element, NULL, NULL, GST_CLOCK_TIME_NONE);
  ck_assert_int_ne (ret, GST_STATE_CHANGE_FAILURE);

  return ret;
}

static void
cleanup_rtprtx (GstElement * rtprtxsend, GstElement * rtprtxreceive)
{
  GST_DEBUG ("cleanup_rtprtx");

  g_list_free (inbuffers);
  inbuffers = NULL;

  gst_pad_set_active (srcpad, FALSE);
  gst_check_teardown_src_pad (rtprtxsend);
  gst_check_teardown_element (rtprtxsend);

  gst_pad_set_active (sinkpad, FALSE);
  gst_check_teardown_sink_pad (rtprtxreceive);
  gst_check_teardown_element (rtprtxreceive);
}

static void
check_rtprtx_results (GstElement * rtprtxsend, GstElement * rtprtxreceive,
    gint num_buffers)
{
  guint nbrtxrequests = 0;
  guint nbrtxpackets = 0;

  g_object_get (G_OBJECT (rtprtxsend), "num-rtx-requests", &nbrtxrequests,
      NULL);
  fail_unless_equals_int (nbrtxrequests, 3);

  g_object_get (G_OBJECT (rtprtxsend), "num-rtx-packets", &nbrtxpackets, NULL);
  fail_unless_equals_int (nbrtxpackets, 3);

  g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-requests", &nbrtxrequests,
      NULL);
  fail_unless_equals_int (nbrtxrequests, 3);

  g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-packets", &nbrtxpackets,
      NULL);
  fail_unless_equals_int (nbrtxpackets, 3);

  g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-assoc-packets",
      &nbrtxpackets, NULL);
  fail_unless_equals_int (nbrtxpackets, 3);
}


GST_START_TEST (test_push_forward_seq)
{
  GstElement *rtprtxsend;
  GstElement *rtprtxreceive;
  const guint num_buffers = 4;
  GList *node;
  gint i = 0;
  GstCaps *caps = NULL;
  GstStructure *pt_map;

  rtprtxsend = gst_check_setup_element ("rtprtxsend");
  rtprtxreceive = gst_check_setup_element ("rtprtxreceive");
  setup_rtprtx (rtprtxsend, rtprtxreceive, num_buffers);
  GST_DEBUG ("setup_rtprtx");

  fail_unless (start_rtprtx (rtprtxsend)
      == GST_STATE_CHANGE_SUCCESS, "could not set to playing");

  fail_unless (start_rtprtx (rtprtxreceive)
      == GST_STATE_CHANGE_SUCCESS, "could not set to playing");

  caps = gst_caps_from_string (RTP_CAPS_STRING);
  gst_check_setup_events (srcpad, rtprtxsend, caps, GST_FORMAT_TIME);
  gst_caps_unref (caps);

  pt_map = gst_structure_new ("application/x-rtp-pt-map",
      "0", G_TYPE_UINT, 97, NULL);
  g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL);
  g_object_set (rtprtxreceive, "payload-type-map", pt_map, NULL);
  gst_structure_free (pt_map);

  /* push buffers: 0,1,2, */
  for (node = inbuffers; node; node = g_list_next (node)) {
    GstEvent *event = NULL;
    GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
    GstBuffer *buffer = (GstBuffer *) node->data;
    GList *last_out_buffer;
    guint64 end_time;
    gboolean res;

    gst_buffer_ref (buffer);
    fail_unless_equals_int (gst_pad_push (srcpad, buffer), GST_FLOW_OK);

    if (i < 3) {
      gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
      event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
          gst_structure_new ("GstRTPRetransmissionRequest",
              "seqnum", G_TYPE_UINT, (guint) gst_rtp_buffer_get_seq (&rtp),
              "ssrc", G_TYPE_UINT, (guint) gst_rtp_buffer_get_ssrc (&rtp),
              "payload-type", G_TYPE_UINT,
              (guint) gst_rtp_buffer_get_payload_type (&rtp), NULL));
      gst_rtp_buffer_unmap (&rtp);

      /* synchronize with the chain() function of the "sinkpad"
       * to make sure that rtxsend has pushed the rtx buffer out
       * before continuing */
      last_out_buffer = g_list_last (buffers);
      g_mutex_lock (&check_mutex);
      fail_unless (gst_pad_push_event (sinkpad, event));
      end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;
      do
        res = g_cond_wait_until (&check_cond, &check_mutex, end_time);
      while (res == TRUE && last_out_buffer == g_list_last (buffers));
      g_mutex_unlock (&check_mutex);
    }
    gst_buffer_unref (buffer);
    ++i;
  }

  /* check the buffer list */
  check_rtprtx_results (rtprtxsend, rtprtxreceive, num_buffers);

  /* cleanup */
  cleanup_rtprtx (rtprtxsend, rtprtxreceive);
}

GST_END_TEST;

static void
message_received (GstBus * bus, GstMessage * message, gboolean * eos)
{
  GST_INFO ("bus message from \"%" GST_PTR_FORMAT "\": %" GST_PTR_FORMAT,
      GST_MESSAGE_SRC (message), message);

  switch (message->type) {
    case GST_MESSAGE_EOS:
      *eos = TRUE;
      break;
    case GST_MESSAGE_WARNING:{
      GError *gerror;
      gchar *debug;

      gst_message_parse_warning (message, &gerror, &debug);
      gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
      g_error_free (gerror);
      g_free (debug);
      break;
    }
    case GST_MESSAGE_ERROR:{
      GError *gerror;
      gchar *debug;

      gst_message_parse_error (message, &gerror, &debug);
      gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
      fail ("Error: %s / %s", gerror->message, debug);
      g_error_free (gerror);
      g_free (debug);
      break;
    }
    default:
      break;
  }
}

typedef struct
{
  guint count;
  guint nb_packets;
  guint drop_every_n_packets;
} RTXSendData;

typedef struct
{
  guint nb_packets;
  guint seqnum_offset;
  guint seqnum_prev;
} RTXReceiveData;

static GstPadProbeReturn
do_buffer_list_as_buffers_probe (GstPad * pad, GstPadProbeInfo * info,
    gpointer user_data, GstPadProbeCallback callback)
{
  /* Iterate the buffer list, removing any items that we're
   * told to drop and creating a new bufferlist. If all buffers
   * are dropped, return DROP.
   */
  guint i, len;
  GstBufferList *list;
  GstBufferList *outlist;
  GstPadProbeInfo buf_info = *info;

  GST_INFO_OBJECT (pad, "probing each buffer in list individually");

  list = gst_pad_probe_info_get_buffer_list (info);

  g_return_val_if_fail (list != NULL, GST_PAD_PROBE_REMOVE);

  len = gst_buffer_list_length (list);
  outlist = gst_buffer_list_new_sized (len);

  buf_info.type = GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH;
  for (i = 0; i < len; i++) {
    GstBuffer *buffer = gst_buffer_list_get (list, i);
    GstPadProbeReturn ret;
    buf_info.data = buffer;
    ret = callback (pad, &buf_info, user_data);
    /* If the buffer wasn't dropped, add it to the output list */
    if (ret != GST_PAD_PROBE_DROP)
      gst_buffer_list_insert (outlist, -1, gst_buffer_ref (buffer));
  }

  len = gst_buffer_list_length (outlist);
  if (len == 0) {
    /* Everything was discarded, drop our outlist */
    gst_buffer_list_unref (outlist);
    return GST_PAD_PROBE_DROP;
  }

  /* Replace the original buffer list with the modified one */
  gst_buffer_list_unref (list);
  info->data = outlist;
  return GST_PAD_PROBE_OK;
}

static GstPadProbeReturn
rtprtxsend_srcpad_probe (GstPad * pad, GstPadProbeInfo * info,
    gpointer user_data)
{
  GstPadProbeReturn ret = GST_PAD_PROBE_OK;

  GST_LOG_OBJECT (pad, "here");
  if (info->type == (GST_PAD_PROBE_TYPE_BUFFER_LIST | GST_PAD_PROBE_TYPE_PUSH))
    return do_buffer_list_as_buffers_probe (pad, info, user_data,
        rtprtxsend_srcpad_probe);

  if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) {
    GstBuffer *buffer = GST_BUFFER (info->data);
    RTXSendData *rtxdata = (RTXSendData *) user_data;
    GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
    guint payload_type = 0;

    gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
    payload_type = gst_rtp_buffer_get_payload_type (&rtp);

    /* main stream packets */
    if (payload_type == 96) {
      /* count packets of the main stream */
      ++rtxdata->nb_packets;
      /* drop some packets */
      if (rtxdata->count < rtxdata->drop_every_n_packets) {
        ++rtxdata->count;
      } else {
        /* drop a packet every 'rtxdata->count' packets */
        rtxdata->count = 1;
        ret = GST_PAD_PROBE_DROP;
      }
    } else {
      /* retransmission packets */
    }

    gst_rtp_buffer_unmap (&rtp);
  }

  return ret;
}

static GstPadProbeReturn
rtprtxreceive_srcpad_probe (GstPad * pad, GstPadProbeInfo * info,
    gpointer user_data)
{
  if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) {
    GstBuffer *buffer = GST_BUFFER (info->data);
    RTXReceiveData *rtxdata = (RTXReceiveData *) user_data;
    GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
    guint seqnum = 0;
    guint i = 0;

    gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
    seqnum = gst_rtp_buffer_get_seq (&rtp);

    /* check if there is a dropped packet */
    if (seqnum > rtxdata->seqnum_prev + rtxdata->seqnum_offset) {
      GstPad *peerpad = gst_pad_get_peer (pad);

      /* ask retransmission of missing packet */
      for (i = rtxdata->seqnum_prev + rtxdata->seqnum_offset; i < seqnum;
          i += rtxdata->seqnum_offset) {
        GstEvent *event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
            gst_structure_new ("GstRTPRetransmissionRequest",
                "seqnum", G_TYPE_UINT, i,
                "ssrc", G_TYPE_UINT, gst_rtp_buffer_get_ssrc (&rtp),
                "payload-type", G_TYPE_UINT,
                gst_rtp_buffer_get_payload_type (&rtp),
                NULL));
        gst_pad_push_event (peerpad, event);
      }
      gst_object_unref (peerpad);

      rtxdata->seqnum_prev = seqnum;
    } else if (seqnum == rtxdata->seqnum_prev + rtxdata->seqnum_offset) {
      /* also update previous seqnum in this case */
      rtxdata->seqnum_prev = seqnum;
    }

    gst_rtp_buffer_unmap (&rtp);

    ++rtxdata->nb_packets;
  }

  return GST_PAD_PROBE_OK;
}

static void
start_test_drop_and_check_results (GstElement * bin, GstElement * rtppayloader,
    GstElement * rtprtxsend, GstElement * rtprtxreceive,
    RTXSendData * send_rtxdata, RTXReceiveData * receive_rtxdata,
    guint drop_every_n_packets, gboolean * eos)
{
  GstStateChangeReturn state_res = GST_STATE_CHANGE_FAILURE;
  guint nbrtxrequests = 0;
  guint nbrtxpackets = 0;
  guint nb_expected_requests = 0;
  GstStructure *pt_map;

  GST_INFO ("starting test");

  pt_map = gst_structure_new ("application/x-rtp-pt-map",
      "96", G_TYPE_UINT, 99, NULL);
  g_object_set (rtppayloader, "pt", 96, NULL);
  g_object_set (rtppayloader, "seqnum-offset", 1, NULL);
  g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL);
  g_object_set (rtprtxreceive, "payload-type-map", pt_map, NULL);
  gst_structure_free (pt_map);

  send_rtxdata->count = 1;
  send_rtxdata->nb_packets = 0;
  send_rtxdata->drop_every_n_packets = drop_every_n_packets;

  receive_rtxdata->nb_packets = 0;
  receive_rtxdata->seqnum_offset = 0;
  receive_rtxdata->seqnum_prev = 0;

  *eos = FALSE;

  /* retrieve offset before going to paused */
  g_object_get (G_OBJECT (rtppayloader), "seqnum-offset",
      &receive_rtxdata->seqnum_offset, NULL);

  /* prepare playing */
  state_res = gst_element_set_state (bin, GST_STATE_PAUSED);
  ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);

  /* wait for completion */
  state_res = gst_element_get_state (bin, NULL, NULL, GST_CLOCK_TIME_NONE);
  ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);

  /* retrieve seqnum_prev here to make sure it has been reseted */
  g_object_get (G_OBJECT (rtppayloader), "seqnum",
      &receive_rtxdata->seqnum_prev, NULL);

  /* run pipeline */
  state_res = gst_element_set_state (bin, GST_STATE_PLAYING);
  ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);

  GST_INFO ("running main loop");
  while (!*eos)
    g_main_context_iteration (NULL, TRUE);

  /* check results */

  if (send_rtxdata->nb_packets % drop_every_n_packets == 0) {
    /* special case because the last buffer will be dropped
     * so the receiver cannot know if it has been dropped (no next packet)
     */
    nb_expected_requests = send_rtxdata->nb_packets / drop_every_n_packets - 1;
    fail_unless_equals_int (send_rtxdata->nb_packets,
        receive_rtxdata->nb_packets + 1);
  } else {
    nb_expected_requests = send_rtxdata->nb_packets / drop_every_n_packets;
    fail_unless_equals_int (send_rtxdata->nb_packets,
        receive_rtxdata->nb_packets);
  }

  g_object_get (G_OBJECT (rtprtxsend), "num-rtx-requests", &nbrtxrequests,
      NULL);
  fail_unless_equals_int (nbrtxrequests, nb_expected_requests);

  g_object_get (G_OBJECT (rtprtxsend), "num-rtx-packets", &nbrtxpackets, NULL);
  fail_unless_equals_int (nbrtxpackets, nb_expected_requests);

  g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-requests", &nbrtxrequests,
      NULL);
  fail_unless_equals_int (nbrtxrequests, nb_expected_requests);

  g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-packets", &nbrtxpackets,
      NULL);
  fail_unless_equals_int (nbrtxpackets, nb_expected_requests);

  g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-assoc-packets",
      &nbrtxpackets, NULL);
  fail_unless_equals_int (nbrtxpackets, nb_expected_requests);

  state_res = gst_element_set_state (bin, GST_STATE_NULL);
  ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);
}

/* This test build the pipeline videotestsrc ! rtpvrawpay ! rtprtxsend ! rtprtxreceive ! fakesink
 * and drop some buffer between rtprtxsend and rtprtxreceive
 * Then it checks that every dropped packet has been re-sent and it checks that
 * not too much requests has been sent.
 */
GST_START_TEST (test_drop_one_sender)
{
  GstElement *bin, *src, *rtppayloader, *rtprtxsend, *rtprtxreceive, *sink;
  GstBus *bus;
  gboolean res;
  GstPad *srcpad, *sinkpad;
  GstStreamConsistency *chk_1, *chk_2, *chk_3;
  gint num_buffers = 20;
  guint drop_every_n_packets = 0;
  RTXSendData send_rtxdata;
  RTXReceiveData receive_rtxdata;
  gboolean eos = FALSE;

  GST_INFO ("preparing test");

  /* build pipeline */
  bin = gst_pipeline_new ("pipeline");
  bus = gst_element_get_bus (bin);
  gst_bus_add_signal_watch_full (bus, G_PRIORITY_HIGH);

  src = gst_element_factory_make ("videotestsrc", "src");
  g_object_set (src, "num-buffers", num_buffers, NULL);
  rtppayloader = gst_element_factory_make ("rtpvrawpay", "rtppayloader");
  rtprtxsend = gst_element_factory_make ("rtprtxsend", "rtprtxsend");
  rtprtxreceive = gst_element_factory_make ("rtprtxreceive", "rtprtxreceive");
  sink = gst_element_factory_make ("fakesink", "sink");
  gst_bin_add_many (GST_BIN (bin), src, rtppayloader, rtprtxsend, rtprtxreceive,
      sink, NULL);

  res = gst_element_link (src, rtppayloader);
  fail_unless (res == TRUE, NULL);
  res = gst_element_link (rtppayloader, rtprtxsend);
  fail_unless (res == TRUE, NULL);
  res = gst_element_link (rtprtxsend, rtprtxreceive);
  fail_unless (res == TRUE, NULL);
  res = gst_element_link (rtprtxreceive, sink);
  fail_unless (res == TRUE, NULL);

  /* create consistency checkers for the pads */

  srcpad = gst_element_get_static_pad (rtppayloader, "src");
  chk_1 = gst_consistency_checker_new (srcpad);
  gst_object_unref (srcpad);

  srcpad = gst_element_get_static_pad (rtprtxsend, "src");
  gst_pad_add_probe (srcpad,
      (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
          GST_PAD_PROBE_TYPE_PUSH),
      (GstPadProbeCallback) rtprtxsend_srcpad_probe, &send_rtxdata, NULL);
  sinkpad = gst_pad_get_peer (srcpad);
  fail_if (sinkpad == NULL);
  chk_2 = gst_consistency_checker_new (sinkpad);
  gst_object_unref (sinkpad);
  gst_object_unref (srcpad);

  srcpad = gst_element_get_static_pad (rtprtxreceive, "src");
  gst_pad_add_probe (srcpad,
      (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH),
      (GstPadProbeCallback) rtprtxreceive_srcpad_probe, &receive_rtxdata, NULL);
  sinkpad = gst_pad_get_peer (srcpad);
  fail_if (sinkpad == NULL);
  chk_3 = gst_consistency_checker_new (sinkpad);
  gst_object_unref (sinkpad);
  gst_object_unref (srcpad);

  g_signal_connect (bus, "message::error", (GCallback) message_received, NULL);
  g_signal_connect (bus, "message::warning", (GCallback) message_received,
      NULL);
  g_signal_connect (bus, "message::eos", (GCallback) message_received, &eos);

  for (drop_every_n_packets = 2; drop_every_n_packets < 10;
      drop_every_n_packets++) {
    start_test_drop_and_check_results (bin, rtppayloader, rtprtxsend,
        rtprtxreceive, &send_rtxdata, &receive_rtxdata, drop_every_n_packets,
        &eos);
  }

  /* cleanup */
  gst_consistency_checker_free (chk_1);
  gst_consistency_checker_free (chk_2);
  gst_consistency_checker_free (chk_3);
  gst_bus_remove_signal_watch (bus);
  gst_object_unref (bus);
  gst_object_unref (bin);
}

GST_END_TEST;

static void
message_received_multiple (GstBus * bus, GstMessage * message, gpointer data)
{
  GST_INFO ("bus message from \"%" GST_PTR_FORMAT "\": %" GST_PTR_FORMAT,
      GST_MESSAGE_SRC (message), message);

  switch (message->type) {
    case GST_MESSAGE_WARNING:{
      GError *gerror;
      gchar *debug;

      gst_message_parse_warning (message, &gerror, &debug);
      gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
      g_error_free (gerror);
      g_free (debug);
      break;
    }
    case GST_MESSAGE_ERROR:{
      GError *gerror;
      gchar *debug;

      gst_message_parse_error (message, &gerror, &debug);
      gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
      fail ("Error: %s / %s", gerror->message, debug);
      g_error_free (gerror);
      g_free (debug);
      break;
    }
    default:
      break;
  }
}

typedef struct
{
  guint count;
  guint nb_packets;
  guint drop_every_n_packets;
  guint payload_type_master;
  guint total_packets;
} RTXSendMultipleData;

/* drop some packets */
static GstPadProbeReturn
rtprtxsend_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info,
    gpointer user_data)
{
  GstPadProbeReturn ret = GST_PAD_PROBE_OK;

  if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) {
    GstBuffer *buffer = GST_BUFFER (info->data);
    RTXSendMultipleData *rtxdata = (RTXSendMultipleData *) user_data;
    GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
    guint payload_type = 0;

    gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
    payload_type = gst_rtp_buffer_get_payload_type (&rtp);

    /* main stream packets */
    if (payload_type == rtxdata->payload_type_master) {
      /* count packets of the main stream */
      ++rtxdata->nb_packets;
      /* drop some packets */
      /* but make sure we never drop the last one, otherwise there
       * will be nothing to trigger a retransmission.
       */
      if (rtxdata->count < rtxdata->drop_every_n_packets ||
          rtxdata->nb_packets == rtxdata->total_packets) {
        ++rtxdata->count;
      } else {
        /* drop a packet every 'rtxdata->count' packets */
        rtxdata->count = 1;
        ret = GST_PAD_PROBE_DROP;
      }
    } else {
      /* retransmission packets */
    }

    gst_rtp_buffer_unmap (&rtp);
  }

  return ret;
}

/* make sure every sources has sent all their buffers */
static GstPadProbeReturn
source_srcpad_probe_multiple_drop_eos (GstPad * pad, GstPadProbeInfo * info,
    gpointer user_data)
{
  GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);

  if (GST_EVENT_TYPE (event) == GST_EVENT_EOS)
    return GST_PAD_PROBE_DROP;
  else
    return GST_PAD_PROBE_OK;
}

typedef struct
{
  GHashTable *ssrc_to_nb_packets_map;
  GHashTable *ssrc_to_seqnum_offset_map;
  guint seqnum_offset;

  gint to_send;
  volatile gint dropped_requests;
  volatile gint received;
  gboolean request_passed;
} RTXReceiveMultipleData;

/* add one branch videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel. */
static RTXSendMultipleData *
add_sender (GstElement * bin, const gchar * src_name,
    const gchar * payloader_name, guint payload_type_master,
    guint payload_type_aux, RTXReceiveMultipleData * rtxdata)
{
  GstElement *src = NULL;
  GstCaps *caps;
  GstElement *rtppayloader = NULL;
  GstElement *rtprtxsend = NULL;
  GstElement *queue = NULL;
  GstElement *funnel = NULL;
  GstPad *srcpad = NULL;
  gboolean res = FALSE;
  RTXSendMultipleData *send_rtxdata = g_slice_new0 (RTXSendMultipleData);
  gchar *pt_master;
  GstStructure *pt_map;

  send_rtxdata->count = 1;
  send_rtxdata->nb_packets = 0;
  send_rtxdata->drop_every_n_packets = 0;
  send_rtxdata->payload_type_master = payload_type_master;
  send_rtxdata->total_packets = 25;
  rtxdata->to_send += send_rtxdata->total_packets;

  src = gst_element_factory_make (src_name, NULL);
  rtppayloader = gst_element_factory_make (payloader_name, NULL);
  rtprtxsend = gst_element_factory_make ("rtprtxsend", NULL);
  queue = gst_element_factory_make ("queue", NULL);
  funnel = gst_bin_get_by_name (GST_BIN (bin), "funnel");

  pt_master = g_strdup_printf ("%" G_GUINT32_FORMAT, payload_type_master);
  pt_map = gst_structure_new ("application/x-rtp-pt-map",
      pt_master, G_TYPE_UINT, payload_type_aux, NULL);
  g_free (pt_master);

  g_object_set (src, "num-buffers", send_rtxdata->total_packets, NULL);
  g_object_set (src, "is-live", TRUE, NULL);
  g_object_set (rtppayloader, "pt", payload_type_master, NULL);
  g_object_set (rtppayloader, "seqnum-offset", 1, NULL);
  g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL);
  /* we want that every drop packet be resent fast */
  g_object_set (queue, "max-size-buffers", 1, NULL);
  g_object_set (queue, "flush-on-eos", FALSE, NULL);

  gst_structure_free (pt_map);

  gst_bin_add_many (GST_BIN (bin), src, rtppayloader, rtprtxsend, queue, NULL);

  /* Make sure we have one buffer per frame, makes it easier to count! */
  caps =
      gst_caps_from_string ("video/x-raw, width=20, height=10, framerate=30/1");
  res = gst_element_link_filtered (src, rtppayloader, caps);
  gst_caps_unref (caps);
  fail_unless (res == TRUE, NULL);
  res = gst_element_link (rtppayloader, rtprtxsend);
  fail_unless (res == TRUE, NULL);
  res = gst_element_link (rtprtxsend, queue);
  fail_unless (res == TRUE, NULL);
  res = gst_element_link (queue, funnel);
  fail_unless (res == TRUE, NULL);
  gst_object_unref (funnel);

  /* to drop some packets */
  srcpad = gst_element_get_static_pad (rtprtxsend, "src");
  gst_pad_add_probe (srcpad,
      (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH),
      (GstPadProbeCallback) rtprtxsend_srcpad_probe_multiple, send_rtxdata,
      NULL);
  gst_object_unref (srcpad);

  /* to make sure every sources has sent all their buffers */
  srcpad = gst_element_get_static_pad (src, "src");
  gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
      (GstPadProbeCallback) source_srcpad_probe_multiple_drop_eos, NULL, NULL);
  gst_object_unref (srcpad);

  return send_rtxdata;
}

static GstPadProbeReturn
rtprtxreceive_sinkpad_probe_check_drop (GstPad * pad, GstPadProbeInfo * info,
    gpointer user_data)
{
  GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
  RTXReceiveMultipleData *rtxdata = (RTXReceiveMultipleData *) user_data;

  if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_UPSTREAM &&
      gst_event_get_structure (event) != NULL &&
      gst_structure_has_name (gst_event_get_structure (event),
          "GstRTPRetransmissionRequest"))
    rtxdata->request_passed = TRUE;

  return GST_PAD_PROBE_OK;
}

static gboolean
check_finished (RTXReceiveMultipleData * rtxdata)
{
  return (g_atomic_int_get (&rtxdata->received) >= (rtxdata->to_send -
          g_atomic_int_get (&rtxdata->dropped_requests)));
}

static GstPadProbeReturn
rtprtxreceive_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info,
    gpointer user_data)
{
  if (info->type == (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH)) {
    GstBuffer *buffer = GST_BUFFER (info->data);
    RTXReceiveMultipleData *rtxdata = (RTXReceiveMultipleData *) user_data;
    GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
    guint ssrc = 0;
    guint seqnum = 0;
    gpointer seqnum_prev = 0;
    guint nb_packets = 0;

    gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
    ssrc = gst_rtp_buffer_get_ssrc (&rtp);
    seqnum = gst_rtp_buffer_get_seq (&rtp);

    g_atomic_int_inc (&rtxdata->received);
    if (check_finished (rtxdata))
      g_main_context_wakeup (NULL);

    if (!g_hash_table_lookup_extended (rtxdata->ssrc_to_seqnum_offset_map,
            GUINT_TO_POINTER (ssrc), NULL, &seqnum_prev)) {
      /*In our test we take care to never drop the first buffer */
      g_hash_table_insert (rtxdata->ssrc_to_seqnum_offset_map,
          GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (seqnum));
      g_hash_table_insert (rtxdata->ssrc_to_nb_packets_map,
          GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (1));
      gst_rtp_buffer_unmap (&rtp);
      return GST_PAD_PROBE_OK;
    }


    /* check if there is a dropped packet
     * (in our test every packet arrived in increasing order) */
    if (seqnum > GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset) {
      GstPad *peerpad = gst_pad_get_peer (pad);
      guint i = 0;

      /* ask retransmission of missing packets */
      for (i = GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset;
          i < seqnum; i += rtxdata->seqnum_offset) {
        GstEvent *event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
            gst_structure_new ("GstRTPRetransmissionRequest",
                "seqnum", G_TYPE_UINT, i,
                "ssrc", G_TYPE_UINT, gst_rtp_buffer_get_ssrc (&rtp),
                "payload-type", G_TYPE_UINT,
                gst_rtp_buffer_get_payload_type (&rtp),
                NULL));
        rtxdata->request_passed = FALSE;
        gst_pad_push_event (peerpad, event);
        if (!rtxdata->request_passed) {
          g_atomic_int_inc (&rtxdata->dropped_requests);
          if (check_finished (rtxdata))
            g_main_context_wakeup (NULL);
        }
      }
      gst_object_unref (peerpad);

      g_hash_table_insert (rtxdata->ssrc_to_seqnum_offset_map,
          GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (seqnum));
    } else if (seqnum ==
        GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset) {
      /* also update previous seqnum in this case */
      g_hash_table_insert (rtxdata->ssrc_to_seqnum_offset_map,
          GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (seqnum));
    } else {
      /* receive retransmited packet */
    }

    gst_rtp_buffer_unmap (&rtp);

    nb_packets =
        GPOINTER_TO_UINT (g_hash_table_lookup (rtxdata->ssrc_to_nb_packets_map,
            GUINT_TO_POINTER (ssrc)));
    g_hash_table_insert (rtxdata->ssrc_to_nb_packets_map,
        GUINT_TO_POINTER (ssrc), GUINT_TO_POINTER (++nb_packets));
  }

  return GST_PAD_PROBE_OK;
}

static void
reset_rtx_send_data (RTXSendMultipleData * send_rtxdata, gpointer data)
{
  send_rtxdata->count = 1;
  send_rtxdata->nb_packets = 0;
  send_rtxdata->drop_every_n_packets = *(guint *) data;
}

/* compute number of all packets sent by all sender */
static void
compute_total_packets_sent (RTXSendMultipleData * send_rtxdata, gpointer data)
{
  guint *sum = (guint *) data;
  *sum += send_rtxdata->nb_packets;
}

/* compute number of all packets received by rtprtxreceive::src pad */
static void
compute_total_packets_received (gpointer key, gpointer value, gpointer data)
{
  guint *sum = (guint *) data;
  *sum += GPOINTER_TO_UINT (value);
}

static void
start_test_drop_multiple_and_check_results (GstElement * bin,
    GList * send_rtxdata_list, RTXReceiveMultipleData * receive_rtxdata,
    guint drop_every_n_packets)
{
  GstStateChangeReturn state_res = GST_STATE_CHANGE_FAILURE;
  GstElement *rtprtxreceive =
      gst_bin_get_by_name (GST_BIN (bin), "rtprtxreceive");
  guint sum_all_packets_sent = 0;
  guint sum_rtx_packets_sent = 0;
  guint sum_all_packets_received = 0;
  guint sum_rtx_packets_received = 0;
  guint sum_rtx_assoc_packets_received = 0;
  guint sum_rtx_dropped_packets_received = 0;
  gdouble error_sent_recv = 0;
  GstIterator *itr_elements = NULL;
  gboolean done = FALSE;
  GValue item = { 0 };
  GstElement *element = NULL;
  gchar *name = NULL;

  GST_INFO ("starting test");

  g_atomic_int_set (&receive_rtxdata->received, 0);
  g_atomic_int_set (&receive_rtxdata->dropped_requests, 0);

  g_hash_table_remove_all (receive_rtxdata->ssrc_to_nb_packets_map);
  g_hash_table_remove_all (receive_rtxdata->ssrc_to_seqnum_offset_map);

  g_list_foreach (send_rtxdata_list, (GFunc) reset_rtx_send_data,
      &drop_every_n_packets);

  /* run pipeline */
  state_res = gst_element_set_state (bin, GST_STATE_PLAYING);
  ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);

  state_res = gst_element_get_state (bin, NULL, NULL, GST_CLOCK_TIME_NONE);
  ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);

  GST_INFO ("running main loop");
  while (!check_finished (receive_rtxdata))
    g_main_context_iteration (NULL, TRUE);

  /* check results */
  itr_elements = gst_bin_iterate_elements (GST_BIN (bin));
  done = FALSE;
  while (!done) {
    switch (gst_iterator_next (itr_elements, &item)) {
      case GST_ITERATOR_OK:
        element = GST_ELEMENT (g_value_get_object (&item));
        name = gst_element_get_name (element);
        if (g_str_has_prefix (name, "rtprtxsend") > 0) {
          guint nb_packets = 0;
          g_object_get (G_OBJECT (element), "num-rtx-packets", &nb_packets,
              NULL);
          sum_rtx_packets_sent += nb_packets;
        }
        g_free (name);
        g_value_reset (&item);
        break;
      case GST_ITERATOR_RESYNC:
        gst_iterator_resync (itr_elements);
        break;
      case GST_ITERATOR_ERROR:
        done = TRUE;
        break;
      case GST_ITERATOR_DONE:
        done = TRUE;
        break;
    }
  }
  g_value_unset (&item);
  gst_iterator_free (itr_elements);

  /* compute number of all packets sent by all sender */
  g_list_foreach (send_rtxdata_list, (GFunc) compute_total_packets_sent,
      &sum_all_packets_sent);

  /* compute number of all packets received by rtprtxreceive::src pad */
  g_hash_table_foreach (receive_rtxdata->ssrc_to_nb_packets_map,
      compute_total_packets_received, (gpointer) & sum_all_packets_received);

  sum_all_packets_received +=
      g_atomic_int_get (&receive_rtxdata->dropped_requests);
  fail_if (sum_all_packets_sent < sum_all_packets_received);

  /* some packet are not received, I still have to figure out why
   * but I suspect it comes from pipeline setup/shutdown
   */
  if (sum_all_packets_sent != sum_all_packets_received) {
    error_sent_recv =
        1 - sum_all_packets_received / (gdouble) sum_all_packets_sent;
    fail_if (error_sent_recv > 0.30);
    /* it should be 0% */
  }

  /* retrieve number of retransmit packets received by rtprtxreceive */
  g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-packets",
      &sum_rtx_packets_received, NULL);

  /* some of rtx packet are not received because the receiver avoids
   * collision (= requests that have the same seqnum)
   */
  fail_if (sum_rtx_packets_sent < sum_rtx_packets_received);
  g_object_get (G_OBJECT (rtprtxreceive), "num-rtx-assoc-packets",
      &sum_rtx_assoc_packets_received, NULL);
  sum_rtx_dropped_packets_received =
      sum_rtx_packets_received - sum_rtx_assoc_packets_received;
  fail_unless_equals_int (sum_rtx_packets_sent,
      sum_rtx_assoc_packets_received + sum_rtx_dropped_packets_received);

  gst_object_unref (rtprtxreceive);
  state_res = gst_element_set_state (bin, GST_STATE_NULL);
  ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE);
}

static void
free_rtx_send_data (gpointer data)
{
  g_slice_free (RTXSendMultipleData, data);
}

/* This test build the pipeline funnel name=funnel
 * videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel.
 * videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel.
 * N
 * funnel. ! rtprtxreceive ! fakesink
 * and drop some buffer just after each rtprtxsend
 * Then it checks that every dropped packet has been re-sent and it checks
 * that not too much requests has been sent.
 */
GST_START_TEST (test_drop_multiple_sender)
{
  GstElement *bin, *funnel, *rtprtxreceive, *sink;
  GstBus *bus;
  gboolean res;
  GstPad *srcpad, *sinkpad;
  guint drop_every_n_packets = 0;
  GList *send_rtxdata_list = NULL;
  RTXReceiveMultipleData receive_rtxdata = { NULL };
  GstStructure *pt_map;

  GST_INFO ("preparing test");

  receive_rtxdata.ssrc_to_nb_packets_map =
      g_hash_table_new (g_direct_hash, g_direct_equal);
  receive_rtxdata.ssrc_to_seqnum_offset_map =
      g_hash_table_new (g_direct_hash, g_direct_equal);
  receive_rtxdata.seqnum_offset = 1;

  /* build pipeline */
  bin = gst_pipeline_new ("pipeline");
  bus = gst_element_get_bus (bin);
  gst_bus_add_signal_watch_full (bus, G_PRIORITY_HIGH);

  funnel = gst_element_factory_make ("funnel", "funnel");
  rtprtxreceive = gst_element_factory_make ("rtprtxreceive", "rtprtxreceive");
  sink = gst_element_factory_make ("fakesink", "sink");
  g_object_set (sink, "sync", TRUE, NULL);
  g_object_set (sink, "qos", FALSE, NULL);
  gst_bin_add_many (GST_BIN (bin), funnel, rtprtxreceive, sink, NULL);

  send_rtxdata_list =
      g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc",
          "rtpvrawpay", 96, 121, &receive_rtxdata));
  send_rtxdata_list =
      g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc",
          "rtpvrawpay", 97, 122, &receive_rtxdata));
  send_rtxdata_list =
      g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc",
          "rtpvrawpay", 98, 123, &receive_rtxdata));
  send_rtxdata_list =
      g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc",
          "rtpvrawpay", 99, 124, &receive_rtxdata));

  pt_map = gst_structure_new ("application/x-rtp-pt-map",
      "96", G_TYPE_UINT, 121, "97", G_TYPE_UINT, 122,
      "98", G_TYPE_UINT, 123, "99", G_TYPE_UINT, 124, NULL);
  g_object_set (rtprtxreceive, "payload-type-map", pt_map, NULL);
  gst_structure_free (pt_map);

  res = gst_element_link (funnel, rtprtxreceive);
  fail_unless (res == TRUE, NULL);
  res = gst_element_link (rtprtxreceive, sink);
  fail_unless (res == TRUE, NULL);

  srcpad = gst_element_get_static_pad (rtprtxreceive, "src");
  gst_pad_add_probe (srcpad,
      (GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH),
      (GstPadProbeCallback) rtprtxreceive_srcpad_probe_multiple,
      &receive_rtxdata, NULL);
  gst_object_unref (srcpad);

  sinkpad = gst_element_get_static_pad (rtprtxreceive, "sink");
  gst_pad_add_probe (sinkpad,
      GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
      (GstPadProbeCallback) rtprtxreceive_sinkpad_probe_check_drop,
      &receive_rtxdata, NULL);
  gst_object_unref (sinkpad);

  g_signal_connect (bus, "message::error",
      (GCallback) message_received_multiple, NULL);
  g_signal_connect (bus, "message::warning",
      (GCallback) message_received_multiple, NULL);

  for (drop_every_n_packets = 2; drop_every_n_packets < 10;
      drop_every_n_packets++) {
    start_test_drop_multiple_and_check_results (bin, send_rtxdata_list,
        &receive_rtxdata, drop_every_n_packets);
  }

  /* cleanup */

  g_list_free_full (send_rtxdata_list, free_rtx_send_data);
  g_hash_table_destroy (receive_rtxdata.ssrc_to_nb_packets_map);
  g_hash_table_destroy (receive_rtxdata.ssrc_to_seqnum_offset_map);

  gst_bus_remove_signal_watch (bus);
  gst_object_unref (bus);
  gst_object_unref (bin);
}

GST_END_TEST;

struct GenerateTestBuffersData
{
  GstElement *src, *capsfilter, *payloader, *sink;
  GMutex mutex;
  GCond cond;
  GList *buffers;
  gint num_buffers;
  guint last_seqnum;
};

static void
fakesink_handoff (GstElement * sink, GstBuffer * buf, GstPad * pad,
    gpointer user_data)
{
  struct GenerateTestBuffersData *data = user_data;

  g_mutex_lock (&data->mutex);

  if (data->num_buffers > 0)
    data->buffers = g_list_append (data->buffers, gst_buffer_ref (buf));

  /* if we have collected enough buffers, unblock the main thread to stop */
  if (--data->num_buffers <= 0)
    g_cond_signal (&data->cond);

  if (data->num_buffers == 0)
    g_object_get (data->payloader, "seqnum", &data->last_seqnum, NULL);

  g_mutex_unlock (&data->mutex);
}

static GList *
generate_test_buffers (const gint num_buffers, guint ssrc, guint * payload_type)
{
  GstElement *bin;
  GstCaps *videotestsrc_caps;
  gboolean res;
  struct GenerateTestBuffersData data;

  fail_unless (num_buffers > 0);

  g_mutex_init (&data.mutex);
  g_cond_init (&data.cond);
  data.buffers = NULL;
  data.num_buffers = num_buffers;

  bin = gst_pipeline_new (NULL);
  data.src = gst_element_factory_make ("videotestsrc", NULL);
  data.capsfilter = gst_element_factory_make ("capsfilter", NULL);
  data.payloader = gst_element_factory_make ("rtpvrawpay", NULL);
  data.sink = gst_element_factory_make ("fakesink", NULL);

  /* small frame size will cause vrawpay to generate exactly one rtp packet
   * per video frame, which we need for the max-size-time test */
  videotestsrc_caps =
      gst_caps_from_string
      ("video/x-raw,format=I420,width=10,height=10,framerate=30/1");

  g_object_set (data.src, "do-timestamp", TRUE, NULL);
  g_object_set (data.capsfilter, "caps", videotestsrc_caps, NULL);
  g_object_set (data.payloader, "seqnum-offset", 1, "ssrc", ssrc, NULL);
  g_object_set (data.sink, "signal-handoffs", TRUE, NULL);
  g_signal_connect (data.sink, "handoff", (GCallback) fakesink_handoff, &data);

  gst_caps_unref (videotestsrc_caps);

  gst_bin_add_many (GST_BIN (bin), data.src, data.capsfilter, data.payloader,
      data.sink, NULL);
  res = gst_element_link_many (data.src, data.capsfilter, data.payloader,
      data.sink, NULL);
  fail_unless_equals_int (res, TRUE);

  g_mutex_lock (&data.mutex);
  ASSERT_SET_STATE (bin, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
  while (data.num_buffers > 0)
    g_cond_wait (&data.cond, &data.mutex);
  g_mutex_unlock (&data.mutex);

  g_object_get (data.payloader, "pt", payload_type, NULL);

  ASSERT_SET_STATE (bin, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);

  fail_unless_equals_int (g_list_length (data.buffers), num_buffers);
  fail_unless_equals_int (num_buffers, data.last_seqnum);

  g_mutex_clear (&data.mutex);
  g_cond_clear (&data.cond);
  gst_object_unref (bin);

  return data.buffers;
}

static GstEvent *
create_rtx_event (guint seqnum, guint ssrc, guint payload_type)
{
  return gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
      gst_structure_new ("GstRTPRetransmissionRequest",
          "seqnum", G_TYPE_UINT, seqnum,
          "ssrc", G_TYPE_UINT, ssrc,
          "payload-type", G_TYPE_UINT, payload_type, NULL));
}

static void
test_rtxsender_packet_retention (gboolean test_with_time)
{
  const gint num_buffers = test_with_time ? 30 : 10;
  const gint half_buffers = num_buffers / 2;
  const guint ssrc = 1234567;
  const guint rtx_ssrc = 7654321;
  const guint rtx_payload_type = 99;
  GstStructure *pt_map;
  GstStructure *ssrc_map;
  GList *in_buffers, *node;
  guint payload_type;
  GstElement *rtxsend;
  GstPad *srcpad, *sinkpad;
  GstCaps *caps;
  GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
  gint i, j;
  gboolean res;

  /* generate test data  */
  in_buffers = generate_test_buffers (num_buffers, ssrc, &payload_type);

  /* clear the global buffers list, which we are going to use later */
  gst_check_drop_buffers ();

  /* setup element & pads */
  rtxsend = gst_check_setup_element ("rtprtxsend");

  pt_map = gst_structure_new ("application/x-rtp-pt-map",
      "96", G_TYPE_UINT, rtx_payload_type, NULL);
  ssrc_map = gst_structure_new ("application/x-rtp-ssrc-map",
      "1234567", G_TYPE_UINT, rtx_ssrc, NULL);

  /* in both cases we want the rtxsend queue to store 'half_buffers'
   * amount of buffers at most. In max-size-packets mode, it's trivial.
   * In max-size-time mode, we specify almost half a second, which is
   * the equivalent of 15 frames in a 30fps video stream */
  g_object_set (rtxsend,
      "max-size-packets", test_with_time ? 0 : half_buffers,
      "max-size-time", test_with_time ? 499 : 0,
      "payload-type-map", pt_map, "ssrc-map", ssrc_map, NULL);
  gst_structure_free (pt_map);
  gst_structure_free (ssrc_map);

  srcpad = gst_check_setup_src_pad (rtxsend, &srctemplate);
  fail_unless_equals_int (gst_pad_set_active (srcpad, TRUE), TRUE);

  sinkpad = gst_check_setup_sink_pad (rtxsend, &sinktemplate);
  fail_unless_equals_int (gst_pad_set_active (sinkpad, TRUE), TRUE);

  ASSERT_SET_STATE (rtxsend, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);

  caps = gst_caps_from_string ("application/x-rtp, "
      "media = (string)video, payload = (int)96, "
      "ssrc = (uint)1234567, clock-rate = (int)90000, "
      "encoding-name = (string)RAW");
  gst_check_setup_events (srcpad, rtxsend, caps, GST_FORMAT_TIME);
  gst_caps_unref (caps);

  /* now push all buffers and request retransmission every time for all of them */
  node = in_buffers;
  for (i = 1; i <= num_buffers; i++) {
    GstBuffer *buffer = GST_BUFFER (node->data);

    /* verify that the original packets are correct */
    res = gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
    fail_unless_equals_int (res, TRUE);
    fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), ssrc);
    fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp),
        payload_type);
    fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp), i);
    gst_rtp_buffer_unmap (&rtp);

    /* retransmit all the previous ones */
    for (j = 1; j < i; j++) {
      /* synchronize with the chain() function of the "sinkpad"
       * to make sure that rtxsend has pushed the rtx buffer out
       * before continuing */
      GList *last_out_buffer = g_list_last (buffers);
      g_mutex_lock (&check_mutex);
      fail_unless_equals_int (gst_pad_push_event (sinkpad,
              create_rtx_event (j, ssrc, payload_type)), TRUE);
      /* wait for the rtx packet only if we expect the element
       * to actually retransmit something */
      if (j >= MAX (i - half_buffers, 1)) {
        guint64 end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;

        while (last_out_buffer == g_list_last (buffers))
          fail_unless (g_cond_wait_until (&check_cond, &check_mutex, end_time));
      }
      g_mutex_unlock (&check_mutex);
    }

    /* push this one */
    gst_pad_push (srcpad, gst_buffer_ref (buffer));
    node = g_list_next (node);
  }

  /* verify the result. buffers should be in this order (numbers are seqnums):
   * 1, 1rtx, 2, 1rtx, 2rtx, 3, ... , 9, 5rtx, 6rtx, 7rtx, 8rtx, 9rtx, 10 */
  {
    GstRTPBuffer orig_rtp = GST_RTP_BUFFER_INIT;
    gint expected_rtx_requests, expected_rtx_packets;
    gint real_rtx_requests, real_rtx_packets;

    /* verify statistics first */
    expected_rtx_packets = half_buffers * half_buffers +
        ((half_buffers - 1) / 2.0f) * half_buffers;
    for (i = 1, expected_rtx_requests = 0; i < num_buffers; i++)
      expected_rtx_requests += i;

    g_object_get (rtxsend, "num-rtx-requests", &real_rtx_requests,
        "num-rtx-packets", &real_rtx_packets, NULL);
    fail_unless_equals_int (expected_rtx_requests, real_rtx_requests);
    fail_unless_equals_int (expected_rtx_packets, real_rtx_packets);

    /* and the number of actual buffers that we were pushed out of rtxsend */
    fail_unless_equals_int (g_list_length (buffers),
        num_buffers + expected_rtx_packets);

    node = buffers;
    for (i = 1; i <= num_buffers; i++) {
      /* verify the retransmission packets */
      for (j = MAX (i - half_buffers, 1); j < i; j++) {
        GST_INFO ("checking %d, %d", i, j);

        res = gst_rtp_buffer_map (GST_BUFFER (node->data), GST_MAP_READ, &rtp);
        fail_unless_equals_int (res, TRUE);

        fail_if (gst_rtp_buffer_get_ssrc (&rtp) == ssrc);
        fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), rtx_ssrc);
        fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp),
            rtx_payload_type);
        fail_unless_equals_int (GST_READ_UINT16_BE (gst_rtp_buffer_get_payload (&rtp)), j);     /* j == rtx seqnum */

        /* open the original packet for this rtx packet and verify timestamps */
        res = gst_rtp_buffer_map (GST_BUFFER (g_list_nth_data (in_buffers,
                    j - 1)), GST_MAP_READ, &orig_rtp);
        fail_unless_equals_int (res, TRUE);
        fail_unless_equals_int (gst_rtp_buffer_get_timestamp (&orig_rtp),
            gst_rtp_buffer_get_timestamp (&rtp));
        gst_rtp_buffer_unmap (&orig_rtp);

        gst_rtp_buffer_unmap (&rtp);
        node = g_list_next (node);
      }

      /* verify the normal rtp flow packet */
      res = gst_rtp_buffer_map (GST_BUFFER (node->data), GST_MAP_READ, &rtp);
      fail_unless_equals_int (res, TRUE);
      fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), ssrc);
      fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp),
          payload_type);
      fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp), i);
      gst_rtp_buffer_unmap (&rtp);
      node = g_list_next (node);
    }
  }

  g_list_free_full (in_buffers, (GDestroyNotify) gst_buffer_unref);
  gst_check_drop_buffers ();

  gst_check_teardown_src_pad (rtxsend);
  gst_check_teardown_sink_pad (rtxsend);
  gst_check_teardown_element (rtxsend);
}

GST_START_TEST (test_rtxsender_max_size_packets)
{
  test_rtxsender_packet_retention (FALSE);
}

GST_END_TEST;

GST_START_TEST (test_rtxsender_max_size_time)
{
  test_rtxsender_packet_retention (TRUE);
}

GST_END_TEST;

static void
compare_rtp_packets (GstBuffer * a, GstBuffer * b)
{
  GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT;
  GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT;

  gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a);
  gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b);

  fail_unless_equals_int (gst_rtp_buffer_get_header_len (&rtp_a),
      gst_rtp_buffer_get_header_len (&rtp_b));
  fail_unless_equals_int (gst_rtp_buffer_get_version (&rtp_a),
      gst_rtp_buffer_get_version (&rtp_b));
  fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp_a),
      gst_rtp_buffer_get_ssrc (&rtp_b));
  fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp_a),
      gst_rtp_buffer_get_seq (&rtp_b));
  fail_unless_equals_int (gst_rtp_buffer_get_csrc_count (&rtp_a),
      gst_rtp_buffer_get_csrc_count (&rtp_b));
  fail_unless_equals_int (gst_rtp_buffer_get_marker (&rtp_a),
      gst_rtp_buffer_get_marker (&rtp_b));
  fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp_a),
      gst_rtp_buffer_get_payload_type (&rtp_b));
  fail_unless_equals_int (gst_rtp_buffer_get_timestamp (&rtp_a),
      gst_rtp_buffer_get_timestamp (&rtp_b));
  fail_unless_equals_int (gst_rtp_buffer_get_extension (&rtp_a),
      gst_rtp_buffer_get_extension (&rtp_b));

  fail_unless_equals_int (gst_rtp_buffer_get_payload_len (&rtp_a),
      gst_rtp_buffer_get_payload_len (&rtp_b));
  fail_unless_equals_int (memcmp (gst_rtp_buffer_get_payload (&rtp_a),
          gst_rtp_buffer_get_payload (&rtp_b),
          gst_rtp_buffer_get_payload_len (&rtp_a)), 0);

  gst_rtp_buffer_unmap (&rtp_a);
  gst_rtp_buffer_unmap (&rtp_b);
}

GST_START_TEST (test_rtxreceive_data_reconstruction)
{
  const guint ssrc = 1234567;
  GList *in_buffers;
  guint payload_type;
  GstElement *rtxsend, *rtxrecv;
  GstPad *srcpad, *sinkpad;
  GstCaps *caps;
  GstBuffer *buffer;
  GstStructure *pt_map;

  /* generate test data  */
  in_buffers = generate_test_buffers (1, ssrc, &payload_type);

  /* clear the global buffers list, which we are going to use later */
  gst_check_drop_buffers ();

  /* setup element & pads */
  rtxsend = gst_check_setup_element ("rtprtxsend");
  rtxrecv = gst_check_setup_element ("rtprtxreceive");

  pt_map = gst_structure_new ("application/x-rtp-pt-map",
      "96", G_TYPE_UINT, 99, NULL);
  g_object_set (rtxsend, "payload-type-map", pt_map, NULL);
  g_object_set (rtxrecv, "payload-type-map", pt_map, NULL);
  gst_structure_free (pt_map);

  fail_unless_equals_int (gst_element_link (rtxsend, rtxrecv), TRUE);

  srcpad = gst_check_setup_src_pad (rtxsend, &srctemplate);
  fail_unless_equals_int (gst_pad_set_active (srcpad, TRUE), TRUE);

  sinkpad = gst_check_setup_sink_pad (rtxrecv, &sinktemplate);
  fail_unless_equals_int (gst_pad_set_active (sinkpad, TRUE), TRUE);

  ASSERT_SET_STATE (rtxsend, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
  ASSERT_SET_STATE (rtxrecv, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);

  caps = gst_caps_from_string ("application/x-rtp, "
      "media = (string)video, payload = (int)96, "
      "ssrc = (uint)1234567, clock-rate = (int)90000, "
      "encoding-name = (string)RAW");
  gst_check_setup_events (srcpad, rtxsend, caps, GST_FORMAT_TIME);
  gst_caps_unref (caps);

  /* push buffer */
  buffer = gst_buffer_ref (GST_BUFFER (in_buffers->data));
  fail_unless_equals_int (gst_pad_push (srcpad, buffer), GST_FLOW_OK);

  /* push retransmission request */
  {
    GList *last_out_buffer;
    guint64 end_time;
    gboolean res;

    /* synchronize with the chain() function of the "sinkpad"
     * to make sure that rtxsend has pushed the rtx buffer out
     * before continuing */
    last_out_buffer = g_list_last (buffers);
    g_mutex_lock (&check_mutex);
    fail_unless_equals_int (gst_pad_push_event (sinkpad,
            create_rtx_event (1, ssrc, payload_type)), TRUE);
    end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;
    do
      res = g_cond_wait_until (&check_cond, &check_mutex, end_time);
    while (res == TRUE && last_out_buffer == g_list_last (buffers));
    fail_unless_equals_int (res, TRUE);
    g_mutex_unlock (&check_mutex);
  }

  /* verify */
  fail_unless_equals_int (g_list_length (buffers), 2);
  compare_rtp_packets (GST_BUFFER (buffers->data),
      GST_BUFFER (buffers->next->data));

  /* cleanup */
  g_list_free_full (in_buffers, (GDestroyNotify) gst_buffer_unref);
  gst_check_drop_buffers ();

  gst_check_teardown_src_pad (rtxsend);
  gst_check_teardown_sink_pad (rtxrecv);
  gst_element_unlink (rtxsend, rtxrecv);
  gst_check_teardown_element (rtxsend);
  gst_check_teardown_element (rtxrecv);
}

GST_END_TEST;

static Suite *
rtprtx_suite (void)
{
  Suite *s = suite_create ("rtprtx");
  TCase *tc_chain = tcase_create ("general");

  tcase_set_timeout (tc_chain, 120);

  suite_add_tcase (s, tc_chain);

  tcase_add_test (tc_chain, test_push_forward_seq);
  tcase_add_test (tc_chain, test_drop_one_sender);
  tcase_add_test (tc_chain, test_drop_multiple_sender);
  tcase_add_test (tc_chain, test_rtxsender_max_size_packets);
  tcase_add_test (tc_chain, test_rtxsender_max_size_time);
  tcase_add_test (tc_chain, test_rtxreceive_data_reconstruction);

  return s;
}

GST_CHECK_MAIN (rtprtx);
