ecere/net; compiler/ecs: Distributed Objects Improvements
[sdk] / ecere / src / net / Socket.ec
1 #define _Noreturn
2
3 namespace net;
4
5 #include <stdarg.h>
6
7 #if defined(__WIN32__)
8
9 #define WIN32_LEAN_AND_MEAN
10 #define String _String
11 #include <winsock.h>
12 #undef String
13 #define SOCKLEN_TYPE int
14
15 #elif defined(__unix__) || defined(__APPLE__)
16
17 default:
18 #define SOCKLEN_TYPE socklen_t
19 #define set _set
20 #define uint _uint
21 #include <sys/time.h>
22 #include <unistd.h>
23
24 #include <netinet/in.h>
25 #include <netdb.h>
26 #include <sys/socket.h>
27 #include <sys/wait.h>
28 #include <sys/types.h>
29 #include <sys/time.h>
30 #include <arpa/inet.h>
31 #undef set
32 #undef uint
33 private:
34
35 typedef int SOCKET;
36 typedef struct hostent HOSTENT;
37 typedef struct sockaddr SOCKADDR;
38 typedef struct sockaddr_in SOCKADDR_IN;
39 typedef struct in_addr IN_ADDR;
40 #define closesocket(s) close(s)
41
42 #endif
43
44 import "network"
45
46 #define GETLEDWORD(b) (uint32)(((b)[3] << 24) | ((b)[2] << 16) | ((b)[1] << 8) | (b)[0])
47
48 #define PUTLEDWORD(b, d) \
49    (b)[3] = (byte)(((d) >> 24) & 0xFF); \
50    (b)[2] = (byte)(((d) >> 16) & 0xFF); \
51    (b)[1] = (byte)(((d) >> 8)  & 0xFF); \
52    (b)[0] = (byte)( (d)        & 0xFF);
53
54 public enum SocketType { tcp, udp };
55 public enum DisconnectCode { remoteLost = 1, remoteClosed = 2, resolveFailed = 3, connectFailed = 4 };
56
57 public class Packet : struct
58 {
59 public:
60    uint size;
61 };
62
63 static class SocketConnectThread : Thread
64 {
65    Socket socket;
66
67    uint Main()
68    {
69       bool result = false;
70       HOSTENT * host = gethostbyname(socket.address);
71       if(host)
72       {
73          network.mutex.Wait();
74
75          if(!socket.destroyed)
76          {
77             socket.a.sin_addr = *((IN_ADDR *)host->h_addr);
78             network.mutex.Release();
79             if(socket.type == udp ||
80                !connect(socket.s,(SOCKADDR *)&socket.a,sizeof(socket.a)))
81             {
82                network.mutex.Wait();
83                strcpy(socket.inetAddress, inet_ntoa(socket.a.sin_addr));
84                socket.inetPort = ntohs(socket.a.sin_port);
85                network.mutex.Release();
86
87                if(socket.OnEstablishConnection((int)socket.s))
88                {
89                   network.mutex.Wait();
90                   result = true;
91                }
92                else
93                {
94                   network.mutex.Wait();
95                   socket.disconnectCode = connectFailed;
96                }
97             }
98             else
99             {
100                network.mutex.Wait();
101                socket.disconnectCode = connectFailed;
102             }
103          }
104       }
105       else
106          socket.disconnectCode = resolveFailed;
107
108    #ifdef DEBUG_SOCKETS
109       Log("[C] Signaling connect event (%X)\n", socket);
110    #endif
111       if(result && !socket.destroyed)
112          socket._connected = 1;
113       else if(socket._connected == -2)
114          socket._connected = -1;
115
116    #ifdef DEBUG_SOCKETS
117       Log("[C] Getting out of connect thread (%X)\n", socket);
118    #endif
119       network.connectEvent = true;
120       if(guiApp)
121          guiApp.SignalEvent();
122       network.mutex.Release();
123       return 0;
124    }
125
126 };
127
128 public class Socket
129 {
130 public:
131    property Service service
132    {
133       set
134       {
135          if(value)
136          {
137             SOCKET s;
138             SOCKADDR_IN a;
139             SOCKLEN_TYPE addrLen = sizeof(a);
140
141             value.accepted = true;
142             s = accept(value.s,(SOCKADDR *)&a, &addrLen);
143             if(s != -1)
144             {
145                int sendsize=65536;
146                int recvsize=65536;
147
148                value.sockets.Add(this);
149                incref this;
150
151                setsockopt(s, SOL_SOCKET, SO_SNDBUF, (char *)&sendsize, (int)sizeof(sendsize));
152                setsockopt(s, SOL_SOCKET, SO_RCVBUF, (char *)&recvsize, (int)sizeof(recvsize));
153
154                destroyed = false;
155                // TESTING THIS HERE
156                _connected = 1;
157                address = null;
158                this.a = a;
159                strcpy(inetAddress, inet_ntoa(this.a.sin_addr));
160                inetPort = ntohs(a.sin_port);
161                this.s = s;
162                service = value;
163                connectThread = null;
164                disconnectCode = (DisconnectCode)-1;
165                disconnected = false;
166
167                network.mutex.Wait();
168                FD_SET(s, &network.exceptSet);
169                FD_SET(s, &network.readSet);
170                if(s >= network.ns)
171                {
172                   network.ns = (int)(s+1);
173                   network.socketsSemaphore.Release();
174                }
175                network.mutex.Release();
176             }
177             else if(!_refCount)
178             {
179                delete this;
180             }
181          }
182       }
183       get { return this ? service : null; }
184    };
185
186    property const char * inetAddress { get { return (char *)inetAddress; } };
187    property int inetPort { get { return inetPort; } }
188    property Socket next { get { return next; } };
189    property bool connected { get { return _connected == 1 || _connected == -2; } };
190    property bool processAlone { get { return processAlone; } set { processAlone = value; } };
191
192    virtual void OnConnect(void);
193    virtual uint OnReceive(const byte * buffer, uint count)
194    {
195       if(count >= sizeof(class Packet))
196       {
197          Packet packet = (Packet)buffer;
198          uint size = GETLEDWORD((byte *)&packet.size);
199          if(count >= size)
200          {
201             byte * tempBuffer = null;
202             if(size)
203             {
204                if(recvBytes - size)
205                {
206                   tempBuffer = new byte[size];
207                   packet = (Packet)tempBuffer;
208                   memcpy(tempBuffer, buffer, size);
209                   memmove(recvBuffer, recvBuffer + size, recvBytes - size);
210                }
211                recvBytes -= size;
212             }
213             OnReceivePacket(packet);
214             delete tempBuffer;
215             return 0;
216          }
217       }
218       return 0;
219    }
220
221    virtual void OnDisconnect(int code);
222    virtual void OnReceivePacket(Packet packet);
223
224    bool Connect(const char * address, int port)
225    {
226       bool result = false;
227    #if defined(__WIN32__) || defined(__unix__) || defined(__APPLE__)
228       SOCKET s = socket(AF_INET,SOCK_STREAM,0);
229    #ifdef DEBUG_SOCKETS
230       Log("\n[P] [NConnect]\n");
231    #endif
232       if(s != -1)
233       {
234          result = _Connect(s, address, port);
235       }
236    #endif
237       return result;
238    }
239
240    void Disconnect(DisconnectCode code)
241    {
242       if(this)
243       {
244          bool wasDisconnected = disconnected;
245    #if defined(__WIN32__) || defined(__unix__) || defined(__APPLE__)
246          SOCKET s = this.s;
247
248    #ifdef DEBUG_SOCKETS
249          Logf("[P] [NDisconnect (%X, %x)]\n", this, this.s);
250    #endif
251          network.mutex.Wait();
252          disconnectCode = code;
253          destroyed = true;
254
255          if(!disconnected)
256          {
257             if(_connected == -2 && connectThread)
258             {
259                incref this;
260                network.mutex.Release();
261                connectThread.Wait();
262                delete connectThread;
263                network.mutex.Wait();
264                _refCount--;
265             }
266             disconnected = true;
267             if(!service)
268             {
269                if(_connected)
270                   ((_connected == -1 || _connected == -2) ? network.connectSockets : network.sockets).Remove(this);
271             }
272             else
273             {
274                service.sockets.Remove(this);
275                service = null;
276             }
277             _connected = 0;
278             network.mutex.Release();
279             OnDisconnect(disconnectCode);
280             network.mutex.Wait();
281          }
282
283          if(s == network.ns - 1)
284             Network_DetermineMaxSocket();
285
286          if(s != -1)
287          {
288             FD_CLR(s, &network.readSet);
289             FD_CLR(s, &network.writeSet);
290             FD_CLR(s, &network.exceptSet);
291          }
292          // Why wasn't this here? Don't want it here :) Hmm why don't we want it here? Service created socket not getting freed in DICOMTest...
293          // Trying >= 1 instead of > 1
294          // Free(code);
295          //if(_refCount > 1)
296          /*if(_refCount >= 1)
297             _refCount--;*/
298
299          shutdown(s, 2);
300
301          if(!wasDisconnected)
302             delete this;
303
304          network.mutex.Release();
305    #endif
306       }
307    }
308
309    // --- Transfer ---
310    bool Send(const void * buffer, int size)
311    {
312    #if defined(__WIN32__) || defined(__unix__) || defined(__APPLE__)
313       if(this)
314       {
315          safeIncRef();
316          {
317          SOCKET s = this.s;
318          int count;
319          fd_set ws, es;
320
321          if(s != -1 && ((type == tcp && (count = SendData(buffer, size, 0))) ||
322             (type == udp && (count = (int)sendto(s, buffer, size,0, (SOCKADDR *)&a, sizeof(a))))))
323          {
324    #if defined(__WIN32__)
325             int error = WSAGetLastError();
326    #endif
327             FD_ZERO(&ws);
328             FD_ZERO(&es);
329             FD_SET(s, &ws);
330             FD_SET(s, &es);
331    #if defined(__WIN32__)
332             if(error)
333    #endif
334             {
335                //Print("~");
336             }
337
338             // This is what was making eCom jam...
339             // select(s+1, null, &ws, &es, null);
340
341             safeDecRef();
342             return true;
343          }
344          }
345          safeDecRef();
346       }
347    #endif
348       return false;
349    }
350
351    bool SendPacket(Packet packet)
352    {
353       if(this && packet)
354       {
355          bool result;
356          uint size = packet.size;
357          PUTLEDWORD((byte *)&packet.size, size);
358          result = Send(packet, size);
359          packet.size = size;
360          return result;
361       }
362       return false;
363    }
364
365    bool SendString(const char * string)
366    {
367       return Send(string, (int)strlen(string));
368    }
369
370    bool Sendf(const char * format, ...)
371    {
372       bool result;
373       va_list args;
374       char string[MAX_F_STRING];
375       va_start(args, format);
376       vsnprintf(string, sizeof(string), format, args);
377       string[sizeof(string)-1] = 0;
378       result = Send(string, (int)strlen(string));
379       va_end(args);
380       return result;
381    }
382
383    bool DatagramConnect(const char * sendAddress, int port)
384    {
385       SOCKET s = socket(AF_INET,SOCK_DGRAM,0);
386       if(s != -1)
387       {
388          _Connect(s, sendAddress, port);
389          type = udp;
390          return true;
391       }
392       return false;
393    }
394
395    bool DatagramHost(int port)
396    {
397       SOCKET s = socket(AF_INET,SOCK_DGRAM,0);
398       if(s != -1 && !_connected)
399       {
400          SOCKADDR_IN a;
401          bool val = true;
402          a.sin_family=AF_INET;
403          a.sin_port = htons((uint16)port);
404          a.sin_addr.s_addr=INADDR_ANY;
405          setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *)&val, sizeof(val));
406          if(!bind(s,(SOCKADDR *)&a, sizeof(a)))
407          {
408             network.mutex.Wait();
409             type = udp;
410             this.s = s;
411             network.sockets.Add(this);
412             incref this;
413
414             disconnectCode = (DisconnectCode)-1;
415             _connected = 1;
416
417             FD_CLR(s, &network.writeSet);
418             FD_SET(s, &network.readSet);
419             FD_SET(s, &network.exceptSet);
420             if(s >= network.ns)
421             {
422                network.ns = (int)(s+1);
423                network.socketsSemaphore.Release();
424             }
425             network.mutex.Release();
426             return true;
427          }
428          closesocket(s);
429       }
430       return false;
431    }
432
433    virtual int ReceiveData(byte * buffer, int count, uint flags)
434    {
435       return (int)recv(s, (char *)buffer, count, flags);
436    }
437    virtual int SendData(const byte * buffer, int count, uint flags)
438    {
439       return (int)send(s, (const char *)buffer, count, flags);
440    }
441    virtual bool OnEstablishConnection(int s);
442
443    dllexport void safeIncRef()
444    {
445       mutex.Wait();
446       incref this;
447       //mutex.Release();
448    }
449
450    dllexport void safeDecRef()
451    {
452       Mutex mutex = this.mutex;
453       //mutex.Wait();
454       delete this;
455       mutex.Release();
456    }
457
458 private:
459    Socket()
460    {
461       Network_Initialize();
462       disconnected = true;
463    }
464
465    ~Socket()
466    {
467       _refCount = MAXINT;
468       Free(true);
469       _refCount = 0;
470    }
471
472 #ifndef ECERE_NONET
473    void Free(bool mustLock)
474    {
475       SOCKET s = this.s;
476
477       if(mustLock) network.mutex.Wait();
478
479       if(!service && _connected)
480       {
481          ((_connected == -1 || _connected == -2) ? network.connectSockets : network.sockets).Remove(this);
482          _connected = 0;
483       }
484
485       if(!disconnected)
486       {
487          disconnected = true;
488          network.mutex.Release();
489          incref this;
490          OnDisconnect(disconnectCode);
491          // if(_refCount > 1) _refCount--;
492          _refCount--;
493          network.mutex.Wait();
494       }
495
496       if(service)
497       {
498          service.sockets.Remove(this);
499          service = null;
500          _connected = 0;
501       }
502
503       if(s != -1) { closesocket(s); this.s = -1; }
504
505       delete address;
506       delete recvBuffer;
507
508       recvBufferSize = 0;
509       recvBytes = 0;
510
511       if(s != -1)
512       {
513          FD_CLR(s, &network.readSet);
514          FD_CLR(s, &network.writeSet);
515          FD_CLR(s, &network.exceptSet);
516       }
517
518       disconnectCode = 0;
519
520       // COMMENTED THIS OUT SINCE IT WAS INVALIDATING PROTECTION FOR HTTP FILE CONNECTION REUSE...
521       // WATCH FOR LEAKS IN OTHER PROJECTS?
522       //if(_refCount > 1) _refCount--;
523       if(mustLock) network.mutex.Release();
524    }
525
526    void _Disconnect(DisconnectCode code)
527    {
528       SOCKET s = this.s;
529       network.mutex.Wait();
530
531       disconnectCode = code;
532
533       Free(false);
534       delete this;
535
536       if(s == network.ns - 1)
537          Network_DetermineMaxSocket();
538       network.mutex.Release();
539    }
540
541    bool _Connect(SOCKET s, const char * address, int port)
542    {
543       bool result = false;
544       if(this)
545       {
546          SOCKADDR_IN a;
547          a.sin_family = AF_INET;
548          a.sin_port = htons((uint16)port);
549
550          network.mutex.Wait();
551
552          delete this.address;
553
554          // incref this;
555          destroyed = false;
556          this.address = new char[strlen(address)+1];
557          strcpy(this.address, address);
558          this.a = a;
559          this.s = s;
560          service = null;
561          disconnected = false;
562          disconnectCode = (DisconnectCode)-1;
563          connectThread = null;
564          _connected = -2;
565
566          FD_SET(s, &network.writeSet);
567          if(s >= network.ns && !processAlone)
568          {
569             network.ns = (int)(s+1);
570             network.socketsSemaphore.Release();
571          }
572          connectThread = SocketConnectThread { socket = this };
573
574          if(OnConnect == Socket::OnConnect)
575          {
576             result = true;
577
578             network.mutex.Release();
579             connectThread.Main();
580             network.mutex.Wait();
581
582             if(_connected == -1 || destroyed)
583             {
584                _connected = 0;
585
586                if(s == network.ns - 1)
587                   Network_DetermineMaxSocket();
588 #if 0
589                if(this.disconnectCode == resolveFailed)
590                   Logf("Error resolving address %s\n", this.address);
591 #endif
592                // Free();
593                this.s = -1;
594                result = false;
595             }
596             else if(_connected == 1)
597             {
598                FD_CLR(s, &network.writeSet);
599                FD_SET(s, &network.readSet);
600                FD_SET(s, &network.exceptSet);
601                network.sockets.Add(this);
602
603                incref this;
604                result = true;
605             }
606             else
607                this.s = -1;
608
609             delete connectThread;
610          }
611          else
612          {
613             network.connectSockets.Add(this);
614             incref this;
615             incref connectThread;
616             connectThread.Create();
617             result = true;
618          }
619          /*if(_refCount > 1)
620             delete this;*/
621       }
622       network.mutex.Release();
623       return result;
624    }
625
626    #define MAX_RECEIVE  65536
627
628    bool ProcessSocket(fd_set * rs, fd_set * ws, fd_set * es)
629    {
630       bool result = false;
631       SOCKET s;
632       Mutex mutex = this.mutex;
633
634       mutex.Wait();
635       incref this;
636       // network.mutex.Wait();
637       s = this.s;
638       if(FD_ISSET(s, rs) || leftOver)
639       {
640          int count = 0;
641
642          result = true;
643          if((int)recvBufferSize - recvBytes < MAX_RECEIVE)
644          {
645             recvBuffer = renew recvBuffer byte[recvBufferSize + MAX_RECEIVE];
646             recvBufferSize += MAX_RECEIVE;
647          }
648
649          if(FD_ISSET(s, rs) && disconnectCode == (DisconnectCode)-1)
650          {
651             if(type == tcp /*|| _connected*/)
652                count = ReceiveData(recvBuffer + recvBytes, recvBufferSize - recvBytes, 0);
653             else
654             {
655                SOCKLEN_TYPE len = sizeof(a);
656                count = (int)recvfrom(s, (char *)recvBuffer + recvBytes,
657                   recvBufferSize - recvBytes, 0, (SOCKADDR *)&a, &len);
658                strcpy(inetAddress, inet_ntoa(this.a.sin_addr));
659                inetPort = ntohs((uint16)a.sin_port);
660             }
661             switch(count)
662             {
663                case 0:
664                   disconnectCode = remoteClosed;
665                   break;
666                case -1:
667                {
668                   /*int yo = errno;
669                   printf("Errno is %d", errno);*/
670                   disconnectCode = remoteLost;
671                   break;
672                }
673             }
674          }
675
676          if(count > 0 || (leftOver && !count))
677          {
678             uint flushCount;
679             leftOver = false;
680             recvBytes += count;
681             for(flushCount = 0; flushCount < recvBytes; )
682             {
683                uint recvCount = OnReceive(recvBuffer + flushCount, recvBytes - flushCount);
684                if(!recvCount)
685                {
686                   if(recvBytes)
687                      leftOver = true;
688                   if(!processAlone)
689                      network.leftOverBytes = true;
690                   break;
691                }
692                flushCount += recvCount;
693             }
694
695             if(flushCount < recvBytes)
696             {
697                if(flushCount)
698                {
699                   memmove(recvBuffer, recvBuffer + flushCount, recvBytes - flushCount);
700                   recvBytes -= flushCount;
701                }
702                else
703                {
704                   // If nothing was acknowledged, clear socket so that OnReceive doesn't keep getting called
705                   if(disconnectCode > -1)
706                      recvBytes = 0;
707                }
708             }
709             else
710                recvBytes = 0;
711          }
712       }
713       else if(FD_ISSET(s, es))
714       {
715          result = true;
716 #if 0
717          Logf("Remote Lost %s\n", (void *)inet_ntoa(a.sin_addr));
718 #endif
719          if(type != udp)
720             _Disconnect(remoteLost);
721       }
722
723       // Disconnect it here
724       if(!recvBytes && disconnectCode > -1 && _connected)
725       {
726          result = true;
727 #if 0
728          if(disconnectCode)
729             Logf("Disconnected (%d) %s\n", disconnectCode, (void *)inet_ntoa(a.sin_addr));
730 #endif
731          if(type != udp)
732             _Disconnect(disconnectCode);
733       }
734       // network.mutex.Release();
735       delete this;
736       mutex.Release();
737
738       return result;
739    }
740 #endif
741
742    public bool Process()
743    {
744       return ProcessTimeOut(0);
745    }
746
747    public bool ProcessTimeOut(Seconds timeOut)
748    {
749       bool gotEvent = false;
750       struct timeval tv = {0, 0};
751       struct timeval tvTO = {(uint)timeOut, (uint)((timeOut -(uint)timeOut)* 1000000)};
752       fd_set rs, ws, es;
753       int selectResult;
754       Mutex mutex;
755
756       if(disconnectCode > 0 && !leftOver) return false;
757       FD_ZERO(&rs);
758       FD_ZERO(&ws);
759       FD_ZERO(&es);
760       FD_SET(s, &rs);
761       //FD_SET(s, &ws);
762       FD_SET(s, &es);
763
764       mutex = this.mutex;
765       mutex.Wait();
766       incref this;
767       mutex.Release();
768       selectResult = select((int)(s+1), &rs, &ws, &es, leftOver ? &tv : (timeOut ? &tvTO : null));
769
770       if(s != -1 && _refCount && (leftOver || selectResult))
771       {
772          gotEvent |= ProcessSocket(&rs, &ws, &es);
773       }
774       mutex.Wait();
775       delete this;
776       mutex.Release();
777       return gotEvent;
778    }
779
780    Service service;
781    bool leftOver;
782
783    char inetAddress[20];
784    int inetPort;
785
786    Socket prev, next;
787
788    SOCKET s;
789    char * address;
790    Thread connectThread;
791    DisconnectCode disconnectCode;
792    bool destroyed;
793    int _connected;         // -2: Initial value when calling Connect(), -1: Disconnected or otherwise destroyed while connecting, 1: succesfully connected, 0: no longer in any connect/sockets list
794    bool disconnected;
795
796    // Receiving Buffer
797    byte * recvBuffer;
798    uint recvBytes;
799    uint recvBufferSize;
800    SocketType type;
801    bool processAlone;
802    SOCKADDR_IN a;
803    Mutex mutex { };
804 };