From 1abde451b80a3f44fa5e85e0f8a12ede4b3df255 Mon Sep 17 00:00:00 2001 From: Jerome St-Louis Date: Sat, 27 Feb 2016 16:28:10 -0500 Subject: [PATCH] ecere/net; compiler/ecs: Distributed Objects Improvements - Better mutexing around socket reference counting - Thread awareness when locking/unlocking GuiApplication mutex - net/Socket: Only setting leftOver to true on non-zero recvBytes --- compiler/ecs/ecs.ec | 6 ++++-- ecere/src/gui/GuiApplication.ec | 33 +++++++++++++++++++++++++++++++ ecere/src/net/Socket.ec | 38 +++++++++++++++++++++++++++++++----- ecere/src/net/dcom.ec | 32 +++++++++++++++++++----------- ecere/src/sys/Mutex.ec | 43 +++++++++++++++++++++-------------------- 5 files changed, 113 insertions(+), 39 deletions(-) diff --git a/compiler/ecs/ecs.ec b/compiler/ecs/ecs.ec index f979662..680f7c6 100644 --- a/compiler/ecs/ecs.ec +++ b/compiler/ecs/ecs.ec @@ -1105,7 +1105,8 @@ static void BindDCOMClient() f.Printf(" = 0"); f.Printf(";\n\n"); } - f.Printf(" incref this;\n"); + // f.Printf(" incref this;\n"); + f.Printf(" safeIncRef();\n"); for(param = method.dataType.params.first; param; param = param.next) { char type[1024] = ""; @@ -1165,7 +1166,8 @@ static void BindDCOMClient() } f.Printf(" }\n"); f.Printf(" __ecereBuffer.Free();\n"); - f.Printf(" delete this;\n"); + //f.Printf(" delete this;\n"); + f.Printf(" safeDecRef();\n"); if(method.dataType.returnType.kind != voidType) { f.Printf(" return __ecereResult;\n"); diff --git a/ecere/src/gui/GuiApplication.ec b/ecere/src/gui/GuiApplication.ec index 9827b8f..4f79ecf 100644 --- a/ecere/src/gui/GuiApplication.ec +++ b/ecere/src/gui/GuiApplication.ec @@ -1474,6 +1474,39 @@ public: #endif } + void LockEx(int count) + { +#if !defined(__EMSCRIPTEN__) + int i; + for(i = 0; i < count; i++) + { + lockMutex.Wait(); +#if (defined(__unix__) || defined(__APPLE__)) && !defined(__ANDROID__) && !defined(__EMSCRIPTEN__) + if(xGlobalDisplay) + XLockDisplay(xGlobalDisplay); +#endif + } +#endif + } + + int UnlockEx(void) + { + int count = 0; +#if !defined(__EMSCRIPTEN__) + int i; + count = lockMutex.owningThread == GetCurrentThreadID() ? lockMutex.lockCount : 0; + for(i = 0; i < count; i++) + { +#if (defined(__unix__) || defined(__APPLE__)) && !defined(__ANDROID__) && !defined(__EMSCRIPTEN__) + if(xGlobalDisplay) + XUnlockDisplay(xGlobalDisplay); +#endif + lockMutex.Release(); + } +#endif + return count; + } + Cursor GetCursor(SystemCursor cursor) { return systemCursors[cursor]; diff --git a/ecere/src/net/Socket.ec b/ecere/src/net/Socket.ec index aec0562..96b25cf 100644 --- a/ecere/src/net/Socket.ec +++ b/ecere/src/net/Socket.ec @@ -312,6 +312,8 @@ public: #if defined(__WIN32__) || defined(__unix__) || defined(__APPLE__) if(this) { + safeIncRef(); + { SOCKET s = this.s; int count; fd_set ws, es; @@ -335,8 +337,12 @@ public: // This is what was making eCom jam... // select(s+1, null, &ws, &es, null); + + safeDecRef(); return true; } + } + safeDecRef(); } #endif return false; @@ -434,6 +440,21 @@ public: } virtual bool OnEstablishConnection(int s); + dllexport void safeIncRef() + { + mutex.Wait(); + incref this; + //mutex.Release(); + } + + dllexport void safeDecRef() + { + Mutex mutex = this.mutex; + //mutex.Wait(); + delete this; + mutex.Release(); + } + private: Socket() { @@ -608,9 +629,10 @@ private: { bool result = false; SOCKET s; + Mutex mutex = this.mutex; - incref this; mutex.Wait(); + incref this; // network.mutex.Wait(); s = this.s; if(FD_ISSET(s, rs) || leftOver) @@ -661,7 +683,8 @@ private: uint recvCount = OnReceive(recvBuffer + flushCount, recvBytes - flushCount); if(!recvCount) { - leftOver = true; + if(recvBytes) + leftOver = true; if(!processAlone) network.leftOverBytes = true; break; @@ -709,9 +732,9 @@ private: _Disconnect(disconnectCode); } // network.mutex.Release(); + delete this; mutex.Release(); - delete this; return result; } #endif @@ -728,6 +751,7 @@ private: struct timeval tvTO = {(uint)timeOut, (uint)((timeOut -(uint)timeOut)* 1000000)}; fd_set rs, ws, es; int selectResult; + Mutex mutex; if(disconnectCode > 0 && !leftOver) return false; FD_ZERO(&rs); @@ -737,15 +761,19 @@ private: //FD_SET(s, &ws); FD_SET(s, &es); + mutex = this.mutex; + mutex.Wait(); incref this; + mutex.Release(); selectResult = select((int)(s+1), &rs, &ws, &es, leftOver ? &tv : (timeOut ? &tvTO : null)); - mutex.Wait(); + if(s != -1 && _refCount && (leftOver || selectResult)) { gotEvent |= ProcessSocket(&rs, &ws, &es); } - mutex.Release(); + mutex.Wait(); delete this; + mutex.Release(); return gotEvent; } diff --git a/ecere/src/net/dcom.ec b/ecere/src/net/dcom.ec index 5110a66..4bd3d9b 100644 --- a/ecere/src/net/dcom.ec +++ b/ecere/src/net/dcom.ec @@ -153,6 +153,7 @@ public: if(serverSocket && serverSocket.connected) { + int lockCount; int64 currentThreadID = GetCurrentThreadID(); int callID = nextCallID++; DCOMServerSocket socket = serverSocket; @@ -206,13 +207,13 @@ public: if((ack = VirtualCallAcknowledged(methodID, id, callID))) break; - guiApp.Unlock(); + lockCount = guiApp.UnlockEx(); mutex.Release(); if(processingSocket && processingSocket.connected) processingSocket.ProcessTimeOut(0.01); else ecere::sys::Sleep(0.01);//serverSocket.thread.semaphore.Wait(); - guiApp.Lock(); + guiApp.LockEx(lockCount); mutex.Wait(); } @@ -223,7 +224,7 @@ public: delete ack; } - guiApp.Unlock(); + lockCount = guiApp.UnlockEx(); mutex.Release(); if(socket._refCount > 1) @@ -234,7 +235,7 @@ public: processingSocket._refCount--; delete processingSocket; - guiApp.Lock(); + guiApp.LockEx(lockCount); mutex.Wait(); if(_refCount > 1) @@ -308,13 +309,18 @@ class DCOMClientThread : Thread bool connected; unsigned int Main() { + socket.mutex.Wait(); socket._refCount += 2; + socket.mutex.Release(); while(connected) { socket.ProcessTimeOut(0.01); semaphore.Release(); } - if(socket._refCount > 1) socket._refCount--; + socket.mutex.Wait(); + if(socket._refCount > 1) + socket._refCount--; + socket.mutex.Release(); delete socket; return 0; } @@ -612,6 +618,7 @@ public: bool result = false; if(Socket::Connect(server, port)) { + int lockCount; int len = (int)(strlen(_class.name) + 4 - strlen("DCOMClient_")); unsigned int size = sizeof(class CreateInstancePacket) + len; CreateInstancePacket packet = (CreateInstancePacket)new0 byte[size]; @@ -625,7 +632,7 @@ public: thread.socket = this; thread.connected = true; thread.Create(); - guiApp.Unlock(); + lockCount = guiApp.UnlockEx(); while(!answered && thread && connected) { //guiApp.WaitNetworkEvent(); @@ -637,7 +644,7 @@ public: else thread.semaphore.Wait(); } - guiApp.Lock(); + guiApp.LockEx(lockCount); result = connected; } return result; @@ -728,8 +735,8 @@ public: break; } } - guiApp.Unlock(); } + guiApp.Unlock(); } void OnDisconnect(int code) @@ -760,20 +767,23 @@ public: while(true) { + int lockCount; if(!thread || !connected) break; if((ack = CallAcknowledged(methodID, objectID, callID))) break; - guiApp.Unlock(); + lockCount = guiApp.UnlockEx(); //guiApp.WaitNetworkEvent(); //guiApp.ProcessNetworkEvents(); //Process(); + mutex.Release(); if(GetCurrentThreadID() == (int64)thread.id) ProcessTimeOut(0.01); else ecere::sys::Sleep(0.01);//thread.semaphore.Wait(); - guiApp.Lock(); + mutex.Wait(); + guiApp.LockEx(lockCount); } if(ack) @@ -807,7 +817,7 @@ public class DCOMSendControl public: void Stop() { - while(sendingOut) guiApp.Unlock(), ecere::sys::Sleep(0.01), guiApp.Lock(); + while(sendingOut) { int lockCount = guiApp.UnlockEx(); ecere::sys::Sleep(0.01); guiApp.LockEx(lockCount); } sendingOut = true; } diff --git a/ecere/src/sys/Mutex.ec b/ecere/src/sys/Mutex.ec index 84f167e..1ea1d06 100644 --- a/ecere/src/sys/Mutex.ec +++ b/ecere/src/sys/Mutex.ec @@ -54,10 +54,10 @@ public class Mutex : struct #endif #endif -#ifdef _DEBUG - int64 owningThread; -#endif - int lockCount; +//#ifdef _DEBUG + int64 _owningThread; +//#endif + int _lockCount; Mutex() { @@ -84,10 +84,10 @@ public class Mutex : struct #endif #endif - lockCount = 0; -#ifdef _DEBUG - owningThread = 0; -#endif + _lockCount = 0; +//#ifdef _DEBUG + _owningThread = 0; +//#endif return true; } @@ -138,10 +138,10 @@ public: #endif -#ifdef _DEBUG - owningThread = GetCurrentThreadID(); -#endif - lockCount++; +//#ifdef _DEBUG + _owningThread = GetCurrentThreadID(); +//#endif + _lockCount++; } } @@ -155,16 +155,16 @@ public: printf("[%d] Releasing Mutex %x\n", (int)GetCurrentThreadID(), this); */ #ifdef _DEBUG - if(lockCount && owningThread != GetCurrentThreadID()) + if(_lockCount && _owningThread != GetCurrentThreadID()) PrintLn("WARNING: Not in owning thread!!"); #endif - if(!--lockCount) -#ifdef _DEBUG - owningThread = 0; -#else - ; -#endif + if(!--_lockCount) + { +//#ifdef _DEBUG + _owningThread = 0; +//#endif + } #if defined(__WIN32__) #ifdef _DEBUG ReleaseMutex(mutex); @@ -186,11 +186,12 @@ public: #endif #ifdef _DEBUG - if(lockCount < 0) + if(_lockCount < 0) PrintLn("WARNING: lockCount < 0"); #endif } } - property int lockCount { get { return lockCount; } } + property int lockCount { get { return _lockCount; } } + property int64 owningThread { get { return _owningThread; } } }; -- 1.8.3.1