ecere/net; compiler/ecs: Distributed Objects Improvements
authorJerome St-Louis <jerome@ecere.com>
Sat, 27 Feb 2016 21:28:10 +0000 (16:28 -0500)
committerJerome St-Louis <jerome@ecere.com>
Thu, 28 Jul 2016 22:23:15 +0000 (18:23 -0400)
- Better mutexing around socket reference counting
- Thread awareness when locking/unlocking GuiApplication mutex
- net/Socket: Only setting leftOver to true on non-zero recvBytes

compiler/ecs/ecs.ec
ecere/src/gui/GuiApplication.ec
ecere/src/net/Socket.ec
ecere/src/net/dcom.ec
ecere/src/sys/Mutex.ec

index f979662..680f7c6 100644 (file)
@@ -1105,7 +1105,8 @@ static void BindDCOMClient()
                         f.Printf(" = 0");
                      f.Printf(";\n\n");
                   }
-                  f.Printf("      incref this;\n");
+                  // f.Printf("      incref this;\n");
+                  f.Printf("      safeIncRef();\n");
                   for(param = method.dataType.params.first; param; param = param.next)
                   {
                      char type[1024] = "";
@@ -1165,7 +1166,8 @@ static void BindDCOMClient()
                   }
                   f.Printf("      }\n");
                   f.Printf("      __ecereBuffer.Free();\n");
-                  f.Printf("      delete this;\n");
+                  //f.Printf("      delete this;\n");
+                  f.Printf("      safeDecRef();\n");
                   if(method.dataType.returnType.kind != voidType)
                   {
                      f.Printf("      return __ecereResult;\n");
index 9827b8f..4f79ecf 100644 (file)
@@ -1474,6 +1474,39 @@ public:
 #endif
    }
 
+   void LockEx(int count)
+   {
+#if !defined(__EMSCRIPTEN__)
+      int i;
+      for(i = 0; i < count; i++)
+      {
+         lockMutex.Wait();
+#if (defined(__unix__) || defined(__APPLE__)) && !defined(__ANDROID__) && !defined(__EMSCRIPTEN__)
+         if(xGlobalDisplay)
+            XLockDisplay(xGlobalDisplay);
+#endif
+      }
+#endif
+   }
+
+   int UnlockEx(void)
+   {
+      int count = 0;
+#if !defined(__EMSCRIPTEN__)
+      int i;
+      count = lockMutex.owningThread == GetCurrentThreadID() ? lockMutex.lockCount : 0;
+      for(i = 0; i < count; i++)
+      {
+#if (defined(__unix__) || defined(__APPLE__)) && !defined(__ANDROID__) && !defined(__EMSCRIPTEN__)
+         if(xGlobalDisplay)
+            XUnlockDisplay(xGlobalDisplay);
+#endif
+         lockMutex.Release();
+      }
+#endif
+      return count;
+   }
+
    Cursor GetCursor(SystemCursor cursor)
    {
       return systemCursors[cursor];
index aec0562..96b25cf 100644 (file)
@@ -312,6 +312,8 @@ public:
    #if defined(__WIN32__) || defined(__unix__) || defined(__APPLE__)
       if(this)
       {
+         safeIncRef();
+         {
          SOCKET s = this.s;
          int count;
          fd_set ws, es;
@@ -335,8 +337,12 @@ public:
 
             // This is what was making eCom jam...
             // select(s+1, null, &ws, &es, null);
+
+            safeDecRef();
             return true;
          }
+         }
+         safeDecRef();
       }
    #endif
       return false;
@@ -434,6 +440,21 @@ public:
    }
    virtual bool OnEstablishConnection(int s);
 
