/////////////////////////////////////////////////////////////////////////////
// Name:        samples/sockbase/client.cpp
// Purpose:     Sockets sample for wxBase
// Author:      Lukasz Michalski
// Modified by:
// Created:     27.06.2005
// Copyright:   (c) 2005 Lukasz Michalski <lmichalski@sf.net>
// Licence:     wxWindows licence
/////////////////////////////////////////////////////////////////////////////

// ============================================================================
// declarations
// ============================================================================

// ----------------------------------------------------------------------------
// headers
// ----------------------------------------------------------------------------

#include "wx/wx.h"
#include "wx/socket.h"
#include "wx/event.h"
#include "wx/list.h"
#include "wx/cmdline.h"
#include "wx/ffile.h"
#include "wx/datetime.h"
#include "wx/timer.h"
#include "wx/thread.h"

const wxEventType wxEVT_WORKER = wxNewEventType();
#define EVT_WORKER(func) DECLARE_EVENT_TABLE_ENTRY( wxEVT_WORKER, -1, -1, (wxObjectEventFunction) (wxEventFunction) (WorkerEventFunction) & func, (wxObject *) NULL ),

const int timeout_val = 1000;

class WorkerEvent : public wxEvent {
public:
    typedef enum {
        CONNECTING,
        SENDING,
        RECEIVING,
        DISCONNECTING,
        DONE
    } evt_type;
    WorkerEvent(void* pSender, evt_type type)
    {
        SetId(-1);
        SetEventType(wxEVT_WORKER);
        m_sender = pSender;
        m_eventType = type;
        m_isFailed = false;
    }

    void setFailed() { m_isFailed = true; }
    bool isFailed() const { return m_isFailed; }

    virtual wxEvent* Clone() const
    {
        return new WorkerEvent(*this);
    }
    void* m_sender;
    bool m_isFailed;
    wxString m_workerIdent;
    evt_type m_eventType;
};

typedef void (wxEvtHandler::*WorkerEventFunction)(WorkerEvent&);

class ThreadWorker;
class EventWorker;

WX_DECLARE_LIST(ThreadWorker, TList);
WX_DECLARE_LIST(EventWorker, EList);

class Client : public wxApp {
    wxDECLARE_EVENT_TABLE();
public:
    void RemoveEventWorker(EventWorker* p_worker);
private:
    typedef enum
    {
      THREADS,
      EVENTS
    } workMode;

    typedef enum
    {
      SEND_RANDOM,
      SEND_MESSAGE,
      STRESS_TEST
    } sendType;

    workMode m_workMode;
    sendType m_sendType;
    wxString m_message;
    wxString m_host;
    long m_stressWorkers;

    virtual bool OnInit();
    virtual int OnRun();
    virtual int OnExit();
    void OnInitCmdLine(wxCmdLineParser& pParser);
    bool OnCmdLineParsed(wxCmdLineParser& pParser);
    void OnWorkerEvent(WorkerEvent& pEvent);
    void OnTimerEvent(wxTimerEvent& pEvent);

    void StartWorker(workMode pMode, const wxString& pMessage);
    void StartWorker(workMode pMode);
    char* CreateBuffer(int *msgsize);

    void dumpStatistics();

    TList m_threadWorkers;
    EList m_eventWorkers;

    unsigned m_statConnecting;
    unsigned m_statSending;
    unsigned m_statReceiving;
    unsigned m_statDisconnecting;
    unsigned m_statDone;
    unsigned m_statFailed;

    wxTimer mTimer;
};

DECLARE_APP(Client);

class ThreadWorker : public wxThread
{
public:
    ThreadWorker(const wxString& p_host, char* p_buf, int p_size);
    virtual ExitCode Entry();
private:
    wxString m_host;
    wxSocketClient* m_clientSocket;
    char* m_inbuf;
    char* m_outbuf;
    int m_outsize;
    int m_insize;
    wxString m_workerIdent;
};

