Skip to content

Commit dc88810

Browse files
committed
Implementation of hub client handling
1 parent 8d34b41 commit dc88810

File tree

4 files changed

+152
-26
lines changed

4 files changed

+152
-26
lines changed

app/src/App.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import './App.css';
66
const socket = new WebSocket("ws://127.0.0.1:8081/ws");
77
socket.onopen = (event) => {
88
console.log('Opened socket');
9+
console.log(event);
910
};
1011

1112
socket.onerror = (event) => {

server/conn_hub.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,46 @@
11
package main
2+
3+
type ConnHub struct {
4+
clients map[*Client]bool
5+
broadcast chan []byte
6+
register chan *Client
7+
unregister chan *Client
8+
}
9+
10+
// init ConnHub
11+
func newConnHub() *ConnHub {
12+
return &ConnHub{
13+
clients: make(map[*Client]bool),
14+
broadcast: make(chan []byte),
15+
register: make(chan *Client),
16+
unregister: make(chan *Client),
17+
}
18+
}
19+
20+
// handles channel actions of a ConnHub
21+
func (hub *ConnHub) run() {
22+
for {
23+
select {
24+
// register client to hub
25+
case client := <-hub.register:
26+
hub.clients[client] = true
27+
// unregister client to hub
28+
case client := <-hub.unregister:
29+
if _, ok := hub.clients[client]; ok {
30+
delete(hub.clients, client)
31+
close(client.send)
32+
}
33+
// loop through registered clients and send message to their send channel
34+
case message := <-hub.broadcast:
35+
for client := range hub.clients {
36+
select {
37+
case client.send <- message:
38+
// if send buffer is full, assume client is dead or stuck and unregister
39+
default:
40+
close(client.send)
41+
delete(hub.clients, client)
42+
}
43+
}
44+
}
45+
}
46+
}

server/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ func main() {
99
fmt.Println("Launching server...")
1010

1111
// start conn_hub
12-
// connHub := newConnHub
13-
// go connHub.run()
12+
hub := newConnHub()
13+
go hub.run()
1414

1515
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
16-
wsHandler(/* hub ,*/ w, r)
16+
wsHandler(hub, w, r)
1717
})
1818
err := http.ListenAndServe(":8081", nil)
1919
if err != nil {

server/websocket.go

Lines changed: 103 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
package main
22

33
import (
4-
"fmt"
4+
// "fmt"
55
"net/http"
6+
"time"
7+
68
"github.com/gorilla/websocket"
79
)
810

9-
type JsonData struct {
10-
Text string `json:"text"`
11-
Client string `json:"client"`
12-
Timestamp string `json:"timestamp"`
13-
}
11+
const (
12+
// Time allowed to write a message to the peer.
13+
writeWait = 10 * time.Second
14+
// Time allowed to read the next pong message from the peer.
15+
pongWait = 60 * time.Second
16+
// Send pings to peer with this period. Must be less than pongWait.
17+
pingPeriod = (pongWait * 9) / 10
18+
// Maximum message size allowed from peer.
19+
maxMessageSize = 512
20+
)
1421

1522
var upgrader = websocket.Upgrader{
1623
ReadBufferSize: 1024,
@@ -19,32 +26,105 @@ var upgrader = websocket.Upgrader{
1926
CheckOrigin: func(r *http.Request) bool { return true },
2027
}
2128

22-
// handle /ws route, upgrade HTTP request and forward ws conn to worker
23-
func wsHandler(w http.ResponseWriter, r *http.Request) {
24-
conn, err := upgrader.Upgrade(w, r, nil)
25-
if err != nil {
26-
http.Error(w, "Could not open websocket connection", http.StatusBadRequest)
27-
}
29+
type JsonData struct {
30+
Text string `json:"text"`
31+
Client string `json:"client"`
32+
Timestamp string `json:"timestamp"`
33+
}
2834

29-
go handleConnection(conn)
35+
type Client struct {
36+
hub *ConnHub
37+
conn *websocket.Conn
38+
send chan []byte
3039
}
3140

32-
func handleConnection(conn *websocket.Conn) {
33-
fmt.Println("Handling new connection")
41+
func (c *Client) readPump() {
42+
// schedule client to be disconnected
43+
defer func() {
44+
c.hub.unregister <- c
45+
c.conn.Close()
46+
}()
3447

48+
// init client connection
49+
c.conn.SetReadLimit(maxMessageSize)
50+
c.conn.SetReadDeadline(time.Now().Add(pongWait))
51+
c.conn.SetPongHandler(func(string) error {
52+
c.conn.SetReadDeadline(time.Now().Add(pongWait))
53+
return nil
54+
})
55+
56+
// handle connection read
3557
for {
36-
fmt.Println("Awaiting data from connection...")
58+
// read JSON data from connection
59+
// message := JsonData{}
60+
// if err := c.conn.ReadJSON(&message); err != nil {
61+
// fmt.Println("Error reading JSON", err)
62+
// }
63+
// fmt.Printf("Get response: %#v\n", message)
3764

38-
res := JsonData{}
65+
// // queue message for writing
66+
// c.hub.broadcast <- message
67+
}
68+
}
3969

40-
if err := conn.ReadJSON(&res); err != nil {
41-
fmt.Println("Error reading JSON", err)
42-
}
70+
func (c *Client) writePump() {
71+
ticker := time.NewTicker(pingPeriod)
72+
defer func() {
73+
ticker.Stop()
74+
c.conn.Close()
75+
}()
4376

44-
fmt.Printf("Get response: %#v\n", res)
77+
for {
78+
select {
79+
case message, ok := <-c.send:
80+
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
81+
if !ok {
82+
// channel has been closed by the hub
83+
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
84+
return
85+
}
86+
87+
w, err := c.conn.NextWriter(websocket.TextMessage)
88+
if err != nil {
89+
return
90+
}
91+
w.Write(message)
4592

46-
if err := conn.WriteJSON(res); err != nil {
47-
fmt.Println("Error writing JSON", err)
93+
// coalesce pending messages into one message
94+
n := len(c.send)
95+
for i := 0; i < n; i++ {
96+
w.Write(<-c.send)
97+
}
98+
99+
if err := w.Close(); err != nil {
100+
return
101+
}
102+
// send ping over websocket
103+
case <-ticker.C:
104+
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
105+
if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
106+
return
107+
}
48108
}
49109
}
50110
}
111+
112+
// handle /ws route, upgrade HTTP request and begin handling of client conn
113+
func wsHandler(hub *ConnHub, w http.ResponseWriter, r *http.Request) {
114+
conn, err := upgrader.Upgrade(w, r, nil)
115+
if err != nil {
116+
http.Error(w, "Could not open websocket connection", http.StatusBadRequest)
117+
}
118+
119+
// init new client, register to hub
120+
client := &Client{
121+
hub: hub,
122+
conn: conn,
123+
send: make(chan []byte, 256),
124+
}
125+
client.hub.register <- client
126+
127+
// separate reads and writes to conform to WebSocket standard of one concurrent reader and writer
128+
// go client.writePump()
129+
// go client.readPump()
130+
}

0 commit comments

Comments
 (0)