repos / hub.go.git


commit
11fcf63
parent
11fcf63
author
Evgenii Akentev
date
2024-08-29 21:13:41 +0400 +04
basic subscription works
7 files changed,  +343, -0
A go.mod
A go.sum
A hub.go
A .gitignore
+2, -0
1@@ -0,0 +1,2 @@
2+.jj
3+main
A client.go
+134, -0
  1@@ -0,0 +1,134 @@
  2+package main
  3+
  4+import (
  5+	"bytes"
  6+	"log"
  7+	"net/http"
  8+	"time"
  9+
 10+	"github.com/gorilla/websocket"
 11+)
 12+
 13+const (
 14+	// Time allowed to write a message to the peer.
 15+	writeWait = 10 * time.Second
 16+
 17+	// Time allowed to read the next pong message from the peer.
 18+	pongWait = 60 * time.Second
 19+
 20+	// Send pings to peer with this period. Must be less than pongWait.
 21+	pingPeriod = (pongWait * 9) / 10
 22+
 23+	// Maximum message size allowed from peer.
 24+	maxMessageSize = 512
 25+)
 26+
 27+var (
 28+	newline = []byte{'\n'}
 29+	space   = []byte{' '}
 30+)
 31+
 32+var upgrader = websocket.Upgrader{
 33+	ReadBufferSize:  1024,
 34+	WriteBufferSize: 1024,
 35+  CheckOrigin: func(r *http.Request) bool{ return true },
 36+}
 37+
 38+// Client is a middleman between the websocket connection and the hub.
 39+type Client struct {
 40+	hub *Hub
 41+
 42+	// The websocket connection.
 43+	conn *websocket.Conn
 44+
 45+	// Buffered channel of outbound messages.
 46+	send chan []byte
 47+}
 48+
 49+// readPump pumps messages from the websocket connection to the hub.
 50+//
 51+// The application runs readPump in a per-connection goroutine. The application
 52+// ensures that there is at most one reader on a connection by executing all
 53+// reads from this goroutine.
 54+func (c *Client) readPump() {
 55+	defer func() {
 56+		c.hub.unregister <- c
 57+		c.conn.Close()
 58+	}()
 59+	c.conn.SetReadLimit(maxMessageSize)
 60+	c.conn.SetReadDeadline(time.Now().Add(pongWait))
 61+	c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
 62+	for {
 63+		_, message, err := c.conn.ReadMessage()
 64+		if err != nil {
 65+			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
 66+				log.Printf("error: %v", err)
 67+			}
 68+			break
 69+		}
 70+		message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
 71+		c.hub.broadcast <- message
 72+	}
 73+}
 74+
 75+// writePump pumps messages from the hub to the websocket connection.
 76+//
 77+// A goroutine running writePump is started for each connection. The
 78+// application ensures that there is at most one writer to a connection by
 79+// executing all writes from this goroutine.
 80+func (c *Client) writePump() {
 81+	ticker := time.NewTicker(pingPeriod)
 82+	defer func() {
 83+		ticker.Stop()
 84+		c.conn.Close()
 85+	}()
 86+	for {
 87+		select {
 88+		case message, ok := <-c.send:
 89+			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
 90+			if !ok {
 91+				// The hub closed the channel.
 92+				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
 93+				return
 94+			}
 95+
 96+			w, err := c.conn.NextWriter(websocket.TextMessage)
 97+			if err != nil {
 98+				return
 99+			}
100+			w.Write(message)
101+
102+			// Add queued chat messages to the current websocket message.
103+			n := len(c.send)
104+			for i := 0; i < n; i++ {
105+				w.Write(newline)
106+				w.Write(<-c.send)
107+			}
108+
109+			if err := w.Close(); err != nil {
110+				return
111+			}
112+		case <-ticker.C:
113+			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
114+			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
115+				return
116+			}
117+		}
118+	}
119+}
120+
121+func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
122+	conn, err := upgrader.Upgrade(w, r, nil)
123+	if err != nil {
124+		log.Println(err)
125+		return
126+	}
127+	client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
128+	client.hub.register <- client
129+
130+	// Allow collection of memory referenced by the caller by doing all work in
131+	// new goroutines.
132+	go client.writePump()
133+	go client.readPump()
134+}
135+
A entities.go
+45, -0
 1@@ -0,0 +1,45 @@
 2+package main
 3+
 4+
 5+import (
 6+//  "encoding/json"
 7+)
 8+
 9+type Message struct {
10+  Event string `json:"event"`
11+  Addrs []string `json:"addresses"`
12+}
13+
14+type PersonDetails struct {
15+  name string
16+  address string
17+}
18+
19+type DocumentData struct {
20+}
21+
22+type DeclarationStatus string
23+const (
24+   Stale DeclarationStatus = "stale"
25+   MoneyFrozen = "money_frozen"
26+   FilesSent = "files_sent"
27+   ViewedByClient = "viewed_by_client"
28+)
29+
30+type ExclusiveRightsClaim struct {
31+}
32+
33+type Copyright struct {
34+  address string
35+  author string
36+  owner string
37+  ownerDetails PersonDetails
38+  royalty *string
39+
40+  document DocumentData
41+  status DeclarationStatus
42+  balance int
43+  price *int
44+  assignmentHash *string
45+  claim *ExclusiveRightsClaim
46+}
A go.mod
+5, -0
1@@ -0,0 +1,5 @@
2+module main
3+
4+go 1.22.5
5+
6+require github.com/gorilla/websocket v1.5.3
A go.sum
+2, -0
1@@ -0,0 +1,2 @@
2+github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
3+github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
A hub.go
+49, -0
 1@@ -0,0 +1,49 @@
 2+package main
 3+
 4+// Hub maintains the set of active clients and broadcasts messages to the
 5+// clients.
 6+type Hub struct {
 7+	// Registered clients.
 8+	clients map[*Client]bool
 9+
10+	// Inbound messages from the clients.
11+	broadcast chan []byte
12+
13+	// Register requests from the clients.
14+	register chan *Client
15+
16+	// Unregister requests from clients.
17+	unregister chan *Client
18+}
19+
20+func newHub() *Hub {
21+	return &Hub{
22+		broadcast:  make(chan []byte),
23+		register:   make(chan *Client),
24+		unregister: make(chan *Client),
25+		clients:    make(map[*Client]bool),
26+	}
27+}
28+
29+func (h *Hub) run() {
30+	for {
31+		select {
32+		case client := <-h.register:
33+			h.clients[client] = true
34+		case client := <-h.unregister:
35+			if _, ok := h.clients[client]; ok {
36+				delete(h.clients, client)
37+				close(client.send)
38+			}
39+		case message := <-h.broadcast:
40+			for client := range h.clients {
41+				select {
42+				case client.send <- message:
43+				default:
44+					close(client.send)
45+					delete(h.clients, client)
46+				}
47+			}
48+		}
49+	}
50+}
A main.go
+106, -0
  1@@ -0,0 +1,106 @@
  2+package main
  3+
  4+import (
  5+	"flag"
  6+	"log"
  7+	"net/http"
  8+
  9+  "github.com/gorilla/websocket"
 10+)
 11+
 12+type Subscribers = map[string]([](*websocket.Conn))
 13+
 14+type State struct {
 15+  copyrights map[string]Copyright
 16+  copyrightSubs map[string]([]*websocket.Conn)
 17+  walletSubs map[string]([]*websocket.Conn)
 18+}
 19+
 20+func newState() *State {
 21+  return &State {
 22+    copyrights: make(map[string]Copyright),
 23+    copyrightSubs: make(map[string]([]*websocket.Conn)),
 24+    walletSubs: make(map[string]([]*websocket.Conn)),
 25+  }
 26+}
 27+
 28+func subscribe(m Subscribers, k string, c *websocket.Conn)  {
 29+  var newSubs [](*websocket.Conn)
 30+  if subs, ok := m[k]; ok {
 31+    newSubs = append(subs, c)
 32+    m[k] = newSubs
 33+  } else {
 34+    newSubs = make([](*websocket.Conn), 1)
 35+    newSubs[0] = c
 36+    m[k] = newSubs
 37+  }
 38+}
 39+
 40+func unsubscribe(m Subscribers, k string, c *websocket.Conn)  {
 41+  var newSubs [](*websocket.Conn)
 42+
 43+  if subs, ok := m[k]; ok {
 44+    for _, sub := range subs {
 45+      if (sub != c) {
 46+        newSubs = append(newSubs, sub)
 47+      }
 48+    }
 49+    m[k] = newSubs
 50+  }
 51+}
 52+
 53+func listenConnection(state *State, c *websocket.Conn) {
 54+  defer func() {
 55+    c.Close()
 56+  }()
 57+
 58+  for {
 59+    var m Message 
 60+    err := c.ReadJSON(&m)
 61+
 62+  	if err != nil {
 63+			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
 64+				log.Printf("error: %v", err)
 65+			}
 66+      log.Printf("error: %v", err)
 67+			break
 68+		}
 69+
 70+    switch m.Event {
 71+    case "subscribe-wallets":
 72+      for _, addr := range m.Addrs {
 73+        subscribe((*state).walletSubs, addr, c)
 74+      }
 75+    default:
 76+      log.Printf("error: Unsupported event: %v", m)
 77+    }
 78+  }
 79+}
 80+
 81+func serve(state *State, w http.ResponseWriter, r *http.Request) {
 82+  conn, err := upgrader.Upgrade(w, r, nil)
 83+  if err != nil {
 84+    log.Println(err)
 85+    return
 86+  }
 87+
 88+  go listenConnection(state, conn)
 89+}
 90+
 91+func main() {
 92+	flag.Parse()
 93+
 94+  state := newState()
 95+
 96+  http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
 97+		serve(state, w, r)
 98+	})
 99+  
100+  log.Println("Started websocket server at :3000")
101+
102+  err := http.ListenAndServe(":3000", nil)
103+	if err != nil {
104+		log.Fatal("ListenAndServe: ", err)
105+	}
106+}
107+