Challenge #3b: Multi-Node Broadcast

The second part of this challenge switches things up. We now have to "propagate values it sees from broadcast messages to the other nodes in the cluster."

What we would like to do is modify our broadcast handler for value propagation. We could use Node.NodeIDs to get a list of all other nodes in the cluster, but this discards the topology information given to our nodes.

A topology is represented by the following JSON:

{
    "n1": ["n2", "n3"],
    "n2": ["n1"],
    "n3": ["n1"]
}

In this case, our topology is an adjacency list.

The following diagram illustrates the topology.

         ┌────┐
    ┌────┤ n1 ├─────┐
    │    └────┘     │
    │               │
    │               │
 ┌──┴─┐           ┌─┴──┐
 │ n2 │           │ n3 │
 └────┘           └────┘

With this topology, nodes can only broadcast to its neighbouring nodes, which is precisely the information given by the adjacency list.

func (s *Server) BroadcastHandler(req maelstrom.Message) error {
    var body struct {
        Message int `json:"message"`
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    s.Values = append(s.Values, body.Message)

    for _, neighbour := range s.Topology[s.Node.ID()] { // New!
        s.Node.Send(neighbour, map[string]any{
            "type":    "broadcast",
            "message": body.Message,
        })
    }

    return s.Node.Reply(req, struct {
        Type string `json:"type"`
    }{
        Type: "broadcast_ok",
    })
}

Let's test this code using two nodes instead of the full five nodes.

go build -o broadcast . && maelstrom test -w broadcast --bin ./broadcast --node-count 2 --time-limit 20 --rate 10
...
    at clojure.core$bound_fn_STAR_$fn__5818.doInvoke(core.clj:2020)
    at clojure.lang.RestFn.invoke(RestFn.java:397)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.base/java.lang.Thread.run(Thread.java:833)

It doesn't look like we're successful.

Logs from one of the nodes give us a clue.

2023/03/05 11:10:30 No handler for {"id":14,"src":"n0","dest":"n1","body":{"in_reply_to":0,"type":"broadcast_ok"}}

Since we didn't define a handler for broadcast_ok, a sending node has no idea what to do when it receives a reply from the receiving node.

We have two approaches for dealing with this.

  1. Create a no-op broadcast_ok handler.

  2. Check if an incoming message has a message ID before replying.

The second approach is possible because Node.Send does not attach a message ID. Let's go with the second approach.

func (s *Server) BroadcastHandler(req maelstrom.Message) error {
    var body struct {
        Message   int    `json:"message"`
        MessageID int    `json:"msg_id"` // New!
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    s.Values = append(s.Values, body.Message)

    for _, neighbour := range s.Topology[s.Node.ID()] {
        s.Node.Send(neighbour, map[string]any{
            "type":    "broadcast",
            "message": body.Message,
        })
    }

    if body.MessageID != 0 { // New!
        return s.Node.Reply(req, struct {
            Type string `json:"type"`
        }{
            Type: "broadcast_ok",
        })
    }

    return nil
}

Another round of tests with two nodes.

Everything looks good! ヽ(‘ー`)ノ

It looks like we're good with two nodes. Onward to the full test with five nodes.

Analysis invalid! (ノಥ益ಥ)ノ ┻━┻

Looking at the maelstrom generated messages diagram, it looks like the nodes are stuck in an infinite loop.

Using the previous three-node topology, we start to see the problem with our current approach.

         ┌────┐
    ┌────┤ n1 ├─────┐
    │    └────┘     │
    │               │
    │               │
 ┌──┴─┐           ┌─┴──┐
 │ n2 │           │ n3 │
 └────┘           └────┘

When n1 receives a broadcast, it will send broadcast to n2 and n3. When n2 receives a broadcast, it will send broadcast to n1. When n3 receives a broadcast, it will send broadcast to n1. This goes on and on.

Let's make our logic more sophisticated by skipping the broadcast if a neighbouring node is the sender.

for _, neighbour := range s.Topology[s.Node.ID()] {
    if neighbour == req.Src { // New!
        continue
    }
    s.Node.Send(neighbour, map[string]any{
        "type":    "broadcast",
        "message": body.Message,
    })
}
Everything looks good! ヽ(‘ー`)ノ

Great! It looks like that worked. However, there is another problem I noticed. From a node's logs, we can see that there's redundant work being performed.

2023/03/05 11:28:20 Received {n1 n2 {"message":1,"type":"broadcast"}}
2023/03/05 11:28:20 Received {n1 n2 {"message":0,"type":"broadcast"}}
2023/03/05 11:28:20 Received {n1 n2 {"message":0,"type":"broadcast"}}
2023/03/05 11:28:20 Received {n1 n2 {"message":1,"type":"broadcast"}}

The same message is getting propagated multiple times.

What we want is for a given message to get propagated to each of the neighbours once. Since message values are unique across all messages, we could switch to a set data structure instead of a list.

Go doesn't natively have a set data structure, so the best approximation is a map[int]struct{} (or a map[int]bool, but I prefer the empty struct). Why do we use struct{} as the value? That's because "the empty struct has a width of zero. It occupies zero bytes of storage." (source).

Here's what that looks like.

type Server struct {
    Node     *maelstrom.Node
    Topology map[string][]string
    Values   map[int]struct{} // New!
}

func NewServer(node *maelstrom.Node) *Server {
    s := &Server{
        Node: node,
        Values: make(map[int]struct{}), // New!
    }

    node.Handle("broadcast", s.BroadcastHandler)
    node.Handle("read", s.ReadHandler)
    node.Handle("topology", s.TopologyHandler)

    return s
}

func (s *Server) BroadcastHandler(req maelstrom.Message) error {
    var body struct {
        Message   int    `json:"message"`
        MessageID int    `json:"msg_id"`
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    if _, ok := s.Values[body.Message]; ok { // New!
        return nil
    }

    s.Values[body.Message] = struct{}{} // New!

    for _, neighbour := range s.Topology[s.Node.ID()] {
        if neighbour == req.Src {
            continue
        }
        s.Node.Send(neighbour, map[string]any{
            "type":    "broadcast",
            "message": body.Message,
        })
    }

    if body.MessageID != 0 {
        return s.Node.Reply(req, struct {
            Type string `json:"type"`
        }{
            Type: "broadcast_ok",
        })
    }

    return nil
}

We also have to modify Server.ReadHandler now that Server.Values is not a []int.

func (s *Server) ReadHandler(req maelstrom.Message) error {
    return s.Node.Reply(req, struct {
        Type     string `json:"type"`
        Messages []int  `json:"messages"`
    }{
        Type:     "read_ok",
        Messages: maps.Keys(s.Values), // New!
    })
}

I'm using the experimental maps package to simplify this code.

Onward to the test.

Everything looks good! ヽ(‘ー`)ノ

Our node logs no longer indicate the same message getting propagated multiple times.

See you in the next one!


Complete code.

package main

import (
    "encoding/json"
    "log"

    maelstrom "github.com/jepsen-io/maelstrom/demo/go"
    "golang.org/x/exp/maps"
)

func main() {
    srv := NewServer(maelstrom.NewNode())

    if err := srv.Run(); err != nil {
        log.Fatal(err)
    }
}

type Server struct {
    Node     *maelstrom.Node
    Topology map[string][]string
    Values   map[int]struct{}
}

func (s *Server) Run() error {
    return s.Node.Run()
}

func NewServer(node *maelstrom.Node) *Server {
    s := &Server{
        Node: node,
        Values: make(map[int]struct{}),
    }

    node.Handle("broadcast", s.BroadcastHandler)
    node.Handle("read", s.ReadHandler)
    node.Handle("topology", s.TopologyHandler)

    return s
}

func (s *Server) BroadcastHandler(req maelstrom.Message) error {
    var body struct {
        Message   int `json:"message"`
        MessageID int `json:"msg_id"`
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    if _, ok := s.Values[body.Message]; ok {
        return nil
    }

    s.Values[body.Message] = struct{}{}

    for _, neighbour := range s.Topology[s.Node.ID()] {
        if neighbour == req.Src {
            continue
        }
        s.Node.Send(neighbour, map[string]any{
            "type":    "broadcast",
            "message": body.Message,
        })
    }

    if body.MessageID != 0 {
        return s.Node.Reply(req, struct {
            Type string `json:"type"`
        }{
            Type: "broadcast_ok",
        })
    }

    return nil
}

func (s *Server) ReadHandler(req maelstrom.Message) error {
    return s.Node.Reply(req, struct {
        Type     string `json:"type"`
        Messages []int  `json:"messages"`
    }{
        Type:     "read_ok",
        Messages: maps.Keys(s.Values),
    })
}

func (s *Server) TopologyHandler(req maelstrom.Message) error {
    var body struct {
        Topology map[string][]string `json:"topology"`
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    s.Topology = body.Topology

    return s.Node.Reply(req, struct {
        Type string `json:"type"`
    }{
        Type: "topology_ok",
    })
}