+   dllexport void safeIncRef()
+   {
+      mutex.Wait();
+      incref this;
+      //mutex.Release();
+   }
+
+   dllexport void safeDecRef()
+   {
+      Mutex mutex = this.mutex;
+      //mutex.Wait();
+      delete this;
+      mutex.Release();
+   }
+
 private:
    Socket()
    {
@@ -608,9 +629,10 @@ private:
    {
       bool result = false;
       SOCKET s;
+      Mutex mutex = this.mutex;
 
-      incref this;
       mutex.Wait();
+      incref this;
       // network.mutex.Wait();
       s = this.s;
       if(FD_ISSET(s, rs) || leftOver)
@@ -661,7 +683,8 @@ private:
                uint recvCount = OnReceive(recvBuffer + flushCount, recvBytes - flushCount);
                if(!recvCount)
                {
-                  leftOver = true;
+                  if(recvBytes)
+                     leftOver = true;
                   if(!processAlone)
                      network.leftOverBytes = true;
                   break;
@@ -709,9 +732,9 @@ private:
             _Disconnect(disconnectCode);
       }
       // network.mutex.Release();
+      delete this;
       mutex.Release();
 
-      delete this;
       return result;
    }
 #endif
@@ -728,6 +751,7 @@ private:
       struct timeval tvTO = {(uint)timeOut, (uint)((timeOut -(uint)timeOut)* 1000000)};
       fd_set rs, ws, es;
       int selectResult;
+      Mutex mutex;
 
       if(disconnectCode > 0 && !leftOver) return false;
       FD_ZERO(&rs);
@@ -737,15 +761,19 @@ private:
       //FD_SET(s, &ws);
       FD_SET(s, &es);
 
+      mutex = this.mutex;
+      mutex.Wait();
       incref this;
+      mutex.Release();
       selectResult = select((int)(s+1), &rs, &ws, &es, leftOver ? &tv : (timeOut ? &tvTO : null));
-      mutex.Wait();
+
       if(s != -1 && _refCount && (leftOver || selectResult))
       {
          gotEvent |= ProcessSocket(&rs, &ws, &es);
       }
-      mutex.Release();
+      mutex.Wait();
       delete this;
+      mutex.Release();
       return gotEvent;
    }
 
index 5110a66..4bd3d9b 100644 (file)
@@ -153,6 +153,7 @@ public:
 
       if(serverSocket && serverSocket.connected)
       {
+         int lockCount;
          int64 currentThreadID = GetCurrentThreadID();
          int callID = nextCallID++;
          DCOMServerSocket socket = serverSocket;
@@ -206,13 +207,13 @@ public:
             if((ack = VirtualCallAcknowledged(methodID, id, callID)))
                break;
 
-            guiApp.Unlock();
+            lockCount = guiApp.UnlockEx();
             mutex.Release();
             if(processingSocket && processingSocket.connected)
                processingSocket.ProcessTimeOut(0.01);
             else
                ecere::sys::Sleep(0.01);//serverSocket.thread.semaphore.Wait();
-            guiApp.Lock();
+            guiApp.LockEx(lockCount);
             mutex.Wait();
          }
 
@@ -223,7 +224,7 @@ public:
             delete ack;
          }
 
-         guiApp.Unlock();
+         lockCount = guiApp.UnlockEx();
          mutex.Release();
 
          if(socket._refCount > 1)
@@ -234,7 +235,7 @@ public:
             processingSocket._refCount--;
          delete processingSocket;
 
-         guiApp.Lock();
+         guiApp.LockEx(lockCount);
          mutex.Wait();
 
          if(_refCount > 1)
@@ -308,13 +309,18 @@ class DCOMClientThread : Thread
    bool connected;
    unsigned int Main()
    {
+      socket.mutex.Wait();
       socket._refCount += 2;
+      socket.mutex.Release();
       while(connected)
       {
          socket.ProcessTimeOut(0.01);
          semaphore.Release();
       }
-      if(socket._refCount > 1) socket._refCount--;
+      socket.mutex.Wait();
+      if(socket._refCount > 1)
+         socket._refCount--;
+      socket.mutex.Release();
       delete socket;
       return 0;
    }