class EventWorker : public wxEvtHandler
{
    wxDECLARE_EVENT_TABLE();
public:
    EventWorker(const wxString& p_host, char* p_buf, int p_size);
    void Run();
    virtual ~EventWorker();
private:
    wxString m_host;
    wxSocketClient* m_clientSocket;
    char* m_inbuf;
    char* m_outbuf;
    int m_outsize;
    int m_written;
    int m_insize;
    int m_readed;

    WorkerEvent::evt_type m_currentType;
    bool m_doneSent;
    wxIPV4address m_localaddr;

    void OnSocketEvent(wxSocketEvent& pEvent);
    void SendEvent(bool failed);
};

/******************* Implementation ******************/
IMPLEMENT_APP_CONSOLE(Client);

#include <wx/listimpl.cpp>
WX_DEFINE_LIST(TList);
WX_DEFINE_LIST(EList);

wxString
CreateIdent(const wxIPV4address& addr)
{
    return wxString::Format(wxT("%s:%d"),addr.IPAddress().c_str(),addr.Service());
}

void
Client::OnInitCmdLine(wxCmdLineParser& pParser)
{
    wxApp::OnInitCmdLine(pParser);
    pParser.AddSwitch(wxT("e"),wxT("event"),_("Use event based worker (default)"),wxCMD_LINE_PARAM_OPTIONAL);
    pParser.AddSwitch(wxT("t"),wxT("thread"),_("Use thread based worker"),wxCMD_LINE_PARAM_OPTIONAL);
    pParser.AddSwitch(wxT("r"),wxT("random"),_("Send radnom data (default)"),wxCMD_LINE_PARAM_OPTIONAL);
    pParser.AddOption(wxT("m"),wxT("message"),_("Send message from <str>"),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL);
    pParser.AddOption(wxT("f"),wxT("file"),_("Send contents of <file>"),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL);
    pParser.AddOption(wxT("H"),wxT("hostname"),_("IP or name of host to connect to"),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL);
    pParser.AddOption(wxT("s"),wxT("stress"),_("stress test with <num> concurrent connections"),wxCMD_LINE_VAL_NUMBER,wxCMD_LINE_PARAM_OPTIONAL);
}


bool
Client::OnCmdLineParsed(wxCmdLineParser& pParser)
{
    wxString fname;
    m_workMode = EVENTS;
    m_stressWorkers = 50;

    if (pParser.Found(_("verbose")))
    {
        wxLog::AddTraceMask(wxT("wxSocket"));
        wxLog::AddTraceMask(wxT("epolldispatcher"));
        wxLog::AddTraceMask(wxT("selectdispatcher"));
        wxLog::AddTraceMask(wxT("thread"));
        wxLog::AddTraceMask(wxT("events"));
    }

    if (pParser.Found(wxT("t")))
        m_workMode = THREADS;
    m_sendType = SEND_RANDOM;

    if (pParser.Found(wxT("m"),&m_message))
        m_sendType = SEND_MESSAGE;
    else if (pParser.Found(wxT("f"),&fname))
    {
        wxFFile file(fname);
        if (!file.IsOpened()) {
            wxLogError(wxT("Cannot open file %s"),fname.c_str());
            return false;
        };
        if (!file.ReadAll(&m_message)) {
            wxLogError(wxT("Cannot read conten of file %s"),fname.c_str());
            return false;
        };
        m_sendType = SEND_MESSAGE;
    };

    if (pParser.Found(wxT("s"),&m_stressWorkers))
        m_sendType = STRESS_TEST;

    m_host = wxT("127.0.0.1");
    pParser.Found(wxT("H"),&m_host);
    return wxApp::OnCmdLineParsed(pParser);
};

bool
Client::OnInit()
{
    if (!wxApp::OnInit())
        return false;
    srand(wxDateTime::Now().GetTicks());
    mTimer.SetOwner(this);
    m_statConnecting = 0;
    m_statSending = 0;
    m_statReceiving = 0;
    m_statDisconnecting = 0;
    m_statDone = 0;
    m_statFailed = 0;
    return true;
}

