Skip to content
Snippets Groups Projects
Commit 8f0da398 authored by Tóth Elíz's avatar Tóth Elíz
Browse files

Added websockets

parent b44bd648
Branches
No related tags found
No related merge requests found
...@@ -157,6 +157,9 @@ type Connection struct { ...@@ -157,6 +157,9 @@ type Connection struct {
MaxPayload int // Maximum payload size, may not be used in websockets, idk MaxPayload int // Maximum payload size, may not be used in websockets, idk
Context context.Context // Context of the connection Context context.Context // Context of the connection
Cancellable context.CancelFunc // Cancel function of the context Cancellable context.CancelFunc // Cancel function of the context
timer time.Timer // Timer for the ping messages
send func(ctx context.Context, frame EngineIOFrame) error
receive func(ctx context.Context) (EngineIOFrame, error)
} }
func (c *Connection) Done() <-chan struct{} { func (c *Connection) Done() <-chan struct{} {
...@@ -174,26 +177,70 @@ func (c *Connection) GetSid() string { ...@@ -174,26 +177,70 @@ func (c *Connection) GetSid() string {
return c.Sid return c.Sid
} }
func (c *Connection) send(ctx context.Context, frame EngineIOFrame) error { //func (c *Connection) send(ctx context.Context, frame EngineIOFrame) error {
return errors.New("Not implemented") // return errors.New("Not implemented")
} //}
//
func (c *Connection) receive(ctx context.Context) (EngineIOFrame, error) { //func (c *Connection) receive(ctx context.Context) (EngineIOFrame, error) {
return EngineIOFrame{}, errors.New("Not implemented") // //return EngineIOFrame{}, errors.New("Not implemented")
} // return EngineIOFrame{}, nil
//}
func (c *Connection) Close(ctx context.Context) error { func (c *Connection) Close(ctx context.Context) error {
c.Cancellable() c.Cancellable()
return c.send(utils.AddAttr(ctx, slog.Attr(slog.String("SID", c.Sid))), EngineIOFrame{Type: CLOSE, Data: &EngineUTF8Data{}}) return c.send(utils.AddAttr(ctx, slog.Attr(slog.String("SID", c.Sid))), EngineIOFrame{Type: CLOSE, Data: &EngineUTF8Data{}})
} }
func (c *Connection) receiveLoop() {
out:
for {
select {
case <-c.Context.Done():
break out
default:
frame, err := c.receive(c.Context)
if err != nil {
utils.WithContext(c.Context).Error("Receive failed: " + err.Error())
continue
}
switch frame.Type {
case PING:
c.timer.Reset(time.Duration(c.PingTimeout+c.PingInterval) * time.Millisecond)
c.send(c.Context, EngineIOFrame{Type: PONG, Data: &EngineUTF8Data{}})
case MESSAGE:
c.Inbound <- frame.Data
case NOOP:
case CLOSE:
utils.WithContext(c.Context).Warn("Connection closed by server")
c.Cancellable()
return
default:
}
}
}
close(c.Inbound)
}
func (c *Connection) sendLoop() {
out:
for {
select {
case <-c.Context.Done():
break out
case data := <-c.Outbound:
c.send(c.Context, EngineIOFrame{Type: MESSAGE, Data: data})
}
}
close(c.Outbound)
}
type PollingConnection struct { type PollingConnection struct {
Connection Connection
frameBuffer []EngineIOFrame frameBuffer []EngineIOFrame
sendMutex sync.Mutex sendMutex sync.Mutex
} }
func (p *PollingConnection) send(ctx context.Context, frame EngineIOFrame) error { func (p *PollingConnection) sender(ctx context.Context, frame EngineIOFrame) error {
p.sendMutex.Lock() p.sendMutex.Lock()
defer p.sendMutex.Unlock() defer p.sendMutex.Unlock()
client := http.Client{ client := http.Client{
...@@ -205,12 +252,12 @@ func (p *PollingConnection) send(ctx context.Context, frame EngineIOFrame) error ...@@ -205,12 +252,12 @@ func (p *PollingConnection) send(ctx context.Context, frame EngineIOFrame) error
} }
req.Header.Set("Content-Type", "text/plain") req.Header.Set("Content-Type", "text/plain")
_, err = client.Do(req.WithContext(ctx)) _, err = client.Do(req.WithContext(ctx))
utils.WithContext(ctx).With(slog.Int("TYPE", int(frame.Type))).Info("Sending message: " + string(frame.Data.GetUTF8())) utils.WithContext(ctx).With(slog.Int("TYPE", int(frame.Type))).Info("Sending Frame: " + string(frame.Data.GetUTF8()))
_, err = http.Post(p.Url+"/socket.io/?EIO=4&transport=polling&sid="+p.Sid, "text/plain", bytes.NewReader(marshalPollingFrame(frame))) _, err = http.Post(p.Url+"/socket.io/?EIO=4&transport=polling&sid="+p.Sid, "text/plain", bytes.NewReader(marshalPollingFrame(frame)))
return err return err
} }
func (p *PollingConnection) receive(ctx context.Context) (EngineIOFrame, error) { func (p *PollingConnection) receiver(ctx context.Context) (EngineIOFrame, error) {
if len(p.frameBuffer) > 0 { if len(p.frameBuffer) > 0 {
frame := p.frameBuffer[0] frame := p.frameBuffer[0]
p.frameBuffer = p.frameBuffer[1:] p.frameBuffer = p.frameBuffer[1:]
...@@ -237,13 +284,13 @@ func (p *PollingConnection) receive(ctx context.Context) (EngineIOFrame, error) ...@@ -237,13 +284,13 @@ func (p *PollingConnection) receive(ctx context.Context) (EngineIOFrame, error)
//println(string(data)) //println(string(data))
frames := parsePollingFrame(data) frames := parsePollingFrame(data)
for _, frame := range frames { for _, frame := range frames {
utils.WithContext(ctx).With(slog.Int("TYPE", int(frame.Type))).Info("Message received: " + string(frame.Data.GetUTF8())) utils.WithContext(ctx).With(slog.Int("TYPE", int(frame.Type))).Info("Received Frame: " + string(frame.Data.GetUTF8()))
} }
p.frameBuffer = frames[1:] p.frameBuffer = frames[1:]
return frames[0], nil return frames[0], nil
} }
func NewConnection(ctx context.Context, url string) IConnection { func NewPollingConnection(ctx context.Context, url string) IConnection {
resp, err := http.Get(url + "/socket.io/?EIO=4&transport=polling") resp, err := http.Get(url + "/socket.io/?EIO=4&transport=polling")
if err != nil { if err != nil {
return nil return nil
...@@ -276,6 +323,9 @@ func NewConnection(ctx context.Context, url string) IConnection { ...@@ -276,6 +323,9 @@ func NewConnection(ctx context.Context, url string) IConnection {
sync.Mutex{}, sync.Mutex{},
} }
connection.send = connection.sender
connection.receive = connection.receiver
utils.WithContext(ctx).Info(`Creating connection with SID: ` + openResponse.Sid) utils.WithContext(ctx).Info(`Creating connection with SID: ` + openResponse.Sid)
ct, canceller := context.WithCancel(utils.AddAttr(ctx, slog.String("SID", openResponse.Sid))) ct, canceller := context.WithCancel(utils.AddAttr(ctx, slog.String("SID", openResponse.Sid)))
...@@ -292,53 +342,11 @@ func NewConnection(ctx context.Context, url string) IConnection { ...@@ -292,53 +342,11 @@ func NewConnection(ctx context.Context, url string) IConnection {
connection.Context = ct connection.Context = ct
connection.Cancellable = canceller connection.Cancellable = canceller
connection.timer = *timer
go func() { go connection.receiveLoop()
out:
for {
frame, err := connection.receive(ct)
if err != nil {
utils.WithContext(ct).Error("Receive failed: " + err.Error())
} else {
switch frame.Type {
case PING:
timer.Reset(time.Duration(connection.PingTimeout+connection.PingInterval) * time.Millisecond)
connection.send(ct, EngineIOFrame{Type: PONG, Data: &EngineUTF8Data{}})
case MESSAGE:
connection.Inbound <- frame.Data
case NOOP:
case CLOSE:
utils.WithContext(ct).Warn("Connection closed by server")
canceller()
break out
case 75:
utils.WithContext(ct).Error("Connection Died")
break out
default:
}
}
select {
case <-ct.Done():
break out
default:
continue
}
}
close(connection.Inbound)
}()
go func() { go connection.sendLoop()
out:
for {
select {
case <-ct.Done():
break out
case data := <-connection.Outbound:
connection.send(ct, EngineIOFrame{Type: MESSAGE, Data: data})
}
}
close(connection.Outbound)
}()
// Set the session ID // Set the session ID
return &connection return &connection
...@@ -362,31 +370,85 @@ func marshalPollingFunction(frame EngineIOFrame) []byte { ...@@ -362,31 +370,85 @@ func marshalPollingFunction(frame EngineIOFrame) []byte {
return append([]byte{byte(frame.Type + '0')}, []byte(frame.Data.GetUTF8())...) return append([]byte{byte(frame.Type + '0')}, []byte(frame.Data.GetUTF8())...)
} }
func (w *WebSocketConnection) send(ctx context.Context, frame EngineIOFrame) error { func (w *WebSocketConnection) sender(ctx context.Context, frame EngineIOFrame) error {
w.mutex.Lock() w.mutex.Lock()
defer w.mutex.Unlock() defer w.mutex.Unlock()
utils.WithContext(ctx).Info("Sending frame: " + frame.Data.GetUTF8()) utils.WithContext(ctx).Info("Sending frame: " + frame.Data.GetUTF8())
return w.conn.WriteMessage(websocket.TextMessage, marshalPollingFrame(frame)) return w.conn.WriteMessage(websocket.TextMessage, marshalPollingFrame(frame))
} }
func UpgradeConnection(ctx context.Context, connection *PollingConnection) (*IConnection, error) { func (w *WebSocketConnection) receiver(ctx context.Context) (EngineIOFrame, error) {
_, data, err := w.conn.ReadMessage()
if err == nil {
utils.WithContext(ctx).Info("Received frame: " + string(data))
}
return parseWebSocketFrame(data), err
}
// This is a background connection
func UpgradeConnection(ctx context.Context, connection *PollingConnection) (IConnection, error) {
connection.Cancellable() connection.Cancellable()
utils.WithContext(ctx).Info(`Upgrading connection with SID: ` + connection.Sid)
ct, canceller := context.WithCancel(utils.AddAttr(ctx, slog.String("SID", connection.Sid)))
timer := time.AfterFunc(time.Duration(connection.PingTimeout+connection.PingInterval)*time.Millisecond, func() {
select {
case <-ct.Done():
return
default:
utils.WithContext(ct).Error("Connection timed out")
canceller()
}
})
url := strings.Replace(connection.Url, "http", "ws", 1) url := strings.Replace(connection.Url, "http", "ws", 1)
conn, _, err := websocket.DefaultDialer.DialContext(ctx, url+"/socket.io/?EIO=4&transport=websocket&sid="+connection.Sid, nil)
conn, piss, err := websocket.DefaultDialer.DialContext(ctx, url+"/socket.io/?EIO=4&transport=websocket&sid="+connection.Sid, nil)
if err != nil { if err != nil {
utils.WithContext(ctx).Error("Failed to upgrade connection: " + err.Error()) utils.WithContext(ctx).Error("Failed to upgrade connection: " + err.Error() + " " + piss.Status)
return nil, err return nil, err
} }
conn.WriteMessage(websocket.TextMessage, []byte("2probe"))
_, data, err := conn.ReadMessage() sock := WebSocketConnection{
Connection: Connection{
Url: url,
Sid: connection.Sid,
Inbound: make(chan IEngineData),
Outbound: make(chan IEngineData),
PingInterval: connection.PingInterval,
PingTimeout: connection.PingTimeout,
MaxPayload: connection.MaxPayload,
Context: ct,
Cancellable: canceller,
timer: *timer,
},
conn: conn,
mutex: sync.Mutex{},
}
sock.send = sock.sender
sock.receive = sock.receiver
err = sock.send(ct, EngineIOFrame{Type: PING, Data: &EngineUTF8Data{"probe"}})
if err != nil { if err != nil {
utils.WithContext(ctx).Error("Failed to upgrade connection: " + err.Error()) utils.WithContext(ct).Error("Failed to upgrade connection: " + err.Error())
return nil, err return nil, err
} }
if string(data) != "3probe" { resp, err := sock.receive(ct)
utils.WithContext(ctx).Error("Failed to upgrade connection: Invalid response") if err != nil || resp.Data.GetUTF8() != "probe" {
return nil, errors.New("Invalid response") utils.WithContext(ct).Error("Failed to upgrade connection: " + err.Error())
return nil, err
} }
conn.WriteMessage(websocket.TextMessage, []byte("5")) err = sock.send(ct, EngineIOFrame{Type: UPGRADE, Data: &EngineUTF8Data{}})
return nil, nil if err != nil {
utils.WithContext(ct).Error("Failed to upgrade connection: " + err.Error())
return nil, err
}
go sock.receiveLoop()
go sock.sendLoop()
return &sock, nil
} }
...@@ -5,17 +5,14 @@ import ( ...@@ -5,17 +5,14 @@ import (
) )
func main() { func main() {
connection := NewConnection(context.Background(), "http://localhost:3000") connection := NewPollingConnection(context.Background(), "http://localhost:3000")
connect := connection.(*PollingConnection) connect := connection.(*PollingConnection)
UpgradeConnection(context.Background(), connect) sock, err := UpgradeConnection(context.Background(), connect)
//in, out := connection.GetMessageChannels() if err != nil {
//data := EngineUTF8Data{Data: "0"} panic(err)
//out <- &data }
//<-in in, out := sock.GetMessageChannels()
////println(message.GetUTF8()) data := EngineUTF8Data{Data: "0"}
////time.Sleep(120 * time.Millisecond) out <- &data
//out <- &EngineUTF8Data{Data: "2[\"chat message\",\"abcd\"]"} <-in
//<-in
//println(message.GetUTF8())
//time.Sleep(120 * time.Second)
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment