#if HAVE_CRT
#define _CRTDBG_MAP_ALLOC 
#include <stdlib.h> 
#include <crtdbg.h>
#endif //HAVE_CRT
/*
* Copyright (C) 2017, University of the Basque Country (UPV/EHU)
* Contact for licensing options: <licensing-mcpttclient(at)mcopenplatform(dot)com>
*
* The original file was part of Open Source Doubango Framework
* Copyright (C) 2010-2011 Mamadou Diop.
* Copyright (C) 2012 Doubango Telecom <http://doubango.org>
*
* This file is part of Open Source Doubango Framework.
*
* DOUBANGO is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* DOUBANGO is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with DOUBANGO.
*
*/

/**@file tnet_transport_poll.c
 * @brief Network transport layer using polling.
 *
 */
#include "tnet_transport.h"
#include "tsk_memory.h"
#include "tsk_string.h"
#include "tsk_debug.h"
#include "tsk_thread.h"
#include "tsk_buffer.h"
#include "tsk_safeobj.h"

#if USE_POLL && !(__IPHONE_OS_VERSION_MIN_REQUIRED >= 40000)

#include "tnet_poll.h"

#if HAVE_SYS_PARAM_H
#   include <sys/param.h> /* http://www.freebsd.org/doc/en/books/porters-handbook/porting-versions.html */
#endif

#if !defined(TNET_MAX_FDS)
#	define TNET_MAX_FDS		0xFFFF /* Default "FD_SIZE". WIll be updated using the OS limit when the transport starts. */
#endif

/*== Socket description ==*/
typedef struct transport_socket_xs
{
	tnet_fd_t fd;
	tsk_bool_t owner;
	tsk_bool_t connected;
	tsk_bool_t paused;

	tnet_socket_type_t type;
	tnet_tls_socket_handle_t* tlshandle;
}
transport_socket_xt;

/*== Transport context structure definition ==*/
typedef struct transport_context_s
{
	TSK_DECLARE_OBJECT;
	
	tsk_size_t count;
	tnet_fd_t pipeW;
	tnet_fd_t pipeR;
	tnet_pollfd_t ufds[TNET_MAX_FDS];
	transport_socket_xt* sockets[TNET_MAX_FDS];
	tsk_bool_t polling; // whether we are poll()ing

	TSK_DECLARE_SAFEOBJ;
}
transport_context_t;

static transport_socket_xt* getSocket(transport_context_t *context, tnet_fd_t fd);
static int addSocket(tnet_fd_t fd, tnet_socket_type_t type, tnet_transport_t *transport, tsk_bool_t take_ownership, tsk_bool_t is_client, tnet_tls_socket_handle_t* tlsHandle);
static int removeSocket(int index, transport_context_t *context);


int tnet_transport_add_socket(const tnet_transport_handle_t *handle, tnet_fd_t fd, tnet_socket_type_t type, tsk_bool_t take_ownership, tsk_bool_t isClient, tnet_tls_socket_handle_t* tlsHandle)
{
	tnet_transport_t *transport = (tnet_transport_t*)handle;
	transport_context_t* context;
	static char c = '\0';
	int ret = -1;

	if(!transport){
		TSK_DEBUG_ERROR("Invalid server handle.");
		return ret;
	}
	
	if(!(context = (transport_context_t*)transport->context)){
		TSK_DEBUG_ERROR("Invalid context.");
		return -2;
	}

	if(TNET_SOCKET_TYPE_IS_TLS(type) || TNET_SOCKET_TYPE_IS_WSS(type)){
		transport->tls.enabled = 1;
	}
	
	if((ret = addSocket(fd, type, transport, take_ownership, isClient, tlsHandle))){
		TSK_DEBUG_ERROR("Failed to add new Socket.");
		return ret;
	}

	// signal
	if(context->pipeW && (TSK_RUNNABLE(transport)->running || TSK_RUNNABLE(transport)->started)){
		if((ret = write(context->pipeW, &c, 1)) > 0){
			TSK_DEBUG_INFO("Socket added (external call) %d", fd);
			return 0;
		}
		else{
			TSK_DEBUG_ERROR("Failed to add new Socket.");
			return ret;
		}
	} else {
		TSK_DEBUG_INFO("pipeW (write site) not initialized yet.");
		return 0; //Will be taken when mainthead start
	}
}

int tnet_transport_pause_socket(const tnet_transport_handle_t *handle, tnet_fd_t fd, tsk_bool_t pause)
{
	tnet_transport_t *transport = (tnet_transport_t*)handle;
	transport_context_t *context;
	transport_socket_xt* socket;

	if(!transport || !(context = (transport_context_t*)transport->context)){
		TSK_DEBUG_ERROR("Invalid parameter");
		return -1;
	}

	if((socket = getSocket(context, fd))){
		socket->paused = pause;
	}
	else{
		TSK_DEBUG_WARN("Socket does not exist in this context");
	}
	return 0;
}

/* Remove socket */
int tnet_transport_remove_socket(const tnet_transport_handle_t *handle, tnet_fd_t *pfd)
{
	tnet_transport_t *transport = (tnet_transport_t*)handle;
	transport_context_t *context;
	int ret = -1;
	tsk_size_t i;
	tsk_bool_t found = tsk_false;
    tnet_fd_t fd = *pfd;
	
	TSK_DEBUG_INFO("Removing socket %d", fd);

	if(!transport){
		TSK_DEBUG_ERROR("Invalid server handle.");
		return ret;
	}
	
	if(!(context = (transport_context_t*)transport->context)){
		TSK_DEBUG_ERROR("Invalid context.");
		return -2;
	}
	
	tsk_safeobj_lock(context);

	for(i=0; i<context->count; i++){
		if(context->sockets[i]->fd == fd){
			tsk_bool_t self_ref = (&context->sockets[i]->fd == pfd);
			removeSocket(i, context); // sockets[i] will be destroyed
			found = tsk_true;
			TSK_RUNNABLE_ENQUEUE(transport, event_removed, transport->callback_data, fd);
			if(!self_ref){ // if self_ref then, pfd no longer valid after removeSocket()
				*pfd = TNET_INVALID_FD;				
			}
			break;
		}
	}

	tsk_safeobj_unlock(context);
	
	if(found){
		/* Signal */
		static char c = '\0';
		ret = write(context->pipeW, &c, 1);
		return (ret > 0 ? 0 : ret);
	}
	
	// ...
	
	return -1;
}


tsk_size_t tnet_transport_send(const tnet_transport_handle_t *handle, tnet_fd_t from, const void* buf, tsk_size_t size)
{
	tnet_transport_t *transport = (tnet_transport_t*)handle;
	int numberOfBytesSent = 0;

	if(!transport){
		TSK_DEBUG_ERROR("Invalid transport handle.");
		goto bail;
	}

	if(transport->tls.enabled){
		const transport_socket_xt* socket = getSocket(transport->context, from);
		if(socket && socket->tlshandle){
			if(!tnet_tls_socket_send(socket->tlshandle, buf, size)){
				numberOfBytesSent = size;
			}
			else{
				numberOfBytesSent = 0;
			}
			goto bail;
		}
	}
	else if((numberOfBytesSent = tnet_sockfd_send(from, buf, size, 0)) <= 0){
		TNET_PRINT_LAST_ERROR("send have failed.");

		//tnet_sockfd_close(&from);
		goto bail;
	}

bail:
	return numberOfBytesSent;
}

tsk_size_t tnet_transport_sendto(const tnet_transport_handle_t *handle, tnet_fd_t from, const struct sockaddr *to, const void* buf, tsk_size_t size)
{
	tnet_transport_t *transport = (tnet_transport_t*)handle;
	int numberOfBytesSent = 0;
	
	if(!transport){
		TSK_DEBUG_ERROR("Invalid server handle.");
		goto bail;
	}
	
	if(!TNET_SOCKET_TYPE_IS_DGRAM(transport->master->type)){
		TSK_DEBUG_ERROR("In order to use sendto() you must use an udp transport.");
		goto bail;
	}
	
    if((numberOfBytesSent = tnet_sockfd_sendto(from, to, buf, size)) <= 0){
		TNET_PRINT_LAST_ERROR("sendto have failed.");
		goto bail;
	}
		
bail:
	return numberOfBytesSent;
}

int tnet_transport_have_socket(const tnet_transport_handle_t *handle, tnet_fd_t fd)
{
	tnet_transport_t *transport = (tnet_transport_t*)handle;
	
	if(!transport){
		TSK_DEBUG_ERROR("Invalid server handle.");
		return 0;
	}
	
	return (getSocket((transport_context_t*)transport->context, fd) != 0);
}

const tnet_tls_socket_handle_t* tnet_transport_get_tlshandle(const tnet_transport_handle_t *handle, tnet_fd_t fd)
{
	tnet_transport_t *transport = (tnet_transport_t*)handle;
	const transport_socket_xt *socket;
	
	if(!transport){
		TSK_DEBUG_ERROR("Invalid parameter");
		return 0;
	}
	
	if((socket = getSocket((transport_context_t*)transport->context, fd))){
		return socket->tlshandle;
	}
	return 0;
}


/*== Get socket ==*/
static transport_socket_xt* getSocket(transport_context_t *context, tnet_fd_t fd)
{
	tsk_size_t i;
	transport_socket_xt* ret = 0;

	if(context){
		tsk_safeobj_lock(context);
		for(i=0; i<context->count; i++){
			if(context->sockets[i]->fd == fd){
				ret = context->sockets[i];
				break;
			}
		}
		tsk_safeobj_unlock(context);
	}
	
	return ret;
}

/*== Add new socket ==*/
int addSocket(tnet_fd_t fd, tnet_socket_type_t type, tnet_transport_t *transport, tsk_bool_t take_ownership, tsk_bool_t is_client, tnet_tls_socket_handle_t* tlsHandle)
{
	transport_context_t *context = transport?transport->context:0;
	if(context){
		#if HAVE_CRT //Debug memory
		transport_socket_xt *sock = calloc(1, sizeof(transport_socket_xt));
		
	#else
		transport_socket_xt *sock = tsk_calloc(1, sizeof(transport_socket_xt));
		
	#endif //HAVE_CRT
		sock->fd = fd;
		sock->type = type;
		sock->owner = take_ownership;

		if((TNET_SOCKET_TYPE_IS_TLS(sock->type) || TNET_SOCKET_TYPE_IS_WSS(sock->type)) && transport->tls.enabled){
			if(tlsHandle){
				sock->tlshandle = tsk_object_ref(tlsHandle);
			}
			else{
#if HAVE_OPENSSL
				sock->tlshandle = tnet_tls_socket_create(sock->fd, is_client ? transport->tls.ctx_client : transport->tls.ctx_server);       
#endif
			}
		}
		
		tsk_safeobj_lock(context);
		
		context->ufds[context->count].fd = fd;
		context->ufds[context->count].events = (fd == context->pipeR) ? TNET_POLLIN : (TNET_POLLIN | TNET_POLLNVAL | TNET_POLLERR);
		if(TNET_SOCKET_TYPE_IS_STREAM(sock->type)){
			context->ufds[context->count].events |= TNET_POLLOUT; // emulate WinSock2 FD_CONNECT event
		}
		context->ufds[context->count].revents = 0;
		context->sockets[context->count] = sock;
		
		context->count++;
		
		tsk_safeobj_unlock(context);
		
		TSK_DEBUG_INFO("Socket added[%s]: fd=%d, tail.count=%d", transport->description, fd, context->count);
		
		return 0;
	}
	else{
		TSK_DEBUG_ERROR("Context is Null.");
		return -1;
	}
}

/*== change connection state ==*/
/*
static void setConnected(tnet_fd_t fd, transport_context_t *context, int connected)
{
	tsk_size_t i;

	for(i=0; i<context->count; i++)
	{
		if(context->sockets[i]->fd == fd){
			context->sockets[i]->connected = connected;
		}
	}
}
*/

/*== Remove socket ==*/
int removeSocket(int index, transport_context_t *context)
{
	int i;
	
	tsk_safeobj_lock(context);

	if(index < (int)context->count){
		/* Close the socket if we are the owner. */
		TSK_DEBUG_INFO("Socket to remove: fd=%d, index=%d, tail.count=%d", context->sockets[index]->fd, index, context->count);
		if(context->sockets[index]->owner){
			// do not close the socket while it's being poll()ed
			// http://stackoverflow.com/questions/5039608/poll-cant-detect-event-when-socket-is-closed-locally
			if(context->polling){
				TSK_DEBUG_INFO("RemoveSocket(fd=%d) has been requested but we are poll()ing the socket. ShutdownSocket(fd) called on the socket and we deferred the request.", context->sockets[index]->fd);
				TSK_DEBUG_INFO("ShutdownSocket(fd=%d)", context->sockets[index]->fd);
				tnet_sockfd_shutdown(context->sockets[index]->fd);
				goto done;
			}
			tnet_sockfd_close(&(context->sockets[index]->fd));
		}
		
		/* Free tls context */
		TSK_OBJECT_SAFE_FREE(context->sockets[index]->tlshandle);
		
		// Free socket
		TSK_FREE(context->sockets[index]);
		
		for(i=index ; i<context->count-1; i++){			
			context->sockets[i] = context->sockets[i+1];
			context->ufds[i] = context->ufds[i+1];
		}
		
		context->sockets[context->count-1] = tsk_null;
		context->ufds[context->count-1].fd = TNET_INVALID_FD;
		context->ufds[context->count-1].events = 0;
		context->ufds[context->count-1].revents = 0;
		
		context->count--;
	}
done:
	tsk_safeobj_unlock(context);
	
	return 0;
}

