diff --git a/src/engine.go b/src/engine.go index 3d2c318758a1796b871b71a84ddaf852b3330d5e..d1c35f2f9b02fea854b2e9e9ff6afb59c5876cdb 100644 --- a/src/engine.go +++ b/src/engine.go @@ -157,6 +157,9 @@ type Connection struct { MaxPayload int // Maximum payload size, may not be used in websockets, idk Context context.Context // Context of the connection Cancellable context.CancelFunc // Cancel function of the context + timer time.Timer // Timer for the ping messages + send func(ctx context.Context, frame EngineIOFrame) error + receive func(ctx context.Context) (EngineIOFrame, error) } func (c *Connection) Done() <-chan struct{} { @@ -174,26 +177,70 @@ func (c *Connection) GetSid() string { return c.Sid } -func (c *Connection) send(ctx context.Context, frame EngineIOFrame) error { - return errors.New("Not implemented") -} - -func (c *Connection) receive(ctx context.Context) (EngineIOFrame, error) { - return EngineIOFrame{}, errors.New("Not implemented") -} +//func (c *Connection) send(ctx context.Context, frame EngineIOFrame) error { +// return errors.New("Not implemented") +//} +// +//func (c *Connection) receive(ctx context.Context) (EngineIOFrame, error) { +// //return EngineIOFrame{}, errors.New("Not implemented") +// return EngineIOFrame{}, nil +//} func (c *Connection) Close(ctx context.Context) error { c.Cancellable() return c.send(utils.AddAttr(ctx, slog.Attr(slog.String("SID", c.Sid))), EngineIOFrame{Type: CLOSE, Data: &EngineUTF8Data{}}) } +func (c *Connection) receiveLoop() { +out: + for { + select { + case <-c.Context.Done(): + break out + default: + frame, err := c.receive(c.Context) + if err != nil { + utils.WithContext(c.Context).Error("Receive failed: " + err.Error()) + continue + } + switch frame.Type { + case PING: + c.timer.Reset(time.Duration(c.PingTimeout+c.PingInterval) * time.Millisecond) + c.send(c.Context, EngineIOFrame{Type: PONG, Data: &EngineUTF8Data{}}) + case MESSAGE: + c.Inbound <- frame.Data + case NOOP: + case CLOSE: + utils.WithContext(c.Context).Warn("Connection closed by server") + c.Cancellable() + return + default: + } + } + } + close(c.Inbound) +} + +func (c *Connection) sendLoop() { +out: + for { + select { + case <-c.Context.Done(): + break out + case data := <-c.Outbound: + c.send(c.Context, EngineIOFrame{Type: MESSAGE, Data: data}) + } + } + close(c.Outbound) +} + type PollingConnection struct { Connection frameBuffer []EngineIOFrame sendMutex sync.Mutex } -func (p *PollingConnection) send(ctx context.Context, frame EngineIOFrame) error { +func (p *PollingConnection) sender(ctx context.Context, frame EngineIOFrame) error { p.sendMutex.Lock() defer p.sendMutex.Unlock() client := http.Client{ @@ -205,12 +252,12 @@ func (p *PollingConnection) send(ctx context.Context, frame EngineIOFrame) error } req.Header.Set("Content-Type", "text/plain") _, err = client.Do(req.WithContext(ctx)) - utils.WithContext(ctx).With(slog.Int("TYPE", int(frame.Type))).Info("Sending message: " + string(frame.Data.GetUTF8())) + utils.WithContext(ctx).With(slog.Int("TYPE", int(frame.Type))).Info("Sending Frame: " + string(frame.Data.GetUTF8())) _, err = http.Post(p.Url+"/socket.io/?EIO=4&transport=polling&sid="+p.Sid, "text/plain", bytes.NewReader(marshalPollingFrame(frame))) return err } -func (p *PollingConnection) receive(ctx context.Context) (EngineIOFrame, error) { +func (p *PollingConnection) receiver(ctx context.Context) (EngineIOFrame, error) { if len(p.frameBuffer) > 0 { frame := p.frameBuffer[0] p.frameBuffer = p.frameBuffer[1:] @@ -237,13 +284,13 @@ func (p *PollingConnection) receive(ctx context.Context) (EngineIOFrame, error) //println(string(data)) frames := parsePollingFrame(data) for _, frame := range frames { - utils.WithContext(ctx).With(slog.Int("TYPE", int(frame.Type))).Info("Message received: " + string(frame.Data.GetUTF8())) + utils.WithContext(ctx).With(slog.Int("TYPE", int(frame.Type))).Info("Received Frame: " + string(frame.Data.GetUTF8())) } p.frameBuffer = frames[1:] return frames[0], nil } -func NewConnection(ctx context.Context, url string) IConnection { +func NewPollingConnection(ctx context.Context, url string) IConnection { resp, err := http.Get(url + "/socket.io/?EIO=4&transport=polling") if err != nil { return nil @@ -276,6 +323,9 @@ func NewConnection(ctx context.Context, url string) IConnection { sync.Mutex{}, } + connection.send = connection.sender + connection.receive = connection.receiver + utils.WithContext(ctx).Info(`Creating connection with SID: ` + openResponse.Sid) ct, canceller := context.WithCancel(utils.AddAttr(ctx, slog.String("SID", openResponse.Sid))) @@ -292,53 +342,11 @@ func NewConnection(ctx context.Context, url string) IConnection { connection.Context = ct connection.Cancellable = canceller + connection.timer = *timer - go func() { - out: - for { - frame, err := connection.receive(ct) - if err != nil { - utils.WithContext(ct).Error("Receive failed: " + err.Error()) - } else { - switch frame.Type { - case PING: - timer.Reset(time.Duration(connection.PingTimeout+connection.PingInterval) * time.Millisecond) - connection.send(ct, EngineIOFrame{Type: PONG, Data: &EngineUTF8Data{}}) - case MESSAGE: - connection.Inbound <- frame.Data - case NOOP: - case CLOSE: - utils.WithContext(ct).Warn("Connection closed by server") - canceller() - break out - case 75: - utils.WithContext(ct).Error("Connection Died") - break out - default: - } - } - select { - case <-ct.Done(): - break out - default: - continue - } - } - close(connection.Inbound) - }() - - go func() { - out: - for { - select { - case <-ct.Done(): - break out - case data := <-connection.Outbound: - connection.send(ct, EngineIOFrame{Type: MESSAGE, Data: data}) - } - } - close(connection.Outbound) - }() + go connection.receiveLoop() + + go connection.sendLoop() // Set the session ID return &connection @@ -362,31 +370,85 @@ func marshalPollingFunction(frame EngineIOFrame) []byte { return append([]byte{byte(frame.Type + '0')}, []byte(frame.Data.GetUTF8())...) } -func (w *WebSocketConnection) send(ctx context.Context, frame EngineIOFrame) error { +func (w *WebSocketConnection) sender(ctx context.Context, frame EngineIOFrame) error { w.mutex.Lock() defer w.mutex.Unlock() utils.WithContext(ctx).Info("Sending frame: " + frame.Data.GetUTF8()) return w.conn.WriteMessage(websocket.TextMessage, marshalPollingFrame(frame)) } -func UpgradeConnection(ctx context.Context, connection *PollingConnection) (*IConnection, error) { +func (w *WebSocketConnection) receiver(ctx context.Context) (EngineIOFrame, error) { + _, data, err := w.conn.ReadMessage() + if err == nil { + utils.WithContext(ctx).Info("Received frame: " + string(data)) + } + return parseWebSocketFrame(data), err +} + +// This is a background connection +func UpgradeConnection(ctx context.Context, connection *PollingConnection) (IConnection, error) { connection.Cancellable() + + utils.WithContext(ctx).Info(`Upgrading connection with SID: ` + connection.Sid) + + ct, canceller := context.WithCancel(utils.AddAttr(ctx, slog.String("SID", connection.Sid))) + + timer := time.AfterFunc(time.Duration(connection.PingTimeout+connection.PingInterval)*time.Millisecond, func() { + select { + case <-ct.Done(): + return + default: + utils.WithContext(ct).Error("Connection timed out") + canceller() + } + }) + url := strings.Replace(connection.Url, "http", "ws", 1) - conn, _, err := websocket.DefaultDialer.DialContext(ctx, url+"/socket.io/?EIO=4&transport=websocket&sid="+connection.Sid, nil) + + conn, piss, err := websocket.DefaultDialer.DialContext(ctx, url+"/socket.io/?EIO=4&transport=websocket&sid="+connection.Sid, nil) if err != nil { - utils.WithContext(ctx).Error("Failed to upgrade connection: " + err.Error()) + utils.WithContext(ctx).Error("Failed to upgrade connection: " + err.Error() + " " + piss.Status) return nil, err } - conn.WriteMessage(websocket.TextMessage, []byte("2probe")) - _, data, err := conn.ReadMessage() + + sock := WebSocketConnection{ + Connection: Connection{ + Url: url, + Sid: connection.Sid, + Inbound: make(chan IEngineData), + Outbound: make(chan IEngineData), + PingInterval: connection.PingInterval, + PingTimeout: connection.PingTimeout, + MaxPayload: connection.MaxPayload, + Context: ct, + Cancellable: canceller, + timer: *timer, + }, + conn: conn, + mutex: sync.Mutex{}, + } + + sock.send = sock.sender + sock.receive = sock.receiver + + err = sock.send(ct, EngineIOFrame{Type: PING, Data: &EngineUTF8Data{"probe"}}) if err != nil { - utils.WithContext(ctx).Error("Failed to upgrade connection: " + err.Error()) + utils.WithContext(ct).Error("Failed to upgrade connection: " + err.Error()) return nil, err } - if string(data) != "3probe" { - utils.WithContext(ctx).Error("Failed to upgrade connection: Invalid response") - return nil, errors.New("Invalid response") + resp, err := sock.receive(ct) + if err != nil || resp.Data.GetUTF8() != "probe" { + utils.WithContext(ct).Error("Failed to upgrade connection: " + err.Error()) + return nil, err } - conn.WriteMessage(websocket.TextMessage, []byte("5")) - return nil, nil + err = sock.send(ct, EngineIOFrame{Type: UPGRADE, Data: &EngineUTF8Data{}}) + if err != nil { + utils.WithContext(ct).Error("Failed to upgrade connection: " + err.Error()) + return nil, err + } + + go sock.receiveLoop() + go sock.sendLoop() + + return &sock, nil } diff --git a/src/main.go b/src/main.go index 9c45a0c397fdc6bfc615276fa8cafad8b55567ef..5464011f0e777e0d3b11b258505b43b4015af7f6 100644 --- a/src/main.go +++ b/src/main.go @@ -5,17 +5,14 @@ import ( ) func main() { - connection := NewConnection(context.Background(), "http://localhost:3000") + connection := NewPollingConnection(context.Background(), "http://localhost:3000") connect := connection.(*PollingConnection) - UpgradeConnection(context.Background(), connect) - //in, out := connection.GetMessageChannels() - //data := EngineUTF8Data{Data: "0"} - //out <- &data - //<-in - ////println(message.GetUTF8()) - ////time.Sleep(120 * time.Millisecond) - //out <- &EngineUTF8Data{Data: "2[\"chat message\",\"abcd\"]"} - //<-in - //println(message.GetUTF8()) - //time.Sleep(120 * time.Second) + sock, err := UpgradeConnection(context.Background(), connect) + if err != nil { + panic(err) + } + in, out := sock.GetMessageChannels() + data := EngineUTF8Data{Data: "0"} + out <- &data + <-in }