int
Client::OnRun()
{
    int i;
    switch(m_sendType)
    {
        case STRESS_TEST:
            switch(m_workMode)
            {
                case THREADS:
                    for (i = 0; i < m_stressWorkers; i++) {
                        if (m_message.empty())
                            StartWorker(THREADS);
                        else
                            StartWorker(THREADS, m_message);
                    }
                    break;
                case EVENTS:
                    for (i = 0; i < m_stressWorkers; i++) {
                        if (m_message.empty())
                            StartWorker(EVENTS);
                        else
                            StartWorker(EVENTS, m_message);
                    }
                    break;
                default:
                    for (i = 0; i < m_stressWorkers; i++) {
                        if (m_message.empty())
                            StartWorker(i % 5 == 0 ? THREADS : EVENTS);
                        else
                            StartWorker(i % 5 == 0 ? THREADS : EVENTS, m_message);
                    }
                break;
            }
        break;
        case SEND_MESSAGE:
            StartWorker(m_workMode,m_message);
        break;
        case SEND_RANDOM:
            StartWorker(m_workMode);
        break;
    }
    mTimer.Start(timeout_val,true);
    return wxApp::OnRun();
}

int
Client::OnExit()
{
    for(EList::compatibility_iterator it = m_eventWorkers.GetFirst(); it ; it->GetNext()) {
        delete it->GetData();
    }
    return 0;
}

// Create buffer to be sent by client. Buffer contains test indicator
// message size and place for data
// msgsize parameter contains size of data in bytes and
// if input value does not fit into 250 bytes then
// on exit is updated to new value that is multiply of 1024 bytes
char*
Client::CreateBuffer(int* msgsize)
{
    int bufsize = 0;
    char* buf;
    //if message should have more than 256 bytes then set it as
    //test3 for compatibility with GUI server sample
    if ((*msgsize) > 250)
    {
        //send at least one kb of data
        int size = (*msgsize)/1024 + 1;
        //returned buffer will contain test indicator, message size in kb and data
        bufsize = size*1024+2;
        buf = new char[bufsize];
        buf[0] = (unsigned char)0xDE; //second byte contains size in kilobytes
        buf[1] = (char)(size);
        *msgsize = size*1024;
    }
    else
    {
        //returned buffer will contain test indicator, message size in kb and data
        bufsize = (*msgsize)+2;
        buf = new char[bufsize];
        buf[0] = (unsigned char)0xBE; //second byte contains size in bytes
        buf[1] = (char)(*msgsize);
    }
    return buf;
}

void
Client::StartWorker(workMode pMode) {
    int msgsize = 1 + (int) (250000.0 * (rand() / (RAND_MAX + 1.0)));
    char* buf = CreateBuffer(&msgsize);

    //fill data part of buffer with random bytes
    for (int i = 2; i < (msgsize); i++) {
        buf[i] = i % 256;
    }

    if (pMode == THREADS) {
        ThreadWorker* c = new ThreadWorker(m_host,buf,msgsize+2);
        if (c->Create() != wxTHREAD_NO_ERROR) {
            wxLogError(wxT("Cannot create more threads"));
        } else {
            c->Run();
            m_threadWorkers.Append(c);
        }
    } else {
        EventWorker* e = new EventWorker(m_host,buf,msgsize+2);
        e->Run();
        m_eventWorkers.Append(e);
    }
    m_statConnecting++;
}

void
Client::StartWorker(workMode pMode, const wxString& pMessage) {
    char* tmpbuf = wxStrdup(pMessage.mb_str());
    int msgsize = strlen(tmpbuf);
    char* buf = CreateBuffer(&msgsize);
    memset(buf+2,0x0,msgsize);
    memcpy(buf+2,tmpbuf,msgsize);
    free(tmpbuf);

    if (pMode == THREADS) {
        ThreadWorker* c = new ThreadWorker(m_host,buf,msgsize+2);
        if (c->Create() != wxTHREAD_NO_ERROR) {
            wxLogError(wxT("Cannot create more threads"));
        } else {
            c->Run();
            m_threadWorkers.Append(c);
        }
    } else {
        EventWorker* e = new EventWorker(m_host,buf,msgsize+2);
        e->Run();
        m_eventWorkers.Append(e);
    }
    m_statConnecting++;
}

