ecere/net/HTTPFile: (#997) Multiple fixes and improvements
authorJerome St-Louis <jerome@ecere.com>
Mon, 30 Sep 2013 07:31:12 +0000 (03:31 -0400)
committerJerome St-Louis <jerome@ecere.com>
Mon, 30 Sep 2013 07:31:12 +0000 (03:31 -0400)
- Requesting HEAD only on Open() and GET on first Read() contents
- Added support for Content-Type and Content-Disposition
- Using List container to hold connections with reference counting fixes
- Fixed hangs upon closing the file caused by improper tracking of position
- Fixed hangs caused by improper Locking/Unlocking of connectionMutex
- Added Seek() support for small files fitting within the buffer

ecere/src/net/HTTPFile.ec

index 2ae06a4..9deeb20 100644 (file)
@@ -1,12 +1,27 @@
 #ifndef ECERE_NONET
 
+#include <stdio.h>
+
+import "List"
 import "network"
 
-static OldList connections { };
+class ConnectionsHolder
+{
+   List<HTTPConnection> connections { };
+
+   ~ConnectionsHolder()
+   {
+      HTTPConnection c;
+      while((c = connections[0]))
+         delete c;   // The HTTPConnection destructor will take out from the list
+   }
+}
+
+static ConnectionsHolder holder { };
 
 namespace net;
 
-#define BUFFERSIZE   65536
+/*static */define HTTPFILE_BUFFERSIZE = 65536;
 
 static Mutex connectionsMutex { };
 
@@ -18,7 +33,7 @@ static class ServerNode : BTNode
    {
       char * name = (char *)key;
       delete name;
-   }   
+   }
 }
 
 static class ServerNameCache
