Challenge #3b: Multi-Node Broadcast
The second part of this challenge switches things up. We now have to "propagate values it sees from broadcast
messages to the other nodes in the cluster."
What we would like to do is modify our broadcast
handler for value propagation. We could use Node.NodeIDs
to get a list of all other nodes in the cluster, but this discards the topology information given to our nodes.
A topology is represented by the following JSON:
{
"n1": ["n2", "n3"],
"n2": ["n1"],
"n3": ["n1"]
}
In this case, our topology is an adjacency list.
The following diagram illustrates the topology.
┌────┐
┌────┤ n1 ├─────┐
│ └────┘ │
│ │
│ │
┌──┴─┐ ┌─┴──┐
│ n2 │ │ n3 │
└────┘ └────┘
With this topology, nodes can only broadcast to its neighbouring nodes, which is precisely the information given by the adjacency list.
func (s *Server) BroadcastHandler(req maelstrom.Message) error {
var body struct {
Message int `json:"message"`
}
if err := json.Unmarshal(req.Body, &body); err != nil {
return err
}
s.Values = append(s.Values, body.Message)
for _, neighbour := range s.Topology[s.Node.ID()] { // New!
s.Node.Send(neighbour, map[string]any{
"type": "broadcast",
"message": body.Message,
})
}
return s.Node.Reply(req, struct {
Type string `json:"type"`
}{
Type: "broadcast_ok",
})
}
Let's test this code using two nodes instead of the full five nodes.
go build -o broadcast . && maelstrom test -w broadcast --bin ./broadcast --node-count 2 --time-limit 20 --rate 10
...
at clojure.core$bound_fn_STAR_$fn__5818.doInvoke(core.clj:2020)
at clojure.lang.RestFn.invoke(RestFn.java:397)
at clojure.lang.AFn.run(AFn.java:22)
at java.base/java.lang.Thread.run(Thread.java:833)
It doesn't look like we're successful.
Logs from one of the nodes give us a clue.
2023/03/05 11:10:30 No handler for {"id":14,"src":"n0","dest":"n1","body":{"in_reply_to":0,"type":"broadcast_ok"}}
Since we didn't define a handler for broadcast_ok
, a sending node has no idea what to do when it receives a reply from the receiving node.
We have two approaches for dealing with this.
Create a no-op
broadcast_ok
handler.Check if an incoming message has a message ID before replying.
The second approach is possible because Node.Send
does not attach a message ID. Let's go with the second approach.
func (s *Server) BroadcastHandler(req maelstrom.Message) error {
var body struct {
Message int `json:"message"`
MessageID int `json:"msg_id"` // New!
}
if err := json.Unmarshal(req.Body, &body); err != nil {
return err
}
s.Values = append(s.Values, body.Message)
for _, neighbour := range s.Topology[s.Node.ID()] {
s.Node.Send(neighbour, map[string]any{
"type": "broadcast",
"message": body.Message,
})
}
if body.MessageID != 0 { // New!
return s.Node.Reply(req, struct {
Type string `json:"type"`
}{
Type: "broadcast_ok",
})
}
return nil
}
Another round of tests with two nodes.
Everything looks good! ヽ(‘ー`)ノ
It looks like we're good with two nodes. Onward to the full test with five nodes.
Analysis invalid! (ノಥ益ಥ)ノ ┻━┻
Looking at the maelstrom
generated messages diagram, it looks like the nodes are stuck in an infinite loop.
Using the previous three-node topology, we start to see the problem with our current approach.
┌────┐
┌────┤ n1 ├─────┐
│ └────┘ │
│ │
│ │
┌──┴─┐ ┌─┴──┐
│ n2 │ │ n3 │
└────┘ └────┘
When n1
receives a broadcast
, it will send broadcast
to n2
and n3
. When n2
receives a broadcast
, it will send broadcast
to n1
. When n3
receives a broadcast
, it will send broadcast
to n1
. This goes on and on.
Let's make our logic more sophisticated by skipping the broadcast if a neighbouring node is the sender.
for _, neighbour := range s.Topology[s.Node.ID()] {
if neighbour == req.Src { // New!
continue
}
s.Node.Send(neighbour, map[string]any{
"type": "broadcast",
"message": body.Message,
})
}
Everything looks good! ヽ(‘ー`)ノ
Great! It looks like that worked. However, there is another problem I noticed. From a node's logs, we can see that there's redundant work being performed.
2023/03/05 11:28:20 Received {n1 n2 {"message":1,"type":"broadcast"}}
2023/03/05 11:28:20 Received {n1 n2 {"message":0,"type":"broadcast"}}
2023/03/05 11:28:20 Received {n1 n2 {"message":0,"type":"broadcast"}}
2023/03/05 11:28:20 Received {n1 n2 {"message":1,"type":"broadcast"}}
The same message is getting propagated multiple times.
What we want is for a given message to get propagated to each of the neighbours once. Since message values are unique across all messages, we could switch to a set data structure instead of a list.
Go doesn't natively have a set data structure, so the best approximation is a map[int]struct{}
(or a map[int]bool
, but I prefer the empty struct). Why do we use struct{}
as the value? That's because "the empty struct has a width of zero. It occupies zero bytes of storage." (source).
Here's what that looks like.
type Server struct {
Node *maelstrom.Node
Topology map[string][]string
Values map[int]struct{} // New!
}
func NewServer(node *maelstrom.Node) *Server {
s := &Server{
Node: node,
Values: make(map[int]struct{}), // New!
}
node.Handle("broadcast", s.BroadcastHandler)
node.Handle("read", s.ReadHandler)
node.Handle("topology", s.TopologyHandler)
return s
}
func (s *Server) BroadcastHandler(req maelstrom.Message) error {
var body struct {
Message int `json:"message"`
MessageID int `json:"msg_id"`
}
if err := json.Unmarshal(req.Body, &body); err != nil {
return err
}
if _, ok := s.Values[body.Message]; ok { // New!
return nil
}
s.Values[body.Message] = struct{}{} // New!
for _, neighbour := range s.Topology[s.Node.ID()] {
if neighbour == req.Src {
continue
}
s.Node.Send(neighbour, map[string]any{
"type": "broadcast",
"message": body.Message,
})
}
if body.MessageID != 0 {
return s.Node.Reply(req, struct {
Type string `json:"type"`
}{
Type: "broadcast_ok",
})
}
return nil
}
We also have to modify Server.ReadHandler
now that Server.Values
is not a []int
.
func (s *Server) ReadHandler(req maelstrom.Message) error {
return s.Node.Reply(req, struct {
Type string `json:"type"`
Messages []int `json:"messages"`
}{
Type: "read_ok",
Messages: maps.Keys(s.Values), // New!
})
}
I'm using the experimental maps
package to simplify this code.
Onward to the test.
Everything looks good! ヽ(‘ー`)ノ
Our node logs no longer indicate the same message getting propagated multiple times.
See you in the next one!
Complete code.
package main
import (
"encoding/json"
"log"
maelstrom "github.com/jepsen-io/maelstrom/demo/go"
"golang.org/x/exp/maps"
)
func main() {
srv := NewServer(maelstrom.NewNode())
if err := srv.Run(); err != nil {
log.Fatal(err)
}
}
type Server struct {
Node *maelstrom.Node
Topology map[string][]string
Values map[int]struct{}
}
func (s *Server) Run() error {
return s.Node.Run()
}
func NewServer(node *maelstrom.Node) *Server {
s := &Server{
Node: node,
Values: make(map[int]struct{}),
}
node.Handle("broadcast", s.BroadcastHandler)
node.Handle("read", s.ReadHandler)
node.Handle("topology", s.TopologyHandler)
return s
}
func (s *Server) BroadcastHandler(req maelstrom.Message) error {
var body struct {
Message int `json:"message"`
MessageID int `json:"msg_id"`
}
if err := json.Unmarshal(req.Body, &body); err != nil {
return err
}
if _, ok := s.Values[body.Message]; ok {
return nil
}
s.Values[body.Message] = struct{}{}
for _, neighbour := range s.Topology[s.Node.ID()] {
if neighbour == req.Src {
continue
}
s.Node.Send(neighbour, map[string]any{
"type": "broadcast",
"message": body.Message,
})
}
if body.MessageID != 0 {
return s.Node.Reply(req, struct {
Type string `json:"type"`
}{
Type: "broadcast_ok",
})
}
return nil
}
func (s *Server) ReadHandler(req maelstrom.Message) error {
return s.Node.Reply(req, struct {
Type string `json:"type"`
Messages []int `json:"messages"`
}{
Type: "read_ok",
Messages: maps.Keys(s.Values),
})
}
func (s *Server) TopologyHandler(req maelstrom.Message) error {
var body struct {
Topology map[string][]string `json:"topology"`
}
if err := json.Unmarshal(req.Body, &body); err != nil {
return err
}
s.Topology = body.Topology
return s.Node.Reply(req, struct {
Type string `json:"type"`
}{
Type: "topology_ok",
})
}