void
Client::OnWorkerEvent(WorkerEvent& pEvent) {
    switch (pEvent.m_eventType) {
        case WorkerEvent::CONNECTING:
            if (pEvent.isFailed())
            {
                m_statConnecting--;
                m_statFailed++;
            }
        break;
        case WorkerEvent::SENDING:
            if (pEvent.isFailed())
            {
                m_statFailed++;
                m_statSending--;
            }
            else
            {
                m_statConnecting--;
                m_statSending++;
            }
        break;
        case WorkerEvent::RECEIVING:
            if (pEvent.isFailed())
            {
                m_statReceiving--;
                m_statFailed++;
            }
            else
            {
                m_statSending--;
                m_statReceiving++;
            }
        break;
        case WorkerEvent::DISCONNECTING:
            if (pEvent.isFailed())
            {
                m_statDisconnecting--;
                m_statFailed++;
            }
            else
            {
                m_statReceiving--;
                m_statDisconnecting++;
            }
        break;
        case WorkerEvent::DONE:
            m_statDone++;
            m_statDisconnecting--;
        break;
    };

    if (pEvent.isFailed() || pEvent.m_eventType == WorkerEvent::DONE)
    {
        for(TList::compatibility_iterator it = m_threadWorkers.GetFirst(); it ; it = it->GetNext()) {
            if (it->GetData() == pEvent.m_sender) {
                m_threadWorkers.DeleteNode(it);
                break;
            }
        }
        for(EList::compatibility_iterator it2 = m_eventWorkers.GetFirst(); it2 ; it2 = it2->GetNext())
        {
            if (it2->GetData() == pEvent.m_sender) {
                delete it2->GetData();
                m_eventWorkers.DeleteNode(it2);
                break;
            }
        }
        if ((m_threadWorkers.GetCount() == 0) && (m_eventWorkers.GetCount() == 0))
        {
            mTimer.Stop();
            dumpStatistics();
            wxSleep(2);
            ExitMainLoop();
        }
        else
        {
            mTimer.Start(timeout_val,true);
        }
    }
}

void
Client::RemoveEventWorker(EventWorker* p_worker) {
    for(EList::compatibility_iterator it = m_eventWorkers.GetFirst(); it ; it = it->GetNext()) {
        if (it->GetData() == p_worker) {
            //wxLogDebug(wxT("Deleting event worker"));
            delete it->GetData();
            m_eventWorkers.DeleteNode(it);
            return;
        }
    }
}

void
Client::dumpStatistics() {
    wxString msg(
        wxString::Format(_("Connecting:\t%d\nSending\t\t%d\nReceiving\t%d\nDisconnecting:\t%d\nDone:\t\t%d\nFailed:\t\t%d\n"),
                m_statConnecting,
                m_statSending,
                m_statReceiving,
                m_statDisconnecting,
                m_statDone,
                m_statFailed
                ));

    wxLogMessage(wxT("Current status:\n%s\n"),msg.c_str());
}

void
Client::OnTimerEvent(wxTimerEvent&) {
    dumpStatistics();
}

wxBEGIN_EVENT_TABLE(Client,wxEvtHandler)
    EVT_WORKER(Client::OnWorkerEvent)
    EVT_TIMER(wxID_ANY,Client::OnTimerEvent)
wxEND_EVENT_TABLE()



EventWorker::EventWorker(const wxString& p_host, char* p_buf, int p_size)
  : m_host(p_host),
    m_outbuf(p_buf),
    m_outsize(p_size),
    m_written(0),
    m_readed(0)
{
    m_clientSocket = new wxSocketClient(wxSOCKET_NOWAIT);
    m_clientSocket->SetEventHandler(*this);
    m_insize = m_outsize - 2;
    m_inbuf = new char[m_insize];
}

