Challenge #3c: Fault Tolerant Broadcast

Implementation

The third part adds network partitions as a failure mode.

Your node should propagate values it sees from broadcast messages to the other nodes in the cluster—even in the face of network partitions! Values should propagate to all other nodes by the end of the test.

To achieve this, we can modify the broadcast to retry until an acknowledgement is received.

Let's extract the broadcast logic to its own function.

func (s *Server) BroadcastHandler(req maelstrom.Message) error {
    // ...

    s.broadcastToNeighbours(req.Src, body.Message) // New!

    // ...
}

func (s *Server) broadcastToNeighbours(src string, message int) { // New!
    for _, neighbour := range s.Topology[s.Node.ID()] {
        if neighbour == src {
            continue
        }
        s.Node.Send(neighbour, map[string]any{
            "type":    "broadcast",
            "message": message,
        })
    }
}

Let's run the test to ensure things are still working.

    ...
    at clojure.lang.RestFn.invoke(RestFn.java:397)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.base/java.lang.Thread.run(Thread.java:833)

Uh oh, there was a crash during the test. Looking at the logs, it looks like there was a data race on the messages store, Server.Values.

fatal error: concurrent map writes

Let's fix this.

First, let's add a mutex.

type Server struct {
    Node     *maelstrom.Node
    Topology map[string][]string

    messagesMu sync.RWMutex // New!
    messages   map[int]struct{}
}

Note that I've renamed Server.Values to Server.messages, because (1) it feels more appropriate, and (2) unexporting it makes it clearer that all data access should go through helper methods.

Now, let's define the helper methods for Server.messages.

func (s *Server) readMessages() []int {
    s.messagesMu.RLock()
    defer s.messagesMu.RUnlock()
    return maps.Keys(s.messages)
}

func (s *Server) storeMessage(msg int) {
    s.messagesMu.Lock()
    defer s.messagesMu.Unlock()
    s.messages[msg] = struct{}{}
}

func (s *Server) messageExists(msg int) bool {
    s.messagesMu.RLock()
    defer s.messagesMu.RUnlock()
    _, ok := s.messages[msg]
    return ok
}

Pay attention that the mutex unlocks are called via defer rather than directly. Mutex unlocks should be deferred because this ensures that the mutex will always get unlocked, even in the event of a panic (see extra).

Let's change our handlers to use these helper methods.