@@ -612,6 +618,7 @@ public:
       bool result = false;
       if(Socket::Connect(server, port))
       {
+         int lockCount;
          int len = (int)(strlen(_class.name) + 4 - strlen("DCOMClient_"));
          unsigned int size = sizeof(class CreateInstancePacket) + len;
          CreateInstancePacket packet = (CreateInstancePacket)new0 byte[size];
@@ -625,7 +632,7 @@ public:
          thread.socket = this;
          thread.connected = true;
          thread.Create();
-         guiApp.Unlock();
+         lockCount = guiApp.UnlockEx();
          while(!answered && thread && connected)
          {
             //guiApp.WaitNetworkEvent();
@@ -637,7 +644,7 @@ public:
             else
                thread.semaphore.Wait();
          }
-         guiApp.Lock();
+         guiApp.LockEx(lockCount);
          result = connected;
       }
       return result;
@@ -728,8 +735,8 @@ public:
                break;
             }
          }
-         guiApp.Unlock();
       }
+      guiApp.Unlock();
    }
 
    void OnDisconnect(int code)
@@ -760,20 +767,23 @@ public:
 
          while(true)
          {
+            int lockCount;
             if(!thread || !connected)
                break;
             if((ack = CallAcknowledged(methodID, objectID, callID)))
                break;
-            guiApp.Unlock();
+            lockCount = guiApp.UnlockEx();
 
             //guiApp.WaitNetworkEvent();
             //guiApp.ProcessNetworkEvents();
             //Process();
+            mutex.Release();
             if(GetCurrentThreadID() == (int64)thread.id)
                ProcessTimeOut(0.01);
             else
                ecere::sys::Sleep(0.01);//thread.semaphore.Wait();
-            guiApp.Lock();
+            mutex.Wait();
+            guiApp.LockEx(lockCount);
          }
 
          if(ack)
@@ -807,7 +817,7 @@ public class DCOMSendControl
 public:
    void Stop()
    {
-      while(sendingOut) guiApp.Unlock(), ecere::sys::Sleep(0.01), guiApp.Lock();
+      while(sendingOut) { int lockCount = guiApp.UnlockEx(); ecere::sys::Sleep(0.01); guiApp.LockEx(lockCount); }
       sendingOut = true;
    }
 
index 84f167e..1ea1d06 100644 (file)
@@ -54,10 +54,10 @@ public class Mutex : struct
 #endif
 #endif
 
-#ifdef _DEBUG
-   int64 owningThread;
-#endif
-   int lockCount;
+//#ifdef _DEBUG
+   int64 _owningThread;
+//#endif
+   int _lockCount;
 
    Mutex()
    {
@@ -84,10 +84,10 @@ public class Mutex : struct
 #endif
 #endif
 
-      lockCount = 0;
-#ifdef _DEBUG
-      owningThread = 0;
-#endif
+      _lockCount = 0;
+//#ifdef _DEBUG
+      _owningThread = 0;
+//#endif
       return true;
    }
 
@@ -138,10 +138,10 @@ public:
 
 #endif
 
-#ifdef _DEBUG
-         owningThread = GetCurrentThreadID();
-#endif
-         lockCount++;
+//#ifdef _DEBUG
+         _owningThread = GetCurrentThreadID();
+//#endif
+         _lockCount++;
       }
    }
 
@@ -155,16 +155,16 @@ public:
             printf("[%d] Releasing Mutex %x\n", (int)GetCurrentThreadID(), this);
          */
 #ifdef _DEBUG
-         if(lockCount && owningThread != GetCurrentThreadID())
+         if(_lockCount && _owningThread != GetCurrentThreadID())
             PrintLn("WARNING: Not in owning thread!!");
 #endif
 
-         if(!--lockCount)
-#ifdef _DEBUG
-            owningThread = 0;
-#else
-            ;
-#endif
+         if(!--_lockCount)
+         {
+//#ifdef _DEBUG
+            _owningThread = 0;
+//#endif
+         }
 #if defined(__WIN32__)
 #ifdef _DEBUG
          ReleaseMutex(mutex);
@@ -186,11 +186,12 @@ public:
 #endif
 
 #ifdef _DEBUG
-         if(lockCount < 0)
+         if(_lockCount < 0)
             PrintLn("WARNING: lockCount < 0");
 #endif
       }
    }
 
-   property int lockCount { get { return lockCount; } }
+   property int lockCount { get { return _lockCount; } }
+   property int64 owningThread { get { return _owningThread; } }
 };