void
EventWorker::Run() {
    wxIPV4address ca;
    ca.Hostname(m_host);
    ca.Service(3000);
    m_clientSocket->SetNotify(wxSOCKET_CONNECTION_FLAG|wxSOCKET_LOST_FLAG|wxSOCKET_OUTPUT_FLAG|wxSOCKET_INPUT_FLAG);
    m_clientSocket->Notify(true);
    m_currentType = WorkerEvent::CONNECTING;
    m_doneSent = false;
    //wxLogMessage(wxT("EventWorker: Connecting....."));
    m_clientSocket->Connect(ca,false);
}

void
EventWorker::OnSocketEvent(wxSocketEvent& pEvent) {
    switch(pEvent.GetSocketEvent()) {
        case wxSOCKET_INPUT:
            //wxLogDebug(wxT("EventWorker: INPUT"));
            do {
                if (m_readed == m_insize)
                    return; //event already posted
                m_clientSocket->Read(m_inbuf + m_readed, m_insize - m_readed);
                if (m_clientSocket->Error())
                {
                    if (m_clientSocket->LastError() != wxSOCKET_WOULDBLOCK)
                    {
                        wxLogError(wxT("%s: read error"),CreateIdent(m_localaddr).c_str());
                        SendEvent(true);
                    }
                }

                m_readed += m_clientSocket->LastCount();
                //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(), m_insize - m_readed);
                if (m_readed == m_insize)
                {
                    if (!memcmp(m_inbuf,m_outbuf,m_insize)) {
                        wxLogError(wxT("%s: data mismatch"),CreateIdent(m_localaddr).c_str());
                        SendEvent(true);
                    }
                    m_currentType = WorkerEvent::DISCONNECTING;
                    wxLogDebug(wxT("%s: DISCONNECTING"),CreateIdent(m_localaddr).c_str());
                    SendEvent(false);

                    //wxLogDebug(wxT("EventWorker %p closing"),this);
                    m_clientSocket->Close();

                    m_currentType = WorkerEvent::DONE;
                    wxLogDebug(wxT("%s: DONE"),CreateIdent(m_localaddr).c_str());
                    SendEvent(false);
                }
            } while (!m_clientSocket->Error());
        break;
        case wxSOCKET_OUTPUT:
            //wxLogDebug(wxT("EventWorker: OUTPUT"));
            do {
                if (m_written == m_outsize)
                    return;
                if (m_written == 0)
                {
                    m_currentType = WorkerEvent::SENDING;
                    wxLogDebug(wxT("%s: SENDING"),CreateIdent(m_localaddr).c_str());
                }
                m_clientSocket->Write(m_outbuf + m_written, m_outsize - m_written);
                if (m_clientSocket->Error())
                {
                    if (m_clientSocket->LastError() != wxSOCKET_WOULDBLOCK) {
                        wxLogError(wxT("%s: Write error"),CreateIdent(m_localaddr).c_str());
                        SendEvent(true);
                    }
                }
                m_written += m_clientSocket->LastCount();
                if (m_written != m_outsize)
                {
                    //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),m_outsize - m_written);
                }
                else
                {
                    //wxLogDebug(wxT("EventWorker %p SENDING->RECEIVING"),this);
                    m_currentType = WorkerEvent::RECEIVING;
                    wxLogDebug(wxT("%s: RECEIVING"),CreateIdent(m_localaddr).c_str());
                    SendEvent(false);
                }
            } while(!m_clientSocket->Error());
        break;
        case wxSOCKET_CONNECTION:
        {
            //wxLogMessage(wxT("EventWorker: got connection"));
            wxLogMessage(wxT("%s: starting writing message (2 bytes for signature and %d bytes of data to write)"),CreateIdent(m_localaddr).c_str(),m_outsize-2);
            if (!m_clientSocket->GetLocal(m_localaddr))
            {
                wxLogError(_("Cannot get peer data for socket %p"),m_clientSocket);
            }
            m_currentType = WorkerEvent::SENDING;
            wxLogDebug(wxT("%s: CONNECTING"),CreateIdent(m_localaddr).c_str());
            SendEvent(false);
        }
        break;
        case wxSOCKET_LOST:
        {
            wxLogError(_("%s: connection lost"),CreateIdent(m_localaddr).c_str());
            SendEvent(true);
        }
        break;
    }
}