func (s *Server) BroadcastHandler(req maelstrom.Message) error {
    var body struct {
        Message   int `json:"message"`
        MessageID int `json:"msg_id"`
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    if s.messageExists(body.Message) { // New!
        return nil
    }

    s.storeMessage(body.Message) // New!

    s.broadcastToNeighbours(req.Src, body.Message)

    if body.MessageID != 0 {
        return s.Node.Reply(req, map[string]any{
            "type": "broadcast_ok",
        })
    }

    return nil
}

func (s *Server) ReadHandler(req maelstrom.Message) error {
    return s.Node.Reply(req, map[string]any{
        "type":     "read_ok",
        "messages": s.readMessages(), // New!
    })
}

After re-running the tests (without the network partitions), we're back to green! Phew.

Everything looks good! ヽ(‘ー`)ノ

Now, let's implement the retry loop in our broadcastToNeighbours method.

func (s *Server) broadcastToNeighbours(src string, message int) {
    for _, neighbour := range s.Topology[s.Node.ID()] {
        if neighbour == src {
            continue
        }
        for { // Retry loop!
            err := s.Node.Send(neighbour, map[string]any{
                "type":    "broadcast",
                "message": message,
            })
            if err != nil {
                continue
            }
            break
        }
    }
}

We'll run the tests with the network partitions this time.

Analysis invalid! (ノಥ益ಥ)ノ ┻━┻

Hmm, that didn't do it. What went wrong?

:lost-count 47

We lost 47 messages. Let's re-evaluate what went wrong.

Send() sends a fire-and-forget message and doesn't expect a response. As such, it does not attach a message ID. — Challenge #3a: Single-Node Broadcast

A nil error from Node.Send only guarantees that the sending node has sent the message, but it tells us nothing about whether the receiving node actually received the message.

To get a response, we have to use Node.RPC

RPC() sends a message and accepts a response handler. The message will be decorated with a message ID so the handler can be invoked when a response message is received. — Challenge #3a: Single-Node Broadcast

Let's make the modification.

func (s *Server) broadcastToNeighbours(src string, message int) {
    for _, neighbour := range s.Topology[s.Node.ID()] {
        if neighbour == src {
            continue
        }
        var acked bool
        for !acked {
            s.Node.RPC(
                neighbour,
                map[string]any{
                    "type":    "broadcast",
                    "message": message,
                },
                func(msg maelstrom.Message) error {
                    acked = true
                    return nil
                },
            )
        }
    }
}

Now a node will retry the broadcast until it receives an acknowledgement from the receiving node.

Let's re-run the tests.

Analysis invalid! (ノಥ益ಥ)ノ ┻━┻

Uh oh.

2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328135,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328136,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328137,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328138,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328139,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328140,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328141,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328142,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328143,"type":"broadcast"}}

The nodes are receiving the same message multiple times. In addition, nodes are also sending the same message multiple times.

func (s *Server) BroadcastHandler(req maelstrom.Message) error {
    var body struct {
        Message   int `json:"message"`
        MessageID int `json:"msg_id"`
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    if s.messageExists(body.Message) {
        return nil
    }

    s.storeMessage(body.Message)

    s.broadcastToNeighbours(req.Src, body.Message)

    if body.MessageID != 0 {
        return s.Node.Reply(req, map[string]any{
            "type": "broadcast_ok",
        })
    }

    return nil
}

Looking at Server.BroadcastHandler, we find the source of the bug. If a message already exists in a node's message store, no acknowledge is sent back.

Let's fix this.

func (s *Server) BroadcastHandler(req maelstrom.Message) error {
    var body struct {
        Message   int `json:"message"`
        MessageID int `json:"msg_id"`
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    s.Node.Reply(req, map[string]any{
        "type": "broadcast_ok",
    })

    if s.messageExists(body.Message) {
        return nil
    }

    s.storeMessage(body.Message)

    s.broadcastToNeighbours(req.Src, body.Message)

    return nil
}

Now, regardless of whether the incoming message has a message ID, a node will always send an acknowledgement reply.

Let's run the tests again.

Analysis invalid! (ノಥ益ಥ)ノ ┻━┻

To add to this bummer, we still have nodes repeatedly sending the same message.

What if we added a delay between each retry run?

func (s *Server) broadcastToNeighbours(src string, message int) {
    for _, neighbour := range s.Topology[s.Node.ID()] {
        if neighbour == src {
            continue
        }

        var acked bool

        for !acked {
            s.Node.RPC(
                neighbour,
                map[string]any{
                    "type":    "broadcast",
                    "message": message,
                },
                func(msg maelstrom.Message) error {
                    acked = true
                    return nil
                },
            )

            time.Sleep(500 * time.Millisecond) // New!
        }
    }
}

Re-run the tests.

Everything looks good! ヽ(‘ー`)ノ

That did it! A good takeaway here is that you should wait between each retry attempt. In production, this is normally implemented via exponential backoff. In this case, our backoff is constant, 500 milliseconds between each run.

Are we done yet? Not quite! Node.RPC's callback is called asynchronously and it's modifying a shared flag ack. Let's protect it with a mutex.

func (s *Server) broadcastToNeighbours(src string, message int) {
    for _, neighbour := range s.Topology[s.Node.ID()] {
        if neighbour == src {
            continue
        }

        var ackedMu sync.Mutex // New!
        var acked bool

        for !acked {
            s.Node.RPC(
                neighbour,
                map[string]any{
                    "type":    "broadcast",
                    "message": message,
                },
                func(msg maelstrom.Message) error {
                    ackedMu.Lock() // New!
                    defer ackedMu.Unlock()
                    acked = true
                    return nil
                },
            )

            time.Sleep(500 * time.Millisecond)
        }
    }
}

See you in the next one!

Attribution

This part stumped me for a few days. My solution is not entirely my own. I had inspiration from these sources.

  1. github.com/jepsen-io/maelstrom/blob/main/do..
  2. github.com/teivah/gossip-glomers/blob/5970b..

Complete code

package main

import (
    "encoding/json"
    "log"
    "sync"
    "time"

    maelstrom "github.com/jepsen-io/maelstrom/demo/go"
    "golang.org/x/exp/maps"
)

func main() {
    srv := NewServer(maelstrom.NewNode())

    if err := srv.Run(); err != nil {
        log.Fatal(err)
    }
}

type Server struct {
    Node     *maelstrom.Node
    Topology map[string][]string

    messagesMu sync.RWMutex
    messages   map[int]struct{}
}

func (s *Server) Run() error {
    return s.Node.Run()
}

func NewServer(node *maelstrom.Node) *Server {
    s := &Server{
        Node:     node,
        messages: make(map[int]struct{}),
    }

    node.Handle("broadcast", s.BroadcastHandler)
    node.Handle("read", s.ReadHandler)
    node.Handle("topology", s.TopologyHandler)

    return s
}

func (s *Server) BroadcastHandler(req maelstrom.Message) error {
    var body struct {
        Message   int `json:"message"`
        MessageID int `json:"msg_id"`
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    s.Node.Reply(req, map[string]any{
        "type": "broadcast_ok",
    })

    if s.messageExists(body.Message) {
        return nil
    }

    s.storeMessage(body.Message)

    s.broadcastToNeighbours(req.Src, body.Message)

    return nil
}

func (s *Server) broadcastToNeighbours(src string, message int) {
    for _, neighbour := range s.Topology[s.Node.ID()] {
        if neighbour == src {
            continue
        }

        var ackedMu sync.Mutex
        var acked bool

        for !acked {
            s.Node.RPC(
                neighbour,
                map[string]any{
                    "type":    "broadcast",
                    "message": message,
                },
                func(msg maelstrom.Message) error {
                    ackedMu.Lock()
                    defer ackedMu.Unlock()
                    acked = true
                    return nil
                },
            )

            time.Sleep(500 * time.Millisecond)
        }
    }
}

func (s *Server) ReadHandler(req maelstrom.Message) error {
    return s.Node.Reply(req, map[string]any{
        "type":     "read_ok",
        "messages": s.readMessages(),
    })
}

func (s *Server) TopologyHandler(req maelstrom.Message) error {
    var body struct {
        Topology map[string][]string `json:"topology"`
    }
    if err := json.Unmarshal(req.Body, &body); err != nil {
        return err
    }

    s.Topology = body.Topology

    return s.Node.Reply(req, map[string]any{
        "type": "topology_ok",
    })
}

func (s *Server) readMessages() []int {
    s.messagesMu.RLock()
    defer s.messagesMu.RUnlock()
    return maps.Keys(s.messages)
}

func (s *Server) storeMessage(msg int) {
    s.messagesMu.Lock()
    defer s.messagesMu.Unlock()
    s.messages[msg] = struct{}{}
}

func (s *Server) messageExists(msg int) bool {
    s.messagesMu.RLock()
    defer s.messagesMu.RUnlock()
    _, ok := s.messages[msg]
    return ok
}