doubango/tinyNET/src/tnet_transport_cfsocket.c
c732d49e
 #if HAVE_CRT
 #define _CRTDBG_MAP_ALLOC 
 #include <stdlib.h> 
 #include <crtdbg.h>
 #endif //HAVE_CRT
 /*
 * Copyright (C) 2017, University of the Basque Country (UPV/EHU)
 * Contact for licensing options: <licensing-mcpttclient(at)mcopenplatform(dot)com>
 *
 * The original file was part of Open Source Doubango Framework
 * Copyright (C) 2010-2011 Mamadou Diop.
 * Copyright (C) 2012 Doubango Telecom <http://doubango.org>
 *
 * This file is part of Open Source Doubango Framework.
 *
 * DOUBANGO is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * DOUBANGO is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with DOUBANGO.
 *
 */
 
 /**@file tnet_transport_cfsocket.c
  * @brief Network transport layer using CFSocket. Used for iOS devices.
  *
  * @author Laurent Etiemble <laurent.etiemble(at)gmail.com>
  * @author Mamadou Diop <diopmamadou(at)doubango.org> 
  */
 
 #include "tnet_transport.h"
 #include "tsk_memory.h"
 #include "tsk_string.h"
 #include "tsk_debug.h"
 #include "tsk_thread.h"
 #include "tsk_buffer.h"
 #include "tsk_safeobj.h"
 
 #if (__IPHONE_OS_VERSION_MIN_REQUIRED >= 40000)
 
 
 #ifdef __OBJC__
 #import <Foundation/Foundation.h>
 #endif /* __OBJC__ */
 #import <Security/Security.h>
 #import <Security/SecureTransport.h>
 #import <CFNetwork/CFNetwork.h>
 
 #define TNET_MAX_FDS                    64
 #define TNET_BUFFER_STREAM_MIN_SIZE    1024
 
 /*== Socket description ==*/
 typedef struct transport_socket_xs
 {
 	tnet_fd_t fd;
 	tsk_bool_t owner;
 	tsk_bool_t readable;
     tsk_bool_t writable;
 	tsk_bool_t paused;
     tsk_bool_t is_client;
     
 	tnet_socket_type_t type;
     
     CFSocketRef cf_socket;
     CFReadStreamRef cf_read_stream;
     CFWriteStreamRef cf_write_stream;
     CFRunLoopSourceRef cf_run_loop_source;
 }
 transport_socket_xt;
 
 /*== Transport context structure definition ==*/
 typedef struct transport_context_s
 {
 	TSK_DECLARE_OBJECT;
 	
 	tsk_size_t count;
 	transport_socket_xt* sockets[TNET_MAX_FDS];
     
     CFRunLoopRef cf_run_loop;
     
 	TSK_DECLARE_SAFEOBJ;
 }
 transport_context_t;
 
 static int recvData(tnet_transport_t *transport, transport_socket_xt* active_socket);
 static const transport_socket_xt* getSocket(transport_context_t *context, tnet_fd_t fd);
 int removeSocket(transport_socket_xt *value, transport_context_t *context);
 static int addSocket(tnet_fd_t fd, tnet_socket_type_t type, tnet_transport_t *transport, tsk_bool_t take_ownership, tsk_bool_t is_client);
 static int removeSocketAtIndex(int index, transport_context_t *context);
 static int wrapSocket(tnet_transport_t *transport, transport_socket_xt *sock);
 
 static BOOL isTrusted(tnet_transport_t *transport, id cfStream, BOOL bReadStream)
 {
     BOOL bTrusted = NO;
     SecTrustRef trust = NULL;
     OSStatus status = 0;
     SecTrustResultType result;
     SecCertificateRef certArray[2] = { NULL, NULL };
     CFArrayRef refCertArray = NULL;
     CFIndex certArrayCount = 0;
     
     trust = bReadStream
         ? (SecTrustRef)CFReadStreamCopyProperty((CFReadStreamRef)cfStream, kCFStreamPropertySSLPeerTrust)
         : (SecTrustRef)CFWriteStreamCopyProperty((CFWriteStreamRef)cfStream, kCFStreamPropertySSLPeerTrust);
     if (!trust) {
         TSK_DEBUG_ERROR("Failed to get SecTrustRef object from '%s' stream", bReadStream ? "read" : "write");
         goto bail;
     }
     
     NSString *caName = NULL, *pbName = NULL;
     
     if (!tsk_strnullORempty(transport->tls.ca)) {
         caName = [[[NSString stringWithCString:transport->tls.ca encoding: NSUTF8StringEncoding] lastPathComponent] stringByDeletingPathExtension];
     }
     if (!tsk_strnullORempty(transport->tls.pbk)) {
         pbName = [[[NSString stringWithCString:transport->tls.pbk encoding: NSUTF8StringEncoding] lastPathComponent] stringByDeletingPathExtension];
     }
     TSK_DEBUG_INFO("SSL::isTrusted(ca=%s, pb=%s)", [caName UTF8String], [pbName UTF8String]);
     
     if (caName) {
         NSString *caPath = [[NSBundle mainBundle] pathForResource:caName ofType:@"der"];
         if (![[NSFileManager defaultManager] fileExistsAtPath:caPath]) {
             TSK_DEBUG_WARN("Cannot find SSL CA file '%s.der'", [caPath UTF8String]);
         }
         else {
             NSData *certData = [[NSData alloc] initWithContentsOfFile:caPath];
             CFDataRef certDataRef = (CFDataRef)certData;
             SecCertificateRef cert = certDataRef ? SecCertificateCreateWithData(NULL, certDataRef) : NULL;
             [certData release];
             if (!cert) {
                 TSK_DEBUG_WARN("Cannot create SecCertificateRef object from '%s' file", [caPath UTF8String]);
             }
             else {
                 TSK_DEBUG_INFO("Using SecCertificateRef object created from '%s' for SSL validation", [caPath UTF8String]);
                 certArray[certArrayCount++] = cert;
             }
         }
     }
     if (pbName) {
         NSString *pbPath = [[NSBundle mainBundle] pathForResource:pbName ofType:@"der"];
         if (![[NSFileManager defaultManager] fileExistsAtPath:pbPath]) {
             TSK_DEBUG_WARN("Cannot find SSL PUB file '%s.der'", [pbPath UTF8String]);
         }
         else {
             NSData *certData = [[NSData alloc] initWithContentsOfFile:pbPath];
             CFDataRef certDataRef = (CFDataRef)certData;
             SecCertificateRef cert = certDataRef ? SecCertificateCreateWithData(NULL, certDataRef) : NULL;
             [certData release];
             if (!cert) {
                 TSK_DEBUG_WARN("Cannot create SecCertificateRef object from '%s' file", [pbPath UTF8String]);
             }
             else {
                 TSK_DEBUG_INFO("Using SecCertificateRef object created from '%s' for SSL validation", [pbPath UTF8String]);
                 certArray[certArrayCount++] = cert;
             }
         }
     }
     if (certArrayCount > 0) {
         refCertArray = CFArrayCreate(NULL, (void *)certArray, certArrayCount, NULL);
     }
     status = SecTrustSetAnchorCertificates(trust, refCertArray);
     if (status != noErr) {
         TSK_DEBUG_ERROR("SecTrustSetAnchorCertificates failed with error code = %d", (int)status);
         goto bail;
     }
     status = SecTrustSetAnchorCertificatesOnly(trust, YES);
     if (status != noErr) {
         TSK_DEBUG_ERROR("SecTrustSetAnchorCertificatesOnly failed with error code = %d", (int)status);
         goto bail;
     }
     status = SecTrustEvaluate(trust, &result);
     if (status != noErr) {
         TSK_DEBUG_ERROR("SecTrustEvaluate failed with error code = %d", (int)status);
         goto bail;
     }
     bTrusted = (result == kSecTrustResultProceed || result == kSecTrustResultUnspecified);
     TSK_DEBUG_INFO("SecTrustEvaluate result = %d", result);
     
 bail:
     CFRelease(trust);
     CFRelease(refCertArray);
     return bTrusted;
 }
 
 
 static int recvData(tnet_transport_t *transport, transport_socket_xt* active_socket)
 {
 	transport_context_t *context;
 	int ret;
 	if(!transport || !transport->context || !active_socket){
 		TSK_DEBUG_ERROR("Invalid parameter");
 		return -1;
 	}
     
     void* buffer = tsk_null;
     tsk_size_t len = 0;
     int isEncrypted = 0;
     struct sockaddr_storage remote_addr = {0};
 	
 	/* check whether the socket is paused or not */
 	if(active_socket->paused){
 		TSK_DEBUG_INFO("Socket is paused");
 		goto bail;
 	}
 	
 	tsk_bool_t is_stream = TNET_SOCKET_TYPE_IS_STREAM(active_socket->type);
 	
 	if(tnet_ioctlt(active_socket->fd, FIONREAD, &len) < 0){
 		TNET_PRINT_LAST_ERROR("ioctl() failed");
 		goto bail;
 	}
 	
 	if(!len){
 		// probably incoming connection
 		if(is_stream && !active_socket->is_client){
 			tnet_fd_t fd;
             if((fd = accept(active_socket->fd, tsk_null, tsk_null)) != TNET_INVALID_SOCKET){
                 TSK_DEBUG_INFO("NETWORK EVENT FOR SERVER [%s] -- FD_ACCEPT(fd=%d)", transport->description, fd);
                 addSocket(fd, transport->master->type, transport, tsk_true, tsk_false);
                 TSK_RUNNABLE_ENQUEUE(transport, event_accepted, transport->callback_data, fd);
                 goto bail;
             }
 		}
         
         if(is_stream && CFReadStreamHasBytesAvailable(active_socket->cf_read_stream)){
             #if HAVE_CRT //Debug memory
 			if((buffer = calloc(TNET_BUFFER_STREAM_MIN_SIZE, sizeof(uint8_t)))){
 		
 	#else
 			if((buffer = tsk_calloc(TNET_BUFFER_STREAM_MIN_SIZE, sizeof(uint8_t)))){
 		
 	#endif //HAVE_CRT
                 ret = len = CFReadStreamRead(active_socket->cf_read_stream, buffer, (CFIndex)TNET_BUFFER_STREAM_MIN_SIZE);
             }
         }
         
         if(ret <= 0){
             TSK_DEBUG_WARN("ioctl() returned zero for fd=%d", active_socket->fd);
             goto bail;
         }
 	}
 	
     if(len && !buffer){
 		#if HAVE_CRT //Debug memory
         if(!(buffer =calloc(len, sizeof(uint8_t)))){
 		
 	#else
         if(!(buffer = tsk_calloc(len, sizeof(uint8_t)))){
 		
 	#endif //HAVE_CRT
             TSK_DEBUG_ERROR("calloc(%zu) failed", len);
             goto bail;
         }
         
         // Receive the waiting data
         if(is_stream){
 			ret = tnet_getpeername(active_socket->fd, &remote_addr);
             if(active_socket->cf_read_stream){
                 ret = CFReadStreamRead(active_socket->cf_read_stream, buffer, (CFIndex)len);
             }
             else {
                 ret = tnet_sockfd_recv(active_socket->fd, buffer, len, 0);
             }
         }
         else {
             ret = tnet_sockfd_recvfrom(active_socket->fd, buffer, len, 0, (struct sockaddr*)&remote_addr);
         }
     }
 	
 	
 	if(ret < 0){
 		removeSocket(active_socket, transport->context);
 		TNET_PRINT_LAST_ERROR("recv/recvfrom have failed.");
 		goto bail;
 	}
 	
 	if((len != (tsk_size_t)ret) && len){
 		len = (tsk_size_t)ret;
 	}
 	
 	tnet_transport_event_t* e = tnet_transport_event_create(event_data, transport->callback_data, active_socket->fd);
 	if(e){
         e->data = buffer; buffer = NULL;
         e->size = len;
         e->remote_addr = remote_addr;
 	
         TSK_RUNNABLE_ENQUEUE_OBJECT_SAFE(TSK_RUNNABLE(transport), e);
     }
 	
 bail:
     TSK_FREE(buffer);
 	return 0;
 }
 
 int tnet_transport_add_socket(const tnet_transport_handle_t *handle, tnet_fd_t fd, tnet_socket_type_t type, tsk_bool_t take_ownership, tsk_bool_t isClient, tnet_tls_socket_handle_t* tlsHandle)
 {
 	tnet_transport_t *transport = (tnet_transport_t*)handle;
 	transport_context_t* context;
 	int ret = -1;
     (void)(tlsHandle);
     
 	if (!transport) {
 		TSK_DEBUG_ERROR("Invalid server handle.");
 		return ret;
 	}
 	
 	if (!(context = (transport_context_t*)transport->context)) {
 		TSK_DEBUG_ERROR("Invalid context.");
 		return -2;
 	}
     
 	if(TNET_SOCKET_TYPE_IS_TLS(type) || TNET_SOCKET_TYPE_IS_WSS(type)){
 		transport->tls.enabled = 1;
 	}
 	
 	if ((ret = addSocket(fd, type, transport, take_ownership, isClient))) {
 		TSK_DEBUG_ERROR("Failed to add new Socket.");
 		return ret;
 	}
     
 	if (context->cf_run_loop) {
 		// Signal the run-loop
         CFRunLoopWakeUp(context->cf_run_loop);
     }
     
     return 0;
 }
 
 int tnet_transport_pause_socket(const tnet_transport_handle_t *handle, tnet_fd_t fd, tsk_bool_t pause){
 	tnet_transport_t *transport = (tnet_transport_t*)handle;
 	transport_context_t *context;
 	transport_socket_xt* socket;
 	
 	if(!transport || !(context = (transport_context_t *)transport->context)){
 		TSK_DEBUG_ERROR("Invalid parameter");
 		return -1;
 	}
 	
 	if((socket = (transport_socket_xt*)getSocket(context, fd))){
 		socket->paused = pause;
 	}
 	else {
 		TSK_DEBUG_WARN("Failed to find socket with fd=%d", (int)fd);
 	}
 	
 	return 0;
 }
 
 /* Remove socket */
 int tnet_transport_remove_socket(const tnet_transport_handle_t *handle, tnet_fd_t *fd)
 {
 	tnet_transport_t *transport = (tnet_transport_t*)handle;
 	transport_context_t *context;
 	int ret = -1;
 	tsk_size_t i;
 	tsk_bool_t found = tsk_false;
 	
 	if (!transport || !fd) {
 		TSK_DEBUG_ERROR("Invalid parameter");
 		return -1;
 	}
 	
 	TSK_DEBUG_INFO("Removing socket %d", *fd);
 	
 	if (!(context = (transport_context_t*)transport->context)) {
 		TSK_DEBUG_ERROR("Invalid context.");
 		return -2;
 	}
 	
 	for(i=0; i<context->count; ++i) {
 		if (context->sockets[i]->fd == *fd) {
 			removeSocketAtIndex(i, context);
 			found = tsk_true;
 			*fd = TNET_INVALID_FD;
 			break;
 		}
 	}
 	
 	if (found && context->cf_run_loop) {
 		// Signal the run-loop
         CFRunLoopWakeUp(context->cf_run_loop);
         return 0;
 	}
 	
 	// ...
 	
 	return -1;
 }
 
 tsk_size_t tnet_transport_send(const tnet_transport_handle_t *handle, tnet_fd_t from, const void* buf, tsk_size_t size)
 {
 	tnet_transport_t *transport = (tnet_transport_t*)handle;
 	int numberOfBytesSent = 0;
     
 	if (!transport) {
 		TSK_DEBUG_ERROR("Invalid transport handle.");
 		goto bail;
 	}
 
     const transport_socket_xt* sock = getSocket(transport->context, from);
     if (sock && TNET_SOCKET_TYPE_IS_STREAM(sock->type) && sock->cf_write_stream) {
         int sent = 0, to_send;
         const uint8_t* buff_ptr = (const uint8_t*)buf;
         // on iOS when TLS is enabled sending more than 1024 bytes could fails
         static const int max_size_to_send = 1024;
         
         to_send = TSK_MIN(max_size_to_send, size);
         
         if (CFWriteStreamGetStatus(sock->cf_write_stream) == kCFStreamStatusNotOpen) {
             if(!CFWriteStreamOpen(sock->cf_write_stream)){
                 TSK_DEBUG_ERROR("CFWriteStreamOpen() failed");
                 return numberOfBytesSent;
             }
         }
         if (CFReadStreamGetStatus(sock->cf_read_stream) == kCFStreamStatusNotOpen) {
             if(!CFReadStreamOpen(sock->cf_read_stream)){
                 TSK_DEBUG_ERROR("CFReadStreamOpen() failed");
                 return numberOfBytesSent;
             }
         }
         while (to_send > 0 && (sent = (int)CFWriteStreamWrite(sock->cf_write_stream, &buff_ptr[numberOfBytesSent], (CFIndex) to_send)) > 0) {
             numberOfBytesSent += sent;
             to_send = TSK_MIN(max_size_to_send, (size - numberOfBytesSent));
         }
         if(sent < 0){
             TNET_PRINT_LAST_ERROR("Send have failed");
             goto bail;
         }
     } else {
         if ((numberOfBytesSent = (int)send(from, buf, size, 0)) < size) {
             TNET_PRINT_LAST_ERROR("Send have failed");
             goto bail;
         }
     }
     
 bail:
 	return numberOfBytesSent;
 }
 
 tsk_size_t tnet_transport_sendto(const tnet_transport_handle_t *handle, tnet_fd_t from, const struct sockaddr *to, const void* buf, tsk_size_t size)
 {
 	tnet_transport_t *transport = (tnet_transport_t*)handle;
 	int numberOfBytesSent = 0;
 	
 	if (!transport) {
 		TSK_DEBUG_ERROR("Invalid server handle");
 		goto bail;
 	}
 	
 	if (!TNET_SOCKET_TYPE_IS_DGRAM(transport->master->type)) {
 		TSK_DEBUG_ERROR("In order to use sendto you must use an udp transport");
 		goto bail;
 	}
 	
     if ((numberOfBytesSent = sendto(from, buf, size, 0, to, tnet_get_sockaddr_size(to))) < size) {
 		TNET_PRINT_LAST_ERROR("sendto have failed");
 		goto bail;
 	}
     
 bail:
 	return numberOfBytesSent;
 }
 
 int tnet_transport_have_socket(const tnet_transport_handle_t *handle, tnet_fd_t fd)
 {
 	tnet_transport_t *transport = (tnet_transport_t*)handle;
 	
 	if (!transport) {
 		TSK_DEBUG_ERROR("Invalid server handle.");
 		return 0;
 	}
 	
 	return (getSocket((transport_context_t*)transport->context, fd) != 0);
 }
 
 const tnet_tls_socket_handle_t* tnet_transport_get_tlshandle(const tnet_transport_handle_t *handle, tnet_fd_t fd)
 {
 	tnet_transport_t *transport = (tnet_transport_t*)handle;
 	const transport_socket_xt *socket;
 	
 	if(!transport){
 		TSK_DEBUG_ERROR("Invalid parameter");
 		return 0;
 	}
     // not using openssl
 	return tsk_null;
 }
 
 /*== Get socket ==*/
 static const transport_socket_xt* getSocket(transport_context_t *context, tnet_fd_t fd)
 {
 	tsk_size_t i;
 	transport_socket_xt* ret = tsk_null;
     
 	if (context) {
 		tsk_safeobj_lock(context);
 		for(i=0; i<context->count; i++) {
 			if (context->sockets[i]->fd == fd) {
 				ret = context->sockets[i];
 				break;
 			}
 		}
 		tsk_safeobj_unlock(context);
 	}
 	
 	return ret;
 }
 static const transport_socket_xt* getSocketByStream(transport_context_t *context, void* cf_stream)
 {
 	tsk_size_t i;
 	transport_socket_xt* ret = tsk_null;
     
 	if (context) {
 		tsk_safeobj_lock(context);
 		for(i=0; i<context->count; i++) {
 			if (context->sockets[i]->cf_read_stream == cf_stream || context->sockets[i]->cf_write_stream == cf_stream) {
 				ret = context->sockets[i];
 				break;
 			}
 		}
 		tsk_safeobj_unlock(context);
 	}
 	
 	return ret;
 }
 
 /*== Add new socket ==*/
 int addSocket(tnet_fd_t fd, tnet_socket_type_t type, tnet_transport_t *transport, tsk_bool_t take_ownership, tsk_bool_t is_client)
 {
 	transport_context_t *context = transport?transport->context:0;
 	if (context) {
 		#if HAVE_CRT //Debug memory
 		transport_socket_xt *sock = calloc(1, sizeof(transport_socket_xt));
 		
 	#else
 		transport_socket_xt *sock = tsk_calloc(1, sizeof(transport_socket_xt));
 		
 	#endif //HAVE_CRT
 		sock->fd = fd;
 		sock->type = type;
 		sock->owner = take_ownership;
         sock->is_client = is_client;
 		
 		if(!sock){
 			TSK_DEBUG_ERROR("Failed to allocate socket");
 			return -1;
 		}
 		
 		tsk_safeobj_lock(context);
         wrapSocket(transport, sock);
 		context->sockets[context->count] = sock;
 		context->count++;
 		
 		tsk_safeobj_unlock(context);
 		
 		TSK_DEBUG_INFO("Socket added");
 		
 		return 0;
 	}
 	else{
 		TSK_DEBUG_ERROR("Context is Null.");
 		return -1;
 	}
 }
 
 /*== Remove socket ==*/
 int removeSocketAtIndex(int index, transport_context_t *context)
 {
 	int i;
 	
 	tsk_safeobj_lock(context);
     
 	if (index < (int)context->count) {
         transport_socket_xt *sock = context->sockets[index];
         
         // Remove from runloop
         if (context->cf_run_loop && sock->cf_run_loop_source) {
             CFRunLoopRemoveSource(context->cf_run_loop, sock->cf_run_loop_source, kCFRunLoopCommonModes);
             CFRelease(sock->cf_run_loop_source), sock->cf_run_loop_source = NULL;
         }
 		
 		// Invalidate CFSocket
         if (sock->cf_socket) {
             if (CFSocketIsValid(sock->cf_socket)) {
                 CFSocketInvalidate(sock->cf_socket);
             }
             CFRelease(sock->cf_socket);
             sock->cf_socket = NULL;
         }
         
         // Close and free write stream
         if (sock->cf_write_stream) {
             if (CFWriteStreamGetStatus(sock->cf_write_stream) != kCFStreamStatusClosed) {
                 CFWriteStreamClose(sock->cf_write_stream);
             }
             CFRelease(sock->cf_write_stream);
             sock->cf_write_stream = NULL;
         }
         
 		// Close and free read stream
         if (sock->cf_read_stream) {
             if (CFReadStreamGetStatus(sock->cf_read_stream) != kCFStreamStatusClosed) {
                 CFReadStreamClose(sock->cf_read_stream);
             }
             CFRelease(sock->cf_read_stream);
             sock->cf_read_stream = NULL;
         }
         
 		// Close the socket if we are the owner.
 		if (sock->owner) {
 			tnet_sockfd_close(&(sock->fd));
 		}
         
 		TSK_FREE(sock);
 		
 		for(i=index ; i<context->count-1; i++) {			
 			context->sockets[i] = context->sockets[i+1];
 		}
 		
 		context->sockets[context->count-1] = tsk_null;
 		context->count--;
         
 		TSK_DEBUG_INFO("Socket removed");
 	}
     
 	tsk_safeobj_unlock(context);
 	
 	return 0;
 }
 
 int removeSocket(transport_socket_xt *value, transport_context_t *context)
 {
 	int i;
 	
 	tsk_safeobj_lock(context);
     
     for(i = 0; i < context->count; i++) {
         transport_socket_xt *sock = context->sockets[i];
         if (sock == value) {
             removeSocketAtIndex(i, context);
             break;
         }
     }
     
 	tsk_safeobj_unlock(context);
 	
 	return 0;
 }
 
 int tnet_transport_stop(tnet_transport_t *transport)
 {	
 	int ret;
 	transport_context_t *context;
     
 	if (!transport) {
 		TSK_DEBUG_ERROR("Invalid parameter");
 		return -1;
 	}
 	
 	context = transport->context;
 	
 	if ((ret = tsk_runnable_stop(TSK_RUNNABLE(transport)))) {
 		return ret;
 	}
 	
 	if(transport->mainThreadId[0]){
 		if (context && context->cf_run_loop) {
 			// Signal the run-loop
 			CFRunLoopWakeUp(context->cf_run_loop);
 		}
 		return tsk_thread_join(transport->mainThreadId);
 	}
 	else { // already stopped
 		return 0;
 	}
 }
 
 int tnet_transport_prepare(tnet_transport_t *transport)
 {
 	int ret = -1;
 	transport_context_t *context;
 	
 	if (!transport || !(context = transport->context)) {
 		TSK_DEBUG_ERROR("Invalid parameter.");
 		return -1;
 	}
 	
 	if (transport->prepared) {
 		TSK_DEBUG_ERROR("Transport already prepared.");
 		return -2;
 	}
 	
 	/* Prepare master */
 	if(!transport->master){
 		if((transport->master = tnet_socket_create(transport->local_host, transport->req_local_port, transport->type))){
 			tsk_strupdate(&transport->local_ip, transport->master->ip);
 			transport->bind_local_port = transport->master->port;
 		}
 		else{
 			TSK_DEBUG_ERROR("Failed to create master socket");
 			return -3;
 		}
 	}
 	
 	/* Start listening */
 	if (TNET_SOCKET_TYPE_IS_STREAM(transport->master->type)) {
 		if ((ret = tnet_sockfd_listen(transport->master->fd, TNET_MAX_FDS))) {
 			TNET_PRINT_LAST_ERROR("listen have failed.");
 			goto bail;
 		}
 	}
 	
 	/* Add the master socket to the context. */
 	// don't take ownership: will be closed by the dtor() when refCount==0
 	// otherwise will be cosed twice: dtor() and removeSocket
 	if ((ret = addSocket(transport->master->fd, transport->master->type, transport, tsk_false, tsk_false))) {
 		TSK_DEBUG_ERROR("Failed to add master socket");
 		goto bail;
 	}
 	
 	transport->prepared = tsk_true;
 	
 bail:
 	return ret;
 }
 
 int tnet_transport_unprepare(tnet_transport_t *transport){
 	transport_context_t *context;
 	
 	if(!transport || !(context = transport->context)){
 		TSK_DEBUG_ERROR("Invalid parameter.");
 		return -1;
 	}
 	
 	if(!transport->prepared){
 		return 0;
 	}
 	
 	transport->prepared = tsk_false;
 	
 	while(context->count){
 		removeSocketAtIndex(0, context); // safe
 	}
 	
 	// destroy master as it has been closed by removeSocket()
 	TSK_OBJECT_SAFE_FREE(transport->master);
 	
 	return 0;
 }
 
 void __CFReadStreamClientCallBack(CFReadStreamRef stream, CFStreamEventType eventType, void *clientCallBackInfo) {
     // Extract the context
     tnet_transport_t *transport = (tnet_transport_t *) clientCallBackInfo;
 	transport_context_t *context = transport->context;
     
     /* lock context */
     tsk_safeobj_lock(context);
     
     // Extract the native socket
     CFDataRef data = CFReadStreamCopyProperty(stream, kCFStreamPropertySocketNativeHandle);
     transport_socket_xt *sock = tsk_null;
 	if(data){
         CFSocketNativeHandle fd;
         CFDataGetBytes(data, CFRangeMake(0, sizeof(CFSocketNativeHandle)), (UInt8*) &fd);
         CFRelease(data);
         sock = (transport_socket_xt *) getSocket(context, fd);
     } else if (eventType == kCFStreamEventErrorOccurred) { // this event returns null data
         sock = (transport_socket_xt *) getSocketByStream(context, stream);
     }
     
 	if(!sock) {
         goto bail;
     }
     
     switch(eventType) {
         case kCFStreamEventOpenCompleted:
         {
             TSK_DEBUG_INFO("__CFReadStreamClientCallBack --> kCFStreamEventOpenCompleted(fd=%d)", sock->fd);
 #if 0
             // Check SSL certificates
             if (TNET_SOCKET_TYPE_IS_TLS(sock->type) && transport->tls.verify) {
                 if (!isTrusted(transport, (__bridge id)stream, YES/*YES read stream*/)) {
                     TSK_DEBUG_ERROR("Remote SSL certs not trusted...closing the write stream");
                     TSK_RUNNABLE_ENQUEUE(transport, event_closed, transport->callback_data, sock->fd);
                     removeSocket(sock, context);
                     break;
                 }
             }
 #endif
             // Set "readable" flag
             if (!sock->readable) {
                 sock->readable = tsk_true;
                 if (sock->writable) {
                     TSK_RUNNABLE_ENQUEUE(transport, event_connected, transport->callback_data, sock->fd);
                 }
             }
             break;
         }
         case kCFStreamEventEndEncountered:
         {
             TSK_DEBUG_INFO("__CFReadStreamClientCallBack --> kCFStreamEventEndEncountered(fd=%d)", sock->fd);
             TSK_RUNNABLE_ENQUEUE(transport, event_closed, transport->callback_data, sock->fd);
             removeSocket(sock, context);
             break;
         }
         case kCFStreamEventHasBytesAvailable:
         {
 			recvData(transport, sock);
             break;
         }
         case kCFStreamEventErrorOccurred:
         {
             // Get the error code
             CFErrorRef error = CFReadStreamCopyError(stream);
             if (error) {
                 TSK_DEBUG_INFO("__CFReadStreamClientCallBack --> Error=%lu -> %s, fd=%d", CFErrorGetCode(error), CFStringGetCStringPtr(CFErrorGetDomain(error), kCFStringEncodingUTF8), sock->fd);
                 CFRelease(error);
             }
             
             TSK_RUNNABLE_ENQUEUE(transport, event_error, transport->callback_data, sock->fd);
             removeSocket(sock, context);
             break;
         }
         default:
         {
             // Not Implemented
             TSK_DEBUG_WARN("Not implemented");
             break;
         }
     }
     
     /* unlock context */
 bail:
     tsk_safeobj_unlock(context);
 }
 
 void __CFWriteStreamClientCallBack(CFWriteStreamRef stream, CFStreamEventType eventType, void *clientCallBackInfo) {
     // Extract the context
     tnet_transport_t *transport = (tnet_transport_t *) clientCallBackInfo;
 	transport_context_t *context = transport->context;
     
     /* lock context */
     tsk_safeobj_lock(context);
     
     // Extract the native socket
     CFDataRef data = CFWriteStreamCopyProperty(stream, kCFStreamPropertySocketNativeHandle);
     transport_socket_xt *sock = tsk_null;
 	if(data){
         CFSocketNativeHandle fd;
         CFDataGetBytes(data, CFRangeMake(0, sizeof(CFSocketNativeHandle)), (UInt8*) &fd);
         CFRelease(data);
         sock = (transport_socket_xt *) getSocket(context, fd);
     } else if (eventType == kCFStreamEventErrorOccurred) { // this event returns null data
         sock = (transport_socket_xt *) getSocketByStream(context, stream);
     }
     
 	if(!sock) {
         goto bail;
     }
     
     switch(eventType) {
         case kCFStreamEventOpenCompleted:
         {
             TSK_DEBUG_INFO("__CFWriteStreamClientCallBack --> kCFStreamEventOpenCompleted(fd=%d)", sock->fd);
             // still not connected, see kCFStreamEventCanAcceptBytes
             break;
         }
         case kCFStreamEventCanAcceptBytes:
         {
             // To avoid blocking, call this function only if CFWriteStreamCanAcceptBytes returns true or after the stream’s client (set with CFWriteStreamSetClient) is notified of a kCFStreamEventCanAcceptBytes event.
             TSK_DEBUG_INFO("__CFWriteStreamClientCallBack --> kCFStreamEventCanAcceptBytes(fd=%d)", sock->fd);
             // Check SSL certificates
             if (TNET_SOCKET_TYPE_IS_TLS(sock->type) && transport->tls.verify) {
                 if (!isTrusted(transport, (__bridge id)stream, FALSE/*NOT read stream*/)) {
                     TSK_DEBUG_ERROR("Remote SSL certs not trusted...closing the write stream");
                     removeSocket(sock, context);
                     break;
                 }
             }
             // Set "writable" flag
             if (!sock->writable) {
                 sock->writable = tsk_true;
                 if (sock->readable) {
                     TSK_RUNNABLE_ENQUEUE(transport, event_connected, transport->callback_data, sock->fd);
                 }
             }
             break;
         }
         case kCFStreamEventEndEncountered:
         {
             TSK_DEBUG_INFO("__CFWriteStreamClientCallBack --> kCFStreamEventEndEncountered(fd=%d)", sock->fd);
             TSK_RUNNABLE_ENQUEUE(transport, event_closed, transport->callback_data, sock->fd);
             removeSocket(sock, context);
             break;
         }
         case kCFStreamEventErrorOccurred:
         {
             // Get the error code
             CFErrorRef error = CFWriteStreamCopyError(stream);
             if (error) {
                 TSK_DEBUG_INFO("__CFWriteStreamClientCallBack --> Error=%lu -> %s, fd=%d", CFErrorGetCode(error), CFStringGetCStringPtr(CFErrorGetDomain(error), kCFStringEncodingUTF8), sock->fd);
                 CFRelease(error);
             }
             
             TSK_RUNNABLE_ENQUEUE(transport, event_error, transport->callback_data, sock->fd);
             removeSocket(sock, context);
             break;
         }
         default:
         {
             // Not Implemented
             TSK_DEBUG_ERROR("Not implemented");
             break;
         }
     }
     
     /* unlock context */
 bail:
     tsk_safeobj_unlock(context);
 }
 
 void __CFSocketCallBack(CFSocketRef s, CFSocketCallBackType callbackType, CFDataRef address, const void *data, void *info) {
     // Extract the context
     tnet_transport_t *transport = (tnet_transport_t *) info;
 	transport_context_t *context = transport->context;
     
     // Extract the native socket
     int fd = CFSocketGetNative(s);
     transport_socket_xt *sock = (transport_socket_xt *) getSocket(context, fd);
 	if(!sock) goto bail;
 
     /* lock context */
     tsk_safeobj_lock(context);
     
     switch (callbackType) {
         case kCFSocketReadCallBack:
         {
             recvData(transport, sock);
             break;
         }
         case kCFSocketAcceptCallBack:
         case kCFSocketConnectCallBack:
         case kCFSocketWriteCallBack:
         {
             TSK_DEBUG_INFO("__CFSocketCallBack(fd=%d), callbackType=%lu", sock->fd, callbackType);
             wrapSocket(transport, sock);
             break;
         }
         case kCFSocketDataCallBack:
         {
             if (data) {
                 const UInt8 *ptr = CFDataGetBytePtr((CFDataRef)data);
                 int len = (int)CFDataGetLength((CFDataRef)data);
                 if (ptr && len > 0) {
                     tnet_transport_event_t* e = tnet_transport_event_create(event_data, transport->callback_data, sock->fd);
                     if (e) {
 						#if HAVE_CRT //Debug memory
 		e->data = malloc(len);
 						#else
 		e->data = tsk_malloc(len);
 						#endif //HAVE_CRT
                         
                         if (e->data) {
                             memcpy(e->data, ptr, len);
                             e->size = len;
                         }
                         memcpy(&e->remote_addr, (struct sockaddr*)address, tnet_get_sockaddr_size((struct sockaddr*)address));
                         TSK_RUNNABLE_ENQUEUE_OBJECT_SAFE(TSK_RUNNABLE(transport), e);
                     }
                 }
             }
             break;
         }
         
         default:
         {
             // Not Implemented
             TSK_DEBUG_ERROR("Not implemented");
             break;
         }
     }
     
     /* unlock context */
 bail:
     tsk_safeobj_unlock(context);
 }
 
 
 
 int wrapSocket(tnet_transport_t *transport, transport_socket_xt *sock) 
 {
 	transport_context_t *context;
 	if(!transport || !(context = transport->context) || !sock){
 		TSK_DEBUG_ERROR("Invalid parameter");
 		return -1;
 	}
     
     // If the socket is already wrapped in a CFSocket or mainthead not started yet then return
     if (!context->cf_run_loop) {
         return 0;
     }
     
     // Put a reference to the transport context 
     const CFSocketContext socket_context = { 0, transport, NULL, NULL, NULL };
     
     // Wrap socket and listen to events
     if (!sock->cf_socket && !sock->cf_read_stream && !sock->cf_write_stream) {
         sock->cf_socket = CFSocketCreateWithNative(kCFAllocatorDefault,
                                                    sock->fd,
                                                    kCFSocketReadCallBack | kCFSocketConnectCallBack | kCFSocketWriteCallBack | kCFSocketAcceptCallBack | kCFSocketDataCallBack,
                                                    &__CFSocketCallBack,
                                                    &socket_context);
         
         // Don't close the socket if the CFSocket is invalidated
         CFOptionFlags flags = CFSocketGetSocketFlags(sock->cf_socket);
         flags = flags & ~kCFSocketCloseOnInvalidate;
         CFSocketSetSocketFlags(sock->cf_socket, flags);
         
         // Create a new RunLoopSource and register it with the main thread RunLoop
         sock->cf_run_loop_source = CFSocketCreateRunLoopSource(kCFAllocatorDefault, sock->cf_socket, 0);
         CFRunLoopAddSource(context->cf_run_loop, sock->cf_run_loop_source, kCFRunLoopCommonModes);
     }
     
     if (TNET_SOCKET_TYPE_IS_DGRAM(sock->type)) {
         // Nothing to do
         
     } else if (TNET_SOCKET_TYPE_IS_STREAM(sock->type)) {
         if (!sock->cf_read_stream && !sock->cf_write_stream) {
             // Create a pair of streams (read/write) from the socket
             CFStreamCreatePairWithSocket(kCFAllocatorDefault, sock->fd, &sock->cf_read_stream, &sock->cf_write_stream);
             
             // Don't close underlying socket
             CFReadStreamSetProperty(sock->cf_read_stream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
             CFWriteStreamSetProperty(sock->cf_write_stream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
             
             // Mark the stream for VoIP usage
             CFReadStreamSetProperty(sock->cf_read_stream, kCFStreamNetworkServiceType, kCFStreamNetworkServiceTypeVoIP);
             CFWriteStreamSetProperty(sock->cf_write_stream, kCFStreamNetworkServiceType, kCFStreamNetworkServiceTypeVoIP);
             
             // Setup a context for the streams
             CFStreamClientContext streamContext = { 0, transport, NULL, NULL, NULL };
             
             // Set the client callback for the stream
             CFReadStreamSetClient(sock->cf_read_stream,
                                   kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
                                   &__CFReadStreamClientCallBack,
                                   &streamContext);
             CFWriteStreamSetClient(sock->cf_write_stream,
                                    kCFStreamEventOpenCompleted | kCFStreamEventErrorOccurred | kCFStreamEventCanAcceptBytes |kCFStreamEventEndEncountered,
                                    &__CFWriteStreamClientCallBack,
                                    &streamContext);
             
             if (TNET_SOCKET_TYPE_IS_TLS(sock->type)) {
                 CFWriteStreamSetProperty(sock->cf_write_stream, kCFStreamPropertySocketSecurityLevel, kCFStreamSocketSecurityLevelNegotiatedSSL);
                 CFReadStreamSetProperty(sock->cf_read_stream, kCFStreamPropertySocketSecurityLevel, kCFStreamSocketSecurityLevelNegotiatedSSL);
                 CFWriteStreamSetProperty(sock->cf_write_stream, kCFStreamSSLLevel, kCFStreamSocketSecurityLevelNegotiatedSSL);
                 CFReadStreamSetProperty(sock->cf_read_stream, kCFStreamSSLLevel, kCFStreamSocketSecurityLevelNegotiatedSSL);
                 
                 CFMutableDictionaryRef settings = CFDictionaryCreateMutable(kCFAllocatorDefault, 0, &kCFTypeDictionaryKeyCallBacks, &kCFTypeDictionaryValueCallBacks);
 #if (__IPHONE_OS_VERSION_MIN_REQUIRED < 40000) // @Deprecated
                 CFDictionaryAddValue(settings, kCFStreamSSLAllowsExpiredCertificates, kCFBooleanTrue);
                 CFDictionaryAddValue(settings, kCFStreamSSLAllowsAnyRoot, kCFBooleanTrue); // self-signed? - deprecated
 #endif
                 // Set "kCFStreamSSLValidatesCertificateChain" to false to accept self-signed certs. The validation will be done manually using "isTrusted()" to check cert matching if "verify" option is enabled.
                 CFDictionaryAddValue(settings, kCFStreamSSLValidatesCertificateChain, kCFBooleanFalse);
                 CFDictionaryAddValue(settings, kCFStreamSSLIsServer, sock->is_client ? kCFBooleanFalse : kCFBooleanTrue);
                 CFDictionaryAddValue(settings, kCFStreamSSLPeerName, kCFNull);
                 
                 // Set the SSL settings
                 CFReadStreamSetProperty(sock->cf_read_stream, kCFStreamPropertySSLSettings, settings);
                 CFWriteStreamSetProperty(sock->cf_write_stream, kCFStreamPropertySSLSettings, settings);
                 
                 CFRelease(settings);
             }
             
             // Enroll streams in the run-loop
             CFReadStreamScheduleWithRunLoop(sock->cf_read_stream, context->cf_run_loop, kCFRunLoopCommonModes);
             CFWriteStreamScheduleWithRunLoop(sock->cf_write_stream, context->cf_run_loop, kCFRunLoopCommonModes);
         }
         
         // Open streams only if ready (otherwise, fails on iOS8)
         if (tnet_sockfd_waitUntilReadable(sock->fd, 1) == 0 || tnet_sockfd_waitUntilWritable(sock->fd, 1) == 0) {
             // switch from cf_socket to streams
             if (sock->cf_run_loop_source) {
                 CFRunLoopRemoveSource(context->cf_run_loop, sock->cf_run_loop_source, kCFRunLoopCommonModes);
                 CFRelease(sock->cf_run_loop_source), sock->cf_run_loop_source = NULL;
             }
             if (sock->cf_socket) {
                 CFSocketInvalidate(sock->cf_socket);
                 CFRelease(sock->cf_socket);
                 sock->cf_socket = NULL;
             }
             
             // Open streams
             if (!CFReadStreamOpen(sock->cf_read_stream)) {
                 TSK_DEBUG_ERROR("CFReadStreamOpen(fd=%d) failed", sock->fd);
                 return -1;
             }
             if (!CFWriteStreamOpen(sock->cf_write_stream)) {
                 TSK_DEBUG_ERROR("CFWriteStreamOpen(fd=%d) failed", sock->fd);
                 return -1;
             }
         }
     }
     
     return 0;
 }
 
 /*=== Main thread */
 void *tnet_transport_mainthread(void *param)
 {
 	tnet_transport_t *transport = param;
     transport_context_t *context = transport->context;
     int i;
 
 	/* check whether the transport is already prepared */
 	if (!transport->prepared) {
 		TSK_DEBUG_ERROR("Transport must be prepared before strating.");
 		goto bail;
 	}
 	
 	TSK_DEBUG_INFO("Starting [%s] server with IP {%s} on port {%d} with fd {%d}...", transport->description, transport->master->ip, transport->master->port, transport->master->fd);
     
     // Set the RunLoop of the context
     context->cf_run_loop = CFRunLoopGetCurrent();
     CFRetain(context->cf_run_loop);
 	// Wrap sockets now that the runloop is defined
 	tsk_safeobj_lock(context);
 	for (i = 0; i < context->count; ++i) {
 		wrapSocket(transport, context->sockets[i]);
 	}
 	tsk_safeobj_unlock(context);
     
 	while(TSK_RUNNABLE(transport)->running)
 	{
         // Give some time to process sources
         CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1.0, false);
         
 		if (!TSK_RUNNABLE(transport)->running) {
             goto bail;
 		}
     }
     
     // Remove all the sockets, streams and sources from the run loop
 	tsk_safeobj_lock(context);
     for(i = 0; i < context->count; i++) {
         transport_context_t *context = transport->context;
         transport_socket_xt *sock = context->sockets[i];
         
         if (!sock) {
             continue;
         }
         if (sock->cf_run_loop_source) {
             CFRunLoopRemoveSource(context->cf_run_loop, sock->cf_run_loop_source, kCFRunLoopDefaultMode);
         }
         if (sock->cf_read_stream) {
             //CFReadStreamClose(sock->cf_read_stream);
             CFReadStreamUnscheduleFromRunLoop(sock->cf_read_stream, context->cf_run_loop, kCFRunLoopDefaultMode);
         }
         if (sock->cf_write_stream) {
             //CFWriteStreamClose(sock->cf_write_stream);
             CFWriteStreamUnscheduleFromRunLoop(sock->cf_write_stream, context->cf_run_loop, kCFRunLoopDefaultMode);
         }
     }
 	tsk_safeobj_unlock(context);
 
     
 bail:
 	TSK_DEBUG_INFO("Stopped [%s] server with IP {%s} on port {%d}...", transport->description, transport->master->ip, transport->master->port);
     if(context->cf_run_loop){
         CFRelease(context->cf_run_loop);
         context->cf_run_loop = NULL;
     }
 	return 0;
 }
 
 
 
 
 
 
 
 
 void* tnet_transport_context_create()
 {
 	return tsk_object_new(tnet_transport_context_def_t);
 }
 
 
 //=================================================================================================
 //	Transport context object definition
 //
 static tsk_object_t* transport_context_ctor(tsk_object_t * self, va_list * app)
 {
 	transport_context_t *context = self;
 	if (context) {
 		tsk_safeobj_init(context);
 	}
 	return self;
 }
 
 static tsk_object_t* transport_context_dtor(tsk_object_t * self)
 { 
 	transport_context_t *context = self;
 	if (context) {
 		while(context->count) {
 			removeSocketAtIndex(0, context);
 		}
 		tsk_safeobj_deinit(context);
 	}
 	return self;
 }
 
 static const tsk_object_def_t tnet_transport_context_def_s = 
 {
     sizeof(transport_context_t),
     transport_context_ctor, 
     transport_context_dtor,
     tsk_null, 
 };
 const tsk_object_def_t *tnet_transport_context_def_t = &tnet_transport_context_def_s;
 
 #endif /* HAVE_POLL_H */