void
EventWorker::SendEvent(bool failed) {
    if (m_doneSent)
        return;
    WorkerEvent e(this,m_currentType);
    if (failed) e.setFailed();
    wxGetApp().AddPendingEvent(e);
    m_doneSent = failed || m_currentType == WorkerEvent::DONE;
};

EventWorker::~EventWorker() {
    m_clientSocket->Destroy();
    delete [] m_outbuf;
    delete [] m_inbuf;
}

wxBEGIN_EVENT_TABLE(EventWorker,wxEvtHandler)
    EVT_SOCKET(wxID_ANY,EventWorker::OnSocketEvent)
wxEND_EVENT_TABLE()


ThreadWorker::ThreadWorker(const wxString& p_host, char* p_buf, int p_size)
  : wxThread(wxTHREAD_DETACHED),
    m_host(p_host),
    m_outbuf(p_buf),
    m_outsize(p_size)
{
    m_clientSocket = new wxSocketClient(wxSOCKET_BLOCK|wxSOCKET_WAITALL);
    m_insize = m_outsize - 2;
    m_inbuf = new char[m_insize];
}

wxThread::ExitCode ThreadWorker::Entry()
{
    wxIPV4address ca;
    ca.Hostname(m_host);
    ca.Service(5678);
    //wxLogDebug(wxT("ThreadWorker: Connecting....."));
    m_clientSocket->SetTimeout(60);
    bool failed = false;
    WorkerEvent::evt_type etype = WorkerEvent::CONNECTING;
    if (!m_clientSocket->Connect(ca)) {
        wxLogError(wxT("Cannot connect to %s:%d"),ca.IPAddress().c_str(), ca.Service());
        failed = true;
    } else {
        //wxLogMessage(wxT("ThreadWorker: Connected. Sending %d bytes of data"),m_outsize);
        etype = WorkerEvent::SENDING;
        WorkerEvent e(this,etype);
        wxGetApp().AddPendingEvent(e);
        int to_process = m_outsize;
        do {
            m_clientSocket->Write(m_outbuf,m_outsize);
            if (m_clientSocket->Error()) {
                wxLogError(wxT("ThreadWorker: Write error"));
                failed  = true;
            }
            to_process -= m_clientSocket->LastCount();
            //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
        } while(!m_clientSocket->Error() && to_process != 0);

        if (!failed) {
            etype = WorkerEvent::RECEIVING;
            WorkerEvent e(this,etype);
            wxGetApp().AddPendingEvent(e);
            to_process = m_insize;
            do {
                m_clientSocket->Read(m_inbuf,m_insize);
                if (m_clientSocket->Error()) {
                    wxLogError(wxT("ThreadWorker: Read error"));
                    failed = true;
                    break;
                }
                to_process -= m_clientSocket->LastCount();
                //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
            } while(!m_clientSocket->Error() && to_process != 0);
        }

        char* outdat = (char*)m_outbuf+2;
        if (!failed && (memcmp(m_inbuf,outdat,m_insize) != 0))
        {
            wxLogError(wxT("Data mismatch"));
            failed = true;
        }
    }
    //wxLogDebug(wxT("ThreadWorker: Finished"));
    if (!failed) {
        etype = WorkerEvent::DISCONNECTING;
        WorkerEvent e(this,etype);
        wxGetApp().AddPendingEvent(e);
    };
    m_clientSocket->Close();
    m_clientSocket->Destroy();
    m_clientSocket = NULL;
    delete [] m_outbuf;
    delete [] m_inbuf;
    if (!failed)
        etype = WorkerEvent::DONE;
    WorkerEvent e(this,etype);
    if (failed) e.setFailed();
    wxGetApp().AddPendingEvent(e);
    return 0;
}

