Challenge #3c: Fault Tolerant Broadcast
Implementation
The third part adds network partitions as a failure mode.
Your node should propagate values it sees from
broadcast
messages to the other nodes in the cluster—even in the face of network partitions! Values should propagate to all other nodes by the end of the test.
To achieve this, we can modify the broadcast to retry until an acknowledgement is received.
Let's extract the broadcast logic to its own function.
func (s *Server) BroadcastHandler(req maelstrom.Message) error {
// ...
s.broadcastToNeighbours(req.Src, body.Message) // New!
// ...
}
func (s *Server) broadcastToNeighbours(src string, message int) { // New!
for _, neighbour := range s.Topology[s.Node.ID()] {
if neighbour == src {
continue
}
s.Node.Send(neighbour, map[string]any{
"type": "broadcast",
"message": message,
})
}
}
Let's run the test to ensure things are still working.
...
at clojure.lang.RestFn.invoke(RestFn.java:397)
at clojure.lang.AFn.run(AFn.java:22)
at java.base/java.lang.Thread.run(Thread.java:833)
Uh oh, there was a crash during the test. Looking at the logs, it looks like there was a data race on the messages store, Server.Values
.
fatal error: concurrent map writes
Let's fix this.
First, let's add a mutex.
type Server struct {
Node *maelstrom.Node
Topology map[string][]string
messagesMu sync.RWMutex // New!
messages map[int]struct{}
}
Note that I've renamed Server.Values
to Server.messages
, because (1) it feels more appropriate, and (2) unexporting it makes it clearer that all data access should go through helper methods.
Now, let's define the helper methods for Server.messages
.
func (s *Server) readMessages() []int {
s.messagesMu.RLock()
defer s.messagesMu.RUnlock()
return maps.Keys(s.messages)
}
func (s *Server) storeMessage(msg int) {
s.messagesMu.Lock()
defer s.messagesMu.Unlock()
s.messages[msg] = struct{}{}
}
func (s *Server) messageExists(msg int) bool {
s.messagesMu.RLock()
defer s.messagesMu.RUnlock()
_, ok := s.messages[msg]
return ok
}
Pay attention that the mutex unlocks are called via defer
rather than directly. Mutex unlocks should be deferred because this ensures that the mutex will always get unlocked, even in the event of a panic (see extra).
Let's change our handlers to use these helper methods.
func (s *Server) BroadcastHandler(req maelstrom.Message) error {
var body struct {
Message int `json:"message"`
MessageID int `json:"msg_id"`
}
if err := json.Unmarshal(req.Body, &body); err != nil {
return err
}
if s.messageExists(body.Message) { // New!
return nil
}
s.storeMessage(body.Message) // New!
s.broadcastToNeighbours(req.Src, body.Message)
if body.MessageID != 0 {
return s.Node.Reply(req, map[string]any{
"type": "broadcast_ok",
})
}
return nil
}
func (s *Server) ReadHandler(req maelstrom.Message) error {
return s.Node.Reply(req, map[string]any{
"type": "read_ok",
"messages": s.readMessages(), // New!
})
}
After re-running the tests (without the network partitions), we're back to green! Phew.
Everything looks good! ヽ(‘ー`)ノ
Now, let's implement the retry loop in our broadcastToNeighbours
method.
func (s *Server) broadcastToNeighbours(src string, message int) {
for _, neighbour := range s.Topology[s.Node.ID()] {
if neighbour == src {
continue
}
for { // Retry loop!
err := s.Node.Send(neighbour, map[string]any{
"type": "broadcast",
"message": message,
})
if err != nil {
continue
}
break
}
}
}
We'll run the tests with the network partitions this time.
Analysis invalid! (ノಥ益ಥ)ノ ┻━┻
Hmm, that didn't do it. What went wrong?
:lost-count 47
We lost 47 messages. Let's re-evaluate what went wrong.
Send()
sends a fire-and-forget message and doesn't expect a response. As such, it does not attach a message ID. — Challenge #3a: Single-Node Broadcast
A nil
error from Node.Send
only guarantees that the sending node has sent the message, but it tells us nothing about whether the receiving node actually received the message.
To get a response, we have to use Node.RPC
RPC()
sends a message and accepts a response handler. The message will be decorated with a message ID so the handler can be invoked when a response message is received. — Challenge #3a: Single-Node Broadcast
Let's make the modification.
func (s *Server) broadcastToNeighbours(src string, message int) {
for _, neighbour := range s.Topology[s.Node.ID()] {
if neighbour == src {
continue
}
var acked bool
for !acked {
s.Node.RPC(
neighbour,
map[string]any{
"type": "broadcast",
"message": message,
},
func(msg maelstrom.Message) error {
acked = true
return nil
},
)
}
}
}
Now a node will retry the broadcast until it receives an acknowledgement from the receiving node.
Let's re-run the tests.
Analysis invalid! (ノಥ益ಥ)ノ ┻━┻
Uh oh.
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328135,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328136,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328137,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328138,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328139,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328140,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328141,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328142,"type":"broadcast"}}
2023/03/06 21:50:58 Received {n3 n0 {"message":7,"msg_id":1328143,"type":"broadcast"}}
The nodes are receiving the same message multiple times. In addition, nodes are also sending the same message multiple times.
func (s *Server) BroadcastHandler(req maelstrom.Message) error {
var body struct {
Message int `json:"message"`
MessageID int `json:"msg_id"`
}
if err := json.Unmarshal(req.Body, &body); err != nil {
return err
}
if s.messageExists(body.Message) {
return nil
}
s.storeMessage(body.Message)
s.broadcastToNeighbours(req.Src, body.Message)
if body.MessageID != 0 {
return s.Node.Reply(req, map[string]any{
"type": "broadcast_ok",
})
}
return nil
}
Looking at Server.BroadcastHandler
, we find the source of the bug. If a message already exists in a node's message store, no acknowledge is sent back.
Let's fix this.
func (s *Server) BroadcastHandler(req maelstrom.Message) error {
var body struct {
Message int `json:"message"`
MessageID int `json:"msg_id"`
}
if err := json.Unmarshal(req.Body, &body); err != nil {
return err
}
s.Node.Reply(req, map[string]any{
"type": "broadcast_ok",
})
if s.messageExists(body.Message) {
return nil
}
s.storeMessage(body.Message)
s.broadcastToNeighbours(req.Src, body.Message)
return nil
}
Now, regardless of whether the incoming message has a message ID, a node will always send an acknowledgement reply.
Let's run the tests again.
Analysis invalid! (ノಥ益ಥ)ノ ┻━┻
To add to this bummer, we still have nodes repeatedly sending the same message.
What if we added a delay between each retry run?
func (s *Server) broadcastToNeighbours(src string, message int) {
for _, neighbour := range s.Topology[s.Node.ID()] {
if neighbour == src {
continue
}
var acked bool
for !acked {
s.Node.RPC(
neighbour,
map[string]any{
"type": "broadcast",
"message": message,
},
func(msg maelstrom.Message) error {
acked = true
return nil
},
)
time.Sleep(500 * time.Millisecond) // New!
}
}
}
Re-run the tests.
Everything looks good! ヽ(‘ー`)ノ
That did it! A good takeaway here is that you should wait between each retry attempt. In production, this is normally implemented via exponential backoff. In this case, our backoff is constant, 500 milliseconds between each run.
Are we done yet? Not quite! Node.RPC
's callback is called asynchronously and it's modifying a shared flag ack
. Let's protect it with a mutex.
func (s *Server) broadcastToNeighbours(src string, message int) {
for _, neighbour := range s.Topology[s.Node.ID()] {
if neighbour == src {
continue
}
var ackedMu sync.Mutex // New!
var acked bool
for !acked {
s.Node.RPC(
neighbour,
map[string]any{
"type": "broadcast",
"message": message,
},
func(msg maelstrom.Message) error {
ackedMu.Lock() // New!
defer ackedMu.Unlock()
acked = true
return nil
},
)
time.Sleep(500 * time.Millisecond)
}
}
}
See you in the next one!
Attribution
This part stumped me for a few days. My solution is not entirely my own. I had inspiration from these sources.
Complete code
package main
import (
"encoding/json"
"log"
"sync"
"time"
maelstrom "github.com/jepsen-io/maelstrom/demo/go"
"golang.org/x/exp/maps"
)
func main() {
srv := NewServer(maelstrom.NewNode())
if err := srv.Run(); err != nil {
log.Fatal(err)
}
}
type Server struct {
Node *maelstrom.Node
Topology map[string][]string
messagesMu sync.RWMutex
messages map[int]struct{}
}
func (s *Server) Run() error {
return s.Node.Run()
}
func NewServer(node *maelstrom.Node) *Server {
s := &Server{
Node: node,
messages: make(map[int]struct{}),
}
node.Handle("broadcast", s.BroadcastHandler)
node.Handle("read", s.ReadHandler)
node.Handle("topology", s.TopologyHandler)
return s
}
func (s *Server) BroadcastHandler(req maelstrom.Message) error {
var body struct {
Message int `json:"message"`
MessageID int `json:"msg_id"`
}
if err := json.Unmarshal(req.Body, &body); err != nil {
return err
}
s.Node.Reply(req, map[string]any{
"type": "broadcast_ok",
})
if s.messageExists(body.Message) {
return nil
}
s.storeMessage(body.Message)
s.broadcastToNeighbours(req.Src, body.Message)
return nil
}
func (s *Server) broadcastToNeighbours(src string, message int) {
for _, neighbour := range s.Topology[s.Node.ID()] {
if neighbour == src {
continue
}
var ackedMu sync.Mutex
var acked bool
for !acked {
s.Node.RPC(
neighbour,
map[string]any{
"type": "broadcast",
"message": message,
},
func(msg maelstrom.Message) error {
ackedMu.Lock()
defer ackedMu.Unlock()
acked = true
return nil
},
)
time.Sleep(500 * time.Millisecond)
}
}
}
func (s *Server) ReadHandler(req maelstrom.Message) error {
return s.Node.Reply(req, map[string]any{
"type": "read_ok",
"messages": s.readMessages(),
})
}
func (s *Server) TopologyHandler(req maelstrom.Message) error {
var body struct {
Topology map[string][]string `json:"topology"`
}
if err := json.Unmarshal(req.Body, &body); err != nil {
return err
}
s.Topology = body.Topology
return s.Node.Reply(req, map[string]any{
"type": "topology_ok",
})
}
func (s *Server) readMessages() []int {
s.messagesMu.RLock()
defer s.messagesMu.RUnlock()
return maps.Keys(s.messages)
}
func (s *Server) storeMessage(msg int) {
s.messagesMu.Lock()
defer s.messagesMu.Unlock()
s.messages[msg] = struct{}{}
}
func (s *Server) messageExists(msg int) bool {
s.messagesMu.RLock()
defer s.messagesMu.RUnlock()
_, ok := s.messages[msg]
return ok
}