@@ -71,7 +86,7 @@ static char * GetString(char * string, char * what, int count)
 
 private class HTTPConnection : Socket
 {
-   HTTPConnection prevConnection, nextConnection;
+   class_no_expansion;
    char * server;
    int port;
    HTTPFile file;
@@ -80,24 +95,31 @@ private class HTTPConnection : Socket
 
    ~HTTPConnection()
    {
-      if(prevConnection || nextConnection || connections.first == this)
-         printf("");      
-   }
-   
-   HTTPConnection()
-   {
-      if(!connections.offset)
-         connections.offset = (uint)&((HTTPConnection)0).prevConnection;
+      // printf("Before TakeOut we have %d connections:\n", holder.connections.count); for(c : holder.connections) ::PrintLn(c.server); ::PrintLn("");
+      holder.connections.TakeOut(this);
+      /*
+      PrintLn(server, " Connection Closed (", holder.connections.count, ") opened");
+      printf("Now we have %d connections:\n", holder.connections.count);
+      for(c : holder.connections)
+      {
+         ::PrintLn(c.server);
+      }
+      ::PrintLn("");
+      */
+      delete server;
    }
 
    void OnDisconnect(int code)
    {
       connectionsMutex.Wait();
       if(file)
-         file.connection = null;
-      delete server;
-      connections.Remove(this);
+         delete file.connection; // This decrements the file's reference
+
       connectionsMutex.Release();
+
+      // PrintLn(server, " Disconnected Us");
+
+      delete this;         // The 'connections' reference
    }
 
    uint Open_OnReceive(const byte * buffer, uint count)
@@ -108,14 +130,14 @@ private class HTTPConnection : Socket
       while(!file.done)
       {
          bool gotEndLine = false;
-         for(c = 0; c<count-1; c++)
+         for(c = 0; c<(int)count-1; c++)
          {
             if(buffer[c] == '\r' && buffer[c+1] == '\n')
             {
                gotEndLine = true;
                break;
             }
-         }   
+         }
          if(!gotEndLine)
             // Incomplete packet
             return pos;
@@ -123,17 +145,10 @@ private class HTTPConnection : Socket
          {
             char * string = (char *)buffer;
 
-            /*
-            char line[1024];
-            memcpy(line, buffer, c+2);
-            line[c+2] = 0;
-            Log(line);
-            */
-            
-            /*
+#ifdef _DEBUG
             fwrite(buffer, 1, c, stdout);
             puts("");
-            */
+#endif
 
             if(!c)
             {
@@ -144,7 +159,7 @@ private class HTTPConnection : Socket
             {
                //file.openStarted = true;
                if((string = GetString((char *)buffer, "HTTP/1.1 ", count)) ||
-                       (string = GetString((char *)buffer, "HTTP/1.0 ", count)))
+                  (string = GetString((char *)buffer, "HTTP/1.0 ", count)))
                {
                   file.status = atoi(string);
                }
@@ -160,6 +175,38 @@ private class HTTPConnection : Socket
                   file.totalSize = atoi(string);
                   file.totalSizeSet = true;
                }
+               else if(string = GetString((char *)buffer, "Content-Type: ", count))
+               {
+                  char * cr = strstr(string, "\r");
+                  char * lf = strstr(string, "\n");
+                  int len;
+                  if(cr)
+                     len = cr - string;
+                  else if(lf)
+                     len = lf - string;
+                  else
+                     len = strlen(string);
+
+                  file.contentType = new char[len+1];
+                  memcpy(file.contentType, string, len);
+                  file.contentType[len] = 0;
+               }
+               else if(string = GetString((char *)buffer, "Content-disposition: ", count))
+               {
+                  char * cr = strstr(string, "\r");
+                  char * lf = strstr(string, "\n");
+                  int len;
+                  if(cr)
+                     len = cr - string;
+                  else if(lf)
+                     len = lf - string;
+                  else
+                     len = strlen(string);
+
+                  file.contentDisposition = new char[len+1];
+                  memcpy(file.contentDisposition, string, len);
+                  file.contentDisposition[len] = 0;
+               }
                else if(string = GetString((char *)buffer, "Connection: ", count))
                {
                   if(!strnicmp(string, "close", strlen("close")))
@@ -231,15 +278,14 @@ private class HTTPConnection : Socket
                file.chunkSize = strtol(string, null, 16);
                if(!file.chunkSize)
                {
-                  //connection.file = null;
                   file.connection.file = null;
-                  file.connection = null;
+                  delete file.connection; // This decrements the file's reference
                }
             }
             return pos;
          }
 
-         read = Min(count, BUFFERSIZE - file.bufferCount);
+         read = Min(count, HTTPFILE_BUFFERSIZE - file.bufferCount);
          if(file.chunked)
          {
             read = Min(read, file.chunkSize);
@@ -248,27 +294,56 @@ private class HTTPConnection : Socket
          if(read)
          {
             memcpy(file.buffer + file.bufferCount, buffer, read);
-            // fwrite(file.buffer, 1, read, stdout);
             file.bufferCount += read;
-            /*
-            if(GetCurrentThreadID() == network.mainThreadID)
-               fwrite(buffer, 1, read, stdout);
-            */
          }
          return read;
       }
       else
-         return count;   
+         return count;
    }
 }
 
 public class HTTPFile : File
 {
+   bool reuseConnection;
+   reuseConnection = true;
 public:
+   property bool reuseConnection
+   {
+      set { reuseConnection = value; }
+      get { return reuseConnection; }
+   }
+   property String contentType
+   {
+      get { return contentType; }
+   }
+   property String contentDisposition
+   {
+      get { return contentDisposition; }
+   }
+
    bool OpenURL(char * name, char * referer, char * relocation)
    {
+      return RetrieveHead(name, referer, relocation, false);
+   }
+
+private:
+
+   bool RetrieveHead(char * name, char * referer, char * relocation, bool askBody)
+   {
       bool result = false;
-      if(this && strstr(name, "http://") == name)
+      String http;
+      if(!this || !name) return false;
+      http = strstr(name, "http://");
+      if(http != name) http = null;
+
+      askedBody = askBody;
+
+      done = false;
+      delete contentType;
+      delete contentDisposition;
+      // ::PrintLn("Opening ", name, " ", (uint64)this, " in thread ", GetCurrentThreadID(), "\n");
+      if(this && http)
       {
          char server[1024];
          char msg[1024];
@@ -309,54 +384,100 @@ public:
          {
             this.connection.file = null;
             if(close)
-            {
                this.connection.Disconnect(0);
-               delete this.connection;
-            }
-            this.connection = null;
+            delete this.connection;
          }
 
          if(chunked)
          {
-            connectionsMutex.Release();
-            while(this.connection && this.connection.file)
+            while(this.connection && this.connection.connected && this.connection.file)
             {
+               connectionsMutex.Release();
                this.connection.Process();
+               connectionsMutex.Wait();
             }
-            connectionsMutex.Wait();
-         }
-         
-         for(connection = connections.first; connection; connection = connection.nextConnection)
-         {
-            if(!strcmpi(connection.server, server) && connection.port == port && !connection.file)
-               break;
          }
 
-         if(connection)
+         if(reuseConnection)
          {
-            incref connection;
-            reuse = true;
-
-            connection.file = this;
+            bool retry = true;
+            while(retry)
+            {
+               retry = false;
+               connection = null;
+               for(c : holder.connections)
+               {
+                  if(!strcmpi(c.server, server) && c.port == port)
+                  {
+                     if(!c.file && c.connected)
+                     {
+                        connection = c;      // TOFIX: 'incref c' doesn't work
+                        incref connection;      // HTTPFile reference if we keep it
+                        connectionsMutex.Release();
+                        connection.ProcessTimeOut(0.000001);
+                        connectionsMutex.Wait();
+                        if(!connection.connected || connection.file)
+                        {
+                           // We're disconnected or reused already...
+                           retry = true;
+                           delete connection;
+                        }
+                        break;
+                     }
+                  }
+               }
+            }
+            if(connection)
+            {
+               // ::PrintLn("Reusing Connection ", (uint64)connection, " for ", name, " ", (uint64)this, " in thread ", GetCurrentThreadID(), "\n");
+               reuse = true;
+               connection.file = this;
+            }
          }
 
+      tryagain:
          if(!connection)
          {
             char ipAddress[1024];
             connection = HTTPConnection { };
-            //incref connection;
+            incref connection;      // HTTPFile reference on success
             
             connection.file = this;
 
             connectionsMutex.Release();
 
-            if(serverNameCache.Resolve(server, ipAddress) && connection.Connect(ipAddress /*server*/, port))
+            if(serverNameCache.Resolve(server, ipAddress))
             {
-               connectionsMutex.Wait();
+               // ::PrintLn("No Connection - Connecting ", (uint64)connection, " for ", name, " ", (uint64)this, " in thread ", GetCurrentThreadID());
+               if(connection.Connect(ipAddress /*server*/, port))
+               {
+                  //::PrintLn("Successfully Connected for ", name, " ", (uint64)this, " in thread ", GetCurrentThreadID());
+                  //::PrintLn("Waiting on connectionsMutex for ", name, " ", (uint64)this, " in thread ", GetCurrentThreadID());
+                  connectionsMutex.Wait();
+
+                  connection.server = CopyString(server);
+                  connection.port = port;
 
-               connections.Add(connection);
-               connection.server = CopyString(server);
-               connection.port = port;
+                  //::PrintLn("Got connectionsMutex for ", name, " ", (uint64)this, " in thread ", GetCurrentThreadID());
+                  holder.connections.Add(connection);
+                  /*
+                  printf("Now we have %d connections:\n", holder.connections.count);
+                  for(c : holder.connections)
+                  {
+                     String s = c.server;
+                     ::Print("Server: ");
+                     ::PrintLn(c.server);
+                  }
+                  ::PrintLn("");
+                  */
+                  incref connection;   // Global List Reference
+               }
+               else
+               {
+                  // ::PrintLn("Connection Failed for ", name, " ", (uint64)this, " in thread ", GetCurrentThreadID());
+                  connectionsMutex.Wait();
+                  delete connection;
+               }
             }
             else
             {
@@ -367,13 +488,16 @@ public:
 
          if(connection)
          {
+            incref connection;      // local reference
+
             connection.OnReceive = HTTPConnection::Open_OnReceive;
             connection.file = this;
             this.connection = connection;
             this.relocation = relocation;
             //openStarted = false;
                   
-            strcpy(msg, "GET /");
+            totalSizeSet = false;   // HEAD will sometimes give you 0!
+            strcpy(msg, askBody ? "GET /" : "HEAD /");
 
             if(fileName)
             {
@@ -401,8 +525,8 @@ public:
             //strcat(msg, " HTTP/1.0\r\nHost: ");
             strcat(msg, server);
             strcat(msg, "\r\n");
-            strcat(msg, "Accept-Charset: ISO-8859-1\r\n");
-            //strcat(msg, "Accept-Charset: UTF-8\r\n");
+            strcat(msg, "Accept-Charset: UTF-8\r\n");
+            //strcat(msg, "Accept-Charset: ISO-8859-1\r\n");
             strcat(msg, "Connection: Keep-Alive\r\n");
             if(referer)
             {
@@ -413,50 +537,63 @@ public:
             strcat(msg, "\r\n");
             len = strlen(msg);
             
+            //::PrintLn("Releasing connectionsMutex before GET for ", name, " ", (uint64)this, " in thread ", GetCurrentThreadID());
             connectionsMutex.Release();
 
+            // ::PrintLn("Sending GET for ", name, " ", (uint64)this, " in thread ", GetCurrentThreadID(), "\n");
             connection.Send(msg, len);
 
-            while(this.connection && !done)
+            while(this.connection && this.connection.connected && !done)
             {
                this.connection.Process();
             }
+            //::PrintLn("Got DONE for ", name, " ", (uint64)this, " in thread ", GetCurrentThreadID(), "\n");
 
             if(this.connection)
             {
+               if(name != location)
+               {
+                  delete location;
+                  location = CopyString(name);
+               }
                if(status == 200 || (!status && totalSizeSet))
                {
-                  this.connection.OnReceive = HTTPConnection::Read_OnReceive;
+                  if(askBody)
+                     this.connection.OnReceive = HTTPConnection::Read_OnReceive;
+
                   result = true;
                   connectionsMutex.Wait();
                }
                else
                {
-                  if(chunked)
+                  if(askBody)
                   {
-                     bool wait = false;
-                     this.connection.OnReceive = HTTPConnection::Read_OnReceive;
-                     while(!eof)
+                     if(chunked)
                      {
-                        if(!this.connection)
-                           eof = true;
-                        else
+                        bool wait = false;
+                        this.connection.OnReceive = HTTPConnection::Read_OnReceive;
+                        while(!eof)
                         {
-                           // First time check if we already have bytes, second time wait for an event
-                           this.connection.Process();
-                           wait = true;
-                        }   
+                           if(!this.connection)
+                              eof = true;
+                           else
+                           {
+                              // First time check if we already have bytes, second time wait for an event
+                              this.connection.Process();
+                              wait = true;
+                           }
+                        }
                      }
-                  }
-                  else if(totalSizeSet)
-                  {
-                     done = false;
-                     this.connection.OnReceive = HTTPConnection::Read_OnReceive;
-                     while(this.connection && position < totalSize)
+                     else if(totalSizeSet)
                      {
-                        connection.Process();
-                        position += bufferCount;
-                        bufferCount = 0;
+                        done = false;
+                        this.connection.OnReceive = HTTPConnection::Read_OnReceive;
+                        while(this.connection && this.connection.connected && position < totalSize)
+                        {
+                           connection.Process();
+                           position += bufferCount;
+                           bufferCount = 0;
+                        }
                      }
                   }
                   
@@ -469,14 +606,12 @@ public:
                      if(close)
                      {
                         this.connection.Disconnect(0);
-                        delete this.connection;
                         connection = null;
                      }
                   }
                   
-
                   status = 0;
-                  this.connection = null;
+                  delete this.connection; // This decrements the file's reference
                   this.relocation = null;
                   totalSize = 0;
                   totalSizeSet = false;
@@ -492,7 +627,13 @@ public:
             {
                connectionsMutex.Wait();
             }
-            if(reuse)
+            if(reuse && !status && connection && !connection.connected)
+            {
+               delete connection;
+               reuse = false;
+               goto tryagain;
+            }
+            else
                delete connection;
          }
          connectionsMutex.Release();
@@ -500,21 +641,24 @@ public:
       return result;
    }
 
-private:
-
    ~HTTPFile()
    {
+      delete location;
+      delete contentType;
+      delete contentDisposition;
       {
          connectionsMutex.Wait();
          if(connection)
          {
-            if(totalSizeSet)
+            if(totalSizeSet && askedBody)
             {
                done = false;
                this.connection.OnReceive = HTTPConnection::Read_OnReceive;
-               while(this.connection && position < totalSize)
+               while(this.connection && this.connection.connected && position + (bufferCount - bufferPos) < totalSize)
                {
+                  connectionsMutex.Release();
                   connection.Process();
+                  connectionsMutex.Wait();
                   position += bufferCount;
                   bufferCount = 0;
                }
@@ -525,22 +669,22 @@ private:
             {
                connection.file = null;
                if(close)
-               {
                   connection.Disconnect(0);
-                  delete connection;
-               }
-               connection = null;
+               delete connection;
             }
          }
          connectionsMutex.Release();
 
          if(chunked)
          {
-            while(connection && connection.file)
+            while(connection && connection.connected && connection.file)
             {
+               connectionsMutex.Release();
                connection.Process();
+               connectionsMutex.Wait();
             }
          }
+         //::PrintLn("Done with ", (uint64)this);
       }
    }
 
@@ -549,6 +693,14 @@ private:
       uint readSize = size * count;
       uint read = 0;
       bool wait = false;
+      Time lastTime = GetTime();
+
+      if(!askedBody)
+      {
+         askedBody = true;
+         if(!RetrieveHead(this.location, null, null, true))
+            return 0;
+      }
 
       if(totalSizeSet && position >= totalSize)
          eof = true;
@@ -561,12 +713,15 @@ private:
          
          if(numbytes)
          {
+            lastTime = GetTime();
             memcpy(buffer + read, this.buffer + bufferPos, numbytes);
             bufferPos += numbytes;
             position += numbytes;
             read += numbytes;
 
-            if(bufferPos > BUFFERSIZE / 2)
+            lastTime = GetTime();
+
+            if(bufferPos > HTTPFILE_BUFFERSIZE / 2)
             {
                // Shift bytes back to beginning of buffer
                uint shift = bufferCount - bufferPos;
@@ -578,14 +733,16 @@ private:
          }
          else
          {
-            if(!connection)
+            if(!connection || !connection.connected)
                eof = true;
             else
             {
                // First time check if we already have bytes, second time wait for an event
                connection.Process();
+               if(wait && bufferCount - bufferPos == 0 && GetTime() - lastTime > 5)
+                  eof = true;
                wait = true;
-            }   
+            }
          }
          if(totalSizeSet && position >= totalSize)
             eof = true;
@@ -616,6 +773,21 @@ private:
 
    bool Seek(int pos, FileSeekMode mode)
    {
+      if(mode == start && bufferPos == 0 && pos <= bufferCount & pos >= 0)
+      {
+         bufferPos = pos;
+         return true;
+      }
+      else if(mode == current && bufferPos == 0 && (position + pos) <= bufferCount & (position + pos) >= 0)
+      {
+         bufferPos = position + pos;
+         return true;
+      }
+      else if(mode == end && totalSizeSet && bufferPos == 0 && bufferCount == totalSize && (totalSize - pos) <= bufferCount & (totalSize - pos) >= 0)
+      {
+         bufferPos = totalSize - pos;
+         return true;
+      }
       return false;
    }
 
@@ -638,6 +810,9 @@ private:
    {
       aborted = true;
    }
+private:
+
+   bool askedBody;
 
    HTTPConnection connection;
    uint position;
@@ -649,13 +824,17 @@ private:
    bool close;
    uint chunkSize;
    char * relocation;
+   String location;
 
    // Buffering...
-   byte buffer[BUFFERSIZE];
+   byte buffer[HTTPFILE_BUFFERSIZE];
    uint bufferPos;
    uint bufferCount;
    bool aborted;
    bool totalSizeSet;
+
+   String contentType;
+   String contentDisposition;
 }
 
 public HTTPFile FileOpenURL(char * name)
@@ -667,7 +846,7 @@ public HTTPFile FileOpenURL(char * name)
    {
       delete f;
       return null;
-   }   
+   }
 }
 
 #endif