int tnet_transport_stop(tnet_transport_t *transport)
{	
	int ret;
	transport_context_t *context;

	if(!transport){
		return -1;
	}
	
	context = transport->context;

	if((ret = tsk_runnable_stop(TSK_RUNNABLE(transport)))){
		return ret;
	}
	
	if(context){
		static char c = '\0';
		
		// signal
		tsk_safeobj_lock(context); // =>MUST
		if(tnet_transport_have_socket(transport, context->pipeR)){ // to avoid SIGPIPE=> check that there is at least one reader
			write(context->pipeW, &c, 1);
		}
		tsk_safeobj_unlock(context);
	}
	
	if(transport->mainThreadId[0]){
		return tsk_thread_join(transport->mainThreadId);
	}
	else{
		/* already soppped */
		return 0;
	}
}

int tnet_transport_prepare(tnet_transport_t *transport)
{
	int ret = -1;
	transport_context_t *context;
	tnet_fd_t pipes[2];

	TSK_DEBUG_INFO("tnet_transport_prepare()");
	
	if(!transport || !transport->context){
		TSK_DEBUG_ERROR("Invalid parameter.");
		return -1;
	}
	else{
		context = transport->context;
	}
	
	if(transport->prepared){
		TSK_DEBUG_ERROR("Transport already prepared.");
		return -2;
	}

	/* Prepare master */
	if(!transport->master){
		if((transport->master = tnet_socket_create(transport->local_host, transport->req_local_port, transport->type))){
			tsk_strupdate(&transport->local_ip, transport->master->ip);
			transport->bind_local_port = transport->master->port;
		}
		else{
			TSK_DEBUG_ERROR("Failed to create master socket");
			return -3;
		}
	}
	
	/* Start listening */
	if(TNET_SOCKET_TYPE_IS_STREAM(transport->master->type)){
		if((ret = tnet_sockfd_listen(transport->master->fd, TNET_MAX_FDS))){
			TNET_PRINT_LAST_ERROR("listen have failed.");
			goto bail;
		}
	}
	
	/* Create and add pipes to the fd_set */
	if((ret = pipe(pipes))){
		TNET_PRINT_LAST_ERROR("Failed to create new pipes.");
		goto bail;
	}
	
	/* set both R and W sides */
	context->pipeR = pipes[0];
	context->pipeW = pipes[1];
	
	/* add R side */
	TSK_DEBUG_INFO("pipeR fd=%d, pipeW=%d", context->pipeR, context->pipeW);
	if((ret = addSocket(context->pipeR, transport->master->type, transport, tsk_true, tsk_false, tsk_null))){
		goto bail;
	}
	
	/* Add the master socket to the context. */
	TSK_DEBUG_INFO("master fd=%d", transport->master->fd);
	// don't take ownership: will be closed by the dctor() when refCount==0
	// otherwise will be closed twice: dctor() and removeSocket()
	if((ret = addSocket(transport->master->fd, transport->master->type, transport, tsk_false, tsk_false, tsk_null))){
		TSK_DEBUG_ERROR("Failed to add master socket");
		goto bail;
	}
	
	transport->prepared = tsk_true;
	
bail:
	return ret;
}

int tnet_transport_unprepare(tnet_transport_t *transport)
{
	//int ret = -1;
	transport_context_t *context;
	
	if(!transport || !transport->context){
		TSK_DEBUG_ERROR("Invalid parameter.");
		return -1;
	}
	else{
		context = transport->context;
	}

	if(!transport->prepared){
		return 0;
	}

	transport->prepared = tsk_false;
	
	while(context->count){
		removeSocket(0, context); // safe
	}

	/* reset both R and W sides */
    if (context->pipeW != -1) {
        if (close(context->pipeW)) {
            TSK_DEBUG_ERROR("Failed to close pipeW:%d", context->pipeW);
        }
        context->pipeW = -1;
    }
    context->pipeR = -1;

	// destroy master as it has been closed by removeSocket()
	TSK_OBJECT_SAFE_FREE(transport->master);

	return 0;
}

/*=== Main thread */
void *tnet_transport_mainthread(void *param)
{
	tnet_transport_t *transport = param;
	transport_context_t *context = transport->context;
	int ret;
	tsk_size_t i;
	tsk_bool_t is_stream;
    tnet_fd_t fd;

	struct sockaddr_storage remote_addr = {0};
	transport_socket_xt* active_socket;

	/* check whether the transport is already prepared */
	if(!transport->prepared){
		TSK_DEBUG_ERROR("Transport must be prepared before strating.");
		goto bail;
	}
	
	is_stream = TNET_SOCKET_TYPE_IS_STREAM(transport->master->type);
	
	TSK_DEBUG_INFO("Starting [%s] server with IP {%s} on port {%d} using fd {%d} with type {%d}...", 
			transport->description, 
			transport->master->ip, 
			transport->master->port,
			transport->master->fd,
			transport->master->type);

	while(TSK_RUNNABLE(transport)->running || TSK_RUNNABLE(transport)->started){
		context->polling = tsk_true;
		ret = tnet_poll(context->ufds, context->count, -1);
		context->polling = tsk_false;
		if(ret < 0){
			TNET_PRINT_LAST_ERROR("poll() have failed.");
			goto bail;
		}

		if(!TSK_RUNNABLE(transport)->running && !TSK_RUNNABLE(transport)->started){
			TSK_DEBUG_INFO("Stopping [%s] server with IP {%s} on port {%d} with type {%d}...", transport->description, transport->master->ip, transport->master->port, transport->master->type);
			goto bail;
		}
		
		/* lock context */
		tsk_safeobj_lock(context);

		/* == == */
		for(i=0; i<context->count; i++)
		{
			if(!context->ufds[i].revents){
				continue;
			}
			
			// TSK_DEBUG_INFO("REVENTS(i=%d) = %d", i, context->ufds[i].revents);

			if(context->ufds[i].fd == context->pipeR){
				TSK_DEBUG_INFO("PipeR event = %d", context->ufds[i].revents);
				if(context->ufds[i].revents & TNET_POLLIN){
					static char __buffer[1024];
					if(read(context->pipeR, __buffer, sizeof(__buffer)) < 0){
						TNET_PRINT_LAST_ERROR("Failed to read from the Pipe");
					}
				}
				else if(context->ufds[i].revents & TNET_POLLHUP){
					TNET_PRINT_LAST_ERROR("Pipe Error");
					goto bail;
				}
				context->ufds[i].revents = 0;
				continue;
			}

			/* Get active event and socket */
			active_socket = context->sockets[i];
            
            /*================== TNET_POLLHUP ==================*/
			if(context->ufds[i].revents & (TNET_POLLHUP)){
				if(context->ufds[i].revents & TNET_POLLOUT){
					TSK_DEBUG_INFO("POLLOUT and POLLHUP are exclusive");
				}
				else{
					fd = active_socket->fd;
					TSK_DEBUG_INFO("NETWORK EVENT FOR SERVER [%s] -- TNET_POLLHUP(%d)", transport->description, fd);

					tnet_transport_remove_socket(transport, &active_socket->fd);
					TSK_RUNNABLE_ENQUEUE(transport, event_closed, transport->callback_data, fd);
					continue;
				}
			}
            
			/*================== TNET_POLLERR ==================*/
			if(context->ufds[i].revents & (TNET_POLLERR)){
                fd = active_socket->fd;
				TSK_DEBUG_INFO("NETWORK EVENT FOR SERVER [%s] -- TNET_POLLERR(%d)", transport->description, fd);
                
                tnet_transport_remove_socket(transport, &active_socket->fd);
				TSK_RUNNABLE_ENQUEUE(transport, event_error, transport->callback_data, fd);
				continue;
			}
			
			/*================== TNET_POLLNVAL ==================*/
			if(context->ufds[i].revents & (TNET_POLLNVAL)){
                fd = active_socket->fd;
				TSK_DEBUG_INFO("NETWORK EVENT FOR SERVER [%s] -- TNET_POLLNVAL(%d)", transport->description, fd);
				
                tnet_transport_remove_socket(transport, &active_socket->fd);
				TSK_RUNNABLE_ENQUEUE(transport, event_error, transport->callback_data, fd);
				continue;
			}
			
			/*================== POLLIN ==================*/
			if(context->ufds[i].revents & TNET_POLLIN)
			{
				tsk_size_t len = 0;
				void* buffer = tsk_null;
				tnet_transport_event_t* e;
				
				// TSK_DEBUG_INFO("NETWORK EVENT FOR SERVER [%s] -- TNET_POLLIN(%d)", transport->description, active_socket->fd);
				
				/* check whether the socket is paused or not */
				if(active_socket->paused){
					TSK_DEBUG_INFO("Socket is paused");
					goto TNET_POLLIN_DONE;
				}

				/* Retrieve the amount of pending data.
				 * IMPORTANT: If you are using Symbian please update your SDK to the latest build (August 2009) to have 'FIONREAD'.
				 * This apply whatever you are using the 3rd or 5th edition.
				 * Download link: http://wiki.forum.nokia.com/index.php/Open_C/C%2B%2B_Release_History
				 */
				ret = tnet_ioctlt(active_socket->fd, FIONREAD, &len);
				if((ret < 0 || !len) && is_stream){
					/* It's probably an incoming connection --> try to accept() it */
					int listening = 0, remove_socket = 0;
					socklen_t socklen = sizeof(listening);
					
					TSK_DEBUG_INFO("ioctlt(%d), len=%u returned zero or failed", active_socket->fd, len);
					
					// check if socket is listening
					if(getsockopt(active_socket->fd, SOL_SOCKET, SO_ACCEPTCONN, &listening, &socklen) != 0){
#if defined(BSD) /* old FreeBSD versions (and OSX up to Lion) do not support SO_ACCEPTCONN */
                        listening = 1;
#else
   						TNET_PRINT_LAST_ERROR("getsockopt(SO_ACCEPTCONN, %d) failed\n", active_socket->fd);
                        /* not socket accepted -> no socket to remove */
                        goto TNET_POLLIN_DONE;
#endif
					}
                    if (listening){
						if((fd = accept(active_socket->fd, tsk_null, tsk_null)) != TNET_INVALID_SOCKET){
							TSK_DEBUG_INFO("NETWORK EVENT FOR SERVER [%s] -- FD_ACCEPT(fd=%d)", transport->description, fd);
							addSocket(fd, transport->master->type, transport, tsk_true, tsk_false, tsk_null);
							TSK_RUNNABLE_ENQUEUE(transport, event_accepted, transport->callback_data, fd);
							if(active_socket->tlshandle){
								transport_socket_xt* tls_socket;
								if((tls_socket = getSocket(context, fd))){
									if(tnet_tls_socket_accept(tls_socket->tlshandle) != 0){
                                        					TSK_RUNNABLE_ENQUEUE(transport, event_closed, transport->callback_data, fd);
										tnet_transport_remove_socket(transport, &fd);
										TNET_PRINT_LAST_ERROR("SSL_accept() failed");
										continue;
									}
								}
							}
						}
						else{
							TNET_PRINT_LAST_ERROR("accept(%d) failed", active_socket->fd);
							remove_socket = 1;
						}
					}
					else{
						TSK_DEBUG_INFO("Closing socket with fd = %d because ioctlt() returned zero or failed", active_socket->fd);
						remove_socket = 1;
					}
					
					if(remove_socket){
						fd = active_socket->fd;
						tnet_transport_remove_socket(transport, &active_socket->fd);
						TSK_RUNNABLE_ENQUEUE(transport, event_closed, transport->callback_data, fd);
						continue;
					}
					goto TNET_POLLIN_DONE;
				}
				
				if(len <= 0){
#if ANDROID
					// workaround for indoona OSX which sends bodiless UDP packets
					// vand Android requires to call recv() even if len is equal to zero
					if(len == 0 && ret == 0){
						static char __fake_buff[1];
						ret = recv(active_socket->fd, __fake_buff, len, 0);
					}
#endif
					goto TNET_POLLIN_DONE;
				}
				#if HAVE_CRT //Debug memory
				if (!(buffer = tsk_calloc(len, sizeof(uint8_t)))) {
		
	#else
				if (!(buffer = tsk_calloc(len, sizeof(uint8_t)))) {
		
	#endif //HAVE_CRT
					TSK_DEBUG_ERROR("TSK_CALLOC FAILED");
					goto TNET_POLLIN_DONE;
				}
				
				// Retrieve the remote address
				if (TNET_SOCKET_TYPE_IS_STREAM(transport->master->type)) {
					ret = tnet_getpeername(active_socket->fd, &remote_addr);
				}
				
				// Receive the waiting data
				if (active_socket->tlshandle) {
					int isEncrypted;
					tsk_size_t tlslen = len;
					if ((ret = tnet_tls_socket_recv(active_socket->tlshandle, &buffer, &tlslen, &isEncrypted)) == 0) {
						if (isEncrypted) {
							TSK_FREE(buffer);
							goto TNET_POLLIN_DONE;
						}
						if (ret == 0) {
							len = ret = tlslen;
						}
					}
				}
				else {
					if (is_stream) {
						ret = tnet_sockfd_recv(active_socket->fd, buffer, len, 0);
					}
					else {
						ret = tnet_sockfd_recvfrom(active_socket->fd, buffer, len, 0, (struct sockaddr*)&remote_addr);
					}
				}
				
				if(ret < 0){
					TSK_FREE(buffer);
					
					removeSocket(i, context);
					TNET_PRINT_LAST_ERROR("recv/recvfrom have failed.");
					goto TNET_POLLIN_DONE;
				}
				
				if((len != (tsk_size_t)ret) && len){
					len = (tsk_size_t)ret;
					// buffer = tsk_realloc(buffer, len);
				}
					
				if(len > 0){
					e = tnet_transport_event_create(event_data, transport->callback_data, active_socket->fd);
					e->data = buffer, buffer = tsk_null;
					e->size = len;
					e->remote_addr = remote_addr;
				
					TSK_RUNNABLE_ENQUEUE_OBJECT_SAFE(TSK_RUNNABLE(transport), e);
				}
				TSK_FREE(buffer);

TNET_POLLIN_DONE:
                /*context->ufds[i].revents &= ~TNET_POLLIN*/;
			}


			/*================== TNET_POLLOUT ==================*/
			if(context->ufds[i].revents & TNET_POLLOUT){
				TSK_DEBUG_INFO("NETWORK EVENT FOR SERVER [%s] -- TNET_POLLOUT", transport->description);
				if(!active_socket->connected){
					active_socket->connected = tsk_true;
					TSK_RUNNABLE_ENQUEUE(transport, event_connected, transport->callback_data, active_socket->fd);
				}
				//else{
					context->ufds[i].events &= ~TNET_POLLOUT;
				//}
			}


			/*================== TNET_POLLPRI ==================*/
			if(context->ufds[i].revents & TNET_POLLPRI){
				TSK_DEBUG_INFO("NETWORK EVENT FOR SERVER [%s] -- TNET_POLLPRI", transport->description);
			}

            context->ufds[i].revents = 0;
		}/* for */

done:
		/* unlock context */
		tsk_safeobj_unlock(context);

	} /* while */

bail:
	
	TSK_DEBUG_INFO("Stopped [%s] server with IP {%s} on port {%d}", transport->description, transport->master->ip, transport->master->port);
	return 0;
}








void* tnet_transport_context_create()
{
	return tsk_object_new(tnet_transport_context_def_t);
}


//=================================================================================================
//	Transport context object definition
//
static tsk_object_t* transport_context_ctor(tsk_object_t * self, va_list * app)
{
	transport_context_t *context = self;
	if(context){
        context->pipeR = context->pipeW = -1;
		tsk_safeobj_init(context);
	}
	return self;
}

static tsk_object_t* transport_context_dtor(tsk_object_t * self)
{ 
	transport_context_t *context = self;
	if(context){
		while(context->count){
			removeSocket(0, context);
		}
		tsk_safeobj_deinit(context);
	}
	return self;
}

static const tsk_object_def_t tnet_transport_context_def_s = 
{
sizeof(transport_context_t),
transport_context_ctor, 
transport_context_dtor,
tsk_null, 
};
const tsk_object_def_t *tnet_transport_context_def_t = &tnet_transport_context_def_s;

#endif /* HAVE